1pub mod config;
4pub mod http;
5mod listener;
6mod safety;
7mod tls;
8
9#[cfg(test)]
10mod data_plane_tests;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14
15use tokio::sync::{broadcast, watch, Mutex};
16use tokio_util::sync::CancellationToken;
17
18use koi_common::capability::{Capability, CapabilityStatus};
19
20use listener::{spawn_listener, ListenerStatus};
21
22pub use config::ProxyEntry;
23pub use safety::{ensure_backend_allowed, parse_backend};
24
25#[derive(Debug, Clone)]
27pub enum ProxyEvent {
28 EntryUpdated { entry: ProxyEntry },
30 EntryRemoved { name: String },
32}
33
34#[derive(Debug, thiserror::Error)]
35pub enum ProxyError {
36 #[error("proxy config error: {0}")]
37 Config(String),
38
39 #[error("proxy io error: {0}")]
40 Io(String),
41
42 #[error("proxy invalid config: {0}")]
43 InvalidConfig(String),
44
45 #[error("proxy entry not found: {0}")]
46 NotFound(String),
47}
48
49#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
55pub struct ProxyStatus {
56 pub name: String,
57 pub listen_port: u16,
58 pub backend: String,
59 pub allow_remote: bool,
60 pub cert_source: String,
62 pub state: String,
64 #[serde(skip_serializing_if = "Option::is_none")]
66 pub error: Option<String>,
67}
68
69pub struct ProxyCore {
70 entries: Arc<Mutex<Vec<ProxyEntry>>>,
71 event_tx: broadcast::Sender<ProxyEvent>,
72 data_dir: Option<std::path::PathBuf>,
73}
74
75impl ProxyCore {
76 pub fn new() -> Result<Self, ProxyError> {
77 let entries = config::load_entries()?;
78 Ok(Self {
79 entries: Arc::new(Mutex::new(entries)),
80 event_tx: koi_common::events::event_channel().0,
81 data_dir: None,
82 })
83 }
84
85 pub fn with_data_dir(data_dir: &std::path::Path) -> Result<Self, ProxyError> {
87 let entries = config::load_entries_with_data_dir(Some(data_dir))?;
88 Ok(Self {
89 entries: Arc::new(Mutex::new(entries)),
90 event_tx: koi_common::events::event_channel().0,
91 data_dir: Some(data_dir.to_path_buf()),
92 })
93 }
94
95 pub async fn entries(&self) -> Vec<ProxyEntry> {
96 self.entries.lock().await.clone()
97 }
98
99 pub async fn reload(&self) -> Result<Vec<ProxyEntry>, ProxyError> {
100 let data_dir = self.data_dir.clone();
101 let entries = tokio::task::spawn_blocking(move || {
102 config::load_entries_with_data_dir(data_dir.as_deref())
103 })
104 .await
105 .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
106 let mut guard = self.entries.lock().await;
107 *guard = entries.clone();
108 Ok(entries)
109 }
110
111 pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, ProxyError> {
112 let data_dir = self.data_dir.clone();
113 let entry_for_io = entry.clone();
114 let entries = tokio::task::spawn_blocking(move || {
115 config::upsert_entry_with_data_dir(entry_for_io, data_dir.as_deref())
116 })
117 .await
118 .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
119 let mut guard = self.entries.lock().await;
120 *guard = entries.clone();
121 let _ = self.event_tx.send(ProxyEvent::EntryUpdated { entry });
122 Ok(entries)
123 }
124
125 pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, ProxyError> {
126 let data_dir = self.data_dir.clone();
127 let name_owned = name.to_string();
128 let entries = tokio::task::spawn_blocking(move || {
129 config::remove_entry_with_data_dir(&name_owned, data_dir.as_deref())
130 })
131 .await
132 .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
133 let mut guard = self.entries.lock().await;
134 *guard = entries.clone();
135 let _ = self.event_tx.send(ProxyEvent::EntryRemoved {
136 name: name.to_string(),
137 });
138 Ok(entries)
139 }
140
141 pub fn subscribe(&self) -> broadcast::Receiver<ProxyEvent> {
143 self.event_tx.subscribe()
144 }
145}
146
147#[async_trait::async_trait]
148impl Capability for ProxyCore {
149 fn name(&self) -> &str {
150 "proxy"
151 }
152
153 async fn status(&self) -> CapabilityStatus {
154 CapabilityStatus {
155 name: "proxy".to_string(),
156 summary: "configured".to_string(),
157 healthy: true,
158 }
159 }
160}
161
162struct ProxyInstance {
163 entry: ProxyEntry,
164 cancel: CancellationToken,
165 status: watch::Receiver<ListenerStatus>,
166}
167
168pub struct ProxyRuntime {
170 core: Arc<ProxyCore>,
171 instances: Arc<Mutex<HashMap<String, ProxyInstance>>>,
172}
173
174impl ProxyRuntime {
175 pub fn new(core: Arc<ProxyCore>) -> Self {
176 Self {
177 core,
178 instances: Arc::new(Mutex::new(HashMap::new())),
179 }
180 }
181
182 pub fn core(&self) -> Arc<ProxyCore> {
183 Arc::clone(&self.core)
184 }
185
186 pub async fn start_all(&self) -> Result<(), ProxyError> {
187 let entries = self.core.entries().await;
188 self.apply_entries(entries).await
189 }
190
191 pub async fn reload(&self) -> Result<(), ProxyError> {
192 let entries = self.core.reload().await?;
193 self.apply_entries(entries).await
194 }
195
196 async fn apply_entries(&self, entries: Vec<ProxyEntry>) -> Result<(), ProxyError> {
197 let mut guard = self.instances.lock().await;
198 let mut seen = HashMap::new();
199
200 for entry in entries {
201 seen.insert(entry.name.clone(), entry.clone());
202 let entry_name = entry.name.clone();
203 let needs_restart = match guard.get(&entry.name) {
204 Some(existing) => existing.entry != entry,
205 None => true,
206 };
207 if needs_restart {
208 if let Some(existing) = guard.remove(&entry.name) {
209 existing.cancel.cancel();
210 }
211 let cancel = CancellationToken::new();
212 let status = spawn_listener(entry.clone(), cancel.clone());
213 guard.insert(
214 entry_name,
215 ProxyInstance {
216 entry,
217 cancel,
218 status,
219 },
220 );
221 }
222 }
223
224 let remove_names: Vec<String> = guard
225 .keys()
226 .filter(|name| !seen.contains_key(*name))
227 .cloned()
228 .collect();
229 for name in remove_names {
230 if let Some(instance) = guard.remove(&name) {
231 instance.cancel.cancel();
232 }
233 }
234
235 Ok(())
236 }
237
238 pub async fn stop_all(&self) {
239 let mut guard = self.instances.lock().await;
240 for instance in guard.values() {
241 instance.cancel.cancel();
242 }
243 guard.clear();
244 }
245
246 pub async fn status(&self) -> Vec<ProxyStatus> {
247 let guard = self.instances.lock().await;
248 guard
249 .values()
250 .map(|instance| {
251 let status = instance.status.borrow();
252 ProxyStatus {
253 name: instance.entry.name.clone(),
254 listen_port: instance.entry.listen_port,
255 backend: instance.entry.backend.clone(),
256 allow_remote: instance.entry.allow_remote,
257 cert_source: status.cert_source.as_str().to_string(),
258 state: status.state.as_str().to_string(),
259 error: status.error.clone(),
260 }
261 })
262 .collect()
263 }
264}
265
266impl Clone for ProxyRuntime {
267 fn clone(&self) -> Self {
268 Self {
269 core: Arc::clone(&self.core),
270 instances: Arc::clone(&self.instances),
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 fn test_core() -> ProxyCore {
282 let dir = std::env::temp_dir().join(format!(
283 "koi-proxy-test-{}",
284 koi_common::id::generate_short_id()
285 ));
286 std::fs::create_dir_all(&dir).expect("temp dir");
287 ProxyCore::with_data_dir(&dir).expect("core should build")
288 }
289
290 fn sample_entry(name: &str) -> ProxyEntry {
291 ProxyEntry {
292 name: name.to_string(),
293 listen_port: 9090,
294 backend: "http://127.0.0.1:8080".to_string(),
295 allow_remote: false,
296 }
297 }
298
299 #[tokio::test]
303 async fn upsert_emits_entry_updated_through_core() {
304 let core = test_core();
305 let mut rx = core.subscribe();
306
307 core.upsert(sample_entry("test-svc"))
308 .await
309 .expect("upsert should succeed");
310
311 match rx.try_recv().expect("should receive event") {
312 ProxyEvent::EntryUpdated { entry } => {
313 assert_eq!(entry.name, "test-svc");
314 assert_eq!(entry.listen_port, 9090);
315 assert_eq!(entry.backend, "http://127.0.0.1:8080");
316 }
317 other => panic!("expected EntryUpdated, got {other:?}"),
318 }
319 }
320
321 #[tokio::test]
323 async fn remove_emits_entry_removed_through_core() {
324 let core = test_core();
325 core.upsert(sample_entry("rm-svc"))
326 .await
327 .expect("upsert should succeed");
328
329 let mut rx = core.subscribe();
330 core.remove("rm-svc").await.expect("remove should succeed");
331
332 match rx.try_recv().expect("should receive event") {
333 ProxyEvent::EntryRemoved { name } => assert_eq!(name, "rm-svc"),
334 other => panic!("expected EntryRemoved, got {other:?}"),
335 }
336 }
337
338 #[tokio::test]
340 async fn multiple_subscribers_each_receive_core_event() {
341 let core = test_core();
342 let mut rx1 = core.subscribe();
343 let mut rx2 = core.subscribe();
344
345 core.upsert(sample_entry("multi"))
346 .await
347 .expect("upsert should succeed");
348
349 assert!(rx1.try_recv().is_ok());
350 assert!(rx2.try_recv().is_ok());
351 }
352}