1use std::collections::HashMap;
2use std::net::{IpAddr, SocketAddr};
3use std::path::PathBuf;
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use axum::routing::get;
7use axum::Router;
8use tokio::net::TcpListener;
9use tokio_util::sync::CancellationToken;
10
11use koi_config::state::DnsEntry;
12use koi_embedded::{Builder, KoiEvent, ServiceMode};
13use koi_health::{HealthCheck, ServiceCheckKind, ServiceStatus};
14use koi_mdns::protocol::{RegisterPayload, Request as MdnsRequest};
15use koi_proxy::ProxyEntry;
16
17struct Harness {
18 passed: usize,
19 failed: usize,
20 skipped: usize,
21 verbose: bool,
22}
23
24impl Harness {
25 fn new(verbose: bool) -> Self {
26 Self {
27 passed: 0,
28 failed: 0,
29 skipped: 0,
30 verbose,
31 }
32 }
33
34 fn log(&self, msg: impl AsRef<str>) {
35 if self.verbose {
36 println!(" {}", msg.as_ref());
37 }
38 }
39
40 fn pass(&mut self, name: &str) {
41 self.passed += 1;
42 println!("[PASS] {name}");
43 }
44
45 fn fail(&mut self, name: &str, reason: &str) {
46 self.failed += 1;
47 println!("[FAIL] {name} - {reason}");
48 }
49
50 #[allow(dead_code)]
51 fn skip(&mut self, name: &str, reason: &str) {
52 self.skipped += 1;
53 println!("[SKIP] {name} - {reason}");
54 }
55
56 fn summary(&self) {
57 println!(
58 "\nSummary: {} passed, {} failed, {} skipped",
59 self.passed, self.failed, self.skipped
60 );
61 }
62}
63
64#[cfg(windows)]
65async fn open_pipe(
66 pipe_name: &str,
67) -> Result<tokio::net::windows::named_pipe::NamedPipeClient, Box<dyn std::error::Error>> {
68 use tokio::net::windows::named_pipe::ClientOptions;
69
70 let mut last_err = None;
71 for _ in 0..20 {
72 match ClientOptions::new().open(pipe_name) {
73 Ok(client) => return Ok(client),
74 Err(err) => {
75 last_err = Some(err);
76 tokio::time::sleep(Duration::from_millis(100)).await;
77 }
78 }
79 }
80
81 Err(Box::new(last_err.unwrap_or_else(|| {
82 std::io::Error::new(std::io::ErrorKind::NotFound, "pipe not available")
83 })))
84}
85
86#[cfg(windows)]
87async fn ipc_send<R, W>(
88 reader: &mut tokio::io::Lines<tokio::io::BufReader<R>>,
89 writer: &mut W,
90 value: serde_json::Value,
91) -> Result<serde_json::Value, Box<dyn std::error::Error>>
92where
93 R: tokio::io::AsyncRead + Unpin,
94 W: tokio::io::AsyncWrite + Unpin,
95{
96 use tokio::io::AsyncWriteExt;
97
98 writer
99 .write_all(serde_json::to_string(&value)?.as_bytes())
100 .await?;
101 writer.write_all(b"\n").await?;
102 let line = reader.next_line().await?.ok_or("empty response")?;
103 let value: serde_json::Value = serde_json::from_str(&line)?;
104 Ok(value)
105}
106
107fn temp_data_dir() -> PathBuf {
108 let nanos = SystemTime::now()
109 .duration_since(UNIX_EPOCH)
110 .unwrap_or_default()
111 .as_nanos();
112 let dir = std::env::temp_dir().join(format!("koi-embedded-integration-{nanos}"));
113 std::fs::create_dir_all(&dir).expect("create temp dir");
114 dir
115}
116
117fn has_flag(args: &[String], flag: &str) -> bool {
118 args.iter().any(|arg| arg == flag)
119}
120
121fn read_arg_value(args: &[String], flag: &str) -> Option<String> {
122 args.iter()
123 .position(|arg| arg == flag)
124 .and_then(|idx| args.get(idx + 1))
125 .cloned()
126}
127
128async fn wait_for_event<F>(
129 rx: &mut tokio::sync::broadcast::Receiver<KoiEvent>,
130 timeout: Duration,
131 predicate: F,
132) -> Option<KoiEvent>
133where
134 F: Fn(&KoiEvent) -> bool,
135{
136 let deadline = tokio::time::Instant::now() + timeout;
137 loop {
138 let now = tokio::time::Instant::now();
139 if now >= deadline {
140 return None;
141 }
142 let remaining = deadline - now;
143 match tokio::time::timeout(remaining, rx.recv()).await {
144 Ok(Ok(event)) => {
145 if predicate(&event) {
146 return Some(event);
147 }
148 }
149 Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
150 Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => return None,
151 Err(_) => return None,
152 }
153 }
154}
155
156async fn start_http_server(
157 mdns: std::sync::Arc<koi_mdns::MdnsCore>,
158 dns: std::sync::Arc<koi_dns::DnsRuntime>,
159 health: std::sync::Arc<koi_health::HealthCore>,
160 certmesh: std::sync::Arc<koi_certmesh::CertmeshCore>,
161 proxy: std::sync::Arc<koi_proxy::ProxyRuntime>,
162) -> Result<(SocketAddr, CancellationToken), Box<dyn std::error::Error>> {
163 let app = Router::new()
164 .route(
165 "/healthz",
166 get(|| async { axum::Json(serde_json::json!({"ok": true})) }),
167 )
168 .nest("/v1/mdns", koi_mdns::http::routes(mdns))
169 .nest("/v1/dns", koi_dns::http::routes(dns))
170 .nest("/v1/health", koi_health::http::routes(health))
171 .nest("/v1/certmesh", certmesh.http_routes())
172 .nest("/v1/proxy", koi_proxy::http::routes(proxy));
173
174 let listener = TcpListener::bind("127.0.0.1:0").await?;
175 let addr = listener.local_addr()?;
176 let cancel = CancellationToken::new();
177 let token = cancel.clone();
178 tokio::spawn(async move {
179 let _ = axum::serve(listener, app)
180 .with_graceful_shutdown(token.cancelled_owned())
181 .await;
182 });
183
184 Ok((addr, cancel))
185}
186
187async fn read_sse_body(
188 client: &reqwest::Client,
189 url: &str,
190 timeout: Duration,
191) -> Result<String, Box<dyn std::error::Error>> {
192 let response = client.get(url).send().await?;
193 let body = tokio::time::timeout(timeout, response.text()).await??;
194 Ok(body)
195}
196
197async fn run_http_tests(
198 base_url: &str,
199 client: &reqwest::Client,
200 harness: &mut Harness,
201) -> Result<(), Box<dyn std::error::Error>> {
202 let health: serde_json::Value = client
203 .get(format!("{base_url}/healthz"))
204 .send()
205 .await?
206 .json()
207 .await?;
208 if health.get("ok") == Some(&serde_json::Value::Bool(true)) {
209 harness.pass("http: healthz ok");
210 } else {
211 harness.fail("http: healthz ok", "unexpected response");
212 }
213
214 let mdns_payload = serde_json::json!({
215 "name": "koi-http-test",
216 "type": "_koi._tcp",
217 "port": 51516,
218 "ip": "127.0.0.1",
219 "lease_secs": 30,
220 "txt": {"source": "http"}
221 });
222 let register_resp: serde_json::Value = client
223 .post(format!("{base_url}/v1/mdns/announce"))
224 .json(&mdns_payload)
225 .send()
226 .await?
227 .json()
228 .await?;
229 let mdns_id = register_resp
230 .get("registered")
231 .and_then(|v| v.get("id"))
232 .and_then(|v| v.as_str())
233 .map(|s| s.to_string());
234 if mdns_id.is_some() {
235 harness.pass("http: mdns register");
236 } else {
237 harness.fail("http: mdns register", "missing registered id");
238 }
239
240 let resolve_resp: serde_json::Value = client
241 .get(format!(
242 "{base_url}/v1/mdns/resolve?name=koi-http-test._koi._tcp.local."
243 ))
244 .send()
245 .await?
246 .json()
247 .await?;
248 if resolve_resp.get("resolved").is_some() {
249 harness.pass("http: mdns resolve");
250 } else {
251 harness.fail("http: mdns resolve", "missing resolved response");
252 }
253
254 let events_url = format!("{base_url}/v1/mdns/events?type=_koi._tcp&idle_for=1");
255 let sse_payload = serde_json::json!({
256 "name": "koi-http-sse",
257 "type": "_koi._tcp",
258 "port": 51518,
259 "ip": "127.0.0.1",
260 "lease_secs": 30,
261 "txt": {"source": "http-sse"}
262 });
263 let sse_future = read_sse_body(client, &events_url, Duration::from_secs(3));
264 let register_future = client
265 .post(format!("{base_url}/v1/mdns/announce"))
266 .json(&sse_payload)
267 .send();
268 let (events_body, register_result) = tokio::join!(sse_future, register_future);
269 let _ = register_result?;
270 let events_body = events_body?;
271 if events_body.contains("data:") {
272 harness.pass("http: mdns events sse");
273 } else {
274 harness.fail("http: mdns events sse", "no sse data received");
275 }
276
277 if let Some(id) = mdns_id {
278 let unregister_resp: serde_json::Value = client
279 .delete(format!("{base_url}/v1/mdns/unregister/{id}"))
280 .send()
281 .await?
282 .json()
283 .await?;
284 if unregister_resp.get("unregistered").is_some() {
285 harness.pass("http: mdns unregister");
286 } else {
287 harness.fail("http: mdns unregister", "missing unregistered response");
288 }
289 }
290
291 let dns_entry = serde_json::json!({
292 "name": "http-test",
293 "ip": "127.0.0.1",
294 "ttl": null
295 });
296 let add_resp: serde_json::Value = client
297 .post(format!("{base_url}/v1/dns/add"))
298 .json(&dns_entry)
299 .send()
300 .await?
301 .json()
302 .await?;
303 if add_resp.get("entries").is_some() {
304 harness.pass("http: dns add entry");
305 } else {
306 harness.fail("http: dns add entry", "missing entries response");
307 }
308
309 let lookup_resp: serde_json::Value = client
310 .get(format!(
311 "{base_url}/v1/dns/lookup?name=http-test.lan&type=A"
312 ))
313 .send()
314 .await?
315 .json()
316 .await?;
317 if lookup_resp.get("ips").is_some() {
318 harness.pass("http: dns lookup");
319 } else {
320 harness.fail("http: dns lookup", "missing lookup response");
321 }
322
323 let list_resp: serde_json::Value = client
324 .get(format!("{base_url}/v1/dns/list"))
325 .send()
326 .await?
327 .json()
328 .await?;
329 if list_resp
330 .get("names")
331 .and_then(|v| v.as_array())
332 .map(|arr| arr.iter().any(|name| name == "http-test.lan."))
333 .unwrap_or(false)
334 {
335 harness.pass("http: dns list names");
336 } else {
337 harness.fail("http: dns list names", "name missing from list");
338 }
339
340 let start_resp: serde_json::Value = client
341 .post(format!("{base_url}/v1/dns/serve"))
342 .send()
343 .await?
344 .json()
345 .await?;
346 if start_resp.get("started").is_some() {
347 harness.pass("http: dns start");
348 } else {
349 harness.fail("http: dns start", "missing started response");
350 }
351
352 let stop_resp: serde_json::Value = client
353 .post(format!("{base_url}/v1/dns/stop"))
354 .send()
355 .await?
356 .json()
357 .await?;
358 if stop_resp.get("stopped").is_some() {
359 harness.pass("http: dns stop");
360 } else {
361 harness.fail("http: dns stop", "missing stopped response");
362 }
363
364 let remove_resp: serde_json::Value = client
365 .delete(format!("{base_url}/v1/dns/remove/http-test"))
366 .send()
367 .await?
368 .json()
369 .await?;
370 if remove_resp.get("entries").is_some() {
371 harness.pass("http: dns remove entry");
372 } else {
373 harness.fail("http: dns remove entry", "missing entries response");
374 }
375
376 let health_add = serde_json::json!({
377 "name": "http-tcp",
378 "kind": "tcp",
379 "target": "127.0.0.1:9",
380 "interval_secs": 1,
381 "timeout_secs": 1
382 });
383 let health_resp: serde_json::Value = client
384 .post(format!("{base_url}/v1/health/add"))
385 .json(&health_add)
386 .send()
387 .await?
388 .json()
389 .await?;
390 if health_resp.get("status") == Some(&serde_json::Value::String("ok".to_string())) {
391 harness.pass("http: health add check");
392 } else {
393 harness.fail("http: health add check", "unexpected response");
394 }
395
396 let checks_resp: serde_json::Value = client
397 .get(format!("{base_url}/v1/health/list"))
398 .send()
399 .await?
400 .json()
401 .await?;
402 if checks_resp.get("checks").is_some() {
403 harness.pass("http: health list checks");
404 } else {
405 harness.fail("http: health list checks", "missing checks response");
406 }
407
408 let remove_health: serde_json::Value = client
409 .delete(format!("{base_url}/v1/health/remove/http-tcp"))
410 .send()
411 .await?
412 .json()
413 .await?;
414 if remove_health.get("status") == Some(&serde_json::Value::String("ok".to_string())) {
415 harness.pass("http: health remove check");
416 } else {
417 harness.fail("http: health remove check", "unexpected response");
418 }
419
420 let proxy_payload = serde_json::json!({
421 "name": "http-proxy",
422 "listen_port": 18090,
423 "backend": "http://127.0.0.1:18091",
424 "allow_remote": false
425 });
426 let proxy_resp: serde_json::Value = client
427 .post(format!("{base_url}/v1/proxy/add"))
428 .json(&proxy_payload)
429 .send()
430 .await?
431 .json()
432 .await?;
433 if proxy_resp.get("status") == Some(&serde_json::Value::String("ok".to_string())) {
434 harness.pass("http: proxy add entry");
435 } else {
436 harness.fail("http: proxy add entry", "unexpected response");
437 }
438
439 let proxy_entries: serde_json::Value = client
440 .get(format!("{base_url}/v1/proxy/list"))
441 .send()
442 .await?
443 .json()
444 .await?;
445 if proxy_entries.get("entries").is_some() {
446 harness.pass("http: proxy list entries");
447 } else {
448 harness.fail("http: proxy list entries", "missing entries response");
449 }
450
451 let proxy_remove: serde_json::Value = client
452 .delete(format!("{base_url}/v1/proxy/remove/http-proxy"))
453 .send()
454 .await?
455 .json()
456 .await?;
457 if proxy_remove.get("status") == Some(&serde_json::Value::String("ok".to_string())) {
458 harness.pass("http: proxy remove entry");
459 } else {
460 harness.fail("http: proxy remove entry", "unexpected response");
461 }
462
463 let entropy_hex = koi_common::encoding::hex_encode(&[42u8; 32]);
464 let certmesh_payload = serde_json::json!({
465 "passphrase": "http-test-pass",
466 "entropy_hex": entropy_hex,
467 "profile": "just_me"
468 });
469 let certmesh_create: serde_json::Value = client
470 .post(format!("{base_url}/v1/certmesh/create"))
471 .json(&certmesh_payload)
472 .send()
473 .await?
474 .json()
475 .await?;
476 if certmesh_create.get("totp_uri").is_some() {
477 harness.pass("http: certmesh create");
478 } else {
479 harness.fail("http: certmesh create", "missing totp_uri");
480 }
481
482 let status: serde_json::Value = client
483 .get(format!("{base_url}/v1/certmesh/status"))
484 .send()
485 .await?
486 .json()
487 .await?;
488 if status.get("ca_initialized") == Some(&serde_json::Value::Bool(true)) {
489 harness.pass("http: certmesh status");
490 } else {
491 harness.fail("http: certmesh status", "unexpected status");
492 }
493
494 let _ = client
495 .post(format!("{base_url}/v1/certmesh/open-enrollment"))
496 .json(&serde_json::json!({"deadline": null}))
497 .send()
498 .await?;
499 let _ = client
500 .post(format!("{base_url}/v1/certmesh/close-enrollment"))
501 .send()
502 .await?;
503 harness.pass("http: certmesh enrollment open/close");
504
505 let _ = client
506 .put(format!("{base_url}/v1/certmesh/policy"))
507 .json(&serde_json::json!({"allowed_domain": "example.com", "allowed_subnet": null}))
508 .send()
509 .await?;
510 harness.pass("http: certmesh set policy");
511
512 let rotate: serde_json::Value = client
513 .post(format!("{base_url}/v1/certmesh/rotate-totp"))
514 .json(&serde_json::json!({"passphrase": "http-test-pass"}))
515 .send()
516 .await?
517 .json()
518 .await?;
519 if rotate.get("totp_uri").is_some() {
520 harness.pass("http: certmesh rotate totp");
521 } else {
522 harness.fail("http: certmesh rotate totp", "missing totp_uri");
523 }
524
525 let destroy: serde_json::Value = client
526 .post(format!("{base_url}/v1/certmesh/destroy"))
527 .send()
528 .await?
529 .json()
530 .await?;
531 if destroy.get("destroyed") == Some(&serde_json::Value::Bool(true)) {
532 harness.pass("http: certmesh destroy");
533 } else {
534 harness.fail("http: certmesh destroy", "unexpected response");
535 }
536
537 Ok(())
538}
539
540#[cfg(windows)]
541async fn run_ipc_tests(
542 mdns: std::sync::Arc<koi_mdns::MdnsCore>,
543 harness: &mut Harness,
544) -> Result<(), Box<dyn std::error::Error>> {
545 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
546 use tokio::net::windows::named_pipe::ServerOptions;
547
548 let pipe_name = format!(
549 "\\\\.\\pipe\\koi-embedded-ipc-{}",
550 SystemTime::now()
551 .duration_since(UNIX_EPOCH)
552 .unwrap_or_default()
553 .as_nanos()
554 );
555 let cancel = CancellationToken::new();
556 let server_core = mdns.clone();
557 let server_cancel = cancel.clone();
558 let pipe_name_clone = pipe_name.clone();
559
560 let server = tokio::spawn(async move {
561 loop {
562 let server = ServerOptions::new()
563 .first_pipe_instance(false)
564 .create(pipe_name_clone.as_str());
565 let server = match server {
566 Ok(server) => server,
567 Err(_) => break,
568 };
569
570 tokio::select! {
571 result = server.connect() => {
572 if result.is_err() {
573 continue;
574 }
575 let core = server_core.clone();
576 tokio::spawn(async move {
577 let (reader, mut writer) = tokio::io::split(server);
578 let reader = BufReader::new(reader);
579 let mut lines = reader.lines();
580 while let Ok(Some(line)) = lines.next_line().await {
581 let line = line.trim();
582 if line.is_empty() {
583 continue;
584 }
585 let request: serde_json::Result<MdnsRequest> = serde_json::from_str(line);
586 let response = match request {
587 Ok(MdnsRequest::Register(payload)) => {
588 let policy = koi_mdns::LeasePolicy::Session {
589 grace: Duration::from_secs(30),
590 };
591 match core.register_with_policy(payload, policy, None) {
592 Ok(result) => koi_mdns::protocol::MdnsPipelineResponse::clean(
593 koi_mdns::protocol::Response::Registered(result),
594 ),
595 Err(err) => koi_mdns::protocol::error_to_pipeline(&err),
596 }
597 }
598 Ok(MdnsRequest::Resolve(name)) => match core.resolve(&name).await {
599 Ok(record) => koi_mdns::protocol::MdnsPipelineResponse::clean(
600 koi_mdns::protocol::Response::Resolved(record),
601 ),
602 Err(err) => koi_mdns::protocol::error_to_pipeline(&err),
603 },
604 Ok(MdnsRequest::Heartbeat(id)) => match core.heartbeat(&id) {
605 Ok(lease_secs) => koi_mdns::protocol::MdnsPipelineResponse::clean(
606 koi_mdns::protocol::Response::Renewed(
607 koi_mdns::protocol::RenewalResult { id, lease_secs },
608 ),
609 ),
610 Err(err) => koi_mdns::protocol::error_to_pipeline(&err),
611 },
612 Ok(MdnsRequest::Unregister(id)) => match core.unregister(&id) {
613 Ok(()) => koi_mdns::protocol::MdnsPipelineResponse::clean(
614 koi_mdns::protocol::Response::Unregistered(id),
615 ),
616 Err(err) => koi_mdns::protocol::error_to_pipeline(&err),
617 },
618 Ok(MdnsRequest::Browse(service_type)) => {
619 let handle = match core.browse(&service_type).await {
620 Ok(handle) => handle,
621 Err(err) => {
622 let resp = koi_mdns::protocol::error_to_pipeline(&err);
623 let _ = writer
624 .write_all(serde_json::to_string(&resp).unwrap().as_bytes())
625 .await;
626 let _ = writer.write_all(b"\n").await;
627 continue;
628 }
629 };
630 let handle = handle;
631 while let Some(event) = handle.recv().await {
632 let resp = koi_mdns::protocol::browse_event_to_pipeline(event);
633 let _ = writer
634 .write_all(serde_json::to_string(&resp).unwrap().as_bytes())
635 .await;
636 let _ = writer.write_all(b"\n").await;
637 }
638 continue;
639 }
640 Ok(MdnsRequest::Subscribe(service_type)) => {
641 let handle = match core.browse(&service_type).await {
642 Ok(handle) => handle,
643 Err(err) => {
644 let resp = koi_mdns::protocol::error_to_pipeline(&err);
645 let _ = writer
646 .write_all(serde_json::to_string(&resp).unwrap().as_bytes())
647 .await;
648 let _ = writer.write_all(b"\n").await;
649 continue;
650 }
651 };
652 let handle = handle;
653 while let Some(event) = handle.recv().await {
654 let resp = koi_mdns::protocol::subscribe_event_to_pipeline(event);
655 let _ = writer
656 .write_all(serde_json::to_string(&resp).unwrap().as_bytes())
657 .await;
658 let _ = writer.write_all(b"\n").await;
659 }
660 continue;
661 }
662 Err(_) => koi_mdns::protocol::MdnsPipelineResponse::clean(
663 koi_mdns::protocol::Response::Error(koi_common::api::error_body(
664 koi_common::error::ErrorCode::ParseError,
665 "invalid_json",
666 )),
667 ),
668 };
669
670 let _ = writer
671 .write_all(serde_json::to_string(&response).unwrap().as_bytes())
672 .await;
673 let _ = writer.write_all(b"\n").await;
674 }
675 });
676 }
677 _ = server_cancel.cancelled() => break,
678 }
679 }
680 });
681
682 let client = open_pipe(&pipe_name).await?;
683 let (reader, mut writer) = tokio::io::split(client);
684 let mut reader = BufReader::new(reader).lines();
685
686 let register_value = serde_json::json!({
687 "register": {
688 "name": "koi-ipc-test",
689 "type": "_koi._tcp",
690 "port": 51517,
691 "ip": "127.0.0.1",
692 "lease_secs": 30,
693 "txt": {"source": "ipc"}
694 }
695 });
696 let register_resp = ipc_send(&mut reader, &mut writer, register_value).await?;
697 let id = register_resp
698 .get("registered")
699 .and_then(|v| v.get("id"))
700 .and_then(|v| v.as_str())
701 .map(|s| s.to_string());
702 if id.is_some() {
703 harness.pass("ipc: mdns register");
704 } else {
705 harness.fail("ipc: mdns register", "missing registered id");
706 }
707
708 let resolve_resp = ipc_send(
709 &mut reader,
710 &mut writer,
711 serde_json::json!({
712 "resolve": "koi-ipc-test._koi._tcp.local."
713 }),
714 )
715 .await?;
716 if resolve_resp.get("resolved").is_some() {
717 harness.pass("ipc: mdns resolve");
718 } else {
719 harness.fail("ipc: mdns resolve", "missing resolved response");
720 }
721
722 if let Some(id) = id.clone() {
723 let heartbeat_resp = ipc_send(
724 &mut reader,
725 &mut writer,
726 serde_json::json!({
727 "heartbeat": id
728 }),
729 )
730 .await?;
731 if heartbeat_resp.get("renewed").is_some() {
732 harness.pass("ipc: mdns heartbeat");
733 } else {
734 harness.fail("ipc: mdns heartbeat", "missing renewed response");
735 }
736 }
737
738 if let Some(id) = id {
739 let unregister_resp = ipc_send(
740 &mut reader,
741 &mut writer,
742 serde_json::json!({
743 "unregister": id
744 }),
745 )
746 .await?;
747 if unregister_resp.get("unregistered").is_some() {
748 harness.pass("ipc: mdns unregister");
749 } else {
750 harness.fail("ipc: mdns unregister", "missing unregistered response");
751 }
752 }
753
754 cancel.cancel();
755 let _ = server.await;
756 Ok(())
757}
758
759#[cfg(not(windows))]
760async fn run_ipc_tests(
761 _mdns: std::sync::Arc<koi_mdns::MdnsCore>,
762 harness: &mut Harness,
763) -> Result<(), Box<dyn std::error::Error>> {
764 harness.skip("ipc: mdns", "named pipes unsupported on this platform");
765 Ok(())
766}
767
768#[tokio::main]
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770 let args: Vec<String> = std::env::args().collect();
771 if has_flag(&args, "--help") {
772 println!(
773 "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n --timeout N Overall timeout in seconds (default: 30)\n --verbose Verbose logging"
774 );
775 return Ok(());
776 }
777
778 let verbose = has_flag(&args, "--verbose");
779 let with_certmesh = true;
780 let skip_mdns = false;
781 let timeout_secs = read_arg_value(&args, "--timeout")
782 .and_then(|value| value.parse::<u64>().ok())
783 .unwrap_or(30);
784 let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786 let data_dir = temp_data_dir();
787 let mut harness = Harness::new(verbose);
788
789 harness.log(format!("data dir: {}", data_dir.display()));
790 harness.log(format!("mdns: {}", !skip_mdns));
791 harness.log(format!("certmesh: {}", with_certmesh));
792
793 let koi = Builder::new()
794 .data_dir(&data_dir)
795 .service_mode(ServiceMode::EmbeddedOnly)
796 .mdns(!skip_mdns)
797 .dns_enabled(true)
798 .dns(|cfg| cfg.port(15353))
799 .health(true)
800 .certmesh(with_certmesh)
801 .proxy(true)
802 .build()?;
803 let handle = koi.start().await?;
804
805 let mdns = match handle.mdns() {
806 Ok(mdns) => mdns,
807 Err(err) => {
808 harness.fail("mdns: setup", &format!("{err}"));
809 handle.shutdown().await?;
810 harness.summary();
811 std::process::exit(1);
812 }
813 };
814 let dns = match handle.dns() {
815 Ok(dns) => dns,
816 Err(err) => {
817 harness.fail("dns: setup", &format!("{err}"));
818 handle.shutdown().await?;
819 harness.summary();
820 std::process::exit(1);
821 }
822 };
823 let health = match handle.health() {
824 Ok(health) => health,
825 Err(err) => {
826 harness.fail("health: setup", &format!("{err}"));
827 handle.shutdown().await?;
828 harness.summary();
829 std::process::exit(1);
830 }
831 };
832 let proxy = match handle.proxy() {
833 Ok(proxy) => proxy,
834 Err(err) => {
835 harness.fail("proxy: setup", &format!("{err}"));
836 handle.shutdown().await?;
837 harness.summary();
838 std::process::exit(1);
839 }
840 };
841 let certmesh = match handle.certmesh() {
842 Ok(certmesh) => certmesh,
843 Err(err) => {
844 harness.fail("certmesh: setup", &format!("{err}"));
845 handle.shutdown().await?;
846 harness.summary();
847 std::process::exit(1);
848 }
849 };
850
851 let (http_addr, http_cancel) = start_http_server(
852 mdns.core()?,
853 dns.runtime()?,
854 health.core()?,
855 certmesh.core()?,
856 proxy.runtime()?,
857 )
858 .await?;
859 let http_base = format!("http://{}", http_addr);
860 harness.log(format!("http base: {http_base}"));
861
862 let mut rx = handle.subscribe();
864 let entry = DnsEntry {
865 name: "embedded-test.lan".to_string(),
866 ip: "127.0.0.1".to_string(),
867 ttl: None,
868 };
869 let _ = dns.add_entry(entry)?;
870 let event = wait_for_event(
871 &mut rx,
872 Duration::from_secs(2),
873 |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874 )
875 .await;
876 if event.is_some() {
877 harness.pass("dns: event emitted");
878 } else {
879 harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880 }
881
882 let result = dns
883 .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884 .await;
885 match result {
886 Some(result) => {
887 if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888 harness.pass("dns: lookup static entry");
889 } else {
890 harness.fail("dns: lookup static entry", "unexpected lookup result");
891 }
892 }
893 None => harness.fail("dns: lookup static entry", "lookup returned none"),
894 }
895
896 let names = dns.list_names();
897 if names.iter().any(|name| name == "embedded-test.lan.") {
898 harness.pass("dns: list names includes entry");
899 } else {
900 harness.fail("dns: list names includes entry", "name missing from list");
901 }
902
903 let _ = dns.remove_entry("embedded-test.lan");
904 let removed_event = wait_for_event(
905 &mut rx,
906 Duration::from_secs(2),
907 |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908 )
909 .await;
910 if removed_event.is_some() {
911 harness.pass("dns: remove emits removal event");
912 } else {
913 harness.fail(
914 "dns: remove emits removal event",
915 "no removal event received",
916 );
917 }
918
919 let mut rx = handle.subscribe();
921 let listener = TcpListener::bind("127.0.0.1:0").await?;
922 let addr = listener.local_addr()?;
923 tokio::spawn(async move {
924 loop {
925 let _ = listener.accept().await;
926 }
927 });
928
929 let check = HealthCheck {
930 name: "tcp-local".to_string(),
931 kind: ServiceCheckKind::Tcp,
932 target: format!("127.0.0.1:{}", addr.port()),
933 interval_secs: 1,
934 timeout_secs: 1,
935 };
936 health.add_check(check).await?;
937 health.core()?.run_checks_once().await;
938 let snapshot = health.status().await;
939 let status = snapshot
940 .services
941 .iter()
942 .find(|svc| svc.name == "tcp-local")
943 .map(|svc| svc.status);
944 match status {
945 Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946 Some(other) => harness.fail(
947 "health: tcp check up",
948 &format!("unexpected status: {other:?}"),
949 ),
950 None => harness.fail("health: tcp check up", "service missing"),
951 }
952
953 let event = wait_for_event(
954 &mut rx,
955 Duration::from_secs(3),
956 |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957 )
958 .await;
959 if event.is_some() {
960 harness.pass("health: event emitted");
961 } else {
962 harness.fail("health: event emitted", "no HealthChanged event received");
963 }
964
965 let _ = health.remove_check("tcp-local").await;
966 let snapshot = health.status().await;
967 if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968 harness.fail("health: remove check", "check still present after removal");
969 } else {
970 harness.pass("health: remove check");
971 }
972
973 let browse = mdns.browse("_koi._tcp").await;
975 if let Ok(browse) = browse {
976 let mut txt = HashMap::new();
977 txt.insert("source".to_string(), "embedded".to_string());
978 let payload = RegisterPayload {
979 name: "koi-embedded-test".to_string(),
980 service_type: "_koi._tcp".to_string(),
981 port: 51515,
982 ip: Some("127.0.0.1".to_string()),
983 lease_secs: Some(30),
984 txt,
985 };
986 let reg = mdns.register(payload);
987 if let Ok(reg) = reg {
988 let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989 match found {
990 Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991 Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992 Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993 }
994
995 match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996 Ok(record) if record.port == Some(51515) => {
997 harness.pass("mdns: resolve registered service");
998 }
999 Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000 Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001 }
1002
1003 match mdns.unregister(®.id) {
1004 Ok(()) => harness.pass("mdns: unregister"),
1005 Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006 }
1007 } else {
1008 harness.fail("mdns: register + browse", "register failed");
1009 }
1010 } else {
1011 harness.fail("mdns: register + browse", "browse failed");
1012 }
1013
1014 let mut rx = handle.subscribe();
1016 let entry = ProxyEntry {
1017 name: "embedded-proxy".to_string(),
1018 listen_port: 18080,
1019 backend: "http://127.0.0.1:18081".to_string(),
1020 allow_remote: false,
1021 };
1022 let result = proxy.upsert(entry.clone()).await;
1023 if result.is_ok() {
1024 let entries = proxy.entries().await;
1025 if entries.iter().any(|item| item.name == entry.name) {
1026 harness.pass("proxy: upsert entry");
1027 } else {
1028 harness.fail("proxy: upsert entry", "entry missing after upsert");
1029 }
1030 let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031 matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032 })
1033 .await;
1034 if event.is_some() {
1035 harness.pass("proxy: event emitted");
1036 } else {
1037 harness.fail(
1038 "proxy: event emitted",
1039 "no ProxyEntryUpdated event received",
1040 );
1041 }
1042 let _ = proxy.remove("embedded-proxy").await;
1043 let entries = proxy.entries().await;
1044 if entries.iter().any(|item| item.name == "embedded-proxy") {
1045 harness.fail("proxy: remove entry", "entry still present after removal");
1046 } else {
1047 harness.pass("proxy: remove entry");
1048 }
1049 } else {
1050 harness.fail("proxy: upsert entry", "upsert failed");
1051 }
1052
1053 let client = reqwest::Client::builder()
1054 .timeout(Duration::from_secs(10))
1055 .build()?;
1056 if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057 harness.fail("http: suite", &format!("{err}"));
1058 }
1059
1060 if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061 harness.fail("ipc: suite", &format!("{err}"));
1062 }
1063
1064 http_cancel.cancel();
1065 handle.shutdown().await?;
1066 if tokio::time::Instant::now() > total_deadline {
1067 harness.fail("runtime", "overall timeout exceeded");
1068 }
1069 harness.summary();
1070 if harness.failed > 0 {
1071 std::process::exit(1);
1072 }
1073 Ok(())
1074}