Skip to main content

ProxyHandle

Struct ProxyHandle 

Source
pub struct ProxyHandle { /* private fields */ }

Implementations§

Source§

impl ProxyHandle

Source

pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError>

Examples found in repository?
examples/embedded-integration.rs (line 856)
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770    let args: Vec<String> = std::env::args().collect();
771    if has_flag(&args, "--help") {
772        println!(
773            "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n  --timeout N   Overall timeout in seconds (default: 30)\n  --verbose     Verbose logging"
774        );
775        return Ok(());
776    }
777
778    let verbose = has_flag(&args, "--verbose");
779    let with_certmesh = true;
780    let skip_mdns = false;
781    let timeout_secs = read_arg_value(&args, "--timeout")
782        .and_then(|value| value.parse::<u64>().ok())
783        .unwrap_or(30);
784    let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786    let data_dir = temp_data_dir();
787    let mut harness = Harness::new(verbose);
788
789    harness.log(format!("data dir: {}", data_dir.display()));
790    harness.log(format!("mdns: {}", !skip_mdns));
791    harness.log(format!("certmesh: {}", with_certmesh));
792
793    let koi = Builder::new()
794        .data_dir(&data_dir)
795        .service_mode(ServiceMode::EmbeddedOnly)
796        .mdns(!skip_mdns)
797        .dns_enabled(true)
798        .dns(|cfg| cfg.port(15353))
799        .health(true)
800        .certmesh(with_certmesh)
801        .proxy(true)
802        .build()?;
803    let handle = koi.start().await?;
804
805    let mdns = match handle.mdns() {
806        Ok(mdns) => mdns,
807        Err(err) => {
808            harness.fail("mdns: setup", &format!("{err}"));
809            handle.shutdown().await?;
810            harness.summary();
811            std::process::exit(1);
812        }
813    };
814    let dns = match handle.dns() {
815        Ok(dns) => dns,
816        Err(err) => {
817            harness.fail("dns: setup", &format!("{err}"));
818            handle.shutdown().await?;
819            harness.summary();
820            std::process::exit(1);
821        }
822    };
823    let health = match handle.health() {
824        Ok(health) => health,
825        Err(err) => {
826            harness.fail("health: setup", &format!("{err}"));
827            handle.shutdown().await?;
828            harness.summary();
829            std::process::exit(1);
830        }
831    };
832    let proxy = match handle.proxy() {
833        Ok(proxy) => proxy,
834        Err(err) => {
835            harness.fail("proxy: setup", &format!("{err}"));
836            handle.shutdown().await?;
837            harness.summary();
838            std::process::exit(1);
839        }
840    };
841    let certmesh = match handle.certmesh() {
842        Ok(certmesh) => certmesh,
843        Err(err) => {
844            harness.fail("certmesh: setup", &format!("{err}"));
845            handle.shutdown().await?;
846            harness.summary();
847            std::process::exit(1);
848        }
849    };
850
851    let (http_addr, http_cancel) = start_http_server(
852        mdns.core()?,
853        dns.runtime()?,
854        health.core()?,
855        certmesh.core()?,
856        proxy.runtime()?,
857    )
858    .await?;
859    let http_base = format!("http://{}", http_addr);
860    harness.log(format!("http base: {http_base}"));
861
862    // DNS: add entry, lookup, and event.
863    let mut rx = handle.subscribe();
864    let entry = DnsEntry {
865        name: "embedded-test.lan".to_string(),
866        ip: "127.0.0.1".to_string(),
867        ttl: None,
868    };
869    let _ = dns.add_entry(entry)?;
870    let event = wait_for_event(
871        &mut rx,
872        Duration::from_secs(2),
873        |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874    )
875    .await;
876    if event.is_some() {
877        harness.pass("dns: event emitted");
878    } else {
879        harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880    }
881
882    let result = dns
883        .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884        .await;
885    match result {
886        Some(result) => {
887            if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888                harness.pass("dns: lookup static entry");
889            } else {
890                harness.fail("dns: lookup static entry", "unexpected lookup result");
891            }
892        }
893        None => harness.fail("dns: lookup static entry", "lookup returned none"),
894    }
895
896    let names = dns.list_names();
897    if names.iter().any(|name| name == "embedded-test.lan.") {
898        harness.pass("dns: list names includes entry");
899    } else {
900        harness.fail("dns: list names includes entry", "name missing from list");
901    }
902
903    let _ = dns.remove_entry("embedded-test.lan");
904    let removed_event = wait_for_event(
905        &mut rx,
906        Duration::from_secs(2),
907        |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908    )
909    .await;
910    if removed_event.is_some() {
911        harness.pass("dns: remove emits removal event");
912    } else {
913        harness.fail(
914            "dns: remove emits removal event",
915            "no removal event received",
916        );
917    }
918
919    // Health: run a TCP check against a local listener.
920    let mut rx = handle.subscribe();
921    let listener = TcpListener::bind("127.0.0.1:0").await?;
922    let addr = listener.local_addr()?;
923    tokio::spawn(async move {
924        loop {
925            let _ = listener.accept().await;
926        }
927    });
928
929    let check = HealthCheck {
930        name: "tcp-local".to_string(),
931        kind: ServiceCheckKind::Tcp,
932        target: format!("127.0.0.1:{}", addr.port()),
933        interval_secs: 1,
934        timeout_secs: 1,
935    };
936    health.add_check(check).await?;
937    health.core()?.run_checks_once().await;
938    let snapshot = health.status().await;
939    let status = snapshot
940        .services
941        .iter()
942        .find(|svc| svc.name == "tcp-local")
943        .map(|svc| svc.status);
944    match status {
945        Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946        Some(other) => harness.fail(
947            "health: tcp check up",
948            &format!("unexpected status: {other:?}"),
949        ),
950        None => harness.fail("health: tcp check up", "service missing"),
951    }
952
953    let event = wait_for_event(
954        &mut rx,
955        Duration::from_secs(3),
956        |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957    )
958    .await;
959    if event.is_some() {
960        harness.pass("health: event emitted");
961    } else {
962        harness.fail("health: event emitted", "no HealthChanged event received");
963    }
964
965    let _ = health.remove_check("tcp-local").await;
966    let snapshot = health.status().await;
967    if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968        harness.fail("health: remove check", "check still present after removal");
969    } else {
970        harness.pass("health: remove check");
971    }
972
973    // mDNS: register + browse.
974    let browse = mdns.browse("_koi._tcp").await;
975    if let Ok(browse) = browse {
976        let mut txt = HashMap::new();
977        txt.insert("source".to_string(), "embedded".to_string());
978        let payload = RegisterPayload {
979            name: "koi-embedded-test".to_string(),
980            service_type: "_koi._tcp".to_string(),
981            port: 51515,
982            ip: Some("127.0.0.1".to_string()),
983            lease_secs: Some(30),
984            txt,
985        };
986        let reg = mdns.register(payload);
987        if let Ok(reg) = reg {
988            let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989            match found {
990                Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991                Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992                Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993            }
994
995            match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996                Ok(record) if record.port == Some(51515) => {
997                    harness.pass("mdns: resolve registered service");
998                }
999                Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000                Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001            }
1002
1003            match mdns.unregister(&reg.id) {
1004                Ok(()) => harness.pass("mdns: unregister"),
1005                Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006            }
1007        } else {
1008            harness.fail("mdns: register + browse", "register failed");
1009        }
1010    } else {
1011        harness.fail("mdns: register + browse", "browse failed");
1012    }
1013
1014    // Proxy: upsert and read entries.
1015    let mut rx = handle.subscribe();
1016    let entry = ProxyEntry {
1017        name: "embedded-proxy".to_string(),
1018        listen_port: 18080,
1019        backend: "http://127.0.0.1:18081".to_string(),
1020        allow_remote: false,
1021    };
1022    let result = proxy.upsert(entry.clone()).await;
1023    if result.is_ok() {
1024        let entries = proxy.entries().await;
1025        if entries.iter().any(|item| item.name == entry.name) {
1026            harness.pass("proxy: upsert entry");
1027        } else {
1028            harness.fail("proxy: upsert entry", "entry missing after upsert");
1029        }
1030        let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031            matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032        })
1033        .await;
1034        if event.is_some() {
1035            harness.pass("proxy: event emitted");
1036        } else {
1037            harness.fail(
1038                "proxy: event emitted",
1039                "no ProxyEntryUpdated event received",
1040            );
1041        }
1042        let _ = proxy.remove("embedded-proxy").await;
1043        let entries = proxy.entries().await;
1044        if entries.iter().any(|item| item.name == "embedded-proxy") {
1045            harness.fail("proxy: remove entry", "entry still present after removal");
1046        } else {
1047            harness.pass("proxy: remove entry");
1048        }
1049    } else {
1050        harness.fail("proxy: upsert entry", "upsert failed");
1051    }
1052
1053    let client = reqwest::Client::builder()
1054        .timeout(Duration::from_secs(10))
1055        .build()?;
1056    if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057        harness.fail("http: suite", &format!("{err}"));
1058    }
1059
1060    if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061        harness.fail("ipc: suite", &format!("{err}"));
1062    }
1063
1064    http_cancel.cancel();
1065    handle.shutdown().await?;
1066    if tokio::time::Instant::now() > total_deadline {
1067        harness.fail("runtime", "overall timeout exceeded");
1068    }
1069    harness.summary();
1070    if harness.failed > 0 {
1071        std::process::exit(1);
1072    }
1073    Ok(())
1074}
Source

