1pub mod config;
4pub mod http;
5mod listener;
6mod safety;
7mod tls;
8
9#[cfg(test)]
10mod data_plane_tests;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14
15use tokio::sync::{broadcast, watch, Mutex};
16use tokio_util::sync::CancellationToken;
17
18use koi_common::capability::{Capability, CapabilityStatus};
19
20use listener::{spawn_listener, ListenerStatus};
21
22pub use config::ProxyEntry;
23pub use safety::{ensure_backend_allowed, parse_backend};
24
25const BROADCAST_CHANNEL_CAPACITY: usize = 256;
27
28#[derive(Debug, Clone)]
30pub enum ProxyEvent {
31 EntryUpdated { entry: ProxyEntry },
33 EntryRemoved { name: String },
35}
36
37#[derive(Debug, thiserror::Error)]
38pub enum ProxyError {
39 #[error("proxy config error: {0}")]
40 Config(String),
41
42 #[error("proxy io error: {0}")]
43 Io(String),
44
45 #[error("proxy invalid config: {0}")]
46 InvalidConfig(String),
47
48 #[error("proxy entry not found: {0}")]
49 NotFound(String),
50}
51
52#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
58pub struct ProxyStatus {
59 pub name: String,
60 pub listen_port: u16,
61 pub backend: String,
62 pub allow_remote: bool,
63 pub cert_source: String,
65 pub state: String,
67 #[serde(skip_serializing_if = "Option::is_none")]
69 pub error: Option<String>,
70}
71
72pub struct ProxyCore {
73 entries: Arc<Mutex<Vec<ProxyEntry>>>,
74 event_tx: broadcast::Sender<ProxyEvent>,
75 data_dir: Option<std::path::PathBuf>,
76}
77
78impl ProxyCore {
79 pub fn new() -> Result<Self, ProxyError> {
80 let entries = config::load_entries()?;
81 Ok(Self {
82 entries: Arc::new(Mutex::new(entries)),
83 event_tx: broadcast::channel(BROADCAST_CHANNEL_CAPACITY).0,
84 data_dir: None,
85 })
86 }
87
88 pub fn with_data_dir(data_dir: &std::path::Path) -> Result<Self, ProxyError> {
90 let entries = config::load_entries_with_data_dir(Some(data_dir))?;
91 Ok(Self {
92 entries: Arc::new(Mutex::new(entries)),
93 event_tx: broadcast::channel(BROADCAST_CHANNEL_CAPACITY).0,
94 data_dir: Some(data_dir.to_path_buf()),
95 })
96 }
97
98 pub async fn entries(&self) -> Vec<ProxyEntry> {
99 self.entries.lock().await.clone()
100 }
101
102 pub async fn reload(&self) -> Result<Vec<ProxyEntry>, ProxyError> {
103 let data_dir = self.data_dir.clone();
104 let entries = tokio::task::spawn_blocking(move || {
105 config::load_entries_with_data_dir(data_dir.as_deref())
106 })
107 .await
108 .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
109 let mut guard = self.entries.lock().await;
110 *guard = entries.clone();
111 Ok(entries)
112 }
113
114 pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, ProxyError> {
115 let data_dir = self.data_dir.clone();
116 let entry_for_io = entry.clone();
117 let entries = tokio::task::spawn_blocking(move || {
118 config::upsert_entry_with_data_dir(entry_for_io, data_dir.as_deref())
119 })
120 .await
121 .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
122 let mut guard = self.entries.lock().await;
123 *guard = entries.clone();
124 let _ = self.event_tx.send(ProxyEvent::EntryUpdated { entry });
125 Ok(entries)
126 }
127
128 pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, ProxyError> {
129 let data_dir = self.data_dir.clone();
130 let name_owned = name.to_string();
131 let entries = tokio::task::spawn_blocking(move || {
132 config::remove_entry_with_data_dir(&name_owned, data_dir.as_deref())
133 })
134 .await
135 .map_err(|e| ProxyError::Io(format!("config task: {e}")))??;
136 let mut guard = self.entries.lock().await;
137 *guard = entries.clone();
138 let _ = self.event_tx.send(ProxyEvent::EntryRemoved {
139 name: name.to_string(),
140 });
141 Ok(entries)
142 }
143
144 pub fn subscribe(&self) -> broadcast::Receiver<ProxyEvent> {
146 self.event_tx.subscribe()
147 }
148}
149
150impl Capability for ProxyCore {
151 fn name(&self) -> &str {
152 "proxy"
153 }
154
155 fn status(&self) -> CapabilityStatus {
156 CapabilityStatus {
157 name: "proxy".to_string(),
158 summary: "configured".to_string(),
159 healthy: true,
160 }
161 }
162}
163
164struct ProxyInstance {
165 entry: ProxyEntry,
166 cancel: CancellationToken,
167 status: watch::Receiver<ListenerStatus>,
168}
169
170pub struct ProxyRuntime {
172 core: Arc<ProxyCore>,
173 instances: Arc<Mutex<HashMap<String, ProxyInstance>>>,
174}
175
176impl ProxyRuntime {
177 pub fn new(core: Arc<ProxyCore>) -> Self {
178 Self {
179 core,
180 instances: Arc::new(Mutex::new(HashMap::new())),
181 }
182 }
183
184 pub fn core(&self) -> Arc<ProxyCore> {
185 Arc::clone(&self.core)
186 }
187
188 pub async fn start_all(&self) -> Result<(), ProxyError> {
189 let entries = self.core.entries().await;
190 self.apply_entries(entries).await
191 }
192
193 pub async fn reload(&self) -> Result<(), ProxyError> {
194 let entries = self.core.reload().await?;
195 self.apply_entries(entries).await
196 }
197
198 async fn apply_entries(&self, entries: Vec<ProxyEntry>) -> Result<(), ProxyError> {
199 let mut guard = self.instances.lock().await;
200 let mut seen = HashMap::new();
201
202 for entry in entries {
203 seen.insert(entry.name.clone(), entry.clone());
204 let entry_name = entry.name.clone();
205 let needs_restart = match guard.get(&entry.name) {
206 Some(existing) => existing.entry != entry,
207 None => true,
208 };
209 if needs_restart {
210 if let Some(existing) = guard.remove(&entry.name) {
211 existing.cancel.cancel();
212 }
213 let cancel = CancellationToken::new();
214 let status = spawn_listener(entry.clone(), cancel.clone());
215 guard.insert(
216 entry_name,
217 ProxyInstance {
218 entry,
219 cancel,
220 status,
221 },
222 );
223 }
224 }
225
226 let remove_names: Vec<String> = guard
227 .keys()
228 .filter(|name| !seen.contains_key(*name))
229 .cloned()
230 .collect();
231 for name in remove_names {
232 if let Some(instance) = guard.remove(&name) {
233 instance.cancel.cancel();
234 }
235 }
236
237 Ok(())
238 }
239
240 pub async fn stop_all(&self) {
241 let mut guard = self.instances.lock().await;
242 for instance in guard.values() {
243 instance.cancel.cancel();
244 }
245 guard.clear();
246 }
247
248 pub async fn status(&self) -> Vec<ProxyStatus> {
249 let guard = self.instances.lock().await;
250 guard
251 .values()
252 .map(|instance| {
253 let status = instance.status.borrow();
254 ProxyStatus {
255 name: instance.entry.name.clone(),
256 listen_port: instance.entry.listen_port,
257 backend: instance.entry.backend.clone(),
258 allow_remote: instance.entry.allow_remote,
259 cert_source: status.cert_source.as_str().to_string(),
260 state: status.state.as_str().to_string(),
261 error: status.error.clone(),
262 }
263 })
264 .collect()
265 }
266}
267
268impl Clone for ProxyRuntime {
269 fn clone(&self) -> Self {
270 Self {
271 core: Arc::clone(&self.core),
272 instances: Arc::clone(&self.instances),
273 }
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn subscribe_receives_emitted_entry_updated() {
283 let (tx, _) = broadcast::channel::<ProxyEvent>(16);
284 let mut rx = tx.subscribe();
285
286 let entry = ProxyEntry {
287 name: "test-svc".to_string(),
288 listen_port: 9090,
289 backend: "http://127.0.0.1:8080".to_string(),
290 allow_remote: false,
291 };
292 let _ = tx.send(ProxyEvent::EntryUpdated {
293 entry: entry.clone(),
294 });
295
296 let event = rx.try_recv().expect("should receive event");
297 match event {
298 ProxyEvent::EntryUpdated { entry: received } => {
299 assert_eq!(received.name, "test-svc");
300 assert_eq!(received.listen_port, 9090);
301 assert_eq!(received.backend, "http://127.0.0.1:8080");
302 }
303 other => panic!("expected EntryUpdated, got {other:?}"),
304 }
305 }
306
307 #[test]
308 fn subscribe_receives_emitted_entry_removed() {
309 let (tx, _) = broadcast::channel::<ProxyEvent>(16);
310 let mut rx = tx.subscribe();
311
312 let _ = tx.send(ProxyEvent::EntryRemoved {
313 name: "rm-svc".to_string(),
314 });
315
316 let event = rx.try_recv().expect("should receive event");
317 match event {
318 ProxyEvent::EntryRemoved { name } => {
319 assert_eq!(name, "rm-svc");
320 }
321 other => panic!("expected EntryRemoved, got {other:?}"),
322 }
323 }
324
325 #[test]
326 fn multiple_subscribers_each_receive_event() {
327 let (tx, _) = broadcast::channel::<ProxyEvent>(16);
328 let mut rx1 = tx.subscribe();
329 let mut rx2 = tx.subscribe();
330
331 let _ = tx.send(ProxyEvent::EntryRemoved {
332 name: "multi".to_string(),
333 });
334
335 assert!(rx1.try_recv().is_ok());
336 assert!(rx2.try_recv().is_ok());
337 }
338}