1use crate::error::Result;
11use std::collections::{HashMap, HashSet};
12use std::net::{IpAddr, Ipv4Addr, SocketAddr};
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18use zlayer_proxy::{
19 endpoint_lb_key, load_existing_certs_into_resolver, tls_acceptor_from_resolver, Activator,
20 CertManager, LbStrategy, LoadBalancer, NetworkPolicyChecker, ProxyConfig, ProxyServer,
21 RouteEntry, RpsRegistry, ServiceRegistry, SniCertResolver, StreamHealthProbe,
22 StreamProxyConfig, StreamRegistry, StreamService, TcpStreamService, UdpStreamService,
23};
24use zlayer_scheduler::scalers::RpsProvider;
25use zlayer_spec::{
26 ExposeType, PortMapping, PortProtocol, Protocol, ServiceSpec, StreamEndpointConfig,
27 StreamHealthCheck,
28};
29
30const DEFAULT_ACTIVATION_FLOOR: u32 = 1;
33
34fn bind_v6_only(port: u16) -> std::io::Result<tokio::net::TcpListener> {
42 use socket2::{Domain, Protocol, Socket, Type};
43 let sock = Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))?;
44 sock.set_only_v6(true)?;
45 sock.set_reuse_address(true)?;
46 sock.set_nonblocking(true)?;
47 let addr: std::net::SocketAddr = (std::net::Ipv6Addr::UNSPECIFIED, port).into();
48 sock.bind(&addr.into())?;
49 sock.listen(1024)?;
50 tokio::net::TcpListener::from_std(sock.into())
51}
52
53fn translate_stream_config(stream: Option<&StreamEndpointConfig>) -> StreamProxyConfig {
65 let Some(stream) = stream else {
66 return StreamProxyConfig::default();
67 };
68
69 let health_check = stream.health_check.as_ref().map(|hc| match hc {
70 StreamHealthCheck::TcpConnect => StreamHealthProbe::TcpConnect,
71 StreamHealthCheck::UdpProbe { request, expect } => StreamHealthProbe::UdpProbe {
72 request: unescape_hex(request),
73 expect: expect.as_deref().map(unescape_hex),
74 },
75 });
76
77 StreamProxyConfig {
78 tls: stream.tls,
79 proxy_protocol: stream.proxy_protocol,
80 session_timeout: stream.session_timeout_duration(),
81 health_check,
82 }
83}
84
85fn unescape_hex(s: &str) -> Vec<u8> {
89 let bytes = s.as_bytes();
90 let mut out = Vec::with_capacity(bytes.len());
91 let mut i = 0;
92 while i < bytes.len() {
93 if bytes[i] == b'\\' && i + 1 < bytes.len() {
94 match bytes[i + 1] {
95 b'x' | b'X' if i + 3 < bytes.len() => {
96 let hi = (bytes[i + 2] as char).to_digit(16);
97 let lo = (bytes[i + 3] as char).to_digit(16);
98 if let (Some(hi), Some(lo)) = (hi, lo) {
99 out.push(u8::try_from((hi << 4) | lo).unwrap_or(0));
102 i += 4;
103 continue;
104 }
105 out.push(b'\\');
107 i += 1;
108 }
109 b'\\' => {
110 out.push(b'\\');
111 i += 2;
112 }
113 _ => {
114 out.push(b'\\');
115 i += 1;
116 }
117 }
118 } else {
119 out.push(bytes[i]);
120 i += 1;
121 }
122 }
123 out
124}
125
126const ACTIVATOR_READY_DEADLINE: Duration = Duration::from_secs(30);
130
131const ACTIVATOR_POLL_STEP: Duration = Duration::from_millis(200);
133
134#[async_trait::async_trait]
145pub trait ScaleTrigger: Send + Sync {
146 async fn scale_to(&self, service: &str, replicas: u32) -> std::result::Result<(), String>;
152}
153
154pub struct ServiceActivator {
163 trigger: Arc<dyn ScaleTrigger>,
165 load_balancer: Arc<LoadBalancer>,
167 floor: u32,
169}
170
171impl ServiceActivator {
172 #[must_use]
175 pub fn new(trigger: Arc<dyn ScaleTrigger>, load_balancer: Arc<LoadBalancer>) -> Self {
176 Self {
177 trigger,
178 load_balancer,
179 floor: DEFAULT_ACTIVATION_FLOOR,
180 }
181 }
182
183 #[must_use]
187 pub fn with_floor(mut self, floor: u32) -> Self {
188 self.floor = floor.max(1);
189 self
190 }
191
192 fn service_name_from_key(key: &str) -> &str {
199 let without_endpoint = key.split('#').next().unwrap_or(key);
200 without_endpoint
201 .rsplit('/')
202 .next()
203 .unwrap_or(without_endpoint)
204 }
205}
206
207#[async_trait::async_trait]
208impl Activator for ServiceActivator {
209 async fn activate(&self, service: &str) -> std::result::Result<(), String> {
210 let bare = Self::service_name_from_key(service);
211 info!(
212 lb_key = %service,
213 service = %bare,
214 floor = self.floor,
215 "Activating scaled-to-zero service on demand"
216 );
217
218 self.trigger.scale_to(bare, self.floor).await?;
221
222 let deadline = Instant::now() + ACTIVATOR_READY_DEADLINE;
224 loop {
225 if self.load_balancer.select(service).is_some() {
226 return Ok(());
227 }
228 if Instant::now() >= deadline {
229 return Err(format!(
230 "service '{bare}' did not become ready within {ACTIVATOR_READY_DEADLINE:?} after scale-up"
231 ));
232 }
233 tokio::time::sleep(ACTIVATOR_POLL_STEP).await;
234 }
235 }
236}
237
238#[derive(Debug, Clone)]
245pub struct RpsRegistryProvider {
246 registry: Arc<RpsRegistry>,
248}
249
250impl RpsRegistryProvider {
251 #[must_use]
253 pub fn new(registry: Arc<RpsRegistry>) -> Self {
254 Self { registry }
255 }
256}
257
258#[async_trait::async_trait]
259impl RpsProvider for RpsRegistryProvider {
260 async fn rps(&self, service: &str) -> f64 {
261 self.registry.rps(service).await
262 }
263}
264
265pub const DEFAULT_INGRESS_HTTP_PORT: u16 = 80;
269
270pub const DEFAULT_INGRESS_HTTPS_PORT: u16 = 443;
272
273#[derive(Debug, Clone)]
275pub struct ProxyManagerConfig {
276 pub http_addr: SocketAddr,
278 pub https_addr: Option<SocketAddr>,
280 pub http2_enabled: bool,
282}
283
284impl Default for ProxyManagerConfig {
285 fn default() -> Self {
286 Self {
287 http_addr: "0.0.0.0:80".parse().unwrap(),
288 https_addr: None,
289 http2_enabled: true,
290 }
291 }
292}
293
294impl ProxyManagerConfig {
295 #[must_use]
297 pub fn new(http_addr: SocketAddr) -> Self {
298 Self {
299 http_addr,
300 https_addr: None,
301 http2_enabled: true,
302 }
303 }
304
305 #[must_use]
307 pub fn with_https(mut self, addr: SocketAddr) -> Self {
308 self.https_addr = Some(addr);
309 self
310 }
311
312 #[must_use]
314 pub fn with_http2(mut self, enabled: bool) -> Self {
315 self.http2_enabled = enabled;
316 self
317 }
318}
319
320#[derive(Debug, Clone)]
322struct ServiceTracking {
323 deployment: Option<String>,
329 endpoint_names: Vec<String>,
332 tcp_ports: Vec<u16>,
334 udp_ports: Vec<u16>,
336 http_ports: Vec<u16>,
338}
339
340pub struct ProxyManager {
350 config: ProxyManagerConfig,
352 registry: Arc<ServiceRegistry>,
354 load_balancer: Arc<LoadBalancer>,
356 servers: RwLock<HashMap<u16, Arc<ProxyServer>>>,
358 services: RwLock<HashMap<String, ServiceTracking>>,
360 stream_registry: Option<Arc<StreamRegistry>>,
362 cert_manager: Option<Arc<CertManager>>,
364 sni_resolver: Arc<SniCertResolver>,
375 provisioning_requested: RwLock<HashSet<String>>,
383 tcp_listeners: RwLock<HashSet<u16>>,
385 udp_listeners: RwLock<HashSet<u16>>,
387 active_connections: Arc<AtomicU64>,
389 network_policy_checker: Option<NetworkPolicyChecker>,
391 loopback_registry: Arc<StreamRegistry>,
401 loopback_tcp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
405 loopback_udp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
408 port_map_registry: Arc<StreamRegistry>,
415 port_map_tcp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
417 port_map_udp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
419 published_ports: RwLock<HashMap<u16, (Option<String>, String)>>,
435 lb_health_checker: tokio::task::JoinHandle<()>,
444 ingress_started: AtomicBool,
449 rps_registry: Arc<RpsRegistry>,
454 activator: RwLock<Option<Arc<dyn Activator>>>,
460}
461
462impl Drop for ProxyManager {
463 fn drop(&mut self) {
464 self.lb_health_checker.abort();
465 }
466}
467
468impl ProxyManager {
469 pub fn new(
472 config: ProxyManagerConfig,
473 registry: Arc<ServiceRegistry>,
474 cert_manager: Option<Arc<CertManager>>,
475 ) -> Self {
476 let load_balancer = Arc::new(LoadBalancer::new());
477
478 let lb_health_checker =
484 load_balancer.spawn_health_checker(Duration::from_secs(5), Duration::from_secs(2));
485
486 Self {
487 config,
488 registry,
489 load_balancer,
490 servers: RwLock::new(HashMap::new()),
491 services: RwLock::new(HashMap::new()),
492 stream_registry: None,
493 cert_manager,
494 sni_resolver: Arc::new(SniCertResolver::new()),
495 provisioning_requested: RwLock::new(HashSet::new()),
496 tcp_listeners: RwLock::new(HashSet::new()),
497 udp_listeners: RwLock::new(HashSet::new()),
498 active_connections: Arc::new(AtomicU64::new(0)),
499 network_policy_checker: None,
500 loopback_registry: Arc::new(StreamRegistry::new()),
501 loopback_tcp: RwLock::new(HashMap::new()),
502 loopback_udp: RwLock::new(HashMap::new()),
503 port_map_registry: Arc::new(StreamRegistry::new()),
504 port_map_tcp: RwLock::new(HashMap::new()),
505 port_map_udp: RwLock::new(HashMap::new()),
506 published_ports: RwLock::new(HashMap::new()),
507 lb_health_checker,
508 ingress_started: AtomicBool::new(false),
509 rps_registry: Arc::new(RpsRegistry::new()),
510 activator: RwLock::new(None),
511 }
512 }
513
514 pub fn registry(&self) -> Arc<ServiceRegistry> {
516 self.registry.clone()
517 }
518
519 pub fn load_balancer(&self) -> Arc<LoadBalancer> {
521 self.load_balancer.clone()
522 }
523
524 #[must_use]
530 pub fn rps_registry(&self) -> Arc<RpsRegistry> {
531 Arc::clone(&self.rps_registry)
532 }
533
534 #[must_use]
540 pub fn rps_provider(&self) -> RpsRegistryProvider {
541 RpsRegistryProvider::new(self.rps_registry())
542 }
543
544 pub async fn set_activator(&self, activator: Arc<dyn Activator>) {
552 *self.activator.write().await = Some(activator);
553 }
554
555 pub async fn install_service_activator(&self, trigger: Arc<dyn ScaleTrigger>) {
561 let activator = Arc::new(ServiceActivator::new(trigger, self.load_balancer.clone()));
562 self.set_activator(activator).await;
563 }
564
565 async fn current_activator(&self) -> Option<Arc<dyn Activator>> {
569 self.activator.read().await.clone()
570 }
571
572 pub fn active_connections(&self) -> u64 {
574 self.active_connections.load(Ordering::Relaxed)
575 }
576
577 pub fn cert_manager(&self) -> Option<&Arc<CertManager>> {
579 self.cert_manager.as_ref()
580 }
581
582 #[must_use]
589 pub fn sni_resolver(&self) -> Arc<SniCertResolver> {
590 Arc::clone(&self.sni_resolver)
591 }
592
593 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
595 self.stream_registry = Some(registry);
596 }
597
598 #[must_use]
600 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
601 self.stream_registry = Some(registry);
602 self
603 }
604
605 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
607 self.stream_registry.as_ref()
608 }
609
610 pub fn set_network_policy_checker(&mut self, checker: NetworkPolicyChecker) {
612 self.network_policy_checker = Some(checker);
613 }
614
615 #[must_use]
617 pub fn with_network_policy_checker(mut self, checker: NetworkPolicyChecker) -> Self {
618 self.network_policy_checker = Some(checker);
619 self
620 }
621
622 pub async fn listen_on(&self, port: u16, bind_ip: IpAddr) -> Result<()> {
630 let mut servers = self.servers.write().await;
631
632 if servers.contains_key(&port) {
633 debug!(port = port, "Already listening on port");
634 return Ok(());
635 }
636
637 let addr = SocketAddr::new(bind_ip, port);
638 let mut proxy_config = ProxyConfig::default();
639 proxy_config.server.http_addr = addr;
640 proxy_config.server.http2_enabled = self.config.http2_enabled;
641
642 let mut server = ProxyServer::with_registry(
643 proxy_config,
644 self.registry.clone(),
645 self.load_balancer.clone(),
646 )
647 .with_rps_registry(self.rps_registry());
648 if let Some(ref checker) = self.network_policy_checker {
649 server = server.with_network_policy_checker(checker.clone());
650 }
651 if let Some(activator) = self.current_activator().await {
652 server = server.with_activator(activator);
653 }
654 let server = Arc::new(server);
655
656 info!(port = port, bind = %addr, "Proxy listening on port");
657
658 let server_clone = server.clone();
659 tokio::spawn(async move {
660 if let Err(e) = server_clone.run().await {
661 tracing::error!(port = port, error = %e, "Proxy server error on port");
662 }
663 });
664
665 servers.insert(port, server);
666 Ok(())
667 }
668
669 pub async fn listen_on_tls(&self, port: u16, bind_ip: IpAddr) -> Result<()> {
677 let mut servers = self.servers.write().await;
678
679 if servers.contains_key(&port) {
680 debug!(port = port, "Already listening on port (TLS)");
681 return Ok(());
682 }
683
684 let Some(cert_manager) = &self.cert_manager else {
685 warn!(
686 port = port,
687 "Cannot start TLS listener: no CertManager configured"
688 );
689 return Ok(());
690 };
691
692 let sni_resolver = self.sni_resolver();
695
696 let _ = load_existing_certs_into_resolver(cert_manager, &sni_resolver).await;
698
699 let addr = SocketAddr::new(bind_ip, port);
700 let mut proxy_config = ProxyConfig::default();
701 proxy_config.server.https_addr = addr;
702
703 let mut server = ProxyServer::with_tls_resolver(
704 proxy_config,
705 self.registry.clone(),
706 self.load_balancer.clone(),
707 sni_resolver,
708 )
709 .with_cert_manager(Arc::clone(cert_manager))
710 .with_rps_registry(self.rps_registry());
711 if let Some(ref checker) = self.network_policy_checker {
712 server = server.with_network_policy_checker(checker.clone());
713 }
714 if let Some(activator) = self.current_activator().await {
715 server = server.with_activator(activator);
716 }
717 let server = Arc::new(server);
718
719 info!(port = port, bind = %addr, "HTTPS proxy listening on port");
720
721 let server_clone = server.clone();
722 tokio::spawn(async move {
723 if let Err(e) = server_clone.run_https().await {
724 tracing::error!(port = port, error = %e, "HTTPS proxy server error");
725 }
726 });
727
728 servers.insert(port, server);
729 Ok(())
730 }
731
732 pub async fn start_ingress(&self) {
750 self.start_ingress_on(DEFAULT_INGRESS_HTTP_PORT, DEFAULT_INGRESS_HTTPS_PORT)
751 .await;
752 }
753
754 fn build_http_ingress_server(
768 &self,
769 http_addr: SocketAddr,
770 activator: Option<Arc<dyn Activator>>,
771 ) -> ProxyServer {
772 let mut http_proxy_config = ProxyConfig::default();
773 http_proxy_config.server.http_addr = http_addr;
774 http_proxy_config.server.http2_enabled = self.config.http2_enabled;
775
776 let mut http_server = ProxyServer::with_registry(
777 http_proxy_config,
778 self.registry.clone(),
779 self.load_balancer.clone(),
780 )
781 .with_rps_registry(self.rps_registry());
782 if let Some(ref checker) = self.network_policy_checker {
783 http_server = http_server.with_network_policy_checker(checker.clone());
784 }
785 if let Some(activator) = activator {
786 http_server = http_server.with_activator(activator);
787 }
788 if let Some(ref cm) = self.cert_manager {
793 http_server = http_server.with_cert_manager(Arc::clone(cm));
794 }
795 http_server
796 }
797
798 #[allow(clippy::similar_names)]
803 pub async fn start_ingress_on(&self, http_port: u16, https_port: u16) {
804 if self
806 .ingress_started
807 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
808 .is_err()
809 {
810 debug!("Ingress already started; skipping");
811 return;
812 }
813
814 let bind_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED); let http_addr = SocketAddr::new(bind_ip, http_port);
818 let activator = self.current_activator().await;
819 let http_server = Arc::new(self.build_http_ingress_server(http_addr, activator));
820 info!(port = http_port, bind = %http_addr, "Starting HTTP ingress (retry-never-error)");
821 {
822 let server = http_server.clone();
823 tokio::spawn(async move {
824 if let Err(e) = server.run_with_retry(http_addr).await {
825 warn!(port = http_port, error = %e, "HTTP ingress accept loop exited");
828 }
829 });
830 }
831 {
835 let server = http_server.clone();
836 tokio::spawn(async move {
837 match bind_v6_only(http_port) {
838 Ok(l) => {
839 if let Err(e) = server.run_on_listener(l).await {
840 warn!(port = http_port, error = %e, "HTTP v6 ingress exited");
841 }
842 }
843 Err(e) => {
844 warn!(port = http_port, error = %e, "HTTP v6 ingress bind failed (non-fatal)");
845 }
846 }
847 });
848 }
849 self.servers.write().await.insert(http_port, http_server);
850
851 let Some(cert_manager) = &self.cert_manager else {
853 warn!(
854 port = https_port,
855 "Cannot start HTTPS ingress: no CertManager configured (HTTP ingress is up)"
856 );
857 return;
858 };
859
860 let sni_resolver = self.sni_resolver();
863 let _ = load_existing_certs_into_resolver(cert_manager, &sni_resolver).await;
865
866 let https_addr = SocketAddr::new(bind_ip, https_port);
867 let mut https_proxy_config = ProxyConfig::default();
868 https_proxy_config.server.https_addr = https_addr;
869
870 let mut https_server = ProxyServer::with_tls_resolver(
871 https_proxy_config,
872 self.registry.clone(),
873 self.load_balancer.clone(),
874 sni_resolver,
875 )
876 .with_cert_manager(Arc::clone(cert_manager))
877 .with_rps_registry(self.rps_registry());
878 if let Some(ref checker) = self.network_policy_checker {
879 https_server = https_server.with_network_policy_checker(checker.clone());
880 }
881 if let Some(activator) = self.current_activator().await {
882 https_server = https_server.with_activator(activator);
883 }
884 let https_server = Arc::new(https_server);
885 info!(port = https_port, bind = %https_addr, "Starting HTTPS ingress (retry-never-error)");
886 {
887 let server = https_server.clone();
888 tokio::spawn(async move {
889 if let Err(e) = server.run_https_with_retry(https_addr).await {
890 warn!(port = https_port, error = %e, "HTTPS ingress accept loop exited");
891 }
892 });
893 }
894 {
898 let server = https_server.clone();
899 tokio::spawn(async move {
900 match bind_v6_only(https_port) {
901 Ok(l) => {
902 if let Err(e) = server.run_https_on_listener(l).await {
903 warn!(port = https_port, error = %e, "HTTPS v6 ingress exited");
904 }
905 }
906 Err(e) => {
907 warn!(port = https_port, error = %e, "HTTPS v6 ingress bind failed (non-fatal)");
908 }
909 }
910 });
911 }
912 self.servers.write().await.insert(https_port, https_server);
913 }
914
915 pub async fn stop(&self) {
920 let mut servers = self.servers.write().await;
921 for (port, server) in servers.drain() {
922 info!(port = port, "Stopping proxy on port");
923 server.shutdown();
924 }
925
926 let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
928 while self.active_connections.load(Ordering::Relaxed) > 0 {
929 if tokio::time::Instant::now() >= deadline {
930 let remaining = self.active_connections.load(Ordering::Relaxed);
931 warn!(
932 remaining = remaining,
933 "Drain timeout reached, forcing shutdown"
934 );
935 break;
936 }
937 tokio::time::sleep(Duration::from_millis(100)).await;
938 }
939
940 info!("All proxy servers stopped");
941 }
942
943 pub async fn unbind(&self, port: u16) {
945 let mut servers = self.servers.write().await;
946 if let Some(server) = servers.remove(&port) {
947 info!(port = port, "Unbinding proxy from port");
948 server.shutdown();
949 }
950 }
951
952 pub async fn ensure_ports_for_service(
968 &self,
969 spec: &ServiceSpec,
970 overlay_ip: Option<IpAddr>,
971 ) -> Result<()> {
972 for endpoint in &spec.endpoints {
973 let bind_ip = match endpoint.expose {
974 ExposeType::Public => IpAddr::V4(Ipv4Addr::UNSPECIFIED), ExposeType::Internal => {
976 let ip = overlay_ip.unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST));
978 if overlay_ip.is_none() {
979 warn!(
980 endpoint = %endpoint.name,
981 port = endpoint.port,
982 "No overlay IP available for internal endpoint; binding to 127.0.0.1"
983 );
984 }
985 ip
986 }
987 };
988
989 match endpoint.protocol {
990 Protocol::Https => {
991 self.listen_on_tls(endpoint.port, bind_ip).await?;
993 }
994 Protocol::Http | Protocol::Websocket => {
995 self.listen_on(endpoint.port, bind_ip).await?;
997 }
998 Protocol::Tcp => {
999 let cfg = translate_stream_config(endpoint.stream.as_ref());
1003 self.ensure_tcp_listener(endpoint.port, bind_ip, &cfg).await;
1004 }
1005 Protocol::Udp => {
1006 let cfg = translate_stream_config(endpoint.stream.as_ref());
1008 self.ensure_udp_listener(endpoint.port, bind_ip, &cfg).await;
1009 }
1010 }
1011 }
1012 Ok(())
1013 }
1014
1015 async fn ensure_tcp_listener(&self, port: u16, bind_ip: IpAddr, cfg: &StreamProxyConfig) {
1027 self.apply_stream_health_check_tcp(port, cfg);
1030
1031 {
1033 let listeners = self.tcp_listeners.read().await;
1034 if listeners.contains(&port) {
1035 debug!(port = port, "TCP stream listener already active");
1036 return;
1037 }
1038 }
1039
1040 let registry = if let Some(r) = &self.stream_registry {
1041 Arc::clone(r)
1042 } else {
1043 warn!(
1044 port = port,
1045 "Cannot start TCP listener: StreamRegistry not configured"
1046 );
1047 return;
1048 };
1049
1050 let addr = SocketAddr::new(bind_ip, port);
1051 let listener = match tokio::net::TcpListener::bind(addr).await {
1052 Ok(l) => l,
1053 Err(e) => {
1054 warn!(
1055 port = port,
1056 bind = %addr,
1057 error = %e,
1058 "Failed to bind TCP stream listener, continuing"
1059 );
1060 return;
1061 }
1062 };
1063
1064 {
1066 let mut listeners = self.tcp_listeners.write().await;
1067 listeners.insert(port);
1068 }
1069
1070 let mut tcp_service = TcpStreamService::new(registry, port);
1071 if cfg.proxy_protocol {
1072 tcp_service = tcp_service.with_proxy_protocol(true);
1073 }
1074 if cfg.tls {
1077 if self.cert_manager.is_some() {
1078 let resolver = self.sni_resolver();
1079 if let Some(cert_manager) = &self.cert_manager {
1080 let _ = load_existing_certs_into_resolver(cert_manager, &resolver).await;
1081 }
1082 let acceptor = tls_acceptor_from_resolver(resolver);
1083 tcp_service = tcp_service.with_tls_acceptor(acceptor);
1084 info!(port = port, "L4 TCP TLS termination enabled");
1085 } else {
1086 warn!(
1087 port = port,
1088 "stream.tls requested but no CertManager configured; serving TCP without TLS"
1089 );
1090 }
1091 }
1092
1093 let tcp_service = Arc::new(tcp_service);
1094 tokio::spawn(async move {
1095 tcp_service.serve(listener).await;
1096 });
1097
1098 info!(port = port, bind = %addr, "TCP stream proxy listening");
1099 }
1100
1101 fn apply_stream_health_check_tcp(&self, port: u16, cfg: &StreamProxyConfig) {
1105 if let Some(registry) = &self.stream_registry {
1106 registry.set_tcp_config(port, cfg.clone());
1107 }
1108 }
1109
1110 fn apply_stream_health_check_udp(&self, port: u16, cfg: &StreamProxyConfig) {
1114 if let Some(registry) = &self.stream_registry {
1115 registry.set_udp_config(port, cfg.clone());
1116 }
1117 }
1118
1119 async fn ensure_udp_listener(&self, port: u16, bind_ip: IpAddr, cfg: &StreamProxyConfig) {
1128 self.apply_stream_health_check_udp(port, cfg);
1131
1132 {
1134 let listeners = self.udp_listeners.read().await;
1135 if listeners.contains(&port) {
1136 debug!(port = port, "UDP stream listener already active");
1137 return;
1138 }
1139 }
1140
1141 let registry = if let Some(r) = &self.stream_registry {
1142 Arc::clone(r)
1143 } else {
1144 warn!(
1145 port = port,
1146 "Cannot start UDP listener: StreamRegistry not configured"
1147 );
1148 return;
1149 };
1150
1151 let addr = SocketAddr::new(bind_ip, port);
1152 let socket = match tokio::net::UdpSocket::bind(addr).await {
1153 Ok(s) => s,
1154 Err(e) => {
1155 warn!(
1156 port = port,
1157 bind = %addr,
1158 error = %e,
1159 "Failed to bind UDP stream listener, continuing"
1160 );
1161 return;
1162 }
1163 };
1164
1165 {
1167 let mut listeners = self.udp_listeners.write().await;
1168 listeners.insert(port);
1169 }
1170
1171 let udp_service = Arc::new(UdpStreamService::new(registry, port, cfg.session_timeout));
1172 tokio::spawn(async move {
1173 if let Err(e) = udp_service.serve(socket).await {
1174 tracing::error!(
1175 port = port,
1176 error = %e,
1177 "UDP stream proxy service failed"
1178 );
1179 }
1180 });
1181
1182 info!(port = port, bind = %addr, "UDP stream proxy listening");
1183 }
1184
1185 pub async fn publish_loopback_for_container(
1233 &self,
1234 deployment: Option<&str>,
1235 service_name: &str,
1236 spec: &ServiceSpec,
1237 container_ip: IpAddr,
1238 port_override: Option<u16>,
1239 ) -> Result<()> {
1240 for endpoint in &spec.endpoints {
1241 if matches!(endpoint.expose, ExposeType::Public) {
1243 continue;
1244 }
1245
1246 let backend = SocketAddr::new(
1247 container_ip,
1248 port_override.unwrap_or_else(|| endpoint.target_port()),
1249 );
1250 let publish_port = endpoint.port;
1251
1252 self.claim_published_port(deployment, service_name, publish_port)
1254 .await?;
1255
1256 match endpoint.protocol {
1257 Protocol::Tcp | Protocol::Http | Protocol::Https | Protocol::Websocket => {
1258 self.publish_loopback_tcp(service_name, publish_port, backend)
1261 .await;
1262 }
1263 Protocol::Udp => {
1264 self.publish_loopback_udp(service_name, publish_port, backend)
1265 .await;
1266 }
1267 }
1268 }
1269 Ok(())
1270 }
1271
1272 async fn claim_published_port(
1279 &self,
1280 deployment: Option<&str>,
1281 service_name: &str,
1282 publish_port: u16,
1283 ) -> Result<()> {
1284 let mut owners = self.published_ports.write().await;
1285 if let Some((owner_dep, owner_svc)) = owners.get(&publish_port) {
1286 if owner_dep.as_deref() == deployment && owner_svc == service_name {
1287 return Ok(());
1289 }
1290 let owner = format!("{}/{}", owner_dep.as_deref().unwrap_or("_"), owner_svc);
1291 let requester = format!("{}/{}", deployment.unwrap_or("_"), service_name);
1292 warn!(
1293 port = publish_port,
1294 owner = %owner,
1295 requester = %requester,
1296 "Refusing to publish host port already owned by a different deployment/service (would cross-wire backends)"
1297 );
1298 return Err(crate::error::AgentError::PortConflict {
1299 port: publish_port,
1300 owner,
1301 requester,
1302 });
1303 }
1304 owners.insert(
1305 publish_port,
1306 (deployment.map(str::to_string), service_name.to_string()),
1307 );
1308 Ok(())
1309 }
1310
1311 async fn publish_loopback_tcp(
1314 &self,
1315 service_name: &str,
1316 publish_port: u16,
1317 backend: SocketAddr,
1318 ) {
1319 if let Some(existing) = self.loopback_registry.resolve_tcp(publish_port) {
1321 let mut backends = existing.backends;
1322 if !backends.contains(&backend) {
1323 backends.push(backend);
1324 }
1325 self.loopback_registry
1326 .update_tcp_backends(publish_port, backends);
1327 } else {
1328 self.loopback_registry.register_tcp(
1329 publish_port,
1330 StreamService::new(service_name.to_string(), vec![backend]),
1331 );
1332 }
1333
1334 let mut listeners = self.loopback_tcp.write().await;
1336 if listeners.contains_key(&publish_port) {
1337 debug!(port = publish_port, "Loopback TCP listener already active");
1338 return;
1339 }
1340
1341 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), publish_port);
1342 let listener = match tokio::net::TcpListener::bind(addr).await {
1343 Ok(l) => l,
1344 Err(e) => {
1345 warn!(
1346 port = publish_port,
1347 bind = %addr,
1348 error = %e,
1349 "Failed to bind loopback TCP listener, continuing"
1350 );
1351 return;
1352 }
1353 };
1354
1355 let tcp_service = Arc::new(TcpStreamService::new(
1356 Arc::clone(&self.loopback_registry),
1357 publish_port,
1358 ));
1359 let handle = tokio::spawn(async move {
1360 tcp_service.serve(listener).await;
1361 });
1362 listeners.insert(publish_port, handle);
1363 drop(listeners);
1364
1365 info!(
1366 service = service_name,
1367 port = publish_port,
1368 bind = %addr,
1369 backend = %backend,
1370 "Published service port on node loopback (TCP)"
1371 );
1372 }
1373
1374 async fn publish_loopback_udp(
1377 &self,
1378 service_name: &str,
1379 publish_port: u16,
1380 backend: SocketAddr,
1381 ) {
1382 if let Some(existing) = self.loopback_registry.resolve_udp(publish_port) {
1383 let mut backends = existing.backends;
1384 if !backends.contains(&backend) {
1385 backends.push(backend);
1386 }
1387 self.loopback_registry
1388 .update_udp_backends(publish_port, backends);
1389 } else {
1390 self.loopback_registry.register_udp(
1391 publish_port,
1392 StreamService::new(service_name.to_string(), vec![backend]),
1393 );
1394 }
1395
1396 let mut listeners = self.loopback_udp.write().await;
1397 if listeners.contains_key(&publish_port) {
1398 debug!(port = publish_port, "Loopback UDP listener already active");
1399 return;
1400 }
1401
1402 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), publish_port);
1403 let socket = match tokio::net::UdpSocket::bind(addr).await {
1404 Ok(s) => s,
1405 Err(e) => {
1406 warn!(
1407 port = publish_port,
1408 bind = %addr,
1409 error = %e,
1410 "Failed to bind loopback UDP listener, continuing"
1411 );
1412 return;
1413 }
1414 };
1415
1416 let udp_service = Arc::new(UdpStreamService::new(
1417 Arc::clone(&self.loopback_registry),
1418 publish_port,
1419 None,
1420 ));
1421 let handle = tokio::spawn(async move {
1422 if let Err(e) = udp_service.serve(socket).await {
1423 tracing::error!(
1424 port = publish_port,
1425 error = %e,
1426 "Loopback UDP stream proxy service failed"
1427 );
1428 }
1429 });
1430 listeners.insert(publish_port, handle);
1431 drop(listeners);
1432
1433 info!(
1434 service = service_name,
1435 port = publish_port,
1436 bind = %addr,
1437 backend = %backend,
1438 "Published service port on node loopback (UDP)"
1439 );
1440 }
1441
1442 pub async fn unpublish_loopback_for_container(
1452 &self,
1453 spec: &ServiceSpec,
1454 container_ip: IpAddr,
1455 port_override: Option<u16>,
1456 ) {
1457 for endpoint in &spec.endpoints {
1458 if matches!(endpoint.expose, ExposeType::Public) {
1459 continue;
1460 }
1461
1462 let backend = SocketAddr::new(
1463 container_ip,
1464 port_override.unwrap_or_else(|| endpoint.target_port()),
1465 );
1466 let publish_port = endpoint.port;
1467
1468 match endpoint.protocol {
1469 Protocol::Tcp | Protocol::Http | Protocol::Https | Protocol::Websocket => {
1470 self.unpublish_loopback_tcp(publish_port, backend).await;
1471 }
1472 Protocol::Udp => {
1473 self.unpublish_loopback_udp(publish_port, backend).await;
1474 }
1475 }
1476 }
1477 }
1478
1479 async fn unpublish_loopback_tcp(&self, publish_port: u16, backend: SocketAddr) {
1482 let Some(existing) = self.loopback_registry.resolve_tcp(publish_port) else {
1483 return;
1484 };
1485 let remaining: Vec<SocketAddr> = existing
1486 .backends
1487 .into_iter()
1488 .filter(|b| *b != backend)
1489 .collect();
1490
1491 if remaining.is_empty() {
1492 let _ = self.loopback_registry.unregister_tcp(publish_port);
1493 let mut listeners = self.loopback_tcp.write().await;
1494 if let Some(handle) = listeners.remove(&publish_port) {
1495 handle.abort();
1496 }
1497 self.published_ports.write().await.remove(&publish_port);
1500 debug!(
1501 port = publish_port,
1502 "Freed loopback TCP listener (no backends remain)"
1503 );
1504 } else {
1505 self.loopback_registry
1506 .update_tcp_backends(publish_port, remaining);
1507 }
1508 }
1509
1510 async fn unpublish_loopback_udp(&self, publish_port: u16, backend: SocketAddr) {
1513 let Some(existing) = self.loopback_registry.resolve_udp(publish_port) else {
1514 return;
1515 };
1516 let remaining: Vec<SocketAddr> = existing
1517 .backends
1518 .into_iter()
1519 .filter(|b| *b != backend)
1520 .collect();
1521
1522 if remaining.is_empty() {
1523 let _ = self.loopback_registry.unregister_udp(publish_port);
1524 let mut listeners = self.loopback_udp.write().await;
1525 if let Some(handle) = listeners.remove(&publish_port) {
1526 handle.abort();
1527 }
1528 self.published_ports.write().await.remove(&publish_port);
1531 debug!(
1532 port = publish_port,
1533 "Freed loopback UDP listener (no backends remain)"
1534 );
1535 } else {
1536 self.loopback_registry
1537 .update_udp_backends(publish_port, remaining);
1538 }
1539 }
1540
1541 pub async fn publish_port_mapping(
1562 &self,
1563 deployment: Option<&str>,
1564 owner_name: &str,
1565 mapping: &PortMapping,
1566 backend: SocketAddr,
1567 ) -> Result<u16> {
1568 let bind_ip: IpAddr = mapping
1569 .host_ip
1570 .parse()
1571 .unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
1572 let requested = mapping.host_port.filter(|p| *p != 0);
1573 let is_udp = matches!(mapping.protocol, PortProtocol::Udp);
1574 let port = match mapping.protocol {
1575 PortProtocol::Udp => {
1576 self.publish_port_mapping_udp(deployment, owner_name, bind_ip, requested, backend)
1577 .await?
1578 }
1579 PortProtocol::Tcp => {
1581 self.publish_port_mapping_tcp(deployment, owner_name, bind_ip, requested, backend)
1582 .await?
1583 }
1584 };
1585
1586 if let Err(e) = zlayer_overlay::firewall::ensure_published_port(port, is_udp) {
1593 warn!(
1594 error = %e,
1595 port,
1596 udp = is_udp,
1597 "could not open published port on host firewall (non-fatal)"
1598 );
1599 }
1600 Ok(port)
1601 }
1602
1603 async fn publish_port_mapping_tcp(
1606 &self,
1607 deployment: Option<&str>,
1608 owner_name: &str,
1609 bind_ip: IpAddr,
1610 requested: Option<u16>,
1611 backend: SocketAddr,
1612 ) -> Result<u16> {
1613 if let Some(port) = requested {
1614 self.claim_published_port(deployment, owner_name, port)
1616 .await?;
1617 self.accumulate_port_map_tcp_backend(port, owner_name, backend);
1618
1619 let mut listeners = self.port_map_tcp.write().await;
1620 if listeners.contains_key(&port) {
1621 debug!(port = port, "port-mapping TCP listener already active");
1622 return Ok(port);
1623 }
1624 let addr = SocketAddr::new(bind_ip, port);
1625 match tokio::net::TcpListener::bind(addr).await {
1626 Ok(listener) => {
1627 let svc = Arc::new(TcpStreamService::new(
1628 Arc::clone(&self.port_map_registry),
1629 port,
1630 ));
1631 let handle = tokio::spawn(async move {
1632 svc.serve(listener).await;
1633 });
1634 listeners.insert(port, handle);
1635 drop(listeners);
1636 info!(service = owner_name, port = port, bind = %addr, backend = %backend,
1637 "Published port mapping (TCP)");
1638 Ok(port)
1639 }
1640 Err(e) => {
1641 warn!(port = port, bind = %addr, error = %e,
1642 "Failed to bind port-mapping TCP listener, continuing");
1643 Ok(port)
1644 }
1645 }
1646 } else {
1647 let addr = SocketAddr::new(bind_ip, 0);
1649 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
1650 crate::error::AgentError::Network(format!(
1651 "failed to bind ephemeral port-mapping TCP listener on {bind_ip}: {e}"
1652 ))
1653 })?;
1654 let port = listener.local_addr().map(|a| a.port()).map_err(|e| {
1655 crate::error::AgentError::Network(format!(
1656 "failed to read ephemeral port-mapping TCP local addr: {e}"
1657 ))
1658 })?;
1659 self.claim_published_port(deployment, owner_name, port)
1660 .await?;
1661 self.accumulate_port_map_tcp_backend(port, owner_name, backend);
1662 let svc = Arc::new(TcpStreamService::new(
1663 Arc::clone(&self.port_map_registry),
1664 port,
1665 ));
1666 let handle = tokio::spawn(async move {
1667 svc.serve(listener).await;
1668 });
1669 self.port_map_tcp.write().await.insert(port, handle);
1670 info!(service = owner_name, port = port, bind = %bind_ip, backend = %backend,
1671 "Published port mapping on ephemeral host port (TCP)");
1672 Ok(port)
1673 }
1674 }
1675
1676 fn accumulate_port_map_tcp_backend(&self, port: u16, owner_name: &str, backend: SocketAddr) {
1678 if let Some(existing) = self.port_map_registry.resolve_tcp(port) {
1679 let mut backends = existing.backends;
1680 if !backends.contains(&backend) {
1681 backends.push(backend);
1682 }
1683 self.port_map_registry.update_tcp_backends(port, backends);
1684 } else {
1685 self.port_map_registry.register_tcp(
1686 port,
1687 StreamService::new(owner_name.to_string(), vec![backend]),
1688 );
1689 }
1690 }
1691
1692 async fn publish_port_mapping_udp(
1695 &self,
1696 deployment: Option<&str>,
1697 owner_name: &str,
1698 bind_ip: IpAddr,
1699 requested: Option<u16>,
1700 backend: SocketAddr,
1701 ) -> Result<u16> {
1702 if let Some(port) = requested {
1703 self.claim_published_port(deployment, owner_name, port)
1705 .await?;
1706 self.accumulate_port_map_udp_backend(port, owner_name, backend);
1707
1708 let mut listeners = self.port_map_udp.write().await;
1709 if listeners.contains_key(&port) {
1710 debug!(port = port, "port-mapping UDP listener already active");
1711 return Ok(port);
1712 }
1713 let addr = SocketAddr::new(bind_ip, port);
1714 match tokio::net::UdpSocket::bind(addr).await {
1715 Ok(socket) => {
1716 let svc = Arc::new(UdpStreamService::new(
1717 Arc::clone(&self.port_map_registry),
1718 port,
1719 None,
1720 ));
1721 let handle = tokio::spawn(async move {
1722 if let Err(e) = svc.serve(socket).await {
1723 tracing::error!(
1724 port = port,
1725 error = %e,
1726 "port-mapping UDP stream proxy service failed"
1727 );
1728 }
1729 });
1730 listeners.insert(port, handle);
1731 drop(listeners);
1732 info!(service = owner_name, port = port, bind = %addr, backend = %backend,
1733 "Published port mapping (UDP)");
1734 Ok(port)
1735 }
1736 Err(e) => {
1737 warn!(port = port, bind = %addr, error = %e,
1738 "Failed to bind port-mapping UDP listener, continuing");
1739 Ok(port)
1740 }
1741 }
1742 } else {
1743 let addr = SocketAddr::new(bind_ip, 0);
1745 let socket = tokio::net::UdpSocket::bind(addr).await.map_err(|e| {
1746 crate::error::AgentError::Network(format!(
1747 "failed to bind ephemeral port-mapping UDP listener on {bind_ip}: {e}"
1748 ))
1749 })?;
1750 let port = socket.local_addr().map(|a| a.port()).map_err(|e| {
1751 crate::error::AgentError::Network(format!(
1752 "failed to read ephemeral port-mapping UDP local addr: {e}"
1753 ))
1754 })?;
1755 self.claim_published_port(deployment, owner_name, port)
1756 .await?;
1757 self.accumulate_port_map_udp_backend(port, owner_name, backend);
1758 let svc = Arc::new(UdpStreamService::new(
1759 Arc::clone(&self.port_map_registry),
1760 port,
1761 None,
1762 ));
1763 let handle = tokio::spawn(async move {
1764 if let Err(e) = svc.serve(socket).await {
1765 tracing::error!(
1766 port = port,
1767 error = %e,
1768 "port-mapping UDP stream proxy service failed"
1769 );
1770 }
1771 });
1772 self.port_map_udp.write().await.insert(port, handle);
1773 info!(service = owner_name, port = port, bind = %bind_ip, backend = %backend,
1774 "Published port mapping on ephemeral host port (UDP)");
1775 Ok(port)
1776 }
1777 }
1778
1779 fn accumulate_port_map_udp_backend(&self, port: u16, owner_name: &str, backend: SocketAddr) {
1781 if let Some(existing) = self.port_map_registry.resolve_udp(port) {
1782 let mut backends = existing.backends;
1783 if !backends.contains(&backend) {
1784 backends.push(backend);
1785 }
1786 self.port_map_registry.update_udp_backends(port, backends);
1787 } else {
1788 self.port_map_registry.register_udp(
1789 port,
1790 StreamService::new(owner_name.to_string(), vec![backend]),
1791 );
1792 }
1793 }
1794
1795 pub async fn unpublish_port_mapping(
1799 &self,
1800 port: u16,
1801 protocol: PortProtocol,
1802 backend: SocketAddr,
1803 ) {
1804 match protocol {
1805 PortProtocol::Udp => self.unpublish_port_mapping_udp(port, backend).await,
1806 PortProtocol::Tcp => self.unpublish_port_mapping_tcp(port, backend).await,
1807 }
1808 }
1809
1810 async fn unpublish_port_mapping_tcp(&self, port: u16, backend: SocketAddr) {
1813 let Some(existing) = self.port_map_registry.resolve_tcp(port) else {
1814 return;
1815 };
1816 let remaining: Vec<SocketAddr> = existing
1817 .backends
1818 .into_iter()
1819 .filter(|b| *b != backend)
1820 .collect();
1821
1822 if remaining.is_empty() {
1823 let _ = self.port_map_registry.unregister_tcp(port);
1824 let mut listeners = self.port_map_tcp.write().await;
1825 if let Some(handle) = listeners.remove(&port) {
1826 handle.abort();
1827 }
1828 self.published_ports.write().await.remove(&port);
1831 zlayer_overlay::firewall::remove_published_port(port, false);
1835 debug!(
1836 port = port,
1837 "Freed port-mapping TCP listener (no backends remain)"
1838 );
1839 } else {
1840 self.port_map_registry.update_tcp_backends(port, remaining);
1841 }
1842 }
1843
1844 async fn unpublish_port_mapping_udp(&self, port: u16, backend: SocketAddr) {
1847 let Some(existing) = self.port_map_registry.resolve_udp(port) else {
1848 return;
1849 };
1850 let remaining: Vec<SocketAddr> = existing
1851 .backends
1852 .into_iter()
1853 .filter(|b| *b != backend)
1854 .collect();
1855
1856 if remaining.is_empty() {
1857 let _ = self.port_map_registry.unregister_udp(port);
1858 let mut listeners = self.port_map_udp.write().await;
1859 if let Some(handle) = listeners.remove(&port) {
1860 handle.abort();
1861 }
1862 self.published_ports.write().await.remove(&port);
1865 zlayer_overlay::firewall::remove_published_port(port, true);
1869 debug!(
1870 port = port,
1871 "Freed port-mapping UDP listener (no backends remain)"
1872 );
1873 } else {
1874 self.port_map_registry.update_udp_backends(port, remaining);
1875 }
1876 }
1877
1878 #[must_use]
1895 pub fn endpoint_wants_managed_cert(endpoint: &zlayer_spec::EndpointSpec) -> Option<&str> {
1896 if endpoint.protocol != Protocol::Https {
1897 return None;
1898 }
1899 if endpoint.expose != ExposeType::Public {
1900 return None;
1901 }
1902 let host = endpoint.host.as_deref()?.trim();
1903 if host.is_empty() || host.contains('*') {
1906 return None;
1907 }
1908 Some(host)
1909 }
1910
1911 #[allow(clippy::too_many_lines)]
1923 pub async fn add_service(&self, name: &str, spec: &ServiceSpec) {
1924 let deployment = spec.deployment.as_deref();
1925 let mut services = self.services.write().await;
1926
1927 let mut endpoint_names = Vec::new();
1929 let mut tcp_ports = Vec::new();
1930 let mut udp_ports = Vec::new();
1931 let mut http_ports = Vec::new();
1932
1933 for endpoint in &spec.endpoints {
1934 match endpoint.protocol {
1935 Protocol::Http | Protocol::Https | Protocol::Websocket => {
1936 let entry = RouteEntry::from_endpoint(deployment, name, endpoint);
1938 self.registry.register(entry).await;
1939 http_ports.push(endpoint.port);
1940
1941 let lb_key = endpoint_lb_key(deployment, name, &endpoint.name);
1951 self.load_balancer
1952 .register(&lb_key, vec![], LbStrategy::RoundRobin);
1953
1954 info!(
1955 service = name,
1956 endpoint = %endpoint.name,
1957 protocol = ?endpoint.protocol,
1958 path = ?endpoint.path,
1959 expose = ?endpoint.expose,
1960 "Added HTTP proxy route for service"
1961 );
1962 }
1963 Protocol::Tcp => {
1964 tcp_ports.push(endpoint.port);
1965 info!(
1966 service = name,
1967 endpoint = %endpoint.name,
1968 protocol = ?endpoint.protocol,
1969 port = endpoint.port,
1970 expose = ?endpoint.expose,
1971 "Tracking TCP stream endpoint for service"
1972 );
1973 }
1974 Protocol::Udp => {
1975 udp_ports.push(endpoint.port);
1976 info!(
1977 service = name,
1978 endpoint = %endpoint.name,
1979 protocol = ?endpoint.protocol,
1980 port = endpoint.port,
1981 expose = ?endpoint.expose,
1982 "Tracking UDP stream endpoint for service"
1983 );
1984 }
1985 }
1986
1987 endpoint_names.push(endpoint.name.clone());
1988 }
1989
1990 self.load_balancer
1997 .register(name, vec![], LbStrategy::RoundRobin);
1998
1999 services.insert(
2000 name.to_string(),
2001 ServiceTracking {
2002 deployment: deployment.map(str::to_string),
2003 endpoint_names,
2004 tcp_ports,
2005 udp_ports,
2006 http_ports,
2007 },
2008 );
2009
2010 drop(services);
2012
2013 if let Some(cert_manager) = &self.cert_manager {
2028 for endpoint in &spec.endpoints {
2029 let Some(host) = Self::endpoint_wants_managed_cert(endpoint) else {
2030 continue;
2031 };
2032 let host = host.to_string();
2033
2034 {
2036 let mut requested = self.provisioning_requested.write().await;
2037 if !requested.insert(host.clone()) {
2038 continue;
2039 }
2040 }
2041
2042 let cert_manager = Arc::clone(cert_manager);
2043 let sni_resolver = self.sni_resolver();
2044 let service_name = name.to_string();
2045 tokio::spawn(async move {
2046 info!(
2047 service = %service_name,
2048 host = %host,
2049 "Provisioning managed TLS certificate for public HTTPS vhost"
2050 );
2051 match cert_manager.get_cert(&host).await {
2052 Ok((cert_pem, key_pem)) => {
2053 if let Err(e) = sni_resolver.refresh_cert(&host, &cert_pem, &key_pem) {
2056 warn!(
2057 host = %host,
2058 error = %e,
2059 "Provisioned certificate but failed to load it into the SNI resolver"
2060 );
2061 } else {
2062 info!(
2063 host = %host,
2064 "Loaded managed TLS certificate into live HTTPS listener"
2065 );
2066 }
2067 }
2068 Err(e) => {
2069 warn!(
2070 service = %service_name,
2071 host = %host,
2072 error = %e,
2073 "Failed to provision managed TLS certificate (best-effort; route is still registered)"
2074 );
2075 }
2076 }
2077 });
2078 }
2079 }
2080 }
2081
2082 pub async fn remove_service(&self, name: &str) {
2092 let mut services = self.services.write().await;
2093
2094 if let Some(tracking) = services.remove(name) {
2095 self.registry.unregister_service(name).await;
2097
2098 self.load_balancer.unregister(name);
2103 let deployment = tracking.deployment.as_deref();
2104 for endpoint_name in &tracking.endpoint_names {
2105 let lb_key = endpoint_lb_key(deployment, name, endpoint_name);
2106 self.load_balancer.unregister(&lb_key);
2107 }
2108
2109 if !tracking.tcp_ports.is_empty() {
2111 let mut tcp_set = self.tcp_listeners.write().await;
2112 for port in &tracking.tcp_ports {
2113 if let Some(registry) = &self.stream_registry {
2114 let _ = registry.unregister_tcp(*port);
2115 }
2116 tcp_set.remove(port);
2117 debug!(service = name, port = port, "Removed TCP listener tracking");
2118 }
2119 }
2120
2121 if !tracking.udp_ports.is_empty() {
2123 let mut udp_set = self.udp_listeners.write().await;
2124 for port in &tracking.udp_ports {
2125 if let Some(registry) = &self.stream_registry {
2126 let _ = registry.unregister_udp(*port);
2127 }
2128 udp_set.remove(port);
2129 debug!(service = name, port = port, "Removed UDP listener tracking");
2130 }
2131 }
2132
2133 if !tracking.http_ports.is_empty() {
2136 let ports_still_in_use: HashSet<u16> = services
2137 .values()
2138 .flat_map(|t| t.http_ports.iter().copied())
2139 .collect();
2140
2141 let mut servers = self.servers.write().await;
2142 for port in &tracking.http_ports {
2143 if !ports_still_in_use.contains(port) {
2144 if let Some(server) = servers.remove(port) {
2145 server.shutdown();
2146 info!(
2147 service = name,
2148 port = port,
2149 "Shut down HTTP proxy server (no remaining services on port)"
2150 );
2151 }
2152 }
2153 }
2154 }
2155
2156 info!(service = name, "Removed all proxy resources for service");
2157 }
2158 }
2159
2160 pub async fn add_backend(&self, service: &str, addr: SocketAddr) {
2167 self.registry.add_backend(service, addr).await;
2168 self.load_balancer.add_backend(service, addr);
2169 let services = self.services.read().await;
2171 if let Some(tracking) = services.get(service) {
2172 let deployment = tracking.deployment.as_deref();
2173 for endpoint_name in &tracking.endpoint_names {
2174 let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
2175 self.load_balancer.add_backend(&lb_key, addr);
2176 }
2177 }
2178 info!(service = service, backend = %addr, "Registered backend with proxy");
2179 }
2180
2181 pub async fn remove_backend(&self, service: &str, addr: SocketAddr) {
2186 self.registry.remove_backend(service, addr).await;
2187 self.load_balancer.remove_backend(service, &addr);
2188 let services = self.services.read().await;
2189 if let Some(tracking) = services.get(service) {
2190 let deployment = tracking.deployment.as_deref();
2191 for endpoint_name in &tracking.endpoint_names {
2192 let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
2193 self.load_balancer.remove_backend(&lb_key, &addr);
2194 }
2195 }
2196 debug!(service = service, backend = %addr, "Removed backend from service");
2197 }
2198
2199 #[allow(clippy::unused_async)]
2206 pub async fn update_backend_health(&self, service: &str, addr: SocketAddr, healthy: bool) {
2207 self.load_balancer.mark_health(service, &addr, healthy);
2208 let services = self.services.read().await;
2209 if let Some(tracking) = services.get(service) {
2210 let deployment = tracking.deployment.as_deref();
2211 for endpoint_name in &tracking.endpoint_names {
2212 let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
2213 self.load_balancer.mark_health(&lb_key, &addr, healthy);
2214 }
2215 }
2216 debug!(
2217 service = service,
2218 backend = %addr,
2219 healthy = healthy,
2220 "Updated backend health in load balancer"
2221 );
2222 }
2223
2224 pub async fn update_backends(&self, service: &str, addrs: Vec<SocketAddr>) {
2232 self.registry.update_backends(service, addrs.clone()).await;
2233 self.load_balancer.update_backends(service, addrs.clone());
2235 let services = self.services.read().await;
2236 if let Some(tracking) = services.get(service) {
2237 let deployment = tracking.deployment.as_deref();
2238 for endpoint_name in &tracking.endpoint_names {
2239 let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
2240 self.load_balancer.update_backends(&lb_key, addrs.clone());
2241 }
2242 }
2243 debug!(service = service, "Updated backends for service");
2244 }
2245
2246 pub async fn update_endpoint_backends(
2253 &self,
2254 service: &str,
2255 endpoint_name: &str,
2256 addrs: Vec<SocketAddr>,
2257 ) {
2258 self.registry
2259 .update_backends_for_endpoint(service, endpoint_name, addrs.clone())
2260 .await;
2261 let deployment = {
2264 let services = self.services.read().await;
2265 services.get(service).and_then(|t| t.deployment.clone())
2266 };
2267 let lb_key = endpoint_lb_key(deployment.as_deref(), service, endpoint_name);
2268 self.load_balancer.update_backends(&lb_key, addrs);
2269 debug!(
2270 service = service,
2271 endpoint = endpoint_name,
2272 "Updated backends for service endpoint"
2273 );
2274 }
2275
2276 pub async fn route_count(&self) -> usize {
2278 self.registry.route_count().await
2279 }
2280
2281 pub async fn list_services(&self) -> Vec<String> {
2283 self.services.read().await.keys().cloned().collect()
2284 }
2285
2286 pub async fn has_service(&self, name: &str) -> bool {
2288 self.services.read().await.contains_key(name)
2289 }
2290}
2291
2292#[cfg(test)]
2293mod tests {
2294 use super::*;
2295
2296 fn mock_service_spec_with_endpoints() -> ServiceSpec {
2297 use zlayer_spec::*;
2298 serde_yaml::from_str::<DeploymentSpec>(
2299 r"
2300version: v1
2301deployment: test
2302services:
2303 test:
2304 rtype: service
2305 image:
2306 name: test:latest
2307 endpoints:
2308 - name: http
2309 protocol: http
2310 port: 8080
2311 path: /api
2312 expose: public
2313 - name: websocket
2314 protocol: websocket
2315 port: 8081
2316 path: /ws
2317 expose: internal
2318",
2319 )
2320 .unwrap()
2321 .services
2322 .remove("test")
2323 .unwrap()
2324 }
2325
2326 fn mock_service_spec_tcp_only() -> ServiceSpec {
2327 mock_service_spec_tcp_only_port(9000)
2328 }
2329
2330 fn mock_service_spec_tcp_only_port(port: u16) -> ServiceSpec {
2331 use zlayer_spec::*;
2332 let yaml = format!(
2333 "
2334version: v1
2335deployment: test
2336services:
2337 test:
2338 rtype: service
2339 image:
2340 name: test:latest
2341 endpoints:
2342 - name: grpc
2343 protocol: tcp
2344 port: {port}
2345"
2346 );
2347 serde_yaml::from_str::<DeploymentSpec>(&yaml)
2348 .unwrap()
2349 .services
2350 .remove("test")
2351 .unwrap()
2352 }
2353
2354 fn reserve_free_tcp_port() -> u16 {
2362 let listener =
2363 std::net::TcpListener::bind("127.0.0.1:0").expect("failed to bind ephemeral test port");
2364 listener.local_addr().unwrap().port()
2365 }
2366
2367 #[tokio::test]
2368 async fn test_proxy_manager_new() {
2369 let config = ProxyManagerConfig::default();
2370 let registry = Arc::new(ServiceRegistry::new());
2371 let manager = ProxyManager::new(config, registry, None);
2372
2373 assert_eq!(manager.route_count().await, 0);
2374 assert!(manager.list_services().await.is_empty());
2375 }
2376
2377 #[tokio::test]
2378 async fn test_add_service_with_http_endpoints() {
2379 let config = ProxyManagerConfig::default();
2380 let registry = Arc::new(ServiceRegistry::new());
2381 let manager = ProxyManager::new(config, registry, None);
2382
2383 let spec = mock_service_spec_with_endpoints();
2384 manager.add_service("api", &spec).await;
2385
2386 assert_eq!(manager.route_count().await, 2);
2388 assert!(manager.has_service("api").await);
2389 }
2390
2391 #[tokio::test]
2392 async fn test_tcp_endpoints_tracked_not_routed() {
2393 let config = ProxyManagerConfig::default();
2394 let registry = Arc::new(ServiceRegistry::new());
2395 let manager = ProxyManager::new(config, registry, None);
2396
2397 let spec = mock_service_spec_tcp_only();
2398 manager.add_service("grpc-service", &spec).await;
2399
2400 assert_eq!(manager.route_count().await, 0);
2402 assert!(manager.has_service("grpc-service").await);
2404 }
2405
2406 #[tokio::test]
2407 async fn test_remove_service() {
2408 let config = ProxyManagerConfig::default();
2409 let registry = Arc::new(ServiceRegistry::new());
2410 let manager = ProxyManager::new(config, registry, None);
2411
2412 let spec = mock_service_spec_with_endpoints();
2413 manager.add_service("api", &spec).await;
2414 assert_eq!(manager.route_count().await, 2);
2415
2416 manager.remove_service("api").await;
2417 assert_eq!(manager.route_count().await, 0);
2418 assert!(!manager.has_service("api").await);
2419 }
2420
2421 #[tokio::test]
2422 async fn test_backend_management() {
2423 let config = ProxyManagerConfig::default();
2424 let registry = Arc::new(ServiceRegistry::new());
2425 let manager = ProxyManager::new(config, registry.clone(), None);
2426
2427 let spec = mock_service_spec_with_endpoints();
2428 manager.add_service("api", &spec).await;
2429
2430 let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
2432 let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();
2433
2434 manager.add_backend("api", addr1).await;
2435 manager.add_backend("api", addr2).await;
2436
2437 let resolved = registry.resolve(None, "/api").await.unwrap();
2439 assert_eq!(resolved.backends.len(), 2);
2440
2441 manager.remove_backend("api", addr1).await;
2443 let resolved = registry.resolve(None, "/api").await.unwrap();
2444 assert_eq!(resolved.backends.len(), 1);
2445 }
2446
2447 #[tokio::test]
2448 async fn test_update_backends_replaces_all() {
2449 let config = ProxyManagerConfig::default();
2450 let registry = Arc::new(ServiceRegistry::new());
2451 let manager = ProxyManager::new(config, registry.clone(), None);
2452
2453 let spec = mock_service_spec_with_endpoints();
2454 manager.add_service("api", &spec).await;
2455
2456 let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
2458 manager.add_backend("api", addr1).await;
2459
2460 let new_backends: Vec<SocketAddr> = vec![
2462 "127.0.0.1:9000".parse().unwrap(),
2463 "127.0.0.1:9001".parse().unwrap(),
2464 "127.0.0.1:9002".parse().unwrap(),
2465 ];
2466 manager.update_backends("api", new_backends).await;
2467
2468 let resolved = registry.resolve(None, "/api").await.unwrap();
2469 assert_eq!(resolved.backends.len(), 3);
2470 }
2471
2472 #[tokio::test]
2473 async fn test_config_builder() {
2474 let config = ProxyManagerConfig::new("0.0.0.0:8080".parse().unwrap())
2475 .with_https("0.0.0.0:8443".parse().unwrap())
2476 .with_http2(false);
2477
2478 assert_eq!(
2479 config.http_addr,
2480 "0.0.0.0:8080".parse::<SocketAddr>().unwrap()
2481 );
2482 assert_eq!(
2483 config.https_addr,
2484 Some("0.0.0.0:8443".parse::<SocketAddr>().unwrap())
2485 );
2486 assert!(!config.http2_enabled);
2487 }
2488
2489 #[tokio::test]
2494 async fn test_ensure_ports_differentiates_public_and_internal() {
2495 let config = ProxyManagerConfig::default();
2496 let registry = Arc::new(ServiceRegistry::new());
2497 let manager = ProxyManager::new(config, registry, None);
2498
2499 let spec = mock_service_spec_with_endpoints();
2500 let result = manager.ensure_ports_for_service(&spec, None).await;
2502 let _ = result;
2505 }
2506
2507 #[tokio::test]
2508 async fn test_ensure_ports_with_overlay_ip() {
2509 let config = ProxyManagerConfig::default();
2510 let registry = Arc::new(ServiceRegistry::new());
2511 let manager = ProxyManager::new(config, registry, None);
2512
2513 let spec = mock_service_spec_with_endpoints();
2514 let overlay_ip: IpAddr = "10.200.0.5".parse().unwrap();
2516 let result = manager
2517 .ensure_ports_for_service(&spec, Some(overlay_ip))
2518 .await;
2519 let _ = result;
2520 }
2521
2522 fn mock_mixed_service_spec() -> ServiceSpec {
2523 use zlayer_spec::*;
2524 serde_yaml::from_str::<DeploymentSpec>(
2525 r"
2526version: v1
2527deployment: test
2528services:
2529 mixed:
2530 rtype: service
2531 image:
2532 name: test:latest
2533 endpoints:
2534 - name: http
2535 protocol: http
2536 port: 8080
2537 path: /api
2538 expose: public
2539 - name: grpc
2540 protocol: tcp
2541 port: 9000
2542 expose: public
2543 - name: game
2544 protocol: udp
2545 port: 27015
2546 expose: public
2547",
2548 )
2549 .unwrap()
2550 .services
2551 .remove("mixed")
2552 .unwrap()
2553 }
2554
2555 #[tokio::test]
2556 async fn test_add_mixed_service_tracks_all_endpoints() {
2557 let config = ProxyManagerConfig::default();
2558 let registry = Arc::new(ServiceRegistry::new());
2559 let manager = ProxyManager::new(config, registry, None);
2560
2561 let spec = mock_mixed_service_spec();
2562 manager.add_service("mixed", &spec).await;
2563
2564 assert_eq!(manager.route_count().await, 1);
2566 assert!(manager.has_service("mixed").await);
2568 }
2569
2570 #[tokio::test]
2571 async fn test_ensure_ports_tcp_with_stream_registry() {
2572 use zlayer_proxy::StreamService;
2573
2574 let mut bound = false;
2581 for _ in 0..16 {
2582 let stream_registry = Arc::new(StreamRegistry::new());
2583 let config = ProxyManagerConfig::default();
2584 let registry = Arc::new(ServiceRegistry::new());
2585 let mut manager = ProxyManager::new(config, registry, None);
2586 manager.set_stream_registry(stream_registry.clone());
2587
2588 let port = reserve_free_tcp_port();
2589 let spec = mock_service_spec_tcp_only_port(port);
2590
2591 stream_registry
2593 .register_tcp(port, StreamService::new("grpc-service".to_string(), vec![]));
2594
2595 let result = manager.ensure_ports_for_service(&spec, None).await;
2597 assert!(result.is_ok());
2598
2599 if manager.tcp_listeners.read().await.contains(&port) {
2601 bound = true;
2602 break;
2603 }
2604 }
2605 assert!(
2606 bound,
2607 "ensure_ports_for_service never tracked an OS-assigned TCP port across 16 attempts"
2608 );
2609 }
2610
2611 #[tokio::test]
2612 async fn test_ensure_ports_tcp_without_stream_registry() {
2613 let config = ProxyManagerConfig::default();
2614 let registry = Arc::new(ServiceRegistry::new());
2615 let manager = ProxyManager::new(config, registry, None);
2616
2617 let spec = mock_service_spec_tcp_only();
2618
2619 let result = manager.ensure_ports_for_service(&spec, None).await;
2621 assert!(result.is_ok());
2622
2623 let tcp_ports = manager.tcp_listeners.read().await;
2625 assert!(tcp_ports.is_empty());
2626 }
2627
2628 #[tokio::test]
2629 async fn test_stream_registry_setter() {
2630 let stream_registry = Arc::new(StreamRegistry::new());
2631 let config = ProxyManagerConfig::default();
2632 let registry = Arc::new(ServiceRegistry::new());
2633 let mut manager = ProxyManager::new(config, registry, None);
2634
2635 assert!(manager.stream_registry().is_none());
2636 manager.set_stream_registry(stream_registry.clone());
2637 assert!(manager.stream_registry().is_some());
2638 }
2639
2640 fn mock_internal_tcp_spec(port: u16) -> ServiceSpec {
2643 use zlayer_spec::*;
2644 let yaml = format!(
2645 "
2646version: v1
2647deployment: test
2648services:
2649 test:
2650 rtype: service
2651 image:
2652 name: test:latest
2653 scale:
2654 mode: fixed
2655 replicas: 1
2656 endpoints:
2657 - name: tcp
2658 protocol: tcp
2659 port: {port}
2660 expose: internal
2661"
2662 );
2663 serde_yaml::from_str::<DeploymentSpec>(&yaml)
2664 .unwrap()
2665 .services
2666 .remove("test")
2667 .unwrap()
2668 }
2669
2670 #[tokio::test]
2675 async fn test_publish_loopback_round_trips_then_frees_port() {
2676 use tokio::io::{AsyncReadExt, AsyncWriteExt};
2677
2678 let backend = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2680 let backend_addr = backend.local_addr().unwrap();
2681 let backend_ip = backend_addr.ip();
2682 let backend_port = backend_addr.port();
2683 tokio::spawn(async move {
2684 if let Ok((mut sock, _)) = backend.accept().await {
2685 let mut buf = [0u8; 16];
2686 let n = sock.read(&mut buf).await.unwrap_or(0);
2687 let _ = sock.write_all(b"pong:").await;
2689 let _ = sock.write_all(&buf[..n]).await;
2690 let _ = sock.flush().await;
2691 }
2692 });
2693
2694 let config = ProxyManagerConfig::default();
2695 let registry = Arc::new(ServiceRegistry::new());
2696 let manager = ProxyManager::new(config, registry, None);
2697
2698 let publish_port = reserve_free_tcp_port();
2700 let spec = mock_internal_tcp_spec(publish_port);
2701 assert!(
2702 spec.publish_to_node_loopback(),
2703 "single-member internal spec should publish to loopback"
2704 );
2705
2706 manager
2709 .publish_loopback_for_container(
2710 Some("dep-a"),
2711 "test",
2712 &spec,
2713 backend_ip,
2714 Some(backend_port),
2715 )
2716 .await
2717 .expect("publish should succeed on a free port");
2718
2719 let mut client = tokio::net::TcpStream::connect((Ipv4Addr::LOCALHOST, publish_port))
2721 .await
2722 .expect("connect to published loopback port");
2723 client.write_all(b"ping").await.unwrap();
2724 client.flush().await.unwrap();
2725 let mut reply = Vec::new();
2726 client.read_to_end(&mut reply).await.unwrap();
2727 assert_eq!(&reply, b"pong:ping");
2728 drop(client);
2729
2730 manager
2732 .unpublish_loopback_for_container(&spec, backend_ip, Some(backend_port))
2733 .await;
2734
2735 let mut bound = None;
2738 for _ in 0..50 {
2739 match std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, publish_port)) {
2740 Ok(l) => {
2741 bound = Some(l);
2742 break;
2743 }
2744 Err(_) => tokio::time::sleep(Duration::from_millis(20)).await,
2745 }
2746 }
2747 assert!(
2748 bound.is_some(),
2749 "loopback port {publish_port} should be freed after unpublish"
2750 );
2751 }
2752
2753 #[tokio::test]
2754 async fn test_publish_loopback_skips_public_endpoints() {
2755 let config = ProxyManagerConfig::default();
2759 let registry = Arc::new(ServiceRegistry::new());
2760 let manager = ProxyManager::new(config, registry, None);
2761
2762 let spec = mock_mixed_service_spec();
2763 let backend_ip: IpAddr = "127.0.0.1".parse().unwrap();
2764 manager
2765 .publish_loopback_for_container(Some("dep-a"), "mixed", &spec, backend_ip, None)
2766 .await
2767 .expect("public-only spec publishes nothing and must not error");
2768
2769 assert!(manager.loopback_tcp.read().await.is_empty());
2771 assert!(manager.loopback_udp.read().await.is_empty());
2772 }
2773
2774 #[tokio::test]
2775 async fn test_registry_accessor() {
2776 let config = ProxyManagerConfig::default();
2777 let registry = Arc::new(ServiceRegistry::new());
2778 let manager = ProxyManager::new(config, registry.clone(), None);
2779
2780 assert_eq!(Arc::as_ptr(&manager.registry()), Arc::as_ptr(®istry));
2782 }
2783
2784 #[tokio::test]
2789 async fn test_published_port_ownership_rejects_cross_deployment() {
2790 let config = ProxyManagerConfig::default();
2791 let registry = Arc::new(ServiceRegistry::new());
2792 let manager = ProxyManager::new(config, registry, None);
2793
2794 let publish_port = reserve_free_tcp_port();
2796 let spec = mock_internal_tcp_spec(publish_port);
2797
2798 let backend_a: IpAddr = "10.0.0.1".parse().unwrap();
2800 let tgt_a = 5001u16;
2801 let backend_b: IpAddr = "10.0.0.2".parse().unwrap();
2802 let tgt_b = 5002u16;
2803
2804 manager
2806 .publish_loopback_for_container(Some("dep-a"), "svc", &spec, backend_a, Some(tgt_a))
2807 .await
2808 .expect("deployment A should claim the free port");
2809
2810 let err = manager
2812 .publish_loopback_for_container(Some("dep-b"), "svc", &spec, backend_b, Some(tgt_b))
2813 .await
2814 .expect_err("deployment B must be refused on an owned port");
2815 match err {
2816 crate::error::AgentError::PortConflict { port, .. } => {
2817 assert_eq!(port, publish_port);
2818 }
2819 other => panic!("expected PortConflict, got {other:?}"),
2820 }
2821
2822 let svc = manager
2825 .loopback_registry
2826 .resolve_tcp(publish_port)
2827 .expect("port should still be registered to deployment A");
2828 let expected_a = SocketAddr::new(backend_a, tgt_a);
2829 let foreign_b = SocketAddr::new(backend_b, tgt_b);
2830 assert_eq!(svc.backends, vec![expected_a]);
2831 assert!(
2832 !svc.backends.contains(&foreign_b),
2833 "deployment B's backend must NOT be cross-wired into the pool"
2834 );
2835 }
2836
2837 #[tokio::test]
2840 async fn test_published_port_same_owner_appends_replica() {
2841 let config = ProxyManagerConfig::default();
2842 let registry = Arc::new(ServiceRegistry::new());
2843 let manager = ProxyManager::new(config, registry, None);
2844
2845 let publish_port = reserve_free_tcp_port();
2846 let spec = mock_internal_tcp_spec(publish_port);
2847
2848 let replica1: IpAddr = "10.0.0.1".parse().unwrap();
2849 let replica2: IpAddr = "10.0.0.2".parse().unwrap();
2850 let target_port = 6000u16;
2851
2852 manager
2854 .publish_loopback_for_container(
2855 Some("dep-a"),
2856 "svc",
2857 &spec,
2858 replica1,
2859 Some(target_port),
2860 )
2861 .await
2862 .expect("first replica claims the port");
2863
2864 manager
2866 .publish_loopback_for_container(
2867 Some("dep-a"),
2868 "svc",
2869 &spec,
2870 replica2,
2871 Some(target_port),
2872 )
2873 .await
2874 .expect("same-owner second replica should be accepted");
2875
2876 let svc = manager
2877 .loopback_registry
2878 .resolve_tcp(publish_port)
2879 .expect("port should be registered");
2880 let b1 = SocketAddr::new(replica1, target_port);
2881 let b2 = SocketAddr::new(replica2, target_port);
2882 assert_eq!(svc.backends.len(), 2, "both replicas should be in the pool");
2883 assert!(svc.backends.contains(&b1));
2884 assert!(svc.backends.contains(&b2));
2885 }
2886
2887 #[tokio::test]
2891 async fn test_published_port_freed_on_unpublish() {
2892 let config = ProxyManagerConfig::default();
2893 let registry = Arc::new(ServiceRegistry::new());
2894 let manager = ProxyManager::new(config, registry, None);
2895
2896 let publish_port = reserve_free_tcp_port();
2897 let spec = mock_internal_tcp_spec(publish_port);
2898 let backend_a: IpAddr = "10.0.0.1".parse().unwrap();
2899 let target_port = 7000u16;
2900
2901 manager
2902 .publish_loopback_for_container(
2903 Some("dep-a"),
2904 "svc",
2905 &spec,
2906 backend_a,
2907 Some(target_port),
2908 )
2909 .await
2910 .expect("deployment A claims the port");
2911 assert!(manager
2912 .published_ports
2913 .read()
2914 .await
2915 .contains_key(&publish_port));
2916
2917 manager
2919 .unpublish_loopback_for_container(&spec, backend_a, Some(target_port))
2920 .await;
2921 assert!(
2922 !manager
2923 .published_ports
2924 .read()
2925 .await
2926 .contains_key(&publish_port),
2927 "ownership entry should be cleared once the last backend is gone"
2928 );
2929
2930 let backend_b: IpAddr = "10.0.0.2".parse().unwrap();
2932 manager
2933 .publish_loopback_for_container(
2934 Some("dep-b"),
2935 "svc",
2936 &spec,
2937 backend_b,
2938 Some(target_port),
2939 )
2940 .await
2941 .expect("freed port should be claimable by another deployment");
2942 }
2943
2944 #[tokio::test]
2945 #[allow(clippy::similar_names)]
2946 async fn test_start_ingress_is_idempotent() {
2947 let config = ProxyManagerConfig::default();
2948 let registry = Arc::new(ServiceRegistry::new());
2949 let manager = ProxyManager::new(config, registry, None);
2950
2951 let http_port = reserve_free_tcp_port();
2955 let https_port = reserve_free_tcp_port();
2956
2957 manager.start_ingress_on(http_port, https_port).await;
2958 assert!(
2960 manager.servers.read().await.contains_key(&http_port),
2961 "HTTP ingress should be registered"
2962 );
2963 assert!(
2964 manager.ingress_started.load(Ordering::SeqCst),
2965 "ingress_started flag should be set"
2966 );
2967 let count_after_first = manager.servers.read().await.len();
2968
2969 manager.start_ingress_on(http_port, https_port).await;
2972 assert_eq!(
2973 manager.servers.read().await.len(),
2974 count_after_first,
2975 "second start_ingress call must not register additional servers"
2976 );
2977 }
2978
2979 #[tokio::test]
2993 async fn test_http_ingress_carries_cert_manager_for_acme() {
2994 let tmp = tempfile::tempdir().unwrap();
2995 let cm = Arc::new(
2996 CertManager::new(tmp.path().to_string_lossy().into_owned(), None)
2997 .await
2998 .unwrap(),
2999 );
3000
3001 let config = ProxyManagerConfig::default();
3002 let registry = Arc::new(ServiceRegistry::new());
3003 let manager = ProxyManager::new(config, registry, Some(Arc::clone(&cm)));
3004
3005 let http_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 80);
3006 let http_server = manager.build_http_ingress_server(http_addr, None);
3007
3008 assert!(
3009 http_server.cert_manager().is_some(),
3010 "the :80 HTTP ingress server must carry the CertManager so ACME \
3011 HTTP-01 challenges (which always arrive on :80) are served, not 403'd"
3012 );
3013 }
3014
3015 #[tokio::test]
3019 async fn test_http_ingress_built_without_cert_manager() {
3020 let config = ProxyManagerConfig::default();
3021 let registry = Arc::new(ServiceRegistry::new());
3022 let manager = ProxyManager::new(config, registry, None);
3023
3024 let http_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 80);
3025 let http_server = manager.build_http_ingress_server(http_addr, None);
3026
3027 assert!(
3028 http_server.cert_manager().is_none(),
3029 "without a configured CertManager the :80 server carries none"
3030 );
3031 }
3032
3033 #[tokio::test]
3034 async fn publish_port_mapping_binds_explicit_host_port() {
3035 let config = ProxyManagerConfig::default();
3036 let registry = Arc::new(ServiceRegistry::new());
3037 let manager = ProxyManager::new(config, registry, None);
3038
3039 let port = {
3041 let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
3042 l.local_addr().unwrap().port()
3043 };
3044
3045 let mapping = PortMapping {
3046 host_port: Some(port),
3047 container_port: 8080,
3048 protocol: PortProtocol::Tcp,
3049 host_ip: "127.0.0.1".to_string(),
3050 };
3051 let backend: SocketAddr = "127.0.0.1:1".parse().unwrap();
3053
3054 let bound = manager
3055 .publish_port_mapping(Some("dep"), "svc", &mapping, backend)
3056 .await
3057 .expect("publish should succeed");
3058 assert_eq!(bound, port, "explicit host port must be honored");
3059
3060 let conn = tokio::net::TcpStream::connect(("127.0.0.1", bound)).await;
3062 assert!(conn.is_ok(), "expected a live host listener on {bound}");
3063 }
3064
3065 #[tokio::test]
3066 async fn publish_port_mapping_rejects_cross_owner_explicit_port() {
3067 let config = ProxyManagerConfig::default();
3068 let registry = Arc::new(ServiceRegistry::new());
3069 let manager = ProxyManager::new(config, registry, None);
3070
3071 let port = {
3073 let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
3074 l.local_addr().unwrap().port()
3075 };
3076
3077 let mapping = PortMapping {
3078 host_port: Some(port),
3079 container_port: 8080,
3080 protocol: PortProtocol::Tcp,
3081 host_ip: "127.0.0.1".to_string(),
3082 };
3083 let backend_a: SocketAddr = "127.0.0.1:1".parse().unwrap();
3084 let backend_b: SocketAddr = "127.0.0.1:2".parse().unwrap();
3085
3086 manager
3088 .publish_port_mapping(Some("depA"), "ownerA", &mapping, backend_a)
3089 .await
3090 .expect("owner A publish should succeed");
3091
3092 let err = manager
3094 .publish_port_mapping(Some("depB"), "ownerB", &mapping, backend_b)
3095 .await
3096 .expect_err("cross-owner explicit port must be refused");
3097 assert!(
3098 matches!(err, crate::error::AgentError::PortConflict { .. }),
3099 "expected PortConflict, got {err:?}"
3100 );
3101 }
3102
3103 #[tokio::test]
3104 async fn publish_port_mapping_ephemeral_returns_nonzero() {
3105 let config = ProxyManagerConfig::default();
3106 let registry = Arc::new(ServiceRegistry::new());
3107 let manager = ProxyManager::new(config, registry, None);
3108
3109 let mapping = PortMapping {
3110 host_port: None,
3111 container_port: 8080,
3112 protocol: PortProtocol::Tcp,
3113 host_ip: "127.0.0.1".to_string(),
3114 };
3115 let backend: SocketAddr = "127.0.0.1:1".parse().unwrap();
3116
3117 let bound = manager
3118 .publish_port_mapping(Some("dep"), "svc", &mapping, backend)
3119 .await
3120 .expect("ephemeral publish should succeed");
3121 assert_ne!(bound, 0, "ephemeral publish must resolve to a real port");
3122 }
3123
3124 struct FakeScaleTrigger {
3130 lb: Arc<LoadBalancer>,
3131 lb_key: String,
3132 calls: Arc<std::sync::Mutex<Vec<(String, u32)>>>,
3133 }
3134
3135 #[async_trait::async_trait]
3136 impl ScaleTrigger for FakeScaleTrigger {
3137 async fn scale_to(&self, service: &str, replicas: u32) -> std::result::Result<(), String> {
3138 self.calls
3139 .lock()
3140 .unwrap()
3141 .push((service.to_string(), replicas));
3142 self.lb.register(
3144 &self.lb_key,
3145 vec!["127.0.0.1:9".parse().unwrap()],
3146 LbStrategy::RoundRobin,
3147 );
3148 Ok(())
3149 }
3150 }
3151
3152 #[test]
3153 fn service_name_from_key_strips_deployment_and_endpoint() {
3154 assert_eq!(
3155 ServiceActivator::service_name_from_key("dep/api#http"),
3156 "api"
3157 );
3158 assert_eq!(ServiceActivator::service_name_from_key("api#http"), "api");
3159 assert_eq!(ServiceActivator::service_name_from_key("dep/api"), "api");
3160 assert_eq!(ServiceActivator::service_name_from_key("api"), "api");
3161 }
3162
3163 #[tokio::test]
3164 async fn service_activator_scales_and_waits_for_backend() {
3165 let lb = Arc::new(LoadBalancer::new());
3166 let lb_key = "dep/api#http".to_string();
3167 lb.register(&lb_key, vec![], LbStrategy::RoundRobin);
3169
3170 let calls = Arc::new(std::sync::Mutex::new(Vec::new()));
3171 let trigger = Arc::new(FakeScaleTrigger {
3172 lb: Arc::clone(&lb),
3173 lb_key: lb_key.clone(),
3174 calls: Arc::clone(&calls),
3175 });
3176 let activator = ServiceActivator::new(trigger, Arc::clone(&lb));
3177
3178 activator
3179 .activate(&lb_key)
3180 .await
3181 .expect("activation should succeed once a backend is registered");
3182
3183 let recorded = calls.lock().unwrap().clone();
3184 assert_eq!(
3185 recorded,
3186 vec![("api".to_string(), DEFAULT_ACTIVATION_FLOOR)],
3187 "scale_to should be called with the bare service name and the floor"
3188 );
3189 }
3190
3191 #[tokio::test]
3192 async fn rps_registry_provider_reflects_recorded_requests() {
3193 let reg = Arc::new(RpsRegistry::new());
3194 let provider = RpsRegistryProvider::new(Arc::clone(®));
3195
3196 assert!((provider.rps("svc").await - 0.0).abs() < f64::EPSILON);
3197
3198 reg.record("svc").await;
3199 reg.record("svc").await;
3200 assert!(
3201 provider.rps("svc").await > 0.0,
3202 "provider must reflect recorded requests"
3203 );
3204 }
3205
3206 #[tokio::test]
3207 async fn rps_registry_accessor_is_shared() {
3208 let config = ProxyManagerConfig::default();
3209 let registry = Arc::new(ServiceRegistry::new());
3210 let manager = ProxyManager::new(config, registry, None);
3211
3212 let reg = manager.rps_registry();
3213 reg.record("svc").await;
3214 let provider = manager.rps_provider();
3216 assert!(provider.rps("svc").await > 0.0);
3217 }
3218
3219 #[tokio::test]
3220 async fn set_activator_is_installed() {
3221 let config = ProxyManagerConfig::default();
3222 let registry = Arc::new(ServiceRegistry::new());
3223 let manager = ProxyManager::new(config, registry, None);
3224
3225 assert!(manager.current_activator().await.is_none());
3226
3227 let lb = manager.load_balancer();
3228 let trigger = Arc::new(FakeScaleTrigger {
3229 lb: Arc::clone(&lb),
3230 lb_key: "dep/api#http".to_string(),
3231 calls: Arc::new(std::sync::Mutex::new(Vec::new())),
3232 });
3233 manager.install_service_activator(trigger).await;
3234
3235 assert!(
3236 manager.current_activator().await.is_some(),
3237 "activator should be installed after install_service_activator"
3238 );
3239 }
3240
3241 fn cert_test_endpoint(
3244 protocol: Protocol,
3245 expose: ExposeType,
3246 host: Option<&str>,
3247 ) -> zlayer_spec::EndpointSpec {
3248 zlayer_spec::EndpointSpec {
3249 name: "web".to_string(),
3250 protocol,
3251 port: 443,
3252 target_port: None,
3253 path: None,
3254 host: host.map(str::to_string),
3255 expose,
3256 stream: None,
3257 target_role: None,
3258 tunnel: None,
3259 }
3260 }
3261
3262 #[test]
3263 fn endpoint_wants_managed_cert_gates_correctly() {
3264 assert_eq!(
3266 ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3267 Protocol::Https,
3268 ExposeType::Public,
3269 Some("console.zatabase.io"),
3270 )),
3271 Some("console.zatabase.io")
3272 );
3273
3274 assert_eq!(
3276 ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3277 Protocol::Https,
3278 ExposeType::Public,
3279 Some(" console.zatabase.io "),
3280 )),
3281 Some("console.zatabase.io")
3282 );
3283
3284 assert!(
3286 ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3287 Protocol::Https,
3288 ExposeType::Internal,
3289 Some("console.zatabase.io"),
3290 ))
3291 .is_none()
3292 );
3293
3294 for proto in [
3296 Protocol::Http,
3297 Protocol::Websocket,
3298 Protocol::Tcp,
3299 Protocol::Udp,
3300 ] {
3301 assert!(
3302 ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3303 proto,
3304 ExposeType::Public,
3305 Some("console.zatabase.io"),
3306 ))
3307 .is_none()
3308 );
3309 }
3310
3311 assert!(
3313 ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3314 Protocol::Https,
3315 ExposeType::Public,
3316 None,
3317 ))
3318 .is_none()
3319 );
3320
3321 assert!(
3323 ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3324 Protocol::Https,
3325 ExposeType::Public,
3326 Some(" "),
3327 ))
3328 .is_none()
3329 );
3330
3331 for wildcard in ["*", "*.zatabase.io", "console.*.io"] {
3333 assert!(
3334 ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3335 Protocol::Https,
3336 ExposeType::Public,
3337 Some(wildcard),
3338 ))
3339 .is_none()
3340 );
3341 }
3342 }
3343
3344 #[tokio::test]
3345 async fn sni_resolver_accessor_is_shared_and_stable() {
3346 let config = ProxyManagerConfig::default();
3347 let registry = Arc::new(ServiceRegistry::new());
3348 let manager = ProxyManager::new(config, registry, None);
3349
3350 let a = manager.sni_resolver();
3353 let b = manager.sni_resolver();
3354 assert!(
3355 Arc::ptr_eq(&a, &b),
3356 "sni_resolver() must return clones of one shared resolver"
3357 );
3358 assert_eq!(a.cert_count(), 0);
3359 }
3360
3361 #[tokio::test]
3362 async fn add_service_without_cert_manager_does_not_panic_or_provision() {
3363 let config = ProxyManagerConfig::default();
3366 let registry = Arc::new(ServiceRegistry::new());
3367 let manager = ProxyManager::new(config, registry, None);
3368
3369 let spec = serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3370 r"
3371version: v1
3372deployment: test
3373services:
3374 web:
3375 rtype: service
3376 image:
3377 name: test:latest
3378 endpoints:
3379 - name: web
3380 protocol: https
3381 port: 443
3382 host: console.zatabase.io
3383 expose: public
3384",
3385 )
3386 .unwrap()
3387 .services
3388 .remove("web")
3389 .unwrap();
3390
3391 manager.add_service("web", &spec).await;
3392 assert!(manager.has_service("web").await);
3394 assert_eq!(manager.sni_resolver().cert_count(), 0);
3396 }
3397}