pub fn core(&self) -> Result<Arc<ProxyCore>, KoiError>

Source

pub async fn entries(&self) -> Vec<ProxyEntry>

Examples found in repository?
examples/embedded-integration.rs (line 1024)
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770    let args: Vec<String> = std::env::args().collect();
771    if has_flag(&args, "--help") {
772        println!(
773            "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n  --timeout N   Overall timeout in seconds (default: 30)\n  --verbose     Verbose logging"
774        );
775        return Ok(());
776    }
777
778    let verbose = has_flag(&args, "--verbose");
779    let with_certmesh = true;
780    let skip_mdns = false;
781    let timeout_secs = read_arg_value(&args, "--timeout")
782        .and_then(|value| value.parse::<u64>().ok())
783        .unwrap_or(30);
784    let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786    let data_dir = temp_data_dir();
787    let mut harness = Harness::new(verbose);
788
789    harness.log(format!("data dir: {}", data_dir.display()));
790    harness.log(format!("mdns: {}", !skip_mdns));
791    harness.log(format!("certmesh: {}", with_certmesh));
792
793    let koi = Builder::new()
794        .data_dir(&data_dir)
795        .service_mode(ServiceMode::EmbeddedOnly)
796        .mdns(!skip_mdns)
797        .dns_enabled(true)
798        .dns(|cfg| cfg.port(15353))
799        .health(true)
800        .certmesh(with_certmesh)
801        .proxy(true)
802        .build()?;
803    let handle = koi.start().await?;
804
805    let mdns = match handle.mdns() {
806        Ok(mdns) => mdns,
807        Err(err) => {
808            harness.fail("mdns: setup", &format!("{err}"));
809            handle.shutdown().await?;
810            harness.summary();
811            std::process::exit(1);
812        }
813    };
814    let dns = match handle.dns() {
815        Ok(dns) => dns,
816        Err(err) => {
817            harness.fail("dns: setup", &format!("{err}"));
818            handle.shutdown().await?;
819            harness.summary();
820            std::process::exit(1);
821        }
822    };
823    let health = match handle.health() {
824        Ok(health) => health,
825        Err(err) => {
826            harness.fail("health: setup", &format!("{err}"));
827            handle.shutdown().await?;
828            harness.summary();
829            std::process::exit(1);
830        }
831    };
832    let proxy = match handle.proxy() {
833        Ok(proxy) => proxy,
834        Err(err) => {
835            harness.fail("proxy: setup", &format!("{err}"));
836            handle.shutdown().await?;
837            harness.summary();
838            std::process::exit(1);
839        }
840    };
841    let certmesh = match handle.certmesh() {
842        Ok(certmesh) => certmesh,
843        Err(err) => {
844            harness.fail("certmesh: setup", &format!("{err}"));
845            handle.shutdown().await?;
846            harness.summary();
847            std::process::exit(1);
848        }
849    };
850
851    let (http_addr, http_cancel) = start_http_server(
852        mdns.core()?,
853        dns.runtime()?,
854        health.core()?,
855        certmesh.core()?,
856        proxy.runtime()?,
857    )
858    .await?;
859    let http_base = format!("http://{}", http_addr);
860    harness.log(format!("http base: {http_base}"));
861
862    // DNS: add entry, lookup, and event.
863    let mut rx = handle.subscribe();
864    let entry = DnsEntry {
865        name: "embedded-test.lan".to_string(),
866        ip: "127.0.0.1".to_string(),
867        ttl: None,
868    };
869    let _ = dns.add_entry(entry)?;
870    let event = wait_for_event(
871        &mut rx,
872        Duration::from_secs(2),
873        |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874    )
875    .await;
876    if event.is_some() {
877        harness.pass("dns: event emitted");
878    } else {
879        harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880    }
881
882    let result = dns
883        .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884        .await;
885    match result {
886        Some(result) => {
887            if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888                harness.pass("dns: lookup static entry");
889            } else {
890                harness.fail("dns: lookup static entry", "unexpected lookup result");
891            }
892        }
893        None => harness.fail("dns: lookup static entry", "lookup returned none"),
894    }
895
896    let names = dns.list_names();
897    if names.iter().any(|name| name == "embedded-test.lan.") {
898        harness.pass("dns: list names includes entry");
899    } else {
900        harness.fail("dns: list names includes entry", "name missing from list");
901    }
902
903    let _ = dns.remove_entry("embedded-test.lan");
904    let removed_event = wait_for_event(
905        &mut rx,
906        Duration::from_secs(2),
907        |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908    )
909    .await;
910    if removed_event.is_some() {
911        harness.pass("dns: remove emits removal event");
912    } else {
913        harness.fail(
914            "dns: remove emits removal event",
915            "no removal event received",
916        );
917    }
918
919    // Health: run a TCP check against a local listener.
920    let mut rx = handle.subscribe();
921    let listener = TcpListener::bind("127.0.0.1:0").await?;
922    let addr = listener.local_addr()?;
923    tokio::spawn(async move {
924        loop {
925            let _ = listener.accept().await;
926        }
927    });
928
929    let check = HealthCheck {
930        name: "tcp-local".to_string(),
931        kind: ServiceCheckKind::Tcp,
932        target: format!("127.0.0.1:{}", addr.port()),
933        interval_secs: 1,
934        timeout_secs: 1,
935    };
936    health.add_check(check).await?;
937    health.core()?.run_checks_once().await;
938    let snapshot = health.status().await;
939    let status = snapshot
940        .services
941        .iter()
942        .find(|svc| svc.name == "tcp-local")
943        .map(|svc| svc.status);
944    match status {
945        Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946        Some(other) => harness.fail(
947            "health: tcp check up",
948            &format!("unexpected status: {other:?}"),
949        ),
950        None => harness.fail("health: tcp check up", "service missing"),
951    }
952
953    let event = wait_for_event(
954        &mut rx,
955        Duration::from_secs(3),
956        |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957    )
958    .await;
959    if event.is_some() {
960        harness.pass("health: event emitted");
961    } else {
962        harness.fail("health: event emitted", "no HealthChanged event received");
963    }
964
965    let _ = health.remove_check("tcp-local").await;
966    let snapshot = health.status().await;
967    if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968        harness.fail("health: remove check", "check still present after removal");
969    } else {
970        harness.pass("health: remove check");
971    }
972
973    // mDNS: register + browse.
974    let browse = mdns.browse("_koi._tcp").await;
975    if let Ok(browse) = browse {
976        let mut txt = HashMap::new();
977        txt.insert("source".to_string(), "embedded".to_string());
978        let payload = RegisterPayload {
979            name: "koi-embedded-test".to_string(),
980            service_type: "_koi._tcp".to_string(),
981            port: 51515,
982            ip: Some("127.0.0.1".to_string()),
983            lease_secs: Some(30),
984            txt,
985        };
986        let reg = mdns.register(payload);
987        if let Ok(reg) = reg {
988            let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989            match found {
990                Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991                Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992                Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993            }
994
995            match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996                Ok(record) if record.port == Some(51515) => {
997                    harness.pass("mdns: resolve registered service");
998                }
999                Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000                Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001            }
1002
1003            match mdns.unregister(&reg.id) {
1004                Ok(()) => harness.pass("mdns: unregister"),
1005                Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006            }
1007        } else {
1008            harness.fail("mdns: register + browse", "register failed");
1009        }
1010    } else {
1011        harness.fail("mdns: register + browse", "browse failed");
1012    }
1013
1014    // Proxy: upsert and read entries.
1015    let mut rx = handle.subscribe();
1016    let entry = ProxyEntry {
1017        name: "embedded-proxy".to_string(),
1018        listen_port: 18080,
1019        backend: "http://127.0.0.1:18081".to_string(),
1020        allow_remote: false,
1021    };
1022    let result = proxy.upsert(entry.clone()).await;
1023    if result.is_ok() {
1024        let entries = proxy.entries().await;
1025        if entries.iter().any(|item| item.name == entry.name) {
1026            harness.pass("proxy: upsert entry");
1027        } else {
1028            harness.fail("proxy: upsert entry", "entry missing after upsert");
1029        }
1030        let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031            matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032        })
1033        .await;
1034        if event.is_some() {
1035            harness.pass("proxy: event emitted");
1036        } else {
1037            harness.fail(
1038                "proxy: event emitted",
1039                "no ProxyEntryUpdated event received",
1040            );
1041        }
1042        let _ = proxy.remove("embedded-proxy").await;
1043        let entries = proxy.entries().await;
1044        if entries.iter().any(|item| item.name == "embedded-proxy") {
1045            harness.fail("proxy: remove entry", "entry still present after removal");
1046        } else {
1047            harness.pass("proxy: remove entry");
1048        }
1049    } else {
1050        harness.fail("proxy: upsert entry", "upsert failed");
1051    }
1052
1053    let client = reqwest::Client::builder()
1054        .timeout(Duration::from_secs(10))
1055        .build()?;
1056    if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057        harness.fail("http: suite", &format!("{err}"));
1058    }
1059
1060    if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061        harness.fail("ipc: suite", &format!("{err}"));
1062    }
1063
1064    http_cancel.cancel();
1065    handle.shutdown().await?;
1066    if tokio::time::Instant::now() > total_deadline {
1067        harness.fail("runtime", "overall timeout exceeded");
1068    }
1069    harness.summary();
1070    if harness.failed > 0 {
1071        std::process::exit(1);
1072    }
1073    Ok(())
1074}
Source

