Skip to main content

koi_proxy/
lib.rs

1//! Koi Proxy - TLS-terminating reverse proxy (Phase 8).
2
3pub mod config;
4pub mod http;
5mod listener;
6mod safety;
7mod tls;
8
9#[cfg(test)]
10mod data_plane_tests;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14
15use tokio::sync::{broadcast, watch, Mutex};
16use tokio_util::sync::CancellationToken;
17
18use koi_common::capability::{Capability, CapabilityStatus};
19
20use listener::{spawn_listener, ListenerStatus};
21
22pub use config::ProxyEntry;
23pub use safety::{ensure_backend_allowed, parse_backend};
24
25/// Capacity for the proxy event broadcast channel.
26const BROADCAST_CHANNEL_CAPACITY: usize = 256;
27
28/// Events emitted by the proxy subsystem when entries change.
29#[derive(Debug, Clone)]
30pub enum ProxyEvent {
31    /// A proxy entry was added or updated.
32    EntryUpdated { entry: ProxyEntry },
33    /// A proxy entry was removed.
34    EntryRemoved { name: String },
35}
36
37#[derive(Debug, thiserror::Error)]
38pub enum ProxyError {
39    #[error("proxy config error: {0}")]
40    Config(String),
41
42    #[error("proxy io error: {0}")]
43    Io(String),
44
45    #[error("proxy invalid config: {0}")]
46    InvalidConfig(String),
47
48    #[error("proxy entry not found: {0}")]
49    NotFound(String),
50}
51
52/// Runtime status of a single proxy listener.
53///
54/// `state`/`error` reflect the listener task's real liveness (bind/accept outcome),
55/// and `cert_source` records which certificate the listener is serving. This
56/// replaces the old hardcoded `running: true`.
57#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
58pub struct ProxyStatus {
59    pub name: String,
60    pub listen_port: u16,
61    pub backend: String,
62    pub allow_remote: bool,
63    /// "certmesh" (cert file found on disk) or "self-signed" (generated fallback).
64    pub cert_source: String,
65    /// "starting" | "running" | "error" | "stopped".
66    pub state: String,
67    /// Error detail, present only when `state == "error"`.
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub error: Option<String>,
70}
71
72pub struct ProxyCore {
73    entries: Arc<Mutex<Vec<ProxyEntry>>>,
74    event_tx: broadcast::Sender<ProxyEvent>,
75    data_dir: Option<std::path::PathBuf>,
76}
77
78impl ProxyCore {
79    pub fn new() -> Result<Self, ProxyError> {
80        let entries = config::load_entries()?;
81        Ok(Self {
82            entries: Arc::new(Mutex::new(entries)),
83            event_tx: broadcast::channel(BROADCAST_CHANNEL_CAPACITY).0,
84            data_dir: None,
85        })
86    }
87
88    /// Create a ProxyCore that reads/writes config from a custom data directory.
89    pub fn with_data_dir(data_dir: &std::path::Path) -> Result<Self, ProxyError> {
90        let entries = config::load_entries_with_data_dir(Some(data_dir))?;
91        Ok(Self {
92            entries: Arc::new(Mutex::new(entries)),
93            event_tx: broadcast::channel(BROADCAST_CHANNEL_CAPACITY).0,
94            data_dir: Some(data_dir.to_path_buf()),
95        })
96    }
97
98    pub async fn entries(&self) -> Vec<ProxyEntry> {
99        self.entries.lock().await.clone()
100    }
101
102    pub async fn reload(&self) -> Result<Vec<ProxyEntry>, ProxyError> {
103        let data_dir = self.data_dir.clone();
104        let entries = tokio::task::spawn_blocking(move || {
105            config::load_entries_with_data_dir(data_dir.as_deref())
106        })
107        .await
108        .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
109        let mut guard = self.entries.lock().await;
110        *guard = entries.clone();
111        Ok(entries)
112    }
113
114    pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, ProxyError> {
115        let data_dir = self.data_dir.clone();
116        let entry_for_io = entry.clone();
117        let entries = tokio::task::spawn_blocking(move || {
118            config::upsert_entry_with_data_dir(entry_for_io, data_dir.as_deref())
119        })
120        .await
121        .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
122        let mut guard = self.entries.lock().await;
123        *guard = entries.clone();
124        let _ = self.event_tx.send(ProxyEvent::EntryUpdated { entry });
125        Ok(entries)
126    }
127
128    pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, ProxyError> {
129        let data_dir = self.data_dir.clone();
130        let name_owned = name.to_string();
131        let entries = tokio::task::spawn_blocking(move || {
132            config::remove_entry_with_data_dir(&name_owned, data_dir.as_deref())
133        })
134        .await
135        .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
136        let mut guard = self.entries.lock().await;
137        *guard = entries.clone();
138        let _ = self.event_tx.send(ProxyEvent::EntryRemoved {
139            name: name.to_string(),
140        });
141        Ok(entries)
142    }
143
144    /// Subscribe to proxy events.
145    pub fn subscribe(&self) -> broadcast::Receiver<ProxyEvent> {
146        self.event_tx.subscribe()
147    }
148}
149
150impl Capability for ProxyCore {
151    fn name(&self) -> &str {
152        "proxy"
153    }
154
155    fn status(&self) -> CapabilityStatus {
156        CapabilityStatus {
157            name: "proxy".to_string(),
158            summary: "configured".to_string(),
159            healthy: true,
160        }
161    }
162}
163
164struct ProxyInstance {
165    entry: ProxyEntry,
166    cancel: CancellationToken,
167    status: watch::Receiver<ListenerStatus>,
168}
169
170/// Runtime controller for proxy listeners.
171pub struct ProxyRuntime {
172    core: Arc<ProxyCore>,
173    instances: Arc<Mutex<HashMap<String, ProxyInstance>>>,
174}
175
176impl ProxyRuntime {
177    pub fn new(core: Arc<ProxyCore>) -> Self {
178        Self {
179            core,
180            instances: Arc::new(Mutex::new(HashMap::new())),
181        }
182    }
183
184    pub fn core(&self) -> Arc<ProxyCore> {
185        Arc::clone(&self.core)
186    }
187
188    pub async fn start_all(&self) -> Result<(), ProxyError> {
189        let entries = self.core.entries().await;
190        self.apply_entries(entries).await
191    }
192
193    pub async fn reload(&self) -> Result<(), ProxyError> {
194        let entries = self.core.reload().await?;
195        self.apply_entries(entries).await
196    }
197
198    async fn apply_entries(&self, entries: Vec<ProxyEntry>) -> Result<(), ProxyError> {
199        let mut guard = self.instances.lock().await;
200        let mut seen = HashMap::new();
201
202        for entry in entries {
203            seen.insert(entry.name.clone(), entry.clone());
204            let entry_name = entry.name.clone();
205            let needs_restart = match guard.get(&entry.name) {
206                Some(existing) => existing.entry != entry,
207                None => true,
208            };
209            if needs_restart {
210                if let Some(existing) = guard.remove(&entry.name) {
211                    existing.cancel.cancel();
212                }
213                let cancel = CancellationToken::new();
214                let status = spawn_listener(entry.clone(), cancel.clone());
215                guard.insert(
216                    entry_name,
217                    ProxyInstance {
218                        entry,
219                        cancel,
220                        status,
221                    },
222                );
223            }
224        }
225
226        let remove_names: Vec<String> = guard
227            .keys()
228            .filter(|name| !seen.contains_key(*name))
229            .cloned()
230            .collect();
231        for name in remove_names {
232            if let Some(instance) = guard.remove(&name) {
233                instance.cancel.cancel();
234            }
235        }
236
237        Ok(())
238    }
239
240    pub async fn stop_all(&self) {
241        let mut guard = self.instances.lock().await;
242        for instance in guard.values() {
243            instance.cancel.cancel();
244        }
245        guard.clear();
246    }
247
248    pub async fn status(&self) -> Vec<ProxyStatus> {
249        let guard = self.instances.lock().await;
250        guard
251            .values()
252            .map(|instance| {
253                let status = instance.status.borrow();
254                ProxyStatus {
255                    name: instance.entry.name.clone(),
256                    listen_port: instance.entry.listen_port,
257                    backend: instance.entry.backend.clone(),
258                    allow_remote: instance.entry.allow_remote,
259                    cert_source: status.cert_source.as_str().to_string(),
260                    state: status.state.as_str().to_string(),
261                    error: status.error.clone(),
262                }
263            })
264            .collect()
265    }
266}
267
268impl Clone for ProxyRuntime {
269    fn clone(&self) -> Self {
270        Self {
271            core: Arc::clone(&self.core),
272            instances: Arc::clone(&self.instances),
273        }
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    #[test]
282    fn subscribe_receives_emitted_entry_updated() {
283        let (tx, _) = broadcast::channel::<ProxyEvent>(16);
284        let mut rx = tx.subscribe();
285
286        let entry = ProxyEntry {
287            name: "test-svc".to_string(),
288            listen_port: 9090,
289            backend: "http://127.0.0.1:8080".to_string(),
290            allow_remote: false,
291        };
292        let _ = tx.send(ProxyEvent::EntryUpdated {
293            entry: entry.clone(),
294        });
295
296        let event = rx.try_recv().expect("should receive event");
297        match event {
298            ProxyEvent::EntryUpdated { entry: received } => {
299                assert_eq!(received.name, "test-svc");
300                assert_eq!(received.listen_port, 9090);
301                assert_eq!(received.backend, "http://127.0.0.1:8080");
302            }
303            other => panic!("expected EntryUpdated, got {other:?}"),
304        }
305    }
306
307    #[test]
308    fn subscribe_receives_emitted_entry_removed() {
309        let (tx, _) = broadcast::channel::<ProxyEvent>(16);
310        let mut rx = tx.subscribe();
311
312        let _ = tx.send(ProxyEvent::EntryRemoved {
313            name: "rm-svc".to_string(),
314        });
315
316        let event = rx.try_recv().expect("should receive event");
317        match event {
318            ProxyEvent::EntryRemoved { name } => {
319                assert_eq!(name, "rm-svc");
320            }
321            other => panic!("expected EntryRemoved, got {other:?}"),
322        }
323    }
324
325    #[test]
326    fn multiple_subscribers_each_receive_event() {
327        let (tx, _) = broadcast::channel::<ProxyEvent>(16);
328        let mut rx1 = tx.subscribe();
329        let mut rx2 = tx.subscribe();
330
331        let _ = tx.send(ProxyEvent::EntryRemoved {
332            name: "multi".to_string(),
333        });
334
335        assert!(rx1.try_recv().is_ok());
336        assert!(rx2.try_recv().is_ok());
337    }
338}