Skip to main content

embedded_integration/
embedded-integration.rs

1use std::collections::HashMap;
2use std::net::{IpAddr, SocketAddr};
3use std::path::PathBuf;
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use axum::routing::get;
7use axum::Router;
8use tokio::net::TcpListener;
9use tokio_util::sync::CancellationToken;
10
11use koi_config::state::DnsEntry;
12use koi_embedded::{Builder, KoiEvent, ServiceMode};
13use koi_health::{HealthCheck, ServiceCheckKind, ServiceStatus};
14use koi_mdns::protocol::{RegisterPayload, Request as MdnsRequest};
15use koi_proxy::ProxyEntry;
16
17struct Harness {
18    passed: usize,
19    failed: usize,
20    skipped: usize,
21    verbose: bool,
22}
23
24impl Harness {
25    fn new(verbose: bool) -> Self {
26        Self {
27            passed: 0,
28            failed: 0,
29            skipped: 0,
30            verbose,
31        }
32    }
33
34    fn log(&self, msg: impl AsRef<str>) {
35        if self.verbose {
36            println!("  {}", msg.as_ref());
37        }
38    }
39
40    fn pass(&mut self, name: &str) {
41        self.passed += 1;
42        println!("[PASS] {name}");
43    }
44
45    fn fail(&mut self, name: &str, reason: &str) {
46        self.failed += 1;
47        println!("[FAIL] {name} - {reason}");
48    }
49
50    #[allow(dead_code)]
51    fn skip(&mut self, name: &str, reason: &str) {
52        self.skipped += 1;
53        println!("[SKIP] {name} - {reason}");
54    }
55
56    fn summary(&self) {
57        println!(
58            "\nSummary: {} passed, {} failed, {} skipped",
59            self.passed, self.failed, self.skipped
60        );
61    }
62}
63
64#[cfg(windows)]
65async fn open_pipe(
66    pipe_name: &str,
67) -> Result<tokio::net::windows::named_pipe::NamedPipeClient, Box<dyn std::error::Error>> {
68    use tokio::net::windows::named_pipe::ClientOptions;
69
70    let mut last_err = None;
71    for _ in 0..20 {
72        match ClientOptions::new().open(pipe_name) {
73            Ok(client) => return Ok(client),
74            Err(err) => {
75                last_err = Some(err);
76                tokio::time::sleep(Duration::from_millis(100)).await;
77            }
78        }
79    }
80
81    Err(Box::new(last_err.unwrap_or_else(|| {
82        std::io::Error::new(std::io::ErrorKind::NotFound, "pipe not available")
83    })))
84}
85
86#[cfg(windows)]
87async fn ipc_send<R, W>(
88    reader: &mut tokio::io::Lines<tokio::io::BufReader<R>>,
89    writer: &mut W,
90    value: serde_json::Value,
91) -> Result<serde_json::Value, Box<dyn std::error::Error>>
92where
93    R: tokio::io::AsyncRead + Unpin,
94    W: tokio::io::AsyncWrite + Unpin,
95{
96    use tokio::io::AsyncWriteExt;
97
98    writer
99        .write_all(serde_json::to_string(&value)?.as_bytes())
100        .await?;
101    writer.write_all(b"\n").await?;
102    let line = reader.next_line().await?.ok_or("empty response")?;
103    let value: serde_json::Value = serde_json::from_str(&line)?;
104    Ok(value)
105}
106
107fn temp_data_dir() -> PathBuf {
108    let nanos = SystemTime::now()
109        .duration_since(UNIX_EPOCH)
110        .unwrap_or_default()
111        .as_nanos();
112    let dir = std::env::temp_dir().join(format!("koi-embedded-integration-{nanos}"));
113    std::fs::create_dir_all(&dir).expect("create temp dir");
114    dir
115}
116
117fn has_flag(args: &[String], flag: &str) -> bool {
118    args.iter().any(|arg| arg == flag)
119}
120
121fn read_arg_value(args: &[String], flag: &str) -> Option<String> {
122    args.iter()
123        .position(|arg| arg == flag)
124        .and_then(|idx| args.get(idx + 1))
125        .cloned()
126}
127
128async fn wait_for_event<F>(
129    rx: &mut tokio::sync::broadcast::Receiver<KoiEvent>,
130    timeout: Duration,
131    predicate: F,
132) -> Option<KoiEvent>
133where
134    F: Fn(&KoiEvent) -> bool,
135{
136    let deadline = tokio::time::Instant::now() + timeout;
137    loop {
138        let now = tokio::time::Instant::now();
139        if now >= deadline {
140            return None;
141        }
142        let remaining = deadline - now;
143        match tokio::time::timeout(remaining, rx.recv()).await {
144            Ok(Ok(event)) => {
145                if predicate(&event) {
146                    return Some(event);
147                }
148            }
149            Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
150            Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => return None,
151            Err(_) => return None,
152        }
153    }
154}
155
156async fn start_http_server(
157    mdns: std::sync::Arc<koi_mdns::MdnsCore>,
158    dns: std::sync::Arc<koi_dns::DnsRuntime>,
159    health: std::sync::Arc<koi_health::HealthCore>,
160    certmesh: std::sync::Arc<koi_certmesh::CertmeshCore>,
161    proxy: std::sync::Arc<koi_proxy::ProxyRuntime>,
162) -> Result<(SocketAddr, CancellationToken), Box<dyn std::error::Error>> {
163    let app = Router::new()
164        .route(
165            "/healthz",
166            get(|| async { axum::Json(serde_json::json!({"ok": true})) }),
167        )
168        .nest("/v1/mdns", koi_mdns::http::routes(mdns))
169        .nest("/v1/dns", koi_dns::http::routes(dns))
170        .nest("/v1/health", koi_health::http::routes(health))
171        .nest("/v1/certmesh", certmesh.http_routes())
172        .nest("/v1/proxy", koi_proxy::http::routes(proxy));
173
174    let listener = TcpListener::bind("127.0.0.1:0").await?;
175    let addr = listener.local_addr()?;
176    let cancel = CancellationToken::new();
177    let token = cancel.clone();
178    tokio::spawn(async move {
179        let _ = axum::serve(listener, app)
180            .with_graceful_shutdown(token.cancelled_owned())
181            .await;
182    });
183
184    Ok((addr, cancel))
185}
186
187async fn read_sse_body(
188    client: &reqwest::Client,
189    url: &str,
190    timeout: Duration,
191) -> Result<String, Box<dyn std::error::Error>> {
192    let response = client.get(url).send().await?;
193    let body = tokio::time::timeout(timeout, response.text()).await??;
194    Ok(body)
195}
196
197async fn run_http_tests(
198    base_url: &str,
199    client: &reqwest::Client,
200    harness: &mut Harness,
201) -> Result<(), Box<dyn std::error::Error>> {
202    let health: serde_json::Value = client
203        .get(format!("{base_url}/healthz"))
204        .send()
205        .await?
206        .json()
207        .await?;
208    if health.get("ok") == Some(&serde_json::Value::Bool(true)) {
209        harness.pass("http: healthz ok");
210    } else {
211        harness.fail("http: healthz ok", "unexpected response");
212    }
213
214    let mdns_payload = serde_json::json!({
215        "name": "koi-http-test",
216        "type": "_koi._tcp",
217        "port": 51516,
218        "ip": "127.0.0.1",
219        "lease_secs": 30,
220        "txt": {"source": "http"}
221    });
222    let register_resp: serde_json::Value = client
223        .post(format!("{base_url}/v1/mdns/announce"))
224        .json(&mdns_payload)
225        .send()
226        .await?
227        .json()
228        .await?;
229    let mdns_id = register_resp
230        .get("registered")
231        .and_then(|v| v.get("id"))
232        .and_then(|v| v.as_str())
233        .map(|s| s.to_string());
234    if mdns_id.is_some() {
235        harness.pass("http: mdns register");
236    } else {
237        harness.fail("http: mdns register", "missing registered id");
238    }
239
240    let resolve_resp: serde_json::Value = client
241        .get(format!(
242            "{base_url}/v1/mdns/resolve?name=koi-http-test._koi._tcp.local."
243        ))
244        .send()
245        .await?
246        .json()
247        .await?;
248    if resolve_resp.get("resolved").is_some() {
249        harness.pass("http: mdns resolve");
250    } else {
251        harness.fail("http: mdns resolve", "missing resolved response");
252    }
253
254    let events_url = format!("{base_url}/v1/mdns/events?type=_koi._tcp&idle_for=1");
255    let sse_payload = serde_json::json!({
256        "name": "koi-http-sse",
257        "type": "_koi._tcp",
258        "port": 51518,
259        "ip": "127.0.0.1",
260        "lease_secs": 30,
261        "txt": {"source": "http-sse"}
262    });
263    let sse_future = read_sse_body(client, &events_url, Duration::from_secs(3));
264    let register_future = client
265        .post(format!("{base_url}/v1/mdns/announce"))
266        .json(&sse_payload)
267        .send();
268    let (events_body, register_result) = tokio::join!(sse_future, register_future);
269    let _ = register_result?;
270    let events_body = events_body?;
271    if events_body.contains("data:") {
272        harness.pass("http: mdns events sse");
273    } else {
274        harness.fail("http: mdns events sse", "no sse data received");
275    }
276
277    if let Some(id) = mdns_id {
278        let unregister_resp: serde_json::Value = client
279            .delete(format!("{base_url}/v1/mdns/unregister/{id}"))
280            .send()
281            .await?
282            .json()
283            .await?;
284        if unregister_resp.get("unregistered").is_some() {
285            harness.pass("http: mdns unregister");
286        } else {
287            harness.fail("http: mdns unregister", "missing unregistered response");
288        }
289    }
290
291    let dns_entry = serde_json::json!({
292        "name": "http-test",
293        "ip": "127.0.0.1",
294        "ttl": null
295    });
296    let add_resp: serde_json::Value = client
297        .post(format!("{base_url}/v1/dns/add"))
298        .json(&dns_entry)
299        .send()
300        .await?
301        .json()
302        .await?;
303    if add_resp.get("entries").is_some() {
304        harness.pass("http: dns add entry");
305    } else {
306        harness.fail("http: dns add entry", "missing entries response");
307    }
308
309    let lookup_resp: serde_json::Value = client
310        .get(format!(
311            "{base_url}/v1/dns/lookup?name=http-test.lan&type=A"
312        ))
313        .send()
314        .await?
315        .json()
316        .await?;
317    if lookup_resp.get("ips").is_some() {
318        harness.pass("http: dns lookup");
319    } else {
320        harness.fail("http: dns lookup", "missing lookup response");
321    }
322
323    let list_resp: serde_json::Value = client
324        .get(format!("{base_url}/v1/dns/list"))
325        .send()
326        .await?
327        .json()
328        .await?;
329    if list_resp
330        .get("names")
331        .and_then(|v| v.as_array())
332        .map(|arr| arr.iter().any(|name| name == "http-test.lan."))
333        .unwrap_or(false)
334    {
335        harness.pass("http: dns list names");
336    } else {
337        harness.fail("http: dns list names", "name missing from list");
338    }
339
340    let start_resp: serde_json::Value = client
341        .post(format!("{base_url}/v1/dns/serve"))
342        .send()
343        .await?
344        .json()
345        .await?;
346    if start_resp.get("started").is_some() {
347        harness.pass("http: dns start");
348    } else {
349        harness.fail("http: dns start", "missing started response");
350    }
351
352    let stop_resp: serde_json::Value = client
353        .post(format!("{base_url}/v1/dns/stop"))
354        .send()
355        .await?
356        .json()
357        .await?;
358    if stop_resp.get("stopped").is_some() {
359        harness.pass("http: dns stop");
360    } else {
361        harness.fail("http: dns stop", "missing stopped response");
362    }
363
364    let remove_resp: serde_json::Value = client
365        .delete(format!("{base_url}/v1/dns/remove/http-test"))
366        .send()
367        .await?
368        .json()
369        .await?;
370    if remove_resp.get("entries").is_some() {
371        harness.pass("http: dns remove entry");
372    } else {
373        harness.fail("http: dns remove entry", "missing entries response");
374    }
375
376    let health_add = serde_json::json!({
377        "name": "http-tcp",
378        "kind": "tcp",
379        "target": "127.0.0.1:9",
380        "interval_secs": 1,
381        "timeout_secs": 1
382    });
383    let health_resp: serde_json::Value = client
384        .post(format!("{base_url}/v1/health/add"))
385        .json(&health_add)
386        .send()
387        .await?
388        .json()
389        .await?;
390    if health_resp.get("status") == Some(&serde_json::Value::String("ok".to_string())) {
391        harness.pass("http: health add check");
392    } else {
393        harness.fail("http: health add check", "unexpected response");
394    }
395
396    let checks_resp: serde_json::Value = client
397        .get(format!("{base_url}/v1/health/list"))
398        .send()
399        .await?
400        .json()
401        .await?;
402    if checks_resp.get("checks").is_some() {
403        harness.pass("http: health list checks");
404    } else {
405        harness.fail("http: health list checks", "missing checks response");
406    }
407
408    let remove_health: serde_json::Value = client
409        .delete(format!("{base_url}/v1/health/remove/http-tcp"))
410        .send()
411        .await?
412        .json()
413        .await?;
414    if remove_health.get("status") == Some(&serde_json::Value::String("ok".to_string())) {
415        harness.pass("http: health remove check");
416    } else {
417        harness.fail("http: health remove check", "unexpected response");
418    }
419
420    let proxy_payload = serde_json::json!({
421        "name": "http-proxy",
422        "listen_port": 18090,
423        "backend": "http://127.0.0.1:18091",
424        "allow_remote": false
425    });
426    let proxy_resp: serde_json::Value = client
427        .post(format!("{base_url}/v1/proxy/add"))
428        .json(&proxy_payload)
429        .send()
430        .await?
431        .json()
432        .await?;
433    if proxy_resp.get("status") == Some(&serde_json::Value::String("ok".to_string())) {
434        harness.pass("http: proxy add entry");
435    } else {
436        harness.fail("http: proxy add entry", "unexpected response");
437    }
438
439    let proxy_entries: serde_json::Value = client
440        .get(format!("{base_url}/v1/proxy/list"))
441        .send()
442        .await?
443        .json()
444        .await?;
445    if proxy_entries.get("entries").is_some() {
446        harness.pass("http: proxy list entries");
447    } else {
448        harness.fail("http: proxy list entries", "missing entries response");
449    }
450
451    let proxy_remove: serde_json::Value = client
452        .delete(format!("{base_url}/v1/proxy/remove/http-proxy"))
453        .send()
454        .await?
455        .json()
456        .await?;
457    if proxy_remove.get("status") == Some(&serde_json::Value::String("ok".to_string())) {
458        harness.pass("http: proxy remove entry");
459    } else {
460        harness.fail("http: proxy remove entry", "unexpected response");
461    }
462
463    let entropy_hex = koi_common::encoding::hex_encode(&[42u8; 32]);
464    let certmesh_payload = serde_json::json!({
465        "passphrase": "http-test-pass",
466        "entropy_hex": entropy_hex,
467        "profile": "just_me"
468    });
469    let certmesh_create: serde_json::Value = client
470        .post(format!("{base_url}/v1/certmesh/create"))
471        .json(&certmesh_payload)
472        .send()
473        .await?
474        .json()
475        .await?;
476    if certmesh_create.get("totp_uri").is_some() {
477        harness.pass("http: certmesh create");
478    } else {
479        harness.fail("http: certmesh create", "missing totp_uri");
480    }
481
482    let status: serde_json::Value = client
483        .get(format!("{base_url}/v1/certmesh/status"))
484        .send()
485        .await?
486        .json()
487        .await?;
488    if status.get("ca_initialized") == Some(&serde_json::Value::Bool(true)) {
489        harness.pass("http: certmesh status");
490    } else {
491        harness.fail("http: certmesh status", "unexpected status");
492    }
493
494    let _ = client
495        .post(format!("{base_url}/v1/certmesh/open-enrollment"))
496        .json(&serde_json::json!({"deadline": null}))
497        .send()
498        .await?;
499    let _ = client
500        .post(format!("{base_url}/v1/certmesh/close-enrollment"))
501        .send()
502        .await?;
503    harness.pass("http: certmesh enrollment open/close");
504
505    let _ = client
506        .put(format!("{base_url}/v1/certmesh/policy"))
507        .json(&serde_json::json!({"allowed_domain": "example.com", "allowed_subnet": null}))
508        .send()
509        .await?;
510    harness.pass("http: certmesh set policy");
511
512    let rotate: serde_json::Value = client
513        .post(format!("{base_url}/v1/certmesh/rotate-totp"))
514        .json(&serde_json::json!({"passphrase": "http-test-pass"}))
515        .send()
516        .await?
517        .json()
518        .await?;
519    if rotate.get("totp_uri").is_some() {
520        harness.pass("http: certmesh rotate totp");
521    } else {
522        harness.fail("http: certmesh rotate totp", "missing totp_uri");
523    }
524
525    let destroy: serde_json::Value = client
526        .post(format!("{base_url}/v1/certmesh/destroy"))
527        .send()
528        .await?
529        .json()
530        .await?;
531    if destroy.get("destroyed") == Some(&serde_json::Value::Bool(true)) {
532        harness.pass("http: certmesh destroy");
533    } else {
534        harness.fail("http: certmesh destroy", "unexpected response");
535    }
536
537    Ok(())
538}
539
540#[cfg(windows)]
541async fn run_ipc_tests(
542    mdns: std::sync::Arc<koi_mdns::MdnsCore>,
543    harness: &mut Harness,
544) -> Result<(), Box<dyn std::error::Error>> {
545    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
546    use tokio::net::windows::named_pipe::ServerOptions;
547
548    let pipe_name = format!(
549        "\\\\.\\pipe\\koi-embedded-ipc-{}",
550        SystemTime::now()
551            .duration_since(UNIX_EPOCH)
552            .unwrap_or_default()
553            .as_nanos()
554    );
555    let cancel = CancellationToken::new();
556    let server_core = mdns.clone();
557    let server_cancel = cancel.clone();
558    let pipe_name_clone = pipe_name.clone();
559
560    let server = tokio::spawn(async move {
561        loop {
562            let server = ServerOptions::new()
563                .first_pipe_instance(false)
564                .create(pipe_name_clone.as_str());
565            let server = match server {
566                Ok(server) => server,
567                Err(_) => break,
568            };
569
570            tokio::select! {
571                result = server.connect() => {
572                    if result.is_err() {
573                        continue;
574                    }
575                    let core = server_core.clone();
576                    tokio::spawn(async move {
577                        let (reader, mut writer) = tokio::io::split(server);
578                        let reader = BufReader::new(reader);
579                        let mut lines = reader.lines();
580                        while let Ok(Some(line)) = lines.next_line().await {
581                            let line = line.trim();
582                            if line.is_empty() {
583                                continue;
584                            }
585                            let request: serde_json::Result<MdnsRequest> = serde_json::from_str(line);
586                            let response = match request {
587                                Ok(MdnsRequest::Register(payload)) => {
588                                    let policy = koi_mdns::LeasePolicy::Session {
589                                        grace: Duration::from_secs(30),
590                                    };
591                                    match core.register_with_policy(payload, policy, None) {
592                                        Ok(result) => koi_mdns::protocol::MdnsPipelineResponse::clean(
593                                            koi_mdns::protocol::Response::Registered(result),
594                                        ),
595                                        Err(err) => koi_mdns::protocol::error_to_pipeline(&err),
596                                    }
597                                }
598                                Ok(MdnsRequest::Resolve(name)) => match core.resolve(&name).await {
599                                    Ok(record) => koi_mdns::protocol::MdnsPipelineResponse::clean(
600                                        koi_mdns::protocol::Response::Resolved(record),
601                                    ),
602                                    Err(err) => koi_mdns::protocol::error_to_pipeline(&err),
603                                },
604                                Ok(MdnsRequest::Heartbeat(id)) => match core.heartbeat(&id) {
605                                    Ok(lease_secs) => koi_mdns::protocol::MdnsPipelineResponse::clean(
606                                        koi_mdns::protocol::Response::Renewed(
607                                            koi_mdns::protocol::RenewalResult { id, lease_secs },
608                                        ),
609                                    ),
610                                    Err(err) => koi_mdns::protocol::error_to_pipeline(&err),
611                                },
612                                Ok(MdnsRequest::Unregister(id)) => match core.unregister(&id) {
613                                    Ok(()) => koi_mdns::protocol::MdnsPipelineResponse::clean(
614                                        koi_mdns::protocol::Response::Unregistered(id),
615                                    ),
616                                    Err(err) => koi_mdns::protocol::error_to_pipeline(&err),
617                                },
618                                Ok(MdnsRequest::Browse(service_type)) => {
619                                    let handle = match core.browse(&service_type).await {
620                                        Ok(handle) => handle,
621                                        Err(err) => {
622                                            let resp = koi_mdns::protocol::error_to_pipeline(&err);
623                                            let _ = writer
624                                                .write_all(serde_json::to_string(&resp).unwrap().as_bytes())
625                                                .await;
626                                            let _ = writer.write_all(b"\n").await;
627                                            continue;
628                                        }
629                                    };
630                                    let handle = handle;
631                                    while let Some(event) = handle.recv().await {
632                                        let resp = koi_mdns::protocol::browse_event_to_pipeline(event);
633                                        let _ = writer
634                                            .write_all(serde_json::to_string(&resp).unwrap().as_bytes())
635                                            .await;
636                                        let _ = writer.write_all(b"\n").await;
637                                    }
638                                    continue;
639                                }
640                                Ok(MdnsRequest::Subscribe(service_type)) => {
641                                    let handle = match core.browse(&service_type).await {
642                                        Ok(handle) => handle,
643                                        Err(err) => {
644                                            let resp = koi_mdns::protocol::error_to_pipeline(&err);
645                                            let _ = writer
646                                                .write_all(serde_json::to_string(&resp).unwrap().as_bytes())
647                                                .await;
648                                            let _ = writer.write_all(b"\n").await;
649                                            continue;
650                                        }
651                                    };
652                                    let handle = handle;
653                                    while let Some(event) = handle.recv().await {
654                                        let resp = koi_mdns::protocol::subscribe_event_to_pipeline(event);
655                                        let _ = writer
656                                            .write_all(serde_json::to_string(&resp).unwrap().as_bytes())
657                                            .await;
658                                        let _ = writer.write_all(b"\n").await;
659                                    }
660                                    continue;
661                                }
662                                Err(_) => koi_mdns::protocol::MdnsPipelineResponse::clean(
663                                    koi_mdns::protocol::Response::Error(koi_common::api::error_body(
664                                        koi_common::error::ErrorCode::ParseError,
665                                        "invalid_json",
666                                    )),
667                                ),
668                            };
669
670                            let _ = writer
671                                .write_all(serde_json::to_string(&response).unwrap().as_bytes())
672                                .await;
673                            let _ = writer.write_all(b"\n").await;
674                        }
675                    });
676                }
677                _ = server_cancel.cancelled() => break,
678            }
679        }
680    });
681
682    let client = open_pipe(&pipe_name).await?;
683    let (reader, mut writer) = tokio::io::split(client);
684    let mut reader = BufReader::new(reader).lines();
685
686    let register_value = serde_json::json!({
687        "register": {
688            "name": "koi-ipc-test",
689            "type": "_koi._tcp",
690            "port": 51517,
691            "ip": "127.0.0.1",
692            "lease_secs": 30,
693            "txt": {"source": "ipc"}
694        }
695    });
696    let register_resp = ipc_send(&mut reader, &mut writer, register_value).await?;
697    let id = register_resp
698        .get("registered")
699        .and_then(|v| v.get("id"))
700        .and_then(|v| v.as_str())
701        .map(|s| s.to_string());
702    if id.is_some() {
703        harness.pass("ipc: mdns register");
704    } else {
705        harness.fail("ipc: mdns register", "missing registered id");
706    }
707
708    let resolve_resp = ipc_send(
709        &mut reader,
710        &mut writer,
711        serde_json::json!({
712            "resolve": "koi-ipc-test._koi._tcp.local."
713        }),
714    )
715    .await?;
716    if resolve_resp.get("resolved").is_some() {
717        harness.pass("ipc: mdns resolve");
718    } else {
719        harness.fail("ipc: mdns resolve", "missing resolved response");
720    }
721
722    if let Some(id) = id.clone() {
723        let heartbeat_resp = ipc_send(
724            &mut reader,
725            &mut writer,
726            serde_json::json!({
727                "heartbeat": id
728            }),
729        )
730        .await?;
731        if heartbeat_resp.get("renewed").is_some() {
732            harness.pass("ipc: mdns heartbeat");
733        } else {
734            harness.fail("ipc: mdns heartbeat", "missing renewed response");
735        }
736    }
737
738    if let Some(id) = id {
739        let unregister_resp = ipc_send(
740            &mut reader,
741            &mut writer,
742            serde_json::json!({
743                "unregister": id
744            }),
745        )
746        .await?;
747        if unregister_resp.get("unregistered").is_some() {
748            harness.pass("ipc: mdns unregister");
749        } else {
750            harness.fail("ipc: mdns unregister", "missing unregistered response");
751        }
752    }
753
754    cancel.cancel();
755    let _ = server.await;
756    Ok(())
757}
758
759#[cfg(not(windows))]
760async fn run_ipc_tests(
761    _mdns: std::sync::Arc<koi_mdns::MdnsCore>,
762    harness: &mut Harness,
763) -> Result<(), Box<dyn std::error::Error>> {
764    harness.skip("ipc: mdns", "named pipes unsupported on this platform");
765    Ok(())
766}
767
768#[tokio::main]
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770    let args: Vec<String> = std::env::args().collect();
771    if has_flag(&args, "--help") {
772        println!(
773            "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n  --timeout N   Overall timeout in seconds (default: 30)\n  --verbose     Verbose logging"
774        );
775        return Ok(());
776    }
777
778    let verbose = has_flag(&args, "--verbose");
779    let with_certmesh = true;
780    let skip_mdns = false;
781    let timeout_secs = read_arg_value(&args, "--timeout")
782        .and_then(|value| value.parse::<u64>().ok())
783        .unwrap_or(30);
784    let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786    let data_dir = temp_data_dir();
787    let mut harness = Harness::new(verbose);
788
789    harness.log(format!("data dir: {}", data_dir.display()));
790    harness.log(format!("mdns: {}", !skip_mdns));
791    harness.log(format!("certmesh: {}", with_certmesh));
792
793    let koi = Builder::new()
794        .data_dir(&data_dir)
795        .service_mode(ServiceMode::EmbeddedOnly)
796        .mdns(!skip_mdns)
797        .dns_enabled(true)
798        .dns(|cfg| cfg.port(15353))
799        .health(true)
800        .certmesh(with_certmesh)
801        .proxy(true)
802        .build()?;
803    let handle = koi.start().await?;
804
805    let mdns = match handle.mdns() {
806        Ok(mdns) => mdns,
807        Err(err) => {
808            harness.fail("mdns: setup", &format!("{err}"));
809            handle.shutdown().await?;
810            harness.summary();
811            std::process::exit(1);
812        }
813    };
814    let dns = match handle.dns() {
815        Ok(dns) => dns,
816        Err(err) => {
817            harness.fail("dns: setup", &format!("{err}"));
818            handle.shutdown().await?;
819            harness.summary();
820            std::process::exit(1);
821        }
822    };
823    let health = match handle.health() {
824        Ok(health) => health,
825        Err(err) => {
826            harness.fail("health: setup", &format!("{err}"));
827            handle.shutdown().await?;
828            harness.summary();
829            std::process::exit(1);
830        }
831    };
832    let proxy = match handle.proxy() {
833        Ok(proxy) => proxy,
834        Err(err) => {
835            harness.fail("proxy: setup", &format!("{err}"));
836            handle.shutdown().await?;
837            harness.summary();
838            std::process::exit(1);
839        }
840    };
841    let certmesh = match handle.certmesh() {
842        Ok(certmesh) => certmesh,
843        Err(err) => {
844            harness.fail("certmesh: setup", &format!("{err}"));
845            handle.shutdown().await?;
846            harness.summary();
847            std::process::exit(1);
848        }
849    };
850
851    let (http_addr, http_cancel) = start_http_server(
852        mdns.core()?,
853        dns.runtime()?,
854        health.core()?,
855        certmesh.core()?,
856        proxy.runtime()?,
857    )
858    .await?;
859    let http_base = format!("http://{}", http_addr);
860    harness.log(format!("http base: {http_base}"));
861
862    // DNS: add entry, lookup, and event.
863    let mut rx = handle.subscribe();
864    let entry = DnsEntry {
865        name: "embedded-test.lan".to_string(),
866        ip: "127.0.0.1".to_string(),
867        ttl: None,
868    };
869    let _ = dns.add_entry(entry)?;
870    let event = wait_for_event(
871        &mut rx,
872        Duration::from_secs(2),
873        |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874    )
875    .await;
876    if event.is_some() {
877        harness.pass("dns: event emitted");
878    } else {
879        harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880    }
881
882    let result = dns
883        .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884        .await;
885    match result {
886        Some(result) => {
887            if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888                harness.pass("dns: lookup static entry");
889            } else {
890                harness.fail("dns: lookup static entry", "unexpected lookup result");
891            }
892        }
893        None => harness.fail("dns: lookup static entry", "lookup returned none"),
894    }
895
896    let names = dns.list_names();
897    if names.iter().any(|name| name == "embedded-test.lan.") {
898        harness.pass("dns: list names includes entry");
899    } else {
900        harness.fail("dns: list names includes entry", "name missing from list");
901    }
902
903    let _ = dns.remove_entry("embedded-test.lan");
904    let removed_event = wait_for_event(
905        &mut rx,
906        Duration::from_secs(2),
907        |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908    )
909    .await;
910    if removed_event.is_some() {
911        harness.pass("dns: remove emits removal event");
912    } else {
913        harness.fail(
914            "dns: remove emits removal event",
915            "no removal event received",
916        );
917    }
918
919    // Health: run a TCP check against a local listener.
920    let mut rx = handle.subscribe();
921    let listener = TcpListener::bind("127.0.0.1:0").await?;
922    let addr = listener.local_addr()?;
923    tokio::spawn(async move {
924        loop {
925            let _ = listener.accept().await;
926        }
927    });
928
929    let check = HealthCheck {
930        name: "tcp-local".to_string(),
931        kind: ServiceCheckKind::Tcp,
932        target: format!("127.0.0.1:{}", addr.port()),
933        interval_secs: 1,
934        timeout_secs: 1,
935    };
936    health.add_check(check).await?;
937    health.core()?.run_checks_once().await;
938    let snapshot = health.status().await;
939    let status = snapshot
940        .services
941        .iter()
942        .find(|svc| svc.name == "tcp-local")
943        .map(|svc| svc.status);
944    match status {
945        Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946        Some(other) => harness.fail(
947            "health: tcp check up",
948            &format!("unexpected status: {other:?}"),
949        ),
950        None => harness.fail("health: tcp check up", "service missing"),
951    }
952
953    let event = wait_for_event(
954        &mut rx,
955        Duration::from_secs(3),
956        |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957    )
958    .await;
959    if event.is_some() {
960        harness.pass("health: event emitted");
961    } else {
962        harness.fail("health: event emitted", "no HealthChanged event received");
963    }
964
965    let _ = health.remove_check("tcp-local").await;
966    let snapshot = health.status().await;
967    if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968        harness.fail("health: remove check", "check still present after removal");
969    } else {
970        harness.pass("health: remove check");
971    }
972
973    // mDNS: register + browse.
974    let browse = mdns.browse("_koi._tcp").await;
975    if let Ok(browse) = browse {
976        let mut txt = HashMap::new();
977        txt.insert("source".to_string(), "embedded".to_string());
978        let payload = RegisterPayload {
979            name: "koi-embedded-test".to_string(),
980            service_type: "_koi._tcp".to_string(),
981            port: 51515,
982            ip: Some("127.0.0.1".to_string()),
983            lease_secs: Some(30),
984            txt,
985        };
986        let reg = mdns.register(payload);
987        if let Ok(reg) = reg {
988            let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989            match found {
990                Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991                Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992                Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993            }
994
995            match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996                Ok(record) if record.port == Some(51515) => {
997                    harness.pass("mdns: resolve registered service");
998                }
999                Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000                Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001            }
1002
1003            match mdns.unregister(&reg.id) {
1004                Ok(()) => harness.pass("mdns: unregister"),
1005                Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006            }
1007        } else {
1008            harness.fail("mdns: register + browse", "register failed");
1009        }
1010    } else {
1011        harness.fail("mdns: register + browse", "browse failed");
1012    }
1013
1014    // Proxy: upsert and read entries.
1015    let mut rx = handle.subscribe();
1016    let entry = ProxyEntry {
1017        name: "embedded-proxy".to_string(),
1018        listen_port: 18080,
1019        backend: "http://127.0.0.1:18081".to_string(),
1020        allow_remote: false,
1021    };
1022    let result = proxy.upsert(entry.clone()).await;
1023    if result.is_ok() {
1024        let entries = proxy.entries().await;
1025        if entries.iter().any(|item| item.name == entry.name) {
1026            harness.pass("proxy: upsert entry");
1027        } else {
1028            harness.fail("proxy: upsert entry", "entry missing after upsert");
1029        }
1030        let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031            matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032        })
1033        .await;
1034        if event.is_some() {
1035            harness.pass("proxy: event emitted");
1036        } else {
1037            harness.fail(
1038                "proxy: event emitted",
1039                "no ProxyEntryUpdated event received",
1040            );
1041        }
1042        let _ = proxy.remove("embedded-proxy").await;
1043        let entries = proxy.entries().await;
1044        if entries.iter().any(|item| item.name == "embedded-proxy") {
1045            harness.fail("proxy: remove entry", "entry still present after removal");
1046        } else {
1047            harness.pass("proxy: remove entry");
1048        }
1049    } else {
1050        harness.fail("proxy: upsert entry", "upsert failed");
1051    }
1052
1053    let client = reqwest::Client::builder()
1054        .timeout(Duration::from_secs(10))
1055        .build()?;
1056    if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057        harness.fail("http: suite", &format!("{err}"));
1058    }
1059
1060    if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061        harness.fail("ipc: suite", &format!("{err}"));
1062    }
1063
1064    http_cancel.cancel();
1065    handle.shutdown().await?;
1066    if tokio::time::Instant::now() > total_deadline {
1067        harness.fail("runtime", "overall timeout exceeded");
1068    }
1069    harness.summary();
1070    if harness.failed > 0 {
1071        std::process::exit(1);
1072    }
1073    Ok(())
1074}