pub async fn upsert( &self, entry: ProxyEntry, ) -> Result<Vec<ProxyEntry>, KoiError>

Examples found in repository?
examples/embedded-integration.rs (line 1022)
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770    let args: Vec<String> = std::env::args().collect();
771    if has_flag(&args, "--help") {
772        println!(
773            "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n  --timeout N   Overall timeout in seconds (default: 30)\n  --verbose     Verbose logging"
774        );
775        return Ok(());
776    }
777
778    let verbose = has_flag(&args, "--verbose");
779    let with_certmesh = true;
780    let skip_mdns = false;
781    let timeout_secs = read_arg_value(&args, "--timeout")
782        .and_then(|value| value.parse::<u64>().ok())
783        .unwrap_or(30);
784    let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786    let data_dir = temp_data_dir();
787    let mut harness = Harness::new(verbose);
788
789    harness.log(format!("data dir: {}", data_dir.display()));
790    harness.log(format!("mdns: {}", !skip_mdns));
791    harness.log(format!("certmesh: {}", with_certmesh));
792
793    let koi = Builder::new()
794        .data_dir(&data_dir)
795        .service_mode(ServiceMode::EmbeddedOnly)
796        .mdns(!skip_mdns)
797        .dns_enabled(true)
798        .dns(|cfg| cfg.port(15353))
799        .health(true)
800        .certmesh(with_certmesh)
801        .proxy(true)
802        .build()?;
803    let handle = koi.start().await?;
804
805    let mdns = match handle.mdns() {
806        Ok(mdns) => mdns,
807        Err(err) => {
808            harness.fail("mdns: setup", &format!("{err}"));
809            handle.shutdown().await?;
810            harness.summary();
811            std::process::exit(1);
812        }
813    };
814    let dns = match handle.dns() {
815        Ok(dns) => dns,
816        Err(err) => {
817            harness.fail("dns: setup", &format!("{err}"));
818            handle.shutdown().await?;
819            harness.summary();
820            std::process::exit(1);
821        }
822    };
823    let health = match handle.health() {
824        Ok(health) => health,
825        Err(err) => {
826            harness.fail("health: setup", &format!("{err}"));
827            handle.shutdown().await?;
828            harness.summary();
829            std::process::exit(1);
830        }
831    };
832    let proxy = match handle.proxy() {
833        Ok(proxy) => proxy,
834        Err(err) => {
835            harness.fail("proxy: setup", &format!("{err}"));
836            handle.shutdown().await?;
837            harness.summary();
838            std::process::exit(1);
839        }
840    };
841    let certmesh = match handle.certmesh() {
842        Ok(certmesh) => certmesh,
843        Err(err) => {
844            harness.fail("certmesh: setup", &format!("{err}"));
845            handle.shutdown().await?;
846            harness.summary();
847            std::process::exit(1);
848        }
849    };
850
851    let (http_addr, http_cancel) = start_http_server(
852        mdns.core()?,
853        dns.runtime()?,
854        health.core()?,
855        certmesh.core()?,
856        proxy.runtime()?,
857    )
858    .await?;
859    let http_base = format!("http://{}", http_addr);
860    harness.log(format!("http base: {http_base}"));
861
862    // DNS: add entry, lookup, and event.
863    let mut rx = handle.subscribe();
864    let entry = DnsEntry {
865        name: "embedded-test.lan".to_string(),
866        ip: "127.0.0.1".to_string(),
867        ttl: None,
868    };
869    let _ = dns.add_entry(entry)?;
870    let event = wait_for_event(
871        &mut rx,
872        Duration::from_secs(2),
873        |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874    )
875    .await;
876    if event.is_some() {
877        harness.pass("dns: event emitted");
878    } else {
879        harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880    }
881
882    let result = dns
883        .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884        .await;
885    match result {
886        Some(result) => {
887            if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888                harness.pass("dns: lookup static entry");
889            } else {
890                harness.fail("dns: lookup static entry", "unexpected lookup result");
891            }
892        }
893        None => harness.fail("dns: lookup static entry", "lookup returned none"),
894    }
895
896    let names = dns.list_names();
897    if names.iter().any(|name| name == "embedded-test.lan.") {
898        harness.pass("dns: list names includes entry");
899    } else {
900        harness.fail("dns: list names includes entry", "name missing from list");
901    }
902
903    let _ = dns.remove_entry("embedded-test.lan");
904    let removed_event = wait_for_event(
905        &mut rx,
906        Duration::from_secs(2),
907        |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908    )
909    .await;
910    if removed_event.is_some() {
911        harness.pass("dns: remove emits removal event");
912    } else {
913        harness.fail(
914            "dns: remove emits removal event",
915            "no removal event received",
916        );
917    }
918
919    // Health: run a TCP check against a local listener.
920    let mut rx = handle.subscribe();
921    let listener = TcpListener::bind("127.0.0.1:0").await?;
922    let addr = listener.local_addr()?;
923    tokio::spawn(async move {
924        loop {
925            let _ = listener.accept().await;
926        }
927    });
928
929    let check = HealthCheck {
930        name: "tcp-local".to_string(),
931        kind: ServiceCheckKind::Tcp,
932        target: format!("127.0.0.1:{}", addr.port()),
933        interval_secs: 1,
934        timeout_secs: 1,
935    };
936    health.add_check(check).await?;
937    health.core()?.run_checks_once().await;
938    let snapshot = health.status().await;
939    let status = snapshot
940        .services
941        .iter()
942        .find(|svc| svc.name == "tcp-local")
943        .map(|svc| svc.status);
944    match status {
945        Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946        Some(other) => harness.fail(
947            "health: tcp check up",
948            &format!("unexpected status: {other:?}"),
949        ),
950        None => harness.fail("health: tcp check up", "service missing"),
951    }
952
953    let event = wait_for_event(
954        &mut rx,
955        Duration::from_secs(3),
956        |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957    )
958    .await;
959    if event.is_some() {
960        harness.pass("health: event emitted");
961    } else {
962        harness.fail("health: event emitted", "no HealthChanged event received");
963    }
964
965    let _ = health.remove_check("tcp-local").await;
966    let snapshot = health.status().await;
967    if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968        harness.fail("health: remove check", "check still present after removal");
969    } else {
970        harness.pass("health: remove check");
971    }
972
973    // mDNS: register + browse.
974    let browse = mdns.browse("_koi._tcp").await;
975    if let Ok(browse) = browse {
976        let mut txt = HashMap::new();
977        txt.insert("source".to_string(), "embedded".to_string());
978        let payload = RegisterPayload {
979            name: "koi-embedded-test".to_string(),
980            service_type: "_koi._tcp".to_string(),
981            port: 51515,
982            ip: Some("127.0.0.1".to_string()),
983            lease_secs: Some(30),
984            txt,
985        };
986        let reg = mdns.register(payload);
987        if let Ok(reg) = reg {
988            let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989            match found {
990                Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991                Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992                Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993            }
994
995            match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996                Ok(record) if record.port == Some(51515) => {
997                    harness.pass("mdns: resolve registered service");
998                }
999                Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000                Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001            }
1002
1003            match mdns.unregister(&reg.id) {
1004                Ok(()) => harness.pass("mdns: unregister"),
1005                Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006            }
1007        } else {
1008            harness.fail("mdns: register + browse", "register failed");
1009        }
1010    } else {
1011        harness.fail("mdns: register + browse", "browse failed");
1012    }
1013
1014    // Proxy: upsert and read entries.
1015    let mut rx = handle.subscribe();
1016    let entry = ProxyEntry {
1017        name: "embedded-proxy".to_string(),
1018        listen_port: 18080,
1019        backend: "http://127.0.0.1:18081".to_string(),
1020        allow_remote: false,
1021    };
1022    let result = proxy.upsert(entry.clone()).await;
1023    if result.is_ok() {
1024        let entries = proxy.entries().await;
1025        if entries.iter().any(|item| item.name == entry.name) {
1026            harness.pass("proxy: upsert entry");
1027        } else {
1028            harness.fail("proxy: upsert entry", "entry missing after upsert");
1029        }
1030        let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031            matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032        })
1033        .await;
1034        if event.is_some() {
1035            harness.pass("proxy: event emitted");
1036        } else {
1037            harness.fail(
1038                "proxy: event emitted",
1039                "no ProxyEntryUpdated event received",
1040            );
1041        }
1042        let _ = proxy.remove("embedded-proxy").await;
1043        let entries = proxy.entries().await;
1044        if entries.iter().any(|item| item.name == "embedded-proxy") {
1045            harness.fail("proxy: remove entry", "entry still present after removal");
1046        } else {
1047            harness.pass("proxy: remove entry");
1048        }
1049    } else {
1050        harness.fail("proxy: upsert entry", "upsert failed");
1051    }
1052
1053    let client = reqwest::Client::builder()
1054        .timeout(Duration::from_secs(10))
1055        .build()?;
1056    if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057        harness.fail("http: suite", &format!("{err}"));
1058    }
1059
1060    if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061        harness.fail("ipc: suite", &format!("{err}"));
1062    }
1063
1064    http_cancel.cancel();
1065    handle.shutdown().await?;
1066    if tokio::time::Instant::now() > total_deadline {
1067        harness.fail("runtime", "overall timeout exceeded");
1068    }
1069    harness.summary();
1070    if harness.failed > 0 {
1071        std::process::exit(1);
1072    }
1073    Ok(())
1074}
Source

pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError>

Examples found in repository?
examples/embedded-integration.rs (line 1042)
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770    let args: Vec<String> = std::env::args().collect();
771    if has_flag(&args, "--help") {
772        println!(
773            "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n  --timeout N   Overall timeout in seconds (default: 30)\n  --verbose     Verbose logging"
774        );
775        return Ok(());
776    }
777
778    let verbose = has_flag(&args, "--verbose");
779    let with_certmesh = true;
780    let skip_mdns = false;
781    let timeout_secs = read_arg_value(&args, "--timeout")
782        .and_then(|value| value.parse::<u64>().ok())
783        .unwrap_or(30);
784    let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786    let data_dir = temp_data_dir();
787    let mut harness = Harness::new(verbose);
788
789    harness.log(format!("data dir: {}", data_dir.display()));
790    harness.log(format!("mdns: {}", !skip_mdns));
791    harness.log(format!("certmesh: {}", with_certmesh));
792
793    let koi = Builder::new()
794        .data_dir(&data_dir)
795        .service_mode(ServiceMode::EmbeddedOnly)
796        .mdns(!skip_mdns)
797        .dns_enabled(true)
798        .dns(|cfg| cfg.port(15353))
799        .health(true)
800        .certmesh(with_certmesh)
801        .proxy(true)
802        .build()?;
803    let handle = koi.start().await?;
804
805    let mdns = match handle.mdns() {
806        Ok(mdns) => mdns,
807        Err(err) => {
808            harness.fail("mdns: setup", &format!("{err}"));
809            handle.shutdown().await?;
810            harness.summary();
811            std::process::exit(1);
812        }
813    };
814    let dns = match handle.dns() {
815        Ok(dns) => dns,
816        Err(err) => {
817            harness.fail("dns: setup", &format!("{err}"));
818            handle.shutdown().await?;
819            harness.summary();
820            std::process::exit(1);
821        }
822    };
823    let health = match handle.health() {
824        Ok(health) => health,
825        Err(err) => {
826            harness.fail("health: setup", &format!("{err}"));
827            handle.shutdown().await?;
828            harness.summary();
829            std::process::exit(1);
830        }
831    };
832    let proxy = match handle.proxy() {
833        Ok(proxy) => proxy,
834        Err(err) => {
835            harness.fail("proxy: setup", &format!("{err}"));
836            handle.shutdown().await?;
837            harness.summary();
838            std::process::exit(1);
839        }
840    };
841    let certmesh = match handle.certmesh() {
842        Ok(certmesh) => certmesh,
843        Err(err) => {
844            harness.fail("certmesh: setup", &format!("{err}"));
845            handle.shutdown().await?;
846            harness.summary();
847            std::process::exit(1);
848        }
849    };
850
851    let (http_addr, http_cancel) = start_http_server(
852        mdns.core()?,
853        dns.runtime()?,
854        health.core()?,
855        certmesh.core()?,
856        proxy.runtime()?,
857    )
858    .await?;
859    let http_base = format!("http://{}", http_addr);
860    harness.log(format!("http base: {http_base}"));
861
862    // DNS: add entry, lookup, and event.
863    let mut rx = handle.subscribe();
864    let entry = DnsEntry {
865        name: "embedded-test.lan".to_string(),
866        ip: "127.0.0.1".to_string(),
867        ttl: None,
868    };
869    let _ = dns.add_entry(entry)?;
870    let event = wait_for_event(
871        &mut rx,
872        Duration::from_secs(2),
873        |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874    )
875    .await;
876    if event.is_some() {
877        harness.pass("dns: event emitted");
878    } else {
879        harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880    }
881
882    let result = dns
883        .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884        .await;
885    match result {
886        Some(result) => {
887            if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888                harness.pass("dns: lookup static entry");
889            } else {
890                harness.fail("dns: lookup static entry", "unexpected lookup result");
891            }
892        }
893        None => harness.fail("dns: lookup static entry", "lookup returned none"),
894    }
895
896    let names = dns.list_names();
897    if names.iter().any(|name| name == "embedded-test.lan.") {
898        harness.pass("dns: list names includes entry");
899    } else {
900        harness.fail("dns: list names includes entry", "name missing from list");
901    }
902
903    let _ = dns.remove_entry("embedded-test.lan");
904    let removed_event = wait_for_event(
905        &mut rx,
906        Duration::from_secs(2),
907        |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908    )
909    .await;
910    if removed_event.is_some() {
911        harness.pass("dns: remove emits removal event");
912    } else {
913        harness.fail(
914            "dns: remove emits removal event",
915            "no removal event received",
916        );
917    }
918
919    // Health: run a TCP check against a local listener.
920    let mut rx = handle.subscribe();
921    let listener = TcpListener::bind("127.0.0.1:0").await?;
922    let addr = listener.local_addr()?;
923    tokio::spawn(async move {
924        loop {
925            let _ = listener.accept().await;
926        }
927    });
928
929    let check = HealthCheck {
930        name: "tcp-local".to_string(),
931        kind: ServiceCheckKind::Tcp,
932        target: format!("127.0.0.1:{}", addr.port()),
933        interval_secs: 1,
934        timeout_secs: 1,
935    };
936    health.add_check(check).await?;
937    health.core()?.run_checks_once().await;
938    let snapshot = health.status().await;
939    let status = snapshot
940        .services
941        .iter()
942        .find(|svc| svc.name == "tcp-local")
943        .map(|svc| svc.status);
944    match status {
945        Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946        Some(other) => harness.fail(
947            "health: tcp check up",
948            &format!("unexpected status: {other:?}"),
949        ),
950        None => harness.fail("health: tcp check up", "service missing"),
951    }
952
953    let event = wait_for_event(
954        &mut rx,
955        Duration::from_secs(3),
956        |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957    )
958    .await;
959    if event.is_some() {
960        harness.pass("health: event emitted");
961    } else {
962        harness.fail("health: event emitted", "no HealthChanged event received");
963    }
964
965    let _ = health.remove_check("tcp-local").await;
966    let snapshot = health.status().await;
967    if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968        harness.fail("health: remove check", "check still present after removal");
969    } else {
970        harness.pass("health: remove check");
971    }
972
973    // mDNS: register + browse.
974    let browse = mdns.browse("_koi._tcp").await;
975    if let Ok(browse) = browse {
976        let mut txt = HashMap::new();
977        txt.insert("source".to_string(), "embedded".to_string());
978        let payload = RegisterPayload {
979            name: "koi-embedded-test".to_string(),
980            service_type: "_koi._tcp".to_string(),
981            port: 51515,
982            ip: Some("127.0.0.1".to_string()),
983            lease_secs: Some(30),
984            txt,
985        };
986        let reg = mdns.register(payload);
987        if let Ok(reg) = reg {
988            let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989            match found {
990                Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991                Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992                Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993            }
994
995            match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996                Ok(record) if record.port == Some(51515) => {
997                    harness.pass("mdns: resolve registered service");
998                }
999                Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000                Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001            }
1002
1003            match mdns.unregister(&reg.id) {
1004                Ok(()) => harness.pass("mdns: unregister"),
1005                Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006            }
1007        } else {
1008            harness.fail("mdns: register + browse", "register failed");
1009        }
1010    } else {
1011        harness.fail("mdns: register + browse", "browse failed");
1012    }
1013
1014    // Proxy: upsert and read entries.
1015    let mut rx = handle.subscribe();
1016    let entry = ProxyEntry {
1017        name: "embedded-proxy".to_string(),
1018        listen_port: 18080,
1019        backend: "http://127.0.0.1:18081".to_string(),
1020        allow_remote: false,
1021    };
1022    let result = proxy.upsert(entry.clone()).await;
1023    if result.is_ok() {
1024        let entries = proxy.entries().await;
1025        if entries.iter().any(|item| item.name == entry.name) {
1026            harness.pass("proxy: upsert entry");
1027        } else {
1028            harness.fail("proxy: upsert entry", "entry missing after upsert");
1029        }
1030        let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031            matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032        })
1033        .await;
1034        if event.is_some() {
1035            harness.pass("proxy: event emitted");
1036        } else {
1037            harness.fail(
1038                "proxy: event emitted",
1039                "no ProxyEntryUpdated event received",
1040            );
1041        }
1042        let _ = proxy.remove("embedded-proxy").await;
1043        let entries = proxy.entries().await;
1044        if entries.iter().any(|item| item.name == "embedded-proxy") {
1045            harness.fail("proxy: remove entry", "entry still present after removal");
1046        } else {
1047            harness.pass("proxy: remove entry");
1048        }
1049    } else {
1050        harness.fail("proxy: upsert entry", "upsert failed");
1051    }
1052
1053    let client = reqwest::Client::builder()
1054        .timeout(Duration::from_secs(10))
1055        .build()?;
1056    if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057        harness.fail("http: suite", &format!("{err}"));
1058    }
1059
1060    if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061        harness.fail("ipc: suite", &format!("{err}"));
1062    }
1063
1064    http_cancel.cancel();
1065    handle.shutdown().await?;
1066    if tokio::time::Instant::now() > total_deadline {
1067        harness.fail("runtime", "overall timeout exceeded");
1068    }
1069    harness.summary();
1070    if harness.failed > 0 {
1071        std::process::exit(1);
1072    }
1073    Ok(())
1074}
Source

pub async fn start_all(&self) -> Result<(), KoiError>

Source

pub async fn stop_all(&self)

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more