Skip to main content

koi_embedded/
lib.rs

1mod config;
2mod events;
3mod handle;
4
5use std::sync::Arc;
6
7use tokio::sync::broadcast;
8use tokio::task::JoinHandle;
9use tokio_util::sync::CancellationToken;
10
11use koi_client::KoiClient;
12
13pub use config::{DnsConfigBuilder, KoiConfig, ServiceMode};
14pub use events::KoiEvent;
15pub use handle::{CertmeshHandle, DnsHandle, HealthHandle, KoiHandle, MdnsHandle, ProxyHandle};
16
17// Re-export types needed by downstream consumers (registration, discovery, DNS, proxy, health)
18pub use koi_common::types::ServiceRecord;
19pub use koi_config::state::DnsEntry;
20pub use koi_health::{HealthCheck, HealthSnapshot, ServiceCheckKind};
21pub use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
22pub use koi_mdns::MdnsEvent;
23pub use koi_proxy::ProxyEntry;
24
25pub type Result<T> = std::result::Result<T, KoiError>;
26
27#[derive(Debug, thiserror::Error)]
28pub enum KoiError {
29    #[error("capability disabled: {0}")]
30    DisabledCapability(&'static str),
31    #[error("mdns error: {0}")]
32    Mdns(#[from] koi_mdns::MdnsError),
33    #[error("dns error: {0}")]
34    Dns(#[from] koi_dns::DnsError),
35    #[error("health error: {0}")]
36    Health(#[from] koi_health::HealthError),
37    #[error("proxy error: {0}")]
38    Proxy(#[from] koi_proxy::ProxyError),
39    #[error("certmesh error: {0}")]
40    Certmesh(#[from] koi_certmesh::CertmeshError),
41    #[error("client error: {0}")]
42    Client(#[from] koi_client::ClientError),
43    #[error("io error: {0}")]
44    Io(#[from] std::io::Error),
45}
46
47pub struct Builder {
48    config: KoiConfig,
49    event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
50}
51
52impl Builder {
53    pub fn new() -> Self {
54        Self {
55            config: KoiConfig::default(),
56            event_handler: None,
57        }
58    }
59
60    pub fn data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
61        self.config.data_dir = Some(path.into());
62        self
63    }
64
65    pub fn service_endpoint(mut self, endpoint: impl Into<String>) -> Self {
66        self.config.service_endpoint = endpoint.into();
67        self
68    }
69
70    pub fn service_mode(mut self, mode: ServiceMode) -> Self {
71        self.config.service_mode = mode;
72        self
73    }
74
75    pub fn http(mut self, enabled: bool) -> Self {
76        self.config.http_enabled = enabled;
77        self
78    }
79
80    pub fn mdns(mut self, enabled: bool) -> Self {
81        self.config.mdns_enabled = enabled;
82        self
83    }
84
85    pub fn dns<F>(mut self, configure: F) -> Self
86    where
87        F: FnOnce(DnsConfigBuilder) -> DnsConfigBuilder,
88    {
89        let builder = DnsConfigBuilder::new(self.config.dns_config.clone());
90        self.config.dns_config = configure(builder).build();
91        self
92    }
93
94    pub fn dns_enabled(mut self, enabled: bool) -> Self {
95        self.config.dns_enabled = enabled;
96        self
97    }
98
99    pub fn dns_auto_start(mut self, enabled: bool) -> Self {
100        self.config.dns_auto_start = enabled;
101        self
102    }
103
104    pub fn health(mut self, enabled: bool) -> Self {
105        self.config.health_enabled = enabled;
106        self
107    }
108
109    pub fn health_auto_start(mut self, enabled: bool) -> Self {
110        self.config.health_auto_start = enabled;
111        self
112    }
113
114    pub fn certmesh(mut self, enabled: bool) -> Self {
115        self.config.certmesh_enabled = enabled;
116        self
117    }
118
119    pub fn proxy(mut self, enabled: bool) -> Self {
120        self.config.proxy_enabled = enabled;
121        self
122    }
123
124    pub fn proxy_auto_start(mut self, enabled: bool) -> Self {
125        self.config.proxy_auto_start = enabled;
126        self
127    }
128
129    pub fn events<F>(mut self, handler: F) -> Self
130    where
131        F: Fn(KoiEvent) + Send + Sync + 'static,
132    {
133        self.event_handler = Some(Arc::new(handler));
134        self
135    }
136
137    pub fn build(self) -> Result<KoiEmbedded> {
138        Ok(KoiEmbedded {
139            config: self.config,
140            event_handler: self.event_handler,
141        })
142    }
143}
144
145impl Default for Builder {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151pub struct KoiEmbedded {
152    config: KoiConfig,
153    event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
154}
155
156impl KoiEmbedded {
157    pub async fn start(self) -> Result<KoiHandle> {
158        if let Some(dir) = &self.config.data_dir {
159            std::env::set_var("KOI_DATA_DIR", dir);
160        }
161
162        let cancel = CancellationToken::new();
163        let (event_tx, _) = broadcast::channel(256);
164        let mut tasks: Vec<JoinHandle<()>> = Vec::new();
165
166        if self.config.service_mode != ServiceMode::EmbeddedOnly {
167            let client = Arc::new(KoiClient::new(&self.config.service_endpoint));
168            match self.config.service_mode {
169                ServiceMode::ClientOnly => {
170                    tokio::task::spawn_blocking({
171                        let client = Arc::clone(&client);
172                        move || client.health()
173                    })
174                    .await
175                    .map_err(map_join_error)??;
176                    return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
177                }
178                ServiceMode::Auto => {
179                    let health = tokio::task::spawn_blocking({
180                        let client = Arc::clone(&client);
181                        move || client.health()
182                    })
183                    .await;
184                    if matches!(health, Ok(Ok(()))) {
185                        return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
186                    }
187                }
188                ServiceMode::EmbeddedOnly => {}
189            }
190        }
191
192        let mdns = if self.config.mdns_enabled {
193            Some(Arc::new(koi_mdns::MdnsCore::with_cancel(cancel.clone())?))
194        } else {
195            None
196        };
197
198        let certmesh = if self.config.certmesh_enabled {
199            init_certmesh_core()
200        } else {
201            None
202        };
203
204        let dns = if self.config.dns_enabled {
205            let core = koi_dns::DnsCore::new(
206                self.config.dns_config.clone(),
207                mdns.clone(),
208                certmesh.clone(),
209            )
210            .await?;
211            Some(Arc::new(koi_dns::DnsRuntime::new(core)))
212        } else {
213            None
214        };
215
216        let health = if self.config.health_enabled {
217            let core = koi_health::HealthCore::new(mdns.clone(), dns.clone()).await;
218            Some(Arc::new(koi_health::HealthRuntime::new(Arc::new(core))))
219        } else {
220            None
221        };
222
223        let proxy = if self.config.proxy_enabled {
224            let core = Arc::new(koi_proxy::ProxyCore::new()?);
225            Some(Arc::new(koi_proxy::ProxyRuntime::new(core)))
226        } else {
227            None
228        };
229
230        if let Some(runtime) = &dns {
231            if self.config.dns_auto_start {
232                let _ = runtime.start().await?;
233            }
234        }
235
236        if let Some(runtime) = &health {
237            if self.config.health_auto_start {
238                let _ = runtime.start().await?;
239            }
240        }
241
242        if let Some(runtime) = &proxy {
243            if self.config.proxy_auto_start {
244                runtime.start_all().await?;
245            }
246        }
247
248        if let Some(core) = &mdns {
249            let mut rx = core.subscribe();
250            let tx = event_tx.clone();
251            let token = cancel.clone();
252            let handler = self.event_handler.clone();
253            tasks.push(tokio::spawn(async move {
254                loop {
255                    tokio::select! {
256                        _ = token.cancelled() => break,
257                        msg = rx.recv() => {
258                            let Ok(event) = msg else { continue; };
259                            let mapped = map_mdns_event(event);
260                            if let Some(mapped) = mapped {
261                                emit_event(&tx, handler.as_ref(), mapped);
262                            }
263                        }
264                    }
265                }
266            }));
267        }
268
269        if self.config.health_enabled {
270            if let Some(runtime) = &health {
271                let mut rx = runtime.core().subscribe();
272                let tx = event_tx.clone();
273                let token = cancel.clone();
274                let handler = self.event_handler.clone();
275                tasks.push(tokio::spawn(async move {
276                    loop {
277                        tokio::select! {
278                            _ = token.cancelled() => break,
279                            msg = rx.recv() => {
280                                let Ok(event) = msg else { continue; };
281                                let mapped = map_health_event(event);
282                                emit_event(&tx, handler.as_ref(), mapped);
283                            }
284                        }
285                    }
286                }));
287            }
288        }
289
290        if self.config.dns_enabled {
291            if let Some(runtime) = &dns {
292                let mut rx = runtime.core().subscribe();
293                let tx = event_tx.clone();
294                let token = cancel.clone();
295                let handler = self.event_handler.clone();
296                tasks.push(tokio::spawn(async move {
297                    loop {
298                        tokio::select! {
299                            _ = token.cancelled() => break,
300                            msg = rx.recv() => {
301                                let Ok(event) = msg else { continue; };
302                                let mapped = map_dns_event(event);
303                                emit_event(&tx, handler.as_ref(), mapped);
304                            }
305                        }
306                    }
307                }));
308            }
309        }
310
311        if self.config.certmesh_enabled {
312            if let Some(core) = &certmesh {
313                let mut rx = core.subscribe();
314                let tx = event_tx.clone();
315                let token = cancel.clone();
316                let handler = self.event_handler.clone();
317                tasks.push(tokio::spawn(async move {
318                    loop {
319                        tokio::select! {
320                            _ = token.cancelled() => break,
321                            msg = rx.recv() => {
322                                let Ok(event) = msg else { continue; };
323                                let mapped = map_certmesh_event(event);
324                                emit_event(&tx, handler.as_ref(), mapped);
325                            }
326                        }
327                    }
328                }));
329            }
330        }
331
332        if self.config.proxy_enabled {
333            if let Some(runtime) = &proxy {
334                let mut rx = runtime.core().subscribe();
335                let tx = event_tx.clone();
336                let token = cancel.clone();
337                let handler = self.event_handler.clone();
338                tasks.push(tokio::spawn(async move {
339                    loop {
340                        tokio::select! {
341                            _ = token.cancelled() => break,
342                            msg = rx.recv() => {
343                                let Ok(event) = msg else { continue; };
344                                let mapped = map_proxy_event(event);
345                                emit_event(&tx, handler.as_ref(), mapped);
346                            }
347                        }
348                    }
349                }));
350            }
351        }
352
353        Ok(KoiHandle::new_embedded(
354            mdns, dns, health, certmesh, proxy, event_tx, cancel, tasks,
355        ))
356    }
357}
358
359fn init_certmesh_core() -> Option<Arc<koi_certmesh::CertmeshCore>> {
360    if !koi_certmesh::ca::is_ca_initialized() {
361        return Some(Arc::new(koi_certmesh::CertmeshCore::uninitialized()));
362    }
363
364    let roster_path = koi_certmesh::ca::roster_path();
365    let roster = match koi_certmesh::roster::load_roster(&roster_path) {
366        Ok(r) => r,
367        Err(_) => {
368            return Some(Arc::new(koi_certmesh::CertmeshCore::uninitialized()));
369        }
370    };
371
372    let profile = roster.metadata.trust_profile;
373
374    // ── Auto-unlock at init: single source of truth ─────────────
375    // If the auto-unlock key file exists, boot the core already
376    // unlocked.  This collapses the "create locked → read key →
377    // unlock" three-step into a single atomic construction.
378    let auto_key_path = koi_common::paths::koi_data_dir().join("auto-unlock-key");
379    if let Ok(pp) = std::fs::read_to_string(&auto_key_path) {
380        if !pp.is_empty() {
381            match koi_certmesh::ca::load_ca(&pp) {
382                Ok(ca_state) => {
383                    // Reload roster (fresh copy for the new Arc)
384                    if let Ok(fresh_roster) = koi_certmesh::roster::load_roster(&roster_path) {
385                        let auth_path = koi_certmesh::ca::auth_path();
386                        let auth = if auth_path.exists() {
387                            std::fs::read_to_string(&auth_path)
388                                .ok()
389                                .and_then(|json| {
390                                    serde_json::from_str::<koi_crypto::auth::StoredAuth>(&json).ok()
391                                })
392                                .and_then(|stored| stored.unlock(&pp).ok())
393                        } else {
394                            None
395                        };
396
397                        tracing::info!("Certmesh CA auto-unlocked at init");
398                        return Some(Arc::new(koi_certmesh::CertmeshCore::new(
399                            ca_state,
400                            fresh_roster,
401                            auth,
402                            profile,
403                        )));
404                    }
405                }
406                Err(e) => {
407                    tracing::warn!(
408                        error = %e,
409                        "Auto-unlock key exists but decryption failed"
410                    );
411                }
412            }
413        }
414    }
415
416    // No auto-unlock key — boot locked
417    let core = koi_certmesh::CertmeshCore::locked(roster, profile);
418    Some(Arc::new(core))
419}
420
421fn map_mdns_event(event: MdnsEvent) -> Option<KoiEvent> {
422    match event {
423        MdnsEvent::Found(record) => Some(KoiEvent::MdnsFound(record)),
424        MdnsEvent::Resolved(record) => Some(KoiEvent::MdnsResolved(record)),
425        MdnsEvent::Removed { name, service_type } => {
426            Some(KoiEvent::MdnsRemoved { name, service_type })
427        }
428    }
429}
430
431fn map_health_event(event: koi_health::HealthEvent) -> KoiEvent {
432    match event {
433        koi_health::HealthEvent::StatusChanged { name, status } => {
434            KoiEvent::HealthChanged { name, status }
435        }
436    }
437}
438
439fn map_dns_event(event: koi_dns::DnsEvent) -> KoiEvent {
440    match event {
441        koi_dns::DnsEvent::EntryUpdated { name, ip } => KoiEvent::DnsEntryUpdated { name, ip },
442        koi_dns::DnsEvent::EntryRemoved { name } => KoiEvent::DnsEntryRemoved { name },
443    }
444}
445
446fn map_certmesh_event(event: koi_certmesh::CertmeshEvent) -> KoiEvent {
447    match event {
448        koi_certmesh::CertmeshEvent::MemberJoined {
449            hostname,
450            fingerprint,
451        } => KoiEvent::CertmeshMemberJoined {
452            hostname,
453            fingerprint,
454        },
455        koi_certmesh::CertmeshEvent::MemberRevoked { hostname } => {
456            KoiEvent::CertmeshMemberRevoked { hostname }
457        }
458        koi_certmesh::CertmeshEvent::Destroyed => KoiEvent::CertmeshDestroyed,
459    }
460}
461
462fn map_proxy_event(event: koi_proxy::ProxyEvent) -> KoiEvent {
463    match event {
464        koi_proxy::ProxyEvent::EntryUpdated { entry } => KoiEvent::ProxyEntryUpdated { entry },
465        koi_proxy::ProxyEvent::EntryRemoved { name } => KoiEvent::ProxyEntryRemoved { name },
466    }
467}
468
469fn emit_event(
470    tx: &broadcast::Sender<KoiEvent>,
471    handler: Option<&Arc<dyn Fn(KoiEvent) + Send + Sync>>,
472    event: KoiEvent,
473) {
474    if let Some(handler) = handler {
475        handler(event.clone());
476    }
477    let _ = tx.send(event);
478}
479
480pub(crate) fn map_join_error(err: tokio::task::JoinError) -> KoiError {
481    KoiError::Io(std::io::Error::other(err.to_string()))
482}