pub struct ProxyHandle { /* private fields */ }Implementations§
Source§impl ProxyHandle
impl ProxyHandle
Sourcepub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError>
pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError>
Examples found in repository?
examples/embedded-integration.rs (line 856)
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770 let args: Vec<String> = std::env::args().collect();
771 if has_flag(&args, "--help") {
772 println!(
773 "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n --timeout N Overall timeout in seconds (default: 30)\n --verbose Verbose logging"
774 );
775 return Ok(());
776 }
777
778 let verbose = has_flag(&args, "--verbose");
779 let with_certmesh = true;
780 let skip_mdns = false;
781 let timeout_secs = read_arg_value(&args, "--timeout")
782 .and_then(|value| value.parse::<u64>().ok())
783 .unwrap_or(30);
784 let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786 let data_dir = temp_data_dir();
787 let mut harness = Harness::new(verbose);
788
789 harness.log(format!("data dir: {}", data_dir.display()));
790 harness.log(format!("mdns: {}", !skip_mdns));
791 harness.log(format!("certmesh: {}", with_certmesh));
792
793 let koi = Builder::new()
794 .data_dir(&data_dir)
795 .service_mode(ServiceMode::EmbeddedOnly)
796 .mdns(!skip_mdns)
797 .dns_enabled(true)
798 .dns(|cfg| cfg.port(15353))
799 .health(true)
800 .certmesh(with_certmesh)
801 .proxy(true)
802 .build()?;
803 let handle = koi.start().await?;
804
805 let mdns = match handle.mdns() {
806 Ok(mdns) => mdns,
807 Err(err) => {
808 harness.fail("mdns: setup", &format!("{err}"));
809 handle.shutdown().await?;
810 harness.summary();
811 std::process::exit(1);
812 }
813 };
814 let dns = match handle.dns() {
815 Ok(dns) => dns,
816 Err(err) => {
817 harness.fail("dns: setup", &format!("{err}"));
818 handle.shutdown().await?;
819 harness.summary();
820 std::process::exit(1);
821 }
822 };
823 let health = match handle.health() {
824 Ok(health) => health,
825 Err(err) => {
826 harness.fail("health: setup", &format!("{err}"));
827 handle.shutdown().await?;
828 harness.summary();
829 std::process::exit(1);
830 }
831 };
832 let proxy = match handle.proxy() {
833 Ok(proxy) => proxy,
834 Err(err) => {
835 harness.fail("proxy: setup", &format!("{err}"));
836 handle.shutdown().await?;
837 harness.summary();
838 std::process::exit(1);
839 }
840 };
841 let certmesh = match handle.certmesh() {
842 Ok(certmesh) => certmesh,
843 Err(err) => {
844 harness.fail("certmesh: setup", &format!("{err}"));
845 handle.shutdown().await?;
846 harness.summary();
847 std::process::exit(1);
848 }
849 };
850
851 let (http_addr, http_cancel) = start_http_server(
852 mdns.core()?,
853 dns.runtime()?,
854 health.core()?,
855 certmesh.core()?,
856 proxy.runtime()?,
857 )
858 .await?;
859 let http_base = format!("http://{}", http_addr);
860 harness.log(format!("http base: {http_base}"));
861
862 // DNS: add entry, lookup, and event.
863 let mut rx = handle.subscribe();
864 let entry = DnsEntry {
865 name: "embedded-test.lan".to_string(),
866 ip: "127.0.0.1".to_string(),
867 ttl: None,
868 };
869 let _ = dns.add_entry(entry)?;
870 let event = wait_for_event(
871 &mut rx,
872 Duration::from_secs(2),
873 |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874 )
875 .await;
876 if event.is_some() {
877 harness.pass("dns: event emitted");
878 } else {
879 harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880 }
881
882 let result = dns
883 .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884 .await;
885 match result {
886 Some(result) => {
887 if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888 harness.pass("dns: lookup static entry");
889 } else {
890 harness.fail("dns: lookup static entry", "unexpected lookup result");
891 }
892 }
893 None => harness.fail("dns: lookup static entry", "lookup returned none"),
894 }
895
896 let names = dns.list_names();
897 if names.iter().any(|name| name == "embedded-test.lan.") {
898 harness.pass("dns: list names includes entry");
899 } else {
900 harness.fail("dns: list names includes entry", "name missing from list");
901 }
902
903 let _ = dns.remove_entry("embedded-test.lan");
904 let removed_event = wait_for_event(
905 &mut rx,
906 Duration::from_secs(2),
907 |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908 )
909 .await;
910 if removed_event.is_some() {
911 harness.pass("dns: remove emits removal event");
912 } else {
913 harness.fail(
914 "dns: remove emits removal event",
915 "no removal event received",
916 );
917 }
918
919 // Health: run a TCP check against a local listener.
920 let mut rx = handle.subscribe();
921 let listener = TcpListener::bind("127.0.0.1:0").await?;
922 let addr = listener.local_addr()?;
923 tokio::spawn(async move {
924 loop {
925 let _ = listener.accept().await;
926 }
927 });
928
929 let check = HealthCheck {
930 name: "tcp-local".to_string(),
931 kind: ServiceCheckKind::Tcp,
932 target: format!("127.0.0.1:{}", addr.port()),
933 interval_secs: 1,
934 timeout_secs: 1,
935 };
936 health.add_check(check).await?;
937 health.core()?.run_checks_once().await;
938 let snapshot = health.status().await;
939 let status = snapshot
940 .services
941 .iter()
942 .find(|svc| svc.name == "tcp-local")
943 .map(|svc| svc.status);
944 match status {
945 Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946 Some(other) => harness.fail(
947 "health: tcp check up",
948 &format!("unexpected status: {other:?}"),
949 ),
950 None => harness.fail("health: tcp check up", "service missing"),
951 }
952
953 let event = wait_for_event(
954 &mut rx,
955 Duration::from_secs(3),
956 |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957 )
958 .await;
959 if event.is_some() {
960 harness.pass("health: event emitted");
961 } else {
962 harness.fail("health: event emitted", "no HealthChanged event received");
963 }
964
965 let _ = health.remove_check("tcp-local").await;
966 let snapshot = health.status().await;
967 if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968 harness.fail("health: remove check", "check still present after removal");
969 } else {
970 harness.pass("health: remove check");
971 }
972
973 // mDNS: register + browse.
974 let browse = mdns.browse("_koi._tcp").await;
975 if let Ok(browse) = browse {
976 let mut txt = HashMap::new();
977 txt.insert("source".to_string(), "embedded".to_string());
978 let payload = RegisterPayload {
979 name: "koi-embedded-test".to_string(),
980 service_type: "_koi._tcp".to_string(),
981 port: 51515,
982 ip: Some("127.0.0.1".to_string()),
983 lease_secs: Some(30),
984 txt,
985 };
986 let reg = mdns.register(payload);
987 if let Ok(reg) = reg {
988 let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989 match found {
990 Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991 Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992 Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993 }
994
995 match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996 Ok(record) if record.port == Some(51515) => {
997 harness.pass("mdns: resolve registered service");
998 }
999 Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000 Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001 }
1002
1003 match mdns.unregister(®.id) {
1004 Ok(()) => harness.pass("mdns: unregister"),
1005 Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006 }
1007 } else {
1008 harness.fail("mdns: register + browse", "register failed");
1009 }
1010 } else {
1011 harness.fail("mdns: register + browse", "browse failed");
1012 }
1013
1014 // Proxy: upsert and read entries.
1015 let mut rx = handle.subscribe();
1016 let entry = ProxyEntry {
1017 name: "embedded-proxy".to_string(),
1018 listen_port: 18080,
1019 backend: "http://127.0.0.1:18081".to_string(),
1020 allow_remote: false,
1021 };
1022 let result = proxy.upsert(entry.clone()).await;
1023 if result.is_ok() {
1024 let entries = proxy.entries().await;
1025 if entries.iter().any(|item| item.name == entry.name) {
1026 harness.pass("proxy: upsert entry");
1027 } else {
1028 harness.fail("proxy: upsert entry", "entry missing after upsert");
1029 }
1030 let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031 matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032 })
1033 .await;
1034 if event.is_some() {
1035 harness.pass("proxy: event emitted");
1036 } else {
1037 harness.fail(
1038 "proxy: event emitted",
1039 "no ProxyEntryUpdated event received",
1040 );
1041 }
1042 let _ = proxy.remove("embedded-proxy").await;
1043 let entries = proxy.entries().await;
1044 if entries.iter().any(|item| item.name == "embedded-proxy") {
1045 harness.fail("proxy: remove entry", "entry still present after removal");
1046 } else {
1047 harness.pass("proxy: remove entry");
1048 }
1049 } else {
1050 harness.fail("proxy: upsert entry", "upsert failed");
1051 }
1052
1053 let client = reqwest::Client::builder()
1054 .timeout(Duration::from_secs(10))
1055 .build()?;
1056 if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057 harness.fail("http: suite", &format!("{err}"));
1058 }
1059
1060 if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061 harness.fail("ipc: suite", &format!("{err}"));
1062 }
1063
1064 http_cancel.cancel();
1065 handle.shutdown().await?;
1066 if tokio::time::Instant::now() > total_deadline {
1067 harness.fail("runtime", "overall timeout exceeded");
1068 }
1069 harness.summary();
1070 if harness.failed > 0 {
1071 std::process::exit(1);
1072 }
1073 Ok(())
1074}pub fn core(&self) -> Result<Arc<ProxyCore>, KoiError>
Sourcepub async fn entries(&self) -> Vec<ProxyEntry>
pub async fn entries(&self) -> Vec<ProxyEntry>
Examples found in repository?
examples/embedded-integration.rs (line 1024)
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770 let args: Vec<String> = std::env::args().collect();
771 if has_flag(&args, "--help") {
772 println!(
773 "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n --timeout N Overall timeout in seconds (default: 30)\n --verbose Verbose logging"
774 );
775 return Ok(());
776 }
777
778 let verbose = has_flag(&args, "--verbose");
779 let with_certmesh = true;
780 let skip_mdns = false;
781 let timeout_secs = read_arg_value(&args, "--timeout")
782 .and_then(|value| value.parse::<u64>().ok())
783 .unwrap_or(30);
784 let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786 let data_dir = temp_data_dir();
787 let mut harness = Harness::new(verbose);
788
789 harness.log(format!("data dir: {}", data_dir.display()));
790 harness.log(format!("mdns: {}", !skip_mdns));
791 harness.log(format!("certmesh: {}", with_certmesh));
792
793 let koi = Builder::new()
794 .data_dir(&data_dir)
795 .service_mode(ServiceMode::EmbeddedOnly)
796 .mdns(!skip_mdns)
797 .dns_enabled(true)
798 .dns(|cfg| cfg.port(15353))
799 .health(true)
800 .certmesh(with_certmesh)
801 .proxy(true)
802 .build()?;
803 let handle = koi.start().await?;
804
805 let mdns = match handle.mdns() {
806 Ok(mdns) => mdns,
807 Err(err) => {
808 harness.fail("mdns: setup", &format!("{err}"));
809 handle.shutdown().await?;
810 harness.summary();
811 std::process::exit(1);
812 }
813 };
814 let dns = match handle.dns() {
815 Ok(dns) => dns,
816 Err(err) => {
817 harness.fail("dns: setup", &format!("{err}"));
818 handle.shutdown().await?;
819 harness.summary();
820 std::process::exit(1);
821 }
822 };
823 let health = match handle.health() {
824 Ok(health) => health,
825 Err(err) => {
826 harness.fail("health: setup", &format!("{err}"));
827 handle.shutdown().await?;
828 harness.summary();
829 std::process::exit(1);
830 }
831 };
832 let proxy = match handle.proxy() {
833 Ok(proxy) => proxy,
834 Err(err) => {
835 harness.fail("proxy: setup", &format!("{err}"));
836 handle.shutdown().await?;
837 harness.summary();
838 std::process::exit(1);
839 }
840 };
841 let certmesh = match handle.certmesh() {
842 Ok(certmesh) => certmesh,
843 Err(err) => {
844 harness.fail("certmesh: setup", &format!("{err}"));
845 handle.shutdown().await?;
846 harness.summary();
847 std::process::exit(1);
848 }
849 };
850
851 let (http_addr, http_cancel) = start_http_server(
852 mdns.core()?,
853 dns.runtime()?,
854 health.core()?,
855 certmesh.core()?,
856 proxy.runtime()?,
857 )
858 .await?;
859 let http_base = format!("http://{}", http_addr);
860 harness.log(format!("http base: {http_base}"));
861
862 // DNS: add entry, lookup, and event.
863 let mut rx = handle.subscribe();
864 let entry = DnsEntry {
865 name: "embedded-test.lan".to_string(),
866 ip: "127.0.0.1".to_string(),
867 ttl: None,
868 };
869 let _ = dns.add_entry(entry)?;
870 let event = wait_for_event(
871 &mut rx,
872 Duration::from_secs(2),
873 |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874 )
875 .await;
876 if event.is_some() {
877 harness.pass("dns: event emitted");
878 } else {
879 harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880 }
881
882 let result = dns
883 .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884 .await;
885 match result {
886 Some(result) => {
887 if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888 harness.pass("dns: lookup static entry");
889 } else {
890 harness.fail("dns: lookup static entry", "unexpected lookup result");
891 }
892 }
893 None => harness.fail("dns: lookup static entry", "lookup returned none"),
894 }
895
896 let names = dns.list_names();
897 if names.iter().any(|name| name == "embedded-test.lan.") {
898 harness.pass("dns: list names includes entry");
899 } else {
900 harness.fail("dns: list names includes entry", "name missing from list");
901 }
902
903 let _ = dns.remove_entry("embedded-test.lan");
904 let removed_event = wait_for_event(
905 &mut rx,
906 Duration::from_secs(2),
907 |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908 )
909 .await;
910 if removed_event.is_some() {
911 harness.pass("dns: remove emits removal event");
912 } else {
913 harness.fail(
914 "dns: remove emits removal event",
915 "no removal event received",
916 );
917 }
918
919 // Health: run a TCP check against a local listener.
920 let mut rx = handle.subscribe();
921 let listener = TcpListener::bind("127.0.0.1:0").await?;
922 let addr = listener.local_addr()?;
923 tokio::spawn(async move {
924 loop {
925 let _ = listener.accept().await;
926 }
927 });
928
929 let check = HealthCheck {
930 name: "tcp-local".to_string(),
931 kind: ServiceCheckKind::Tcp,
932 target: format!("127.0.0.1:{}", addr.port()),
933 interval_secs: 1,
934 timeout_secs: 1,
935 };
936 health.add_check(check).await?;
937 health.core()?.run_checks_once().await;
938 let snapshot = health.status().await;
939 let status = snapshot
940 .services
941 .iter()
942 .find(|svc| svc.name == "tcp-local")
943 .map(|svc| svc.status);
944 match status {
945 Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946 Some(other) => harness.fail(
947 "health: tcp check up",
948 &format!("unexpected status: {other:?}"),
949 ),
950 None => harness.fail("health: tcp check up", "service missing"),
951 }
952
953 let event = wait_for_event(
954 &mut rx,
955 Duration::from_secs(3),
956 |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957 )
958 .await;
959 if event.is_some() {
960 harness.pass("health: event emitted");
961 } else {
962 harness.fail("health: event emitted", "no HealthChanged event received");
963 }
964
965 let _ = health.remove_check("tcp-local").await;
966 let snapshot = health.status().await;
967 if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968 harness.fail("health: remove check", "check still present after removal");
969 } else {
970 harness.pass("health: remove check");
971 }
972
973 // mDNS: register + browse.
974 let browse = mdns.browse("_koi._tcp").await;
975 if let Ok(browse) = browse {
976 let mut txt = HashMap::new();
977 txt.insert("source".to_string(), "embedded".to_string());
978 let payload = RegisterPayload {
979 name: "koi-embedded-test".to_string(),
980 service_type: "_koi._tcp".to_string(),
981 port: 51515,
982 ip: Some("127.0.0.1".to_string()),
983 lease_secs: Some(30),
984 txt,
985 };
986 let reg = mdns.register(payload);
987 if let Ok(reg) = reg {
988 let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989 match found {
990 Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991 Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992 Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993 }
994
995 match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996 Ok(record) if record.port == Some(51515) => {
997 harness.pass("mdns: resolve registered service");
998 }
999 Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000 Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001 }
1002
1003 match mdns.unregister(®.id) {
1004 Ok(()) => harness.pass("mdns: unregister"),
1005 Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006 }
1007 } else {
1008 harness.fail("mdns: register + browse", "register failed");
1009 }
1010 } else {
1011 harness.fail("mdns: register + browse", "browse failed");
1012 }
1013
1014 // Proxy: upsert and read entries.
1015 let mut rx = handle.subscribe();
1016 let entry = ProxyEntry {
1017 name: "embedded-proxy".to_string(),
1018 listen_port: 18080,
1019 backend: "http://127.0.0.1:18081".to_string(),
1020 allow_remote: false,
1021 };
1022 let result = proxy.upsert(entry.clone()).await;
1023 if result.is_ok() {
1024 let entries = proxy.entries().await;
1025 if entries.iter().any(|item| item.name == entry.name) {
1026 harness.pass("proxy: upsert entry");
1027 } else {
1028 harness.fail("proxy: upsert entry", "entry missing after upsert");
1029 }
1030 let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031 matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032 })
1033 .await;
1034 if event.is_some() {
1035 harness.pass("proxy: event emitted");
1036 } else {
1037 harness.fail(
1038 "proxy: event emitted",
1039 "no ProxyEntryUpdated event received",
1040 );
1041 }
1042 let _ = proxy.remove("embedded-proxy").await;
1043 let entries = proxy.entries().await;
1044 if entries.iter().any(|item| item.name == "embedded-proxy") {
1045 harness.fail("proxy: remove entry", "entry still present after removal");
1046 } else {
1047 harness.pass("proxy: remove entry");
1048 }
1049 } else {
1050 harness.fail("proxy: upsert entry", "upsert failed");
1051 }
1052
1053 let client = reqwest::Client::builder()
1054 .timeout(Duration::from_secs(10))
1055 .build()?;
1056 if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057 harness.fail("http: suite", &format!("{err}"));
1058 }
1059
1060 if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061 harness.fail("ipc: suite", &format!("{err}"));
1062 }
1063
1064 http_cancel.cancel();
1065 handle.shutdown().await?;
1066 if tokio::time::Instant::now() > total_deadline {
1067 harness.fail("runtime", "overall timeout exceeded");
1068 }
1069 harness.summary();
1070 if harness.failed > 0 {
1071 std::process::exit(1);
1072 }
1073 Ok(())
1074}Sourcepub async fn upsert(
&self,
entry: ProxyEntry,
) -> Result<Vec<ProxyEntry>, KoiError>
pub async fn upsert( &self, entry: ProxyEntry, ) -> Result<Vec<ProxyEntry>, KoiError>
Examples found in repository?
examples/embedded-integration.rs (line 1022)
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770 let args: Vec<String> = std::env::args().collect();
771 if has_flag(&args, "--help") {
772 println!(
773 "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n --timeout N Overall timeout in seconds (default: 30)\n --verbose Verbose logging"
774 );
775 return Ok(());
776 }
777
778 let verbose = has_flag(&args, "--verbose");
779 let with_certmesh = true;
780 let skip_mdns = false;
781 let timeout_secs = read_arg_value(&args, "--timeout")
782 .and_then(|value| value.parse::<u64>().ok())
783 .unwrap_or(30);
784 let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786 let data_dir = temp_data_dir();
787 let mut harness = Harness::new(verbose);
788
789 harness.log(format!("data dir: {}", data_dir.display()));
790 harness.log(format!("mdns: {}", !skip_mdns));
791 harness.log(format!("certmesh: {}", with_certmesh));
792
793 let koi = Builder::new()
794 .data_dir(&data_dir)
795 .service_mode(ServiceMode::EmbeddedOnly)
796 .mdns(!skip_mdns)
797 .dns_enabled(true)
798 .dns(|cfg| cfg.port(15353))
799 .health(true)
800 .certmesh(with_certmesh)
801 .proxy(true)
802 .build()?;
803 let handle = koi.start().await?;
804
805 let mdns = match handle.mdns() {
806 Ok(mdns) => mdns,
807 Err(err) => {
808 harness.fail("mdns: setup", &format!("{err}"));
809 handle.shutdown().await?;
810 harness.summary();
811 std::process::exit(1);
812 }
813 };
814 let dns = match handle.dns() {
815 Ok(dns) => dns,
816 Err(err) => {
817 harness.fail("dns: setup", &format!("{err}"));
818 handle.shutdown().await?;
819 harness.summary();
820 std::process::exit(1);
821 }
822 };
823 let health = match handle.health() {
824 Ok(health) => health,
825 Err(err) => {
826 harness.fail("health: setup", &format!("{err}"));
827 handle.shutdown().await?;
828 harness.summary();
829 std::process::exit(1);
830 }
831 };
832 let proxy = match handle.proxy() {
833 Ok(proxy) => proxy,
834 Err(err) => {
835 harness.fail("proxy: setup", &format!("{err}"));
836 handle.shutdown().await?;
837 harness.summary();
838 std::process::exit(1);
839 }
840 };
841 let certmesh = match handle.certmesh() {
842 Ok(certmesh) => certmesh,
843 Err(err) => {
844 harness.fail("certmesh: setup", &format!("{err}"));
845 handle.shutdown().await?;
846 harness.summary();
847 std::process::exit(1);
848 }
849 };
850
851 let (http_addr, http_cancel) = start_http_server(
852 mdns.core()?,
853 dns.runtime()?,
854 health.core()?,
855 certmesh.core()?,
856 proxy.runtime()?,
857 )
858 .await?;
859 let http_base = format!("http://{}", http_addr);
860 harness.log(format!("http base: {http_base}"));
861
862 // DNS: add entry, lookup, and event.
863 let mut rx = handle.subscribe();
864 let entry = DnsEntry {
865 name: "embedded-test.lan".to_string(),
866 ip: "127.0.0.1".to_string(),
867 ttl: None,
868 };
869 let _ = dns.add_entry(entry)?;
870 let event = wait_for_event(
871 &mut rx,
872 Duration::from_secs(2),
873 |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874 )
875 .await;
876 if event.is_some() {
877 harness.pass("dns: event emitted");
878 } else {
879 harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880 }
881
882 let result = dns
883 .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884 .await;
885 match result {
886 Some(result) => {
887 if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888 harness.pass("dns: lookup static entry");
889 } else {
890 harness.fail("dns: lookup static entry", "unexpected lookup result");
891 }
892 }
893 None => harness.fail("dns: lookup static entry", "lookup returned none"),
894 }
895
896 let names = dns.list_names();
897 if names.iter().any(|name| name == "embedded-test.lan.") {
898 harness.pass("dns: list names includes entry");
899 } else {
900 harness.fail("dns: list names includes entry", "name missing from list");
901 }
902
903 let _ = dns.remove_entry("embedded-test.lan");
904 let removed_event = wait_for_event(
905 &mut rx,
906 Duration::from_secs(2),
907 |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908 )
909 .await;
910 if removed_event.is_some() {
911 harness.pass("dns: remove emits removal event");
912 } else {
913 harness.fail(
914 "dns: remove emits removal event",
915 "no removal event received",
916 );
917 }
918
919 // Health: run a TCP check against a local listener.
920 let mut rx = handle.subscribe();
921 let listener = TcpListener::bind("127.0.0.1:0").await?;
922 let addr = listener.local_addr()?;
923 tokio::spawn(async move {
924 loop {
925 let _ = listener.accept().await;
926 }
927 });
928
929 let check = HealthCheck {
930 name: "tcp-local".to_string(),
931 kind: ServiceCheckKind::Tcp,
932 target: format!("127.0.0.1:{}", addr.port()),
933 interval_secs: 1,
934 timeout_secs: 1,
935 };
936 health.add_check(check).await?;
937 health.core()?.run_checks_once().await;
938 let snapshot = health.status().await;
939 let status = snapshot
940 .services
941 .iter()
942 .find(|svc| svc.name == "tcp-local")
943 .map(|svc| svc.status);
944 match status {
945 Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946 Some(other) => harness.fail(
947 "health: tcp check up",
948 &format!("unexpected status: {other:?}"),
949 ),
950 None => harness.fail("health: tcp check up", "service missing"),
951 }
952
953 let event = wait_for_event(
954 &mut rx,
955 Duration::from_secs(3),
956 |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957 )
958 .await;
959 if event.is_some() {
960 harness.pass("health: event emitted");
961 } else {
962 harness.fail("health: event emitted", "no HealthChanged event received");
963 }
964
965 let _ = health.remove_check("tcp-local").await;
966 let snapshot = health.status().await;
967 if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968 harness.fail("health: remove check", "check still present after removal");
969 } else {
970 harness.pass("health: remove check");
971 }
972
973 // mDNS: register + browse.
974 let browse = mdns.browse("_koi._tcp").await;
975 if let Ok(browse) = browse {
976 let mut txt = HashMap::new();
977 txt.insert("source".to_string(), "embedded".to_string());
978 let payload = RegisterPayload {
979 name: "koi-embedded-test".to_string(),
980 service_type: "_koi._tcp".to_string(),
981 port: 51515,
982 ip: Some("127.0.0.1".to_string()),
983 lease_secs: Some(30),
984 txt,
985 };
986 let reg = mdns.register(payload);
987 if let Ok(reg) = reg {
988 let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989 match found {
990 Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991 Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992 Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993 }
994
995 match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996 Ok(record) if record.port == Some(51515) => {
997 harness.pass("mdns: resolve registered service");
998 }
999 Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000 Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001 }
1002
1003 match mdns.unregister(®.id) {
1004 Ok(()) => harness.pass("mdns: unregister"),
1005 Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006 }
1007 } else {
1008 harness.fail("mdns: register + browse", "register failed");
1009 }
1010 } else {
1011 harness.fail("mdns: register + browse", "browse failed");
1012 }
1013
1014 // Proxy: upsert and read entries.
1015 let mut rx = handle.subscribe();
1016 let entry = ProxyEntry {
1017 name: "embedded-proxy".to_string(),
1018 listen_port: 18080,
1019 backend: "http://127.0.0.1:18081".to_string(),
1020 allow_remote: false,
1021 };
1022 let result = proxy.upsert(entry.clone()).await;
1023 if result.is_ok() {
1024 let entries = proxy.entries().await;
1025 if entries.iter().any(|item| item.name == entry.name) {
1026 harness.pass("proxy: upsert entry");
1027 } else {
1028 harness.fail("proxy: upsert entry", "entry missing after upsert");
1029 }
1030 let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031 matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032 })
1033 .await;
1034 if event.is_some() {
1035 harness.pass("proxy: event emitted");
1036 } else {
1037 harness.fail(
1038 "proxy: event emitted",
1039 "no ProxyEntryUpdated event received",
1040 );
1041 }
1042 let _ = proxy.remove("embedded-proxy").await;
1043 let entries = proxy.entries().await;
1044 if entries.iter().any(|item| item.name == "embedded-proxy") {
1045 harness.fail("proxy: remove entry", "entry still present after removal");
1046 } else {
1047 harness.pass("proxy: remove entry");
1048 }
1049 } else {
1050 harness.fail("proxy: upsert entry", "upsert failed");
1051 }
1052
1053 let client = reqwest::Client::builder()
1054 .timeout(Duration::from_secs(10))
1055 .build()?;
1056 if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057 harness.fail("http: suite", &format!("{err}"));
1058 }
1059
1060 if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061 harness.fail("ipc: suite", &format!("{err}"));
1062 }
1063
1064 http_cancel.cancel();
1065 handle.shutdown().await?;
1066 if tokio::time::Instant::now() > total_deadline {
1067 harness.fail("runtime", "overall timeout exceeded");
1068 }
1069 harness.summary();
1070 if harness.failed > 0 {
1071 std::process::exit(1);
1072 }
1073 Ok(())
1074}Sourcepub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError>
pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError>
Examples found in repository?
examples/embedded-integration.rs (line 1042)
769async fn main() -> Result<(), Box<dyn std::error::Error>> {
770 let args: Vec<String> = std::env::args().collect();
771 if has_flag(&args, "--help") {
772 println!(
773 "Usage: cargo run -p koi-embedded --example embedded-integration -- [options]\n\nOptions:\n --timeout N Overall timeout in seconds (default: 30)\n --verbose Verbose logging"
774 );
775 return Ok(());
776 }
777
778 let verbose = has_flag(&args, "--verbose");
779 let with_certmesh = true;
780 let skip_mdns = false;
781 let timeout_secs = read_arg_value(&args, "--timeout")
782 .and_then(|value| value.parse::<u64>().ok())
783 .unwrap_or(30);
784 let total_deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs);
785
786 let data_dir = temp_data_dir();
787 let mut harness = Harness::new(verbose);
788
789 harness.log(format!("data dir: {}", data_dir.display()));
790 harness.log(format!("mdns: {}", !skip_mdns));
791 harness.log(format!("certmesh: {}", with_certmesh));
792
793 let koi = Builder::new()
794 .data_dir(&data_dir)
795 .service_mode(ServiceMode::EmbeddedOnly)
796 .mdns(!skip_mdns)
797 .dns_enabled(true)
798 .dns(|cfg| cfg.port(15353))
799 .health(true)
800 .certmesh(with_certmesh)
801 .proxy(true)
802 .build()?;
803 let handle = koi.start().await?;
804
805 let mdns = match handle.mdns() {
806 Ok(mdns) => mdns,
807 Err(err) => {
808 harness.fail("mdns: setup", &format!("{err}"));
809 handle.shutdown().await?;
810 harness.summary();
811 std::process::exit(1);
812 }
813 };
814 let dns = match handle.dns() {
815 Ok(dns) => dns,
816 Err(err) => {
817 harness.fail("dns: setup", &format!("{err}"));
818 handle.shutdown().await?;
819 harness.summary();
820 std::process::exit(1);
821 }
822 };
823 let health = match handle.health() {
824 Ok(health) => health,
825 Err(err) => {
826 harness.fail("health: setup", &format!("{err}"));
827 handle.shutdown().await?;
828 harness.summary();
829 std::process::exit(1);
830 }
831 };
832 let proxy = match handle.proxy() {
833 Ok(proxy) => proxy,
834 Err(err) => {
835 harness.fail("proxy: setup", &format!("{err}"));
836 handle.shutdown().await?;
837 harness.summary();
838 std::process::exit(1);
839 }
840 };
841 let certmesh = match handle.certmesh() {
842 Ok(certmesh) => certmesh,
843 Err(err) => {
844 harness.fail("certmesh: setup", &format!("{err}"));
845 handle.shutdown().await?;
846 harness.summary();
847 std::process::exit(1);
848 }
849 };
850
851 let (http_addr, http_cancel) = start_http_server(
852 mdns.core()?,
853 dns.runtime()?,
854 health.core()?,
855 certmesh.core()?,
856 proxy.runtime()?,
857 )
858 .await?;
859 let http_base = format!("http://{}", http_addr);
860 harness.log(format!("http base: {http_base}"));
861
862 // DNS: add entry, lookup, and event.
863 let mut rx = handle.subscribe();
864 let entry = DnsEntry {
865 name: "embedded-test.lan".to_string(),
866 ip: "127.0.0.1".to_string(),
867 ttl: None,
868 };
869 let _ = dns.add_entry(entry)?;
870 let event = wait_for_event(
871 &mut rx,
872 Duration::from_secs(2),
873 |event| matches!(event, KoiEvent::DnsEntryUpdated { name, .. } if name == "embedded-test.lan"),
874 )
875 .await;
876 if event.is_some() {
877 harness.pass("dns: event emitted");
878 } else {
879 harness.fail("dns: event emitted", "no DnsEntryUpdated event received");
880 }
881
882 let result = dns
883 .lookup("embedded-test.lan", hickory_proto::rr::RecordType::A)
884 .await;
885 match result {
886 Some(result) => {
887 if result.ips.contains(&IpAddr::from([127, 0, 0, 1])) && result.source == "static" {
888 harness.pass("dns: lookup static entry");
889 } else {
890 harness.fail("dns: lookup static entry", "unexpected lookup result");
891 }
892 }
893 None => harness.fail("dns: lookup static entry", "lookup returned none"),
894 }
895
896 let names = dns.list_names();
897 if names.iter().any(|name| name == "embedded-test.lan.") {
898 harness.pass("dns: list names includes entry");
899 } else {
900 harness.fail("dns: list names includes entry", "name missing from list");
901 }
902
903 let _ = dns.remove_entry("embedded-test.lan");
904 let removed_event = wait_for_event(
905 &mut rx,
906 Duration::from_secs(2),
907 |event| matches!(event, KoiEvent::DnsEntryRemoved { name } if name == "embedded-test.lan"),
908 )
909 .await;
910 if removed_event.is_some() {
911 harness.pass("dns: remove emits removal event");
912 } else {
913 harness.fail(
914 "dns: remove emits removal event",
915 "no removal event received",
916 );
917 }
918
919 // Health: run a TCP check against a local listener.
920 let mut rx = handle.subscribe();
921 let listener = TcpListener::bind("127.0.0.1:0").await?;
922 let addr = listener.local_addr()?;
923 tokio::spawn(async move {
924 loop {
925 let _ = listener.accept().await;
926 }
927 });
928
929 let check = HealthCheck {
930 name: "tcp-local".to_string(),
931 kind: ServiceCheckKind::Tcp,
932 target: format!("127.0.0.1:{}", addr.port()),
933 interval_secs: 1,
934 timeout_secs: 1,
935 };
936 health.add_check(check).await?;
937 health.core()?.run_checks_once().await;
938 let snapshot = health.status().await;
939 let status = snapshot
940 .services
941 .iter()
942 .find(|svc| svc.name == "tcp-local")
943 .map(|svc| svc.status);
944 match status {
945 Some(ServiceStatus::Up) => harness.pass("health: tcp check up"),
946 Some(other) => harness.fail(
947 "health: tcp check up",
948 &format!("unexpected status: {other:?}"),
949 ),
950 None => harness.fail("health: tcp check up", "service missing"),
951 }
952
953 let event = wait_for_event(
954 &mut rx,
955 Duration::from_secs(3),
956 |event| matches!(event, KoiEvent::HealthChanged { name, .. } if name == "tcp-local"),
957 )
958 .await;
959 if event.is_some() {
960 harness.pass("health: event emitted");
961 } else {
962 harness.fail("health: event emitted", "no HealthChanged event received");
963 }
964
965 let _ = health.remove_check("tcp-local").await;
966 let snapshot = health.status().await;
967 if snapshot.services.iter().any(|svc| svc.name == "tcp-local") {
968 harness.fail("health: remove check", "check still present after removal");
969 } else {
970 harness.pass("health: remove check");
971 }
972
973 // mDNS: register + browse.
974 let browse = mdns.browse("_koi._tcp").await;
975 if let Ok(browse) = browse {
976 let mut txt = HashMap::new();
977 txt.insert("source".to_string(), "embedded".to_string());
978 let payload = RegisterPayload {
979 name: "koi-embedded-test".to_string(),
980 service_type: "_koi._tcp".to_string(),
981 port: 51515,
982 ip: Some("127.0.0.1".to_string()),
983 lease_secs: Some(30),
984 txt,
985 };
986 let reg = mdns.register(payload);
987 if let Ok(reg) = reg {
988 let found = tokio::time::timeout(Duration::from_secs(5), browse.recv()).await;
989 match found {
990 Ok(Some(_event)) => harness.pass("mdns: register + browse"),
991 Ok(None) => harness.fail("mdns: register + browse", "browse stream ended"),
992 Err(_) => harness.fail("mdns: register + browse", "no events within timeout"),
993 }
994
995 match mdns.resolve("koi-embedded-test._koi._tcp.local.").await {
996 Ok(record) if record.port == Some(51515) => {
997 harness.pass("mdns: resolve registered service");
998 }
999 Ok(_) => harness.fail("mdns: resolve registered service", "unexpected record"),
1000 Err(err) => harness.fail("mdns: resolve registered service", &format!("{err}")),
1001 }
1002
1003 match mdns.unregister(®.id) {
1004 Ok(()) => harness.pass("mdns: unregister"),
1005 Err(err) => harness.fail("mdns: unregister", &format!("{err}")),
1006 }
1007 } else {
1008 harness.fail("mdns: register + browse", "register failed");
1009 }
1010 } else {
1011 harness.fail("mdns: register + browse", "browse failed");
1012 }
1013
1014 // Proxy: upsert and read entries.
1015 let mut rx = handle.subscribe();
1016 let entry = ProxyEntry {
1017 name: "embedded-proxy".to_string(),
1018 listen_port: 18080,
1019 backend: "http://127.0.0.1:18081".to_string(),
1020 allow_remote: false,
1021 };
1022 let result = proxy.upsert(entry.clone()).await;
1023 if result.is_ok() {
1024 let entries = proxy.entries().await;
1025 if entries.iter().any(|item| item.name == entry.name) {
1026 harness.pass("proxy: upsert entry");
1027 } else {
1028 harness.fail("proxy: upsert entry", "entry missing after upsert");
1029 }
1030 let event = wait_for_event(&mut rx, Duration::from_secs(3), |event| {
1031 matches!(event, KoiEvent::ProxyEntryUpdated { entry } if entry.name == "embedded-proxy")
1032 })
1033 .await;
1034 if event.is_some() {
1035 harness.pass("proxy: event emitted");
1036 } else {
1037 harness.fail(
1038 "proxy: event emitted",
1039 "no ProxyEntryUpdated event received",
1040 );
1041 }
1042 let _ = proxy.remove("embedded-proxy").await;
1043 let entries = proxy.entries().await;
1044 if entries.iter().any(|item| item.name == "embedded-proxy") {
1045 harness.fail("proxy: remove entry", "entry still present after removal");
1046 } else {
1047 harness.pass("proxy: remove entry");
1048 }
1049 } else {
1050 harness.fail("proxy: upsert entry", "upsert failed");
1051 }
1052
1053 let client = reqwest::Client::builder()
1054 .timeout(Duration::from_secs(10))
1055 .build()?;
1056 if let Err(err) = run_http_tests(&http_base, &client, &mut harness).await {
1057 harness.fail("http: suite", &format!("{err}"));
1058 }
1059
1060 if let Err(err) = run_ipc_tests(mdns.core()?, &mut harness).await {
1061 harness.fail("ipc: suite", &format!("{err}"));
1062 }
1063
1064 http_cancel.cancel();
1065 handle.shutdown().await?;
1066 if tokio::time::Instant::now() > total_deadline {
1067 harness.fail("runtime", "overall timeout exceeded");
1068 }
1069 harness.summary();
1070 if harness.failed > 0 {
1071 std::process::exit(1);
1072 }
1073 Ok(())
1074}pub async fn start_all(&self) -> Result<(), KoiError>
pub async fn stop_all(&self)
Auto Trait Implementations§
impl Freeze for ProxyHandle
impl !RefUnwindSafe for ProxyHandle
impl Send for ProxyHandle
impl Sync for ProxyHandle
impl Unpin for ProxyHandle
impl UnsafeUnpin for ProxyHandle
impl !UnwindSafe for ProxyHandle
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more