Skip to main content

koi_proxy/
lib.rs

1//! Koi Proxy - TLS-terminating reverse proxy (Phase 8).
2
3pub mod config;
4pub mod http;
5mod listener;
6mod safety;
7mod tls;
8
9#[cfg(test)]
10mod data_plane_tests;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14
15use tokio::sync::{broadcast, watch, Mutex};
16use tokio_util::sync::CancellationToken;
17
18use koi_common::capability::{Capability, CapabilityStatus};
19
20use listener::{spawn_listener, ListenerStatus};
21
22pub use config::ProxyEntry;
23pub use safety::{ensure_backend_allowed, parse_backend};
24
25/// Events emitted by the proxy subsystem when entries change.
26#[derive(Debug, Clone)]
27pub enum ProxyEvent {
28    /// A proxy entry was added or updated.
29    EntryUpdated { entry: ProxyEntry },
30    /// A proxy entry was removed.
31    EntryRemoved { name: String },
32}
33
34#[derive(Debug, thiserror::Error)]
35pub enum ProxyError {
36    #[error("proxy config error: {0}")]
37    Config(String),
38
39    #[error("proxy io error: {0}")]
40    Io(String),
41
42    #[error("proxy invalid config: {0}")]
43    InvalidConfig(String),
44
45    #[error("proxy entry not found: {0}")]
46    NotFound(String),
47}
48
49/// Runtime status of a single proxy listener.
50///
51/// `state`/`error` reflect the listener task's real liveness (bind/accept outcome),
52/// and `cert_source` records which certificate the listener is serving. This
53/// replaces the old hardcoded `running: true`.
54#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
55pub struct ProxyStatus {
56    pub name: String,
57    pub listen_port: u16,
58    pub backend: String,
59    pub allow_remote: bool,
60    /// "certmesh" (cert file found on disk) or "self-signed" (generated fallback).
61    pub cert_source: String,
62    /// "starting" | "running" | "error" | "stopped".
63    pub state: String,
64    /// Error detail, present only when `state == "error"`.
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub error: Option<String>,
67}
68
69pub struct ProxyCore {
70    entries: Arc<Mutex<Vec<ProxyEntry>>>,
71    event_tx: broadcast::Sender<ProxyEvent>,
72    data_dir: Option<std::path::PathBuf>,
73}
74
75impl ProxyCore {
76    pub fn new() -> Result<Self, ProxyError> {
77        let entries = config::load_entries()?;
78        Ok(Self {
79            entries: Arc::new(Mutex::new(entries)),
80            event_tx: koi_common::events::event_channel().0,
81            data_dir: None,
82        })
83    }
84
85    /// Create a ProxyCore that reads/writes config from a custom data directory.
86    pub fn with_data_dir(data_dir: &std::path::Path) -> Result<Self, ProxyError> {
87        let entries = config::load_entries_with_data_dir(Some(data_dir))?;
88        Ok(Self {
89            entries: Arc::new(Mutex::new(entries)),
90            event_tx: koi_common::events::event_channel().0,
91            data_dir: Some(data_dir.to_path_buf()),
92        })
93    }
94
95    pub async fn entries(&self) -> Vec<ProxyEntry> {
96        self.entries.lock().await.clone()
97    }
98
99    pub async fn reload(&self) -> Result<Vec<ProxyEntry>, ProxyError> {
100        let data_dir = self.data_dir.clone();
101        let entries = tokio::task::spawn_blocking(move || {
102            config::load_entries_with_data_dir(data_dir.as_deref())
103        })
104        .await
105        .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
106        let mut guard = self.entries.lock().await;
107        *guard = entries.clone();
108        Ok(entries)
109    }
110
111    pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, ProxyError> {
112        let data_dir = self.data_dir.clone();
113        let entry_for_io = entry.clone();
114        let entries = tokio::task::spawn_blocking(move || {
115            config::upsert_entry_with_data_dir(entry_for_io, data_dir.as_deref())
116        })
117        .await
118        .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
119        let mut guard = self.entries.lock().await;
120        *guard = entries.clone();
121        let _ = self.event_tx.send(ProxyEvent::EntryUpdated { entry });
122        Ok(entries)
123    }
124
125    pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, ProxyError> {
126        let data_dir = self.data_dir.clone();
127        let name_owned = name.to_string();
128        let entries = tokio::task::spawn_blocking(move || {
129            config::remove_entry_with_data_dir(&name_owned, data_dir.as_deref())
130        })
131        .await
132        .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
133        let mut guard = self.entries.lock().await;
134        *guard = entries.clone();
135        let _ = self.event_tx.send(ProxyEvent::EntryRemoved {
136            name: name.to_string(),
137        });
138        Ok(entries)
139    }
140
141    /// Subscribe to proxy events.
142    pub fn subscribe(&self) -> broadcast::Receiver<ProxyEvent> {
143        self.event_tx.subscribe()
144    }
145}
146
147#[async_trait::async_trait]
148impl Capability for ProxyCore {
149    fn name(&self) -> &str {
150        "proxy"
151    }
152
153    async fn status(&self) -> CapabilityStatus {
154        CapabilityStatus {
155            name: "proxy".to_string(),
156            summary: "configured".to_string(),
157            healthy: true,
158        }
159    }
160}
161
162struct ProxyInstance {
163    entry: ProxyEntry,
164    cancel: CancellationToken,
165    status: watch::Receiver<ListenerStatus>,
166}
167
168/// Runtime controller for proxy listeners.
169pub struct ProxyRuntime {
170    core: Arc<ProxyCore>,
171    instances: Arc<Mutex<HashMap<String, ProxyInstance>>>,
172}
173
174impl ProxyRuntime {
175    pub fn new(core: Arc<ProxyCore>) -> Self {
176        Self {
177            core,
178            instances: Arc::new(Mutex::new(HashMap::new())),
179        }
180    }
181
182    pub fn core(&self) -> Arc<ProxyCore> {
183        Arc::clone(&self.core)
184    }
185
186    pub async fn start_all(&self) -> Result<(), ProxyError> {
187        let entries = self.core.entries().await;
188        self.apply_entries(entries).await
189    }
190
191    pub async fn reload(&self) -> Result<(), ProxyError> {
192        let entries = self.core.reload().await?;
193        self.apply_entries(entries).await
194    }
195
196    async fn apply_entries(&self, entries: Vec<ProxyEntry>) -> Result<(), ProxyError> {
197        let mut guard = self.instances.lock().await;
198        let mut seen = HashMap::new();
199
200        for entry in entries {
201            seen.insert(entry.name.clone(), entry.clone());
202            let entry_name = entry.name.clone();
203            let needs_restart = match guard.get(&entry.name) {
204                Some(existing) => existing.entry != entry,
205                None => true,
206            };
207            if needs_restart {
208                if let Some(existing) = guard.remove(&entry.name) {
209                    existing.cancel.cancel();
210                }
211                let cancel = CancellationToken::new();
212                let status = spawn_listener(entry.clone(), cancel.clone());
213                guard.insert(
214                    entry_name,
215                    ProxyInstance {
216                        entry,
217                        cancel,
218                        status,
219                    },
220                );
221            }
222        }
223
224        let remove_names: Vec<String> = guard
225            .keys()
226            .filter(|name| !seen.contains_key(*name))
227            .cloned()
228            .collect();
229        for name in remove_names {
230            if let Some(instance) = guard.remove(&name) {
231                instance.cancel.cancel();
232            }
233        }
234
235        Ok(())
236    }
237
238    pub async fn stop_all(&self) {
239        let mut guard = self.instances.lock().await;
240        for instance in guard.values() {
241            instance.cancel.cancel();
242        }
243        guard.clear();
244    }
245
246    pub async fn status(&self) -> Vec<ProxyStatus> {
247        let guard = self.instances.lock().await;
248        guard
249            .values()
250            .map(|instance| {
251                let status = instance.status.borrow();
252                ProxyStatus {
253                    name: instance.entry.name.clone(),
254                    listen_port: instance.entry.listen_port,
255                    backend: instance.entry.backend.clone(),
256                    allow_remote: instance.entry.allow_remote,
257                    cert_source: status.cert_source.as_str().to_string(),
258                    state: status.state.as_str().to_string(),
259                    error: status.error.clone(),
260                }
261            })
262            .collect()
263    }
264}
265
266impl Clone for ProxyRuntime {
267    fn clone(&self) -> Self {
268        Self {
269            core: Arc::clone(&self.core),
270            instances: Arc::clone(&self.instances),
271        }
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    /// Build a ProxyCore backed by a throwaway data dir so tests never touch the
280    /// real on-disk proxy config.
281    fn test_core() -> ProxyCore {
282        let dir = std::env::temp_dir().join(format!(
283            "koi-proxy-test-{}",
284            koi_common::id::generate_short_id()
285        ));
286        std::fs::create_dir_all(&dir).expect("temp dir");
287        ProxyCore::with_data_dir(&dir).expect("core should build")
288    }
289
290    fn sample_entry(name: &str) -> ProxyEntry {
291        ProxyEntry {
292            name: name.to_string(),
293            listen_port: 9090,
294            backend: "http://127.0.0.1:8080".to_string(),
295            allow_remote: false,
296        }
297    }
298
299    /// Drives the real ProxyCore command path: subscribe → upsert → assert the
300    /// EntryUpdated event is broadcast through the core's own channel. Fails if
301    /// `upsert` stops emitting (tests Koi wiring, not tokio).
302    #[tokio::test]
303    async fn upsert_emits_entry_updated_through_core() {
304        let core = test_core();
305        let mut rx = core.subscribe();
306
307        core.upsert(sample_entry("test-svc"))
308            .await
309            .expect("upsert should succeed");
310
311        match rx.try_recv().expect("should receive event") {
312            ProxyEvent::EntryUpdated { entry } => {
313                assert_eq!(entry.name, "test-svc");
314                assert_eq!(entry.listen_port, 9090);
315                assert_eq!(entry.backend, "http://127.0.0.1:8080");
316            }
317            other => panic!("expected EntryUpdated, got {other:?}"),
318        }
319    }
320
321    /// remove() on an existing entry emits EntryRemoved through the core.
322    #[tokio::test]
323    async fn remove_emits_entry_removed_through_core() {
324        let core = test_core();
325        core.upsert(sample_entry("rm-svc"))
326            .await
327            .expect("upsert should succeed");
328
329        let mut rx = core.subscribe();
330        core.remove("rm-svc").await.expect("remove should succeed");
331
332        match rx.try_recv().expect("should receive event") {
333            ProxyEvent::EntryRemoved { name } => assert_eq!(name, "rm-svc"),
334            other => panic!("expected EntryRemoved, got {other:?}"),
335        }
336    }
337
338    /// Two subscribers to the same core each receive a core-emitted event.
339    #[tokio::test]
340    async fn multiple_subscribers_each_receive_core_event() {
341        let core = test_core();
342        let mut rx1 = core.subscribe();
343        let mut rx2 = core.subscribe();
344
345        core.upsert(sample_entry("multi"))
346            .await
347            .expect("upsert should succeed");
348
349        assert!(rx1.try_recv().is_ok());
350        assert!(rx2.try_recv().is_ok());
351    }
352}