1use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6 DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType, ServiceSpec};
24
25pub struct ServiceInstance {
27 pub service_name: String,
28 pub spec: ServiceSpec,
29 runtime: Arc<dyn Runtime + Send + Sync>,
30 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33 proxy_manager: Option<Arc<ProxyManager>>,
35 dns_server: Option<Arc<DnsServer>>,
37 health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
39 last_pulled_digest: tokio::sync::RwLock<Option<String>>,
44 node_id: u64,
49}
50
51impl ServiceInstance {
52 pub fn new(
54 service_name: String,
55 spec: ServiceSpec,
56 runtime: Arc<dyn Runtime + Send + Sync>,
57 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
58 ) -> Self {
59 Self {
60 service_name,
61 spec,
62 runtime,
63 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
64 overlay_manager,
65 proxy_manager: None,
66 dns_server: None,
67 health_states: None,
68 last_pulled_digest: tokio::sync::RwLock::new(None),
69 node_id: 0,
70 }
71 }
72
73 pub fn with_proxy(
75 service_name: String,
76 spec: ServiceSpec,
77 runtime: Arc<dyn Runtime + Send + Sync>,
78 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
79 proxy_manager: Arc<ProxyManager>,
80 ) -> Self {
81 Self {
82 service_name,
83 spec,
84 runtime,
85 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
86 overlay_manager,
87 proxy_manager: Some(proxy_manager),
88 dns_server: None,
89 health_states: None,
90 last_pulled_digest: tokio::sync::RwLock::new(None),
91 node_id: 0,
92 }
93 }
94
95 pub fn set_node_id(&mut self, node_id: u64) {
100 self.node_id = node_id;
101 }
102
103 #[must_use]
112 pub fn role_for_replica(&self, replica_idx: u32) -> String {
113 let Some(groups) = self.spec.replica_groups.as_ref() else {
114 return "default".to_string();
115 };
116 let mut cumulative = 0u32;
117 for group in groups {
118 cumulative = cumulative.saturating_add(group.count);
119 if replica_idx <= cumulative {
120 return group.role.clone();
121 }
122 }
123 "default".to_string()
124 }
125
126 #[must_use]
128 pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
129 self.dns_server = Some(dns_server);
130 self
131 }
132
133 pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
135 self.dns_server = Some(dns_server);
136 }
137
138 pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
140 self.proxy_manager = Some(proxy_manager);
141 }
142
143 pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
145 self.health_states = Some(states);
146 }
147
148 pub async fn last_pulled_digest(&self) -> Option<String> {
152 self.last_pulled_digest.read().await.clone()
153 }
154
155 async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
166 let image_str = self.spec.image.name.to_string();
167 let policy = self.spec.image.pull_policy;
168
169 self.runtime
170 .pull_image_with_policy(
171 &image_str,
172 policy,
173 None,
174 self.spec.image.source_policy.unwrap_or_default(),
175 )
176 .await
177 .map_err(|e| AgentError::PullFailed {
178 image: self.spec.image.name.to_string(),
179 reason: e.to_string(),
180 })?;
181
182 let new_digest = match self.runtime.list_images().await {
188 Ok(images) => images
189 .into_iter()
190 .find(|info| info.reference == image_str)
191 .and_then(|info| info.digest),
192 Err(e) => {
193 tracing::debug!(
194 image = %image_str,
195 error = %e,
196 "list_images unavailable; cannot record post-pull digest"
197 );
198 None
199 }
200 };
201
202 if let Some(ref digest) = new_digest {
203 *self.last_pulled_digest.write().await = Some(digest.clone());
204 }
205
206 Ok(new_digest)
207 }
208
209 #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
218 pub async fn scale_to(&self, replicas: u32) -> Result<()> {
219 let current_replicas = { self.containers.read().await.len() as u32 }; if replicas >= current_replicas {
231 let _ = self.pull_and_refresh_digest().await?;
232 }
233
234 let local_node_id = self.node_id;
243 if replicas > current_replicas {
244 let replica_specs: Vec<(String, u32)> =
245 if let Some(groups) = self.spec.replica_groups.as_ref() {
246 let mut specs: Vec<(String, u32)> = Vec::new();
247 for group in groups {
248 for idx in 0..group.count {
249 specs.push((group.role.clone(), idx + 1));
250 }
251 }
252 specs
253 .into_iter()
254 .skip(current_replicas as usize)
255 .take((replicas - current_replicas) as usize)
256 .collect()
257 } else {
258 (current_replicas..replicas)
259 .map(|i| ("default".to_string(), i + 1))
260 .collect()
261 };
262
263 for (role, replica_idx) in replica_specs {
264 let id = ContainerId::with_role_and_node(
265 self.service_name.clone(),
266 replica_idx,
267 role,
268 local_node_id,
269 );
270
271 self.runtime
279 .create_container(&id, &self.spec)
280 .await
281 .map_err(|e| match e {
282 AgentError::RouteToPeer { .. } => e,
283 other => AgentError::CreateFailed {
284 id: id.to_string(),
285 reason: other.to_string(),
286 },
287 })?;
288
289 let init_orchestrator = InitOrchestrator::with_error_policy(
291 id.clone(),
292 self.spec.init.clone(),
293 self.spec.errors.clone(),
294 );
295 init_orchestrator.run().await?;
296
297 self.runtime
299 .start_container(&id)
300 .await
301 .map_err(|e| AgentError::StartFailed {
302 id: id.to_string(),
303 reason: e.to_string(),
304 })?;
305
306 let mut container_pid = None;
308 for attempt in 1..=5u32 {
309 match self.runtime.get_container_pid(&id).await {
310 Ok(Some(pid)) => {
311 container_pid = Some(pid);
312 break;
313 }
314 Ok(None) if attempt < 5 => {
315 tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
316 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
317 }
318 Ok(None) => {
319 tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
320 }
321 Err(e) => {
322 tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
323 if attempt < 5 {
324 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
325 }
326 }
327 }
328 }
329
330 if container_pid.is_some() {
337 let alive = match self.runtime.container_state(&id).await {
338 Ok(
339 ContainerState::Running
340 | ContainerState::Pending
341 | ContainerState::Initializing,
342 ) => true,
343 Ok(state) => {
344 tracing::warn!(
345 container = %id,
346 ?state,
347 "container exited before overlay attach could run"
348 );
349 false
350 }
351 Err(e) => {
352 tracing::warn!(
356 container = %id,
357 error = %e,
358 "container state query failed before overlay attach, proceeding"
359 );
360 true
361 }
362 };
363 if !alive {
364 let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
365 || " <log read failed>".to_string(),
366 |entries| {
367 if entries.is_empty() {
368 " <no log output>".to_string()
369 } else {
370 entries
371 .into_iter()
372 .map(|e| format!(" {}", e.message))
373 .collect::<Vec<_>>()
374 .join("\n")
375 }
376 },
377 );
378 return Err(AgentError::StartFailed {
379 id: id.to_string(),
380 reason: format!("container exited during startup:\n{log_tail}"),
381 });
382 }
383 }
384
385 let overlay_ip = if let Some(overlay) = &self.overlay_manager {
395 let overlay_guard = overlay.read().await;
396 #[cfg(target_os = "windows")]
397 let attach_result: Option<std::net::IpAddr> = {
398 let _ = (container_pid, &overlay_guard); match self.runtime.get_container_ip(&id).await {
404 Ok(Some(ip)) => Some(ip),
405 Ok(None) => {
406 tracing::debug!(
407 container = %id,
408 "no overlay IP recorded for container (overlay attach skipped at create time)"
409 );
410 None
411 }
412 Err(e) => {
413 tracing::warn!(
414 container = %id,
415 error = %e,
416 "failed to fetch container overlay IP"
417 );
418 None
419 }
420 }
421 };
422 #[cfg(not(target_os = "windows"))]
423 let attach_result: Option<std::net::IpAddr> = {
424 match self.runtime.overlay_attach_kind() {
425 crate::runtime::OverlayAttachKind::InGuestVsock => {
430 let cid = id.to_string();
431 match overlay_guard
432 .attach_container_guest(&cid, &self.service_name, true)
433 .await
434 {
435 Ok(cfg) => {
436 let ip = cfg.overlay_ip;
437 match self.runtime.push_overlay_config(&id, &cfg).await {
438 Ok(()) => Some(ip),
439 Err(e) => {
440 tracing::warn!(
441 container = %id,
442 error = %e,
443 "failed to push overlay config into guest; rolling back allocation"
444 );
445 if let Err(de) =
447 overlay_guard.detach_container_guest(&cid).await
448 {
449 tracing::warn!(
450 container = %id,
451 error = %de,
452 "failed to roll back guest overlay allocation"
453 );
454 }
455 None
456 }
457 }
458 }
459 Err(e) => {
460 tracing::warn!(
461 container = %id,
462 error = %e,
463 "failed to allocate guest overlay config from overlayd"
464 );
465 None
466 }
467 }
468 }
469 _ => {
472 if let Some(pid) = container_pid {
473 match overlay_guard
474 .attach_container(pid, &self.service_name, true)
475 .await
476 {
477 Ok(ip) => Some(ip),
478 Err(e) => {
479 tracing::warn!(
480 container = %id,
481 error = %e,
482 "failed to attach container to overlay network"
483 );
484 None
485 }
486 }
487 } else {
488 tracing::debug!(
490 container = %id,
491 "skipping overlay attachment - no PID available"
492 );
493 None
494 }
495 }
496 }
497 };
498
499 if let Some(ip) = attach_result {
500 tracing::info!(
501 container = %id,
502 overlay_ip = %ip,
503 "attached container to overlay network"
504 );
505
506 if let Some(dns) = &self.dns_server {
508 match dns.add_record(&self.service_name, ip).await {
517 Ok(()) => tracing::debug!(
518 hostname = %self.service_name,
519 ip = %ip,
520 "registered bare service-name DNS (compose discovery)"
521 ),
522 Err(e) => tracing::warn!(
523 hostname = %self.service_name,
524 error = %e,
525 "failed to register bare service-name DNS"
526 ),
527 }
528
529 let service_hostname = format!("{}.service.local", self.service_name);
531
532 let replica_hostname =
534 format!("{}.{}.service.local", id.replica, self.service_name);
535
536 match dns.add_record(&service_hostname, ip).await {
537 Ok(()) => tracing::debug!(
538 hostname = %service_hostname,
539 ip = %ip,
540 "registered DNS for service"
541 ),
542 Err(e) => tracing::warn!(
543 hostname = %service_hostname,
544 error = %e,
545 "failed to register DNS for service"
546 ),
547 }
548
549 if let Err(e) = dns.add_record(&replica_hostname, ip).await {
551 tracing::warn!(
552 hostname = %replica_hostname,
553 error = %e,
554 "failed to register replica DNS"
555 );
556 } else {
557 tracing::debug!(
558 hostname = %replica_hostname,
559 ip = %ip,
560 "registered DNS for replica"
561 );
562 }
563
564 if id.role != "default" {
570 let role_hostname =
571 format!("{}.{}.service.local", id.role, self.service_name);
572 match dns.add_record(&role_hostname, ip).await {
573 Ok(()) => tracing::debug!(
574 hostname = %role_hostname,
575 ip = %ip,
576 role = %id.role,
577 "registered DNS for replica group role"
578 ),
579 Err(e) => tracing::warn!(
580 hostname = %role_hostname,
581 error = %e,
582 "failed to register role DNS"
583 ),
584 }
585 }
586 }
587
588 Some(ip)
589 } else {
590 None
591 }
592 } else {
593 None
594 };
595
596 let effective_ip = if overlay_ip.is_none() {
598 match self.runtime.get_container_ip(&id).await {
599 Ok(Some(ip)) => {
600 tracing::info!(
601 container = %id,
602 ip = %ip,
603 "using runtime container IP for proxy (overlay unavailable)"
604 );
605 Some(ip)
606 }
607 Ok(None) => {
608 tracing::warn!(
609 container = %id,
610 "no container IP available from runtime, proxy routing will be unavailable"
611 );
612 None
613 }
614 Err(e) => {
615 tracing::warn!(
616 container = %id,
617 error = %e,
618 "failed to get container IP from runtime"
619 );
620 None
621 }
622 }
623 } else {
624 overlay_ip
625 };
626
627 tracing::info!(
628 container = %id,
629 service = %self.service_name,
630 overlay_ip = ?overlay_ip,
631 effective_ip = ?effective_ip,
632 "Container IP resolution complete"
633 );
634
635 let port_override = match self.runtime.get_container_port_override(&id).await {
640 Ok(Some(port)) => {
641 tracing::info!(
642 container = %id,
643 port = port,
644 "runtime assigned dynamic port override for this container"
645 );
646 Some(port)
647 }
648 Ok(None) => None,
649 Err(e) => {
650 tracing::warn!(
651 container = %id,
652 error = %e,
653 "failed to query port override from runtime, using spec port"
654 );
655 None
656 }
657 };
658
659 let health_monitor_handle = {
661 let mut check = self.spec.health.check.clone();
662
663 if let HealthCheck::Tcp { ref mut port } = check {
667 if *port == 0 {
668 *port = port_override.unwrap_or_else(|| {
669 self.spec
670 .endpoints
671 .iter()
672 .find(|ep| {
673 matches!(
674 ep.protocol,
675 Protocol::Http | Protocol::Https | Protocol::Websocket
676 )
677 })
678 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
679 });
680 }
681 }
682
683 let start_grace = self
684 .spec
685 .health
686 .start_grace
687 .unwrap_or(Duration::from_secs(5));
688 let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
689 let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
690 let retries = self.spec.health.retries;
691
692 let checker = HealthChecker::new(check, effective_ip);
693 let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
694 .with_start_grace(start_grace)
695 .with_check_timeout(check_timeout);
696
697 let proxy_backend: Option<(Arc<ProxyManager>, SocketAddr)> =
703 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
704 let proxy = Arc::clone(proxy);
705 let port = port_override.unwrap_or_else(|| {
710 self.spec
711 .endpoints
712 .iter()
713 .find(|ep| {
714 matches!(
715 ep.protocol,
716 Protocol::Http | Protocol::Https | Protocol::Websocket
717 )
718 })
719 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
720 });
721
722 let backend_addr = SocketAddr::new(ip, port);
723
724 proxy.add_backend(&self.service_name, backend_addr).await;
728
729 if self.spec.publish_to_node_loopback() {
739 proxy
740 .publish_loopback_for_container(
741 &self.service_name,
742 &self.spec,
743 ip,
744 port_override,
745 )
746 .await;
747 }
748
749 Some((proxy, backend_addr))
750 } else {
751 None
752 };
753
754 let health_states_opt = self.health_states.clone();
761 let svc_name_for_states = self.service_name.clone();
762 let svc_name_for_proxy = self.service_name.clone();
763 let svc_name_for_log = self.service_name.clone();
764
765 let health_callback: HealthCallback =
766 Arc::new(move |container_id: ContainerId, is_healthy: bool| {
767 tracing::info!(
768 container = %container_id,
769 service = %svc_name_for_log,
770 healthy = is_healthy,
771 has_proxy_backend = proxy_backend.is_some(),
772 "health status changed"
773 );
774
775 if let Some(ref health_states) = health_states_opt {
778 let states = Arc::clone(health_states);
779 let svc = svc_name_for_states.clone();
780 tokio::spawn(async move {
781 let state = if is_healthy {
782 HealthState::Healthy
783 } else {
784 HealthState::Unhealthy {
785 failures: 0,
786 reason: "health check failed".into(),
787 }
788 };
789 states.write().await.insert(svc, state);
790 });
791 }
792
793 if let Some((proxy, backend_addr)) = proxy_backend.clone() {
796 let svc = svc_name_for_proxy.clone();
797 tokio::spawn(async move {
798 proxy
799 .update_backend_health(&svc, backend_addr, is_healthy)
800 .await;
801 });
802 }
803 });
804
805 monitor = monitor.with_callback(health_callback);
806
807 monitor.start()
808 };
809
810 {
812 let mut containers = self.containers.write().await;
813 containers.insert(
814 id.clone(),
815 Container {
816 id: id.clone(),
817 image: self.spec.image.name.to_string(),
818 state: ContainerState::Running,
819 pid: None,
820 task: None,
821 overlay_ip: effective_ip,
822 health_monitor: Some(health_monitor_handle),
823 port_override,
824 },
825 );
826 } }
828 }
829
830 if replicas < current_replicas {
838 for i in replicas..current_replicas {
839 let replica_idx = i + 1;
840 let id = ContainerId::with_role_and_node(
841 self.service_name.clone(),
842 replica_idx,
843 self.role_for_replica(replica_idx),
844 local_node_id,
845 );
846
847 let removed_container = {
849 let mut containers = self.containers.write().await;
850 containers.remove(&id)
851 }; if let Some(container) = removed_container {
855 if let Some(handle) = container.health_monitor {
857 handle.abort();
858 }
859
860 if self.spec.publish_to_node_loopback() {
866 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip)
867 {
868 proxy
869 .unpublish_loopback_for_container(
870 &self.spec,
871 ip,
872 container.port_override,
873 )
874 .await;
875 }
876 }
877
878 if let Some(dns) = &self.dns_server {
880 let replica_hostname =
882 format!("{}.{}.service.local", id.replica, self.service_name);
883 if let Err(e) = dns.remove_record(&replica_hostname).await {
884 tracing::warn!(
885 hostname = %replica_hostname,
886 error = %e,
887 "failed to remove replica DNS record"
888 );
889 } else {
890 tracing::debug!(
891 hostname = %replica_hostname,
892 "removed replica DNS record"
893 );
894 }
895
896 if id.role != "default" {
906 let role_hostname =
907 format!("{}.{}.service.local", id.role, self.service_name);
908 if let Err(e) = dns.remove_record(&role_hostname).await {
909 tracing::warn!(
910 hostname = %role_hostname,
911 error = %e,
912 "failed to remove role DNS record"
913 );
914 } else {
915 tracing::debug!(
916 hostname = %role_hostname,
917 "removed role DNS record"
918 );
919 }
920 }
921
922 }
926
927 if let Some(overlay) = &self.overlay_manager {
943 if self.runtime.overlay_attach_kind()
947 == crate::runtime::OverlayAttachKind::InGuestVsock
948 {
949 let overlay_guard = overlay.read().await;
950 if let Err(e) =
951 overlay_guard.detach_container_guest(&id.to_string()).await
952 {
953 tracing::warn!(
954 container = %id,
955 error = %e,
956 "overlay detach_container_guest failed; relying on orphan sweep"
957 );
958 }
959 } else {
960 match self.runtime.get_container_pid(&id).await {
961 Ok(Some(pid)) => {
962 let overlay_guard = overlay.read().await;
963 if let Err(e) = overlay_guard.detach_container(pid).await {
964 tracing::warn!(
965 container = %id,
966 pid,
967 error = %e,
968 "overlay detach_container failed; relying on orphan sweep"
969 );
970 }
971 }
972 Ok(None) => {
973 tracing::debug!(
974 container = %id,
975 "no PID available for overlay detach (already exited or non-Linux runtime)"
976 );
977 }
978 Err(e) => {
979 tracing::warn!(
980 container = %id,
981 error = %e,
982 "failed to query container PID for overlay detach"
983 );
984 }
985 }
986 }
987 }
988
989 self.runtime
991 .stop_container(&id, Duration::from_secs(30))
992 .await?;
993
994 if let Err(e) = self.runtime.sync_container_volumes(&id).await {
996 tracing::warn!(
997 container = %id,
998 error = %e,
999 "failed to sync volumes before removal"
1000 );
1001 }
1002
1003 self.runtime.remove_container(&id).await?;
1005 }
1006 }
1007 }
1008
1009 Ok(())
1010 }
1011
1012 pub async fn replica_count(&self) -> usize {
1014 self.containers.read().await.len()
1015 }
1016
1017 pub async fn container_ids(&self) -> Vec<ContainerId> {
1019 self.containers.read().await.keys().cloned().collect()
1020 }
1021
1022 pub async fn container_infos(&self) -> Vec<ContainerInfo> {
1029 self.containers
1030 .read()
1031 .await
1032 .values()
1033 .map(|c| ContainerInfo {
1034 id: c.id.clone(),
1035 image: c.image.clone(),
1036 state: c.state.as_str().to_string(),
1037 pid: c.pid,
1038 overlay_ip: c.overlay_ip.map(|ip| ip.to_string()),
1039 })
1040 .collect()
1041 }
1042
1043 pub fn containers(
1048 &self,
1049 ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
1050 &self.containers
1051 }
1052
1053 pub fn has_overlay_manager(&self) -> bool {
1055 self.overlay_manager.is_some()
1056 }
1057
1058 pub fn has_proxy_manager(&self) -> bool {
1060 self.proxy_manager.is_some()
1061 }
1062
1063 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1065 self.proxy_manager.as_ref()
1066 }
1067
1068 pub fn has_dns_server(&self) -> bool {
1070 self.dns_server.is_some()
1071 }
1072}
1073
1074#[derive(Debug, Clone)]
1080pub struct ContainerInfo {
1081 pub id: ContainerId,
1083 pub image: String,
1085 pub state: String,
1087 pub pid: Option<u32>,
1089 pub overlay_ip: Option<String>,
1091}
1092
1093#[derive(Debug, Clone)]
1096pub struct DeploymentContainerView {
1097 pub deployment: Option<String>,
1099 pub service: String,
1101 pub container_id: ContainerId,
1103 pub container_name: Option<String>,
1105 pub image: String,
1107 pub state: String,
1109 pub pid: Option<u32>,
1111 pub ports: Vec<zlayer_spec::PortMapping>,
1113}
1114
1115pub struct ServiceManager {
1117 runtime: Arc<dyn Runtime + Send + Sync>,
1118 services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
1119 scale_semaphore: Arc<Semaphore>,
1120 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1122 stream_registry: Option<Arc<StreamRegistry>>,
1124 proxy_manager: Option<Arc<ProxyManager>>,
1126 dns_server: Option<Arc<DnsServer>>,
1128 deployment_name: Option<String>,
1130 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
1132 job_executor: Option<Arc<JobExecutor>>,
1134 cron_scheduler: Option<Arc<CronScheduler>>,
1136 container_supervisor: Option<Arc<ContainerSupervisor>>,
1138 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1143}
1144
1145pub struct ServiceManagerBuilder {
1163 runtime: Arc<dyn Runtime + Send + Sync>,
1164 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1165 proxy_manager: Option<Arc<ProxyManager>>,
1166 stream_registry: Option<Arc<StreamRegistry>>,
1167 dns_server: Option<Arc<DnsServer>>,
1168 deployment_name: Option<String>,
1169 job_executor: Option<Arc<JobExecutor>>,
1170 cron_scheduler: Option<Arc<CronScheduler>>,
1171 container_supervisor: Option<Arc<ContainerSupervisor>>,
1172 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1173}
1174
1175impl ServiceManagerBuilder {
1176 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1178 Self {
1179 runtime,
1180 overlay_manager: None,
1181 proxy_manager: None,
1182 stream_registry: None,
1183 dns_server: None,
1184 deployment_name: None,
1185 job_executor: None,
1186 cron_scheduler: None,
1187 container_supervisor: None,
1188 cluster: None,
1189 }
1190 }
1191
1192 #[must_use]
1194 pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
1195 self.overlay_manager = Some(om);
1196 self
1197 }
1198
1199 #[must_use]
1201 pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
1202 self.proxy_manager = Some(pm);
1203 self
1204 }
1205
1206 #[must_use]
1208 pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
1209 self.stream_registry = Some(sr);
1210 self
1211 }
1212
1213 #[must_use]
1215 pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1216 self.dns_server = Some(dns);
1217 self
1218 }
1219
1220 #[must_use]
1222 pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
1223 self.deployment_name = Some(name.into());
1224 self
1225 }
1226
1227 #[must_use]
1229 pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
1230 self.job_executor = Some(je);
1231 self
1232 }
1233
1234 #[must_use]
1236 pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
1237 self.cron_scheduler = Some(cs);
1238 self
1239 }
1240
1241 #[must_use]
1243 pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
1244 self.container_supervisor = Some(cs);
1245 self
1246 }
1247
1248 #[must_use]
1253 pub fn cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1254 self.cluster = Some(cluster);
1255 self
1256 }
1257
1258 pub fn build(self) -> ServiceManager {
1263 if self.proxy_manager.is_none() {
1264 tracing::warn!("ServiceManager built without proxy_manager");
1265 }
1266 if self.stream_registry.is_none() {
1267 tracing::warn!("ServiceManager built without stream_registry");
1268 }
1269 if self.container_supervisor.is_none() {
1270 tracing::warn!("ServiceManager built without container_supervisor");
1271 }
1272 if self.deployment_name.is_none() {
1273 tracing::warn!("ServiceManager built without deployment_name");
1274 }
1275
1276 ServiceManager {
1277 runtime: self.runtime,
1278 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1279 scale_semaphore: Arc::new(Semaphore::new(10)),
1280 overlay_manager: self.overlay_manager,
1281 stream_registry: self.stream_registry,
1282 proxy_manager: self.proxy_manager,
1283 dns_server: self.dns_server,
1284 deployment_name: self.deployment_name,
1285 health_states: Arc::new(RwLock::new(HashMap::new())),
1286 job_executor: self.job_executor,
1287 cron_scheduler: self.cron_scheduler,
1288 container_supervisor: self.container_supervisor,
1289 cluster: self.cluster,
1290 }
1291 }
1292}
1293
1294impl ServiceManager {
1295 pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
1308 ServiceManagerBuilder::new(runtime)
1309 }
1310
1311 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1313 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1314 Self {
1315 runtime,
1316 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1317 scale_semaphore: Arc::new(Semaphore::new(10)), overlay_manager: None,
1319 stream_registry: None,
1320 proxy_manager: None,
1321 dns_server: None,
1322 deployment_name: None,
1323 health_states: Arc::new(RwLock::new(HashMap::new())),
1324 job_executor: None,
1325 cron_scheduler: None,
1326 container_supervisor: None,
1327 cluster: None,
1328 }
1329 }
1330
1331 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1333 pub fn with_overlay(
1334 runtime: Arc<dyn Runtime + Send + Sync>,
1335 overlay_manager: Arc<RwLock<OverlayManager>>,
1336 ) -> Self {
1337 Self {
1338 runtime,
1339 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1340 scale_semaphore: Arc::new(Semaphore::new(10)),
1341 overlay_manager: Some(overlay_manager),
1342 stream_registry: None,
1343 proxy_manager: None,
1344 dns_server: None,
1345 deployment_name: None,
1346 health_states: Arc::new(RwLock::new(HashMap::new())),
1347 job_executor: None,
1348 cron_scheduler: None,
1349 container_supervisor: None,
1350 cluster: None,
1351 }
1352 }
1353
1354 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1356 pub fn with_full_config(
1357 runtime: Arc<dyn Runtime + Send + Sync>,
1358 overlay_manager: Arc<RwLock<OverlayManager>>,
1359 deployment_name: String,
1360 ) -> Self {
1361 Self {
1362 runtime,
1363 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1364 scale_semaphore: Arc::new(Semaphore::new(10)),
1365 overlay_manager: Some(overlay_manager),
1366 stream_registry: None,
1367 proxy_manager: None,
1368 dns_server: None,
1369 deployment_name: Some(deployment_name),
1370 health_states: Arc::new(RwLock::new(HashMap::new())),
1371 job_executor: None,
1372 cron_scheduler: None,
1373 container_supervisor: None,
1374 cluster: None,
1375 }
1376 }
1377
1378 pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
1380 Arc::clone(&self.health_states)
1381 }
1382
1383 pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1385 let mut states = self.health_states.write().await;
1386 states.insert(service_name.to_string(), state);
1387 }
1388
1389 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1391 pub fn set_deployment_name(&mut self, name: String) {
1392 self.deployment_name = Some(name);
1393 }
1394
1395 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1397 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1398 self.stream_registry = Some(registry);
1399 }
1400
1401 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1403 #[must_use]
1404 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1405 self.stream_registry = Some(registry);
1406 self
1407 }
1408
1409 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1411 self.stream_registry.as_ref()
1412 }
1413
1414 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1416 pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1417 self.overlay_manager = Some(manager);
1418 }
1419
1420 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1422 pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1423 self.proxy_manager = Some(proxy);
1424 }
1425
1426 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1428 #[must_use]
1429 pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1430 self.proxy_manager = Some(proxy);
1431 self
1432 }
1433
1434 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1436 self.proxy_manager.as_ref()
1437 }
1438
1439 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1441 pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1442 self.dns_server = Some(dns);
1443 }
1444
1445 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1447 #[must_use]
1448 pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1449 self.dns_server = Some(dns);
1450 self
1451 }
1452
1453 pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1455 self.dns_server.as_ref()
1456 }
1457
1458 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1460 pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1461 self.job_executor = Some(executor);
1462 }
1463
1464 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1466 pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1467 self.cron_scheduler = Some(scheduler);
1468 }
1469
1470 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1472 #[must_use]
1473 pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1474 self.job_executor = Some(executor);
1475 self
1476 }
1477
1478 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1480 #[must_use]
1481 pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1482 self.cron_scheduler = Some(scheduler);
1483 self
1484 }
1485
1486 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1488 pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
1489 self.cluster = Some(cluster);
1490 }
1491
1492 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1494 #[must_use]
1495 pub fn with_cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1496 self.cluster = Some(cluster);
1497 self
1498 }
1499
1500 pub fn cluster(&self) -> Option<&Arc<dyn zlayer_scheduler::cluster::Cluster>> {
1502 self.cluster.as_ref()
1503 }
1504
1505 pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1507 self.job_executor.as_ref()
1508 }
1509
1510 pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1512 self.cron_scheduler.as_ref()
1513 }
1514
1515 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1517 pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1518 self.container_supervisor = Some(supervisor);
1519 }
1520
1521 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1523 #[must_use]
1524 pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1525 self.container_supervisor = Some(supervisor);
1526 self
1527 }
1528
1529 pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1531 self.container_supervisor.as_ref()
1532 }
1533
1534 pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1545 let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1546 AgentError::Configuration("Container supervisor not configured".to_string())
1547 })?;
1548
1549 let supervisor = Arc::clone(supervisor);
1550 Ok(tokio::spawn(async move {
1551 supervisor.run_loop().await;
1552 }))
1553 }
1554
1555 pub fn shutdown_container_supervisor(&self) {
1557 if let Some(supervisor) = &self.container_supervisor {
1558 supervisor.shutdown();
1559 }
1560 }
1561
1562 pub async fn get_container_supervised_state(
1564 &self,
1565 container_id: &ContainerId,
1566 ) -> Option<SupervisedState> {
1567 if let Some(supervisor) = &self.container_supervisor {
1568 supervisor.get_state(container_id).await
1569 } else {
1570 None
1571 }
1572 }
1573
1574 pub async fn take_supervisor_events(
1578 &self,
1579 ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1580 if let Some(supervisor) = &self.container_supervisor {
1581 supervisor.take_event_receiver().await
1582 } else {
1583 None
1584 }
1585 }
1586
1587 pub async fn deploy_with_dependencies(
1604 &self,
1605 services: HashMap<String, ServiceSpec>,
1606 ) -> Result<()> {
1607 if services.is_empty() {
1608 return Ok(());
1609 }
1610
1611 let graph = DependencyGraph::build(&services)?;
1613
1614 tracing::info!(
1615 service_count = services.len(),
1616 "Starting deployment with dependency ordering"
1617 );
1618
1619 let order = graph.startup_order();
1621 tracing::debug!(order = ?order, "Computed startup order");
1622
1623 for service_name in order {
1625 let service_spec = services
1626 .get(service_name)
1627 .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1628
1629 if !service_spec.depends.is_empty() {
1631 tracing::info!(
1632 service = %service_name,
1633 dependency_count = service_spec.depends.len(),
1634 "Waiting for dependencies"
1635 );
1636 self.wait_for_dependencies(service_name, &service_spec.depends)
1637 .await?;
1638 }
1639
1640 tracing::info!(service = %service_name, "Starting service");
1642 Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
1643
1644 let replicas = match &service_spec.scale {
1646 zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1647 zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, zlayer_spec::ScaleSpec::Manual => 1, };
1650 self.scale_service(service_name, replicas).await?;
1651
1652 self.update_health_state(service_name, HealthState::Unknown)
1654 .await;
1655
1656 tracing::info!(
1657 service = %service_name,
1658 replicas = replicas,
1659 "Service started"
1660 );
1661 }
1662
1663 tracing::info!(service_count = services.len(), "Deployment complete");
1664
1665 Ok(())
1666 }
1667
1668 async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1677 let condition_checker = DependencyConditionChecker::new(
1678 Arc::clone(&self.runtime),
1679 Arc::clone(&self.health_states),
1680 None,
1681 );
1682
1683 let waiter = DependencyWaiter::new(condition_checker);
1684 let results = waiter.wait_for_all(deps).await?;
1685
1686 for result in results {
1688 match result {
1689 WaitResult::TimedOutFail {
1690 service: dep_service,
1691 condition,
1692 timeout,
1693 } => {
1694 return Err(AgentError::DependencyTimeout {
1695 service: service.to_string(),
1696 dependency: dep_service,
1697 condition: format!("{condition:?}"),
1698 timeout,
1699 });
1700 }
1701 WaitResult::TimedOutWarn {
1702 service: dep_service,
1703 condition,
1704 } => {
1705 tracing::warn!(
1706 service = %service,
1707 dependency = %dep_service,
1708 condition = ?condition,
1709 "Dependency timed out but continuing"
1710 );
1711 }
1712 WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1713 }
1715 }
1716 }
1717
1718 Ok(())
1719 }
1720
1721 pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1728 let condition_checker = DependencyConditionChecker::new(
1729 Arc::clone(&self.runtime),
1730 Arc::clone(&self.health_states),
1731 None,
1732 );
1733
1734 for dep in deps {
1735 if !condition_checker.check(dep).await? {
1736 return Ok(false);
1737 }
1738 }
1739
1740 Ok(true)
1741 }
1742
1743 #[allow(clippy::too_many_lines)]
1753 pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1754 match spec.rtype {
1755 ResourceType::Service => {
1756 let mut services = self.services.write().await;
1758
1759 if let Some(instance) = services.get_mut(&name) {
1760 let image_changed = instance.spec.image.name != spec.image.name;
1775 instance.spec = spec.clone();
1776 if let Some(dns) = &self.dns_server {
1777 instance.set_dns_server(Arc::clone(dns));
1778 }
1779
1780 let effective = spec.image.pull_policy;
1781 let old_digest = instance.last_pulled_digest().await;
1782 let current_replicas =
1783 u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
1784 drop(services); let mut should_recreate = image_changed;
1789 let mut new_digest = old_digest.clone();
1790
1791 match effective {
1792 PullPolicy::Never | PullPolicy::IfNotPresent => {
1793 tracing::debug!(
1798 service = %name,
1799 policy = ?effective,
1800 image_changed,
1801 "re-deploy under no-refresh pull policy"
1802 );
1803 }
1804 PullPolicy::Always | PullPolicy::Newer => {
1805 let services_ro = self.services.read().await;
1809 new_digest = if let Some(inst) = services_ro.get(&name) {
1810 inst.pull_and_refresh_digest().await?
1811 } else {
1812 tracing::warn!(
1816 service = %name,
1817 "service removed during upsert; skipping drift recreate"
1818 );
1819 return Ok(());
1820 };
1821 drop(services_ro);
1822
1823 should_recreate = should_recreate
1828 || match effective {
1829 PullPolicy::Always => true,
1830 PullPolicy::Newer => match (&old_digest, &new_digest) {
1831 (Some(old), Some(new)) => old != new,
1832 _ => false,
1833 },
1834 _ => false,
1835 };
1836 }
1837 }
1838
1839 if should_recreate && current_replicas > 0 {
1840 tracing::info!(
1841 service = %name,
1842 policy = ?effective,
1843 image_changed,
1844 old_digest = ?old_digest,
1845 new_digest = ?new_digest,
1846 replicas = current_replicas,
1847 "image changed; performing local rolling recreate"
1848 );
1849 self.scale_service_local(&name, 0).await?;
1850 self.scale_service_local(&name, current_replicas).await?;
1851 tracing::info!(
1852 service = %name,
1853 new_digest = ?new_digest,
1854 "service recreated with refreshed image"
1855 );
1856 } else {
1857 tracing::debug!(
1858 service = %name,
1859 policy = ?effective,
1860 old_digest = ?old_digest,
1861 new_digest = ?new_digest,
1862 "service up to date; no recreate required"
1863 );
1864 }
1865 return Ok(());
1866 }
1867 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1869 let mut instance = if let Some(proxy) = &self.proxy_manager {
1870 ServiceInstance::with_proxy(
1871 name.clone(),
1872 spec,
1873 self.runtime.clone(),
1874 overlay,
1875 Arc::clone(proxy),
1876 )
1877 } else {
1878 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1879 };
1880 instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
1883 if let Some(dns) = &self.dns_server {
1885 instance.set_dns_server(Arc::clone(dns));
1886 }
1887 instance.set_health_states(Arc::clone(&self.health_states));
1889 if let Some(proxy) = &self.proxy_manager {
1891 proxy.add_service(&name, &instance.spec).await;
1892 }
1893 if let Some(stream_registry) = &self.stream_registry {
1895 for endpoint in &instance.spec.endpoints {
1896 let svc = StreamService::new(
1897 name.clone(),
1898 Vec::new(), );
1900 match endpoint.protocol {
1901 Protocol::Tcp => {
1902 stream_registry.register_tcp(endpoint.port, svc);
1903 tracing::debug!(
1904 service = %name,
1905 port = endpoint.port,
1906 "Registered TCP stream route"
1907 );
1908 }
1909 Protocol::Udp => {
1910 stream_registry.register_udp(endpoint.port, svc);
1911 tracing::debug!(
1912 service = %name,
1913 port = endpoint.port,
1914 "Registered UDP stream route"
1915 );
1916 }
1917 _ => {} }
1919 }
1920 }
1921 services.insert(name, instance);
1922 }
1923 ResourceType::Job => {
1924 if let Some(executor) = &self.job_executor {
1927 executor.register_job(&name, spec).await;
1928 tracing::info!(job = %name, "Registered job spec");
1929 } else {
1930 tracing::warn!(
1931 job = %name,
1932 "Job executor not configured, storing as service for reference"
1933 );
1934 let mut services = self.services.write().await;
1936 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1937 let mut instance = if let Some(proxy) = &self.proxy_manager {
1938 ServiceInstance::with_proxy(
1939 name.clone(),
1940 spec,
1941 self.runtime.clone(),
1942 overlay,
1943 Arc::clone(proxy),
1944 )
1945 } else {
1946 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1947 };
1948 instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
1952 if let Some(dns) = &self.dns_server {
1954 instance.set_dns_server(Arc::clone(dns));
1955 }
1956 services.insert(name, instance);
1957 }
1958 }
1959 ResourceType::Cron => {
1960 if let Some(scheduler) = &self.cron_scheduler {
1962 scheduler.register(&name, &spec).await?;
1963 tracing::info!(cron = %name, "Registered cron job with scheduler");
1964 } else {
1965 return Err(AgentError::Configuration(format!(
1966 "Cron scheduler not configured for cron job '{name}'"
1967 )));
1968 }
1969 }
1970 }
1971
1972 Ok(())
1973 }
1974
1975 async fn update_proxy_backends(&self, instance: &ServiceInstance) {
1984 let Some(proxy) = &self.proxy_manager else {
1985 return;
1986 };
1987 for endpoint in &instance.spec.endpoints {
1988 if !matches!(
1990 endpoint.protocol,
1991 Protocol::Http | Protocol::Https | Protocol::Websocket
1992 ) {
1993 continue;
1994 }
1995 let addrs = self.collect_endpoint_backends(instance, endpoint).await;
1996 proxy
1997 .update_endpoint_backends(&instance.service_name, &endpoint.name, addrs)
1998 .await;
1999 }
2000 }
2001
2002 async fn update_stream_backends(&self, instance: &ServiceInstance) {
2011 let Some(stream_registry) = &self.stream_registry else {
2012 return;
2013 };
2014
2015 for endpoint in &instance.spec.endpoints {
2016 match endpoint.protocol {
2017 Protocol::Tcp => {
2018 let tcp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2019 let backend_count = tcp_backends.len();
2020 stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
2021 tracing::debug!(
2022 endpoint = %endpoint.name,
2023 port = endpoint.port,
2024 backend_count = backend_count,
2025 target_role = ?endpoint.target_role,
2026 "Updated TCP stream backends"
2027 );
2028 }
2029 Protocol::Udp => {
2030 let udp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2031 let backend_count = udp_backends.len();
2032 stream_registry.update_udp_backends(endpoint.port, udp_backends);
2033 tracing::debug!(
2034 endpoint = %endpoint.name,
2035 port = endpoint.port,
2036 backend_count = backend_count,
2037 target_role = ?endpoint.target_role,
2038 "Updated UDP stream backends"
2039 );
2040 }
2041 _ => {} }
2043 }
2044 }
2045
2046 #[allow(clippy::cast_possible_truncation)]
2054 pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
2055 use zlayer_scheduler::cluster::InternalScaleRequest;
2056
2057 tracing::info!(
2058 target: "zlayer::scale_distribute",
2059 service = name,
2060 replicas,
2061 has_cluster = self.cluster.is_some(),
2062 "scale_service ENTER"
2063 );
2064
2065 let spec = self
2071 .services
2072 .read()
2073 .await
2074 .get(name)
2075 .map(|inst| inst.spec.clone());
2076 let build_req = |replicas: u32| {
2077 let req = InternalScaleRequest::new(name, replicas);
2078 match spec.clone() {
2079 Some(s) => req.with_spec(s),
2080 None => req,
2081 }
2082 };
2083
2084 if let Some(cluster) = &self.cluster {
2085 let is_leader = cluster.is_leader().await;
2086 tracing::info!(
2087 target: "zlayer::scale_distribute",
2088 service = name,
2089 replicas,
2090 is_leader,
2091 spec_affinity = ?spec.as_ref().and_then(|s| s.affinity.clone()),
2092 "scale_service: cluster path"
2093 );
2094 if !is_leader {
2095 return cluster
2097 .forward_scale(build_req(replicas))
2098 .await
2099 .map_err(|e| AgentError::CreateFailed {
2100 id: name.to_string(),
2101 reason: format!("cluster forward: {e}"),
2102 });
2103 }
2104
2105 return cluster
2115 .dispatch_scale_distributed(build_req(replicas))
2116 .await
2117 .map_err(|e| AgentError::CreateFailed {
2118 id: name.to_string(),
2119 reason: format!("cluster dispatch: {e}"),
2120 });
2121 }
2122
2123 self.scale_service_local(name, replicas).await
2125 }
2126
2127 #[allow(clippy::cast_possible_truncation)]
2139 pub async fn scale_service_local(&self, name: &str, replicas: u32) -> Result<()> {
2140 tracing::info!(
2141 target: "zlayer::scale_distribute",
2142 service = name,
2143 replicas,
2144 "scale_service_local ENTER"
2145 );
2146 let _permit = self.scale_semaphore.acquire().await;
2147
2148 let services = self.services.read().await;
2149 let Some(instance) = services.get(name) else {
2150 if replicas == 0 {
2154 return Ok(());
2155 }
2156 return Err(AgentError::NotFound {
2157 container: name.to_string(),
2158 reason: "service not found".to_string(),
2159 });
2160 };
2161
2162 let current_replicas = instance.replica_count().await as u32;
2164
2165 instance.scale_to(replicas).await?;
2167
2168 if self.proxy_manager.is_some() {
2174 self.update_proxy_backends(instance).await;
2175 }
2176 if self.stream_registry.is_some() {
2177 self.update_stream_backends(instance).await;
2178 }
2179
2180 let local_node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2187 if let Some(supervisor) = &self.container_supervisor {
2188 if replicas > current_replicas {
2190 for i in current_replicas..replicas {
2191 let replica_idx = i + 1;
2192 let container_id = ContainerId::with_role_and_node(
2193 name.to_string(),
2194 replica_idx,
2195 instance.role_for_replica(replica_idx),
2196 local_node_id,
2197 );
2198 supervisor.supervise(&container_id, &instance.spec).await;
2199 }
2200 }
2201 if replicas < current_replicas {
2203 for i in replicas..current_replicas {
2204 let replica_idx = i + 1;
2205 let container_id = ContainerId::with_role_and_node(
2206 name.to_string(),
2207 replica_idx,
2208 instance.role_for_replica(replica_idx),
2209 local_node_id,
2210 );
2211 supervisor.unsupervise(&container_id).await;
2212 }
2213 }
2214 }
2215
2216 Ok(())
2217 }
2218
2219 async fn collect_endpoint_backends(
2237 &self,
2238 instance: &ServiceInstance,
2239 endpoint: &zlayer_spec::EndpointSpec,
2240 ) -> Vec<SocketAddr> {
2241 let mut addrs = Vec::new();
2242 let endpoint_port = endpoint.target_port();
2243 let containers = instance.containers().read().await;
2244
2245 for (container_id, container) in containers.iter() {
2246 if let Some(required_role) = endpoint.target_role.as_ref() {
2248 if container_id.role != *required_role {
2249 continue;
2250 }
2251 }
2252 let Some(ip) = container.overlay_ip else {
2253 continue;
2254 };
2255 let port = container.port_override.unwrap_or(endpoint_port);
2259 addrs.push(SocketAddr::new(ip, port));
2260 }
2261
2262 if addrs.is_empty() && !containers.is_empty() {
2266 tracing::warn!(
2267 service = %instance.service_name,
2268 endpoint = %endpoint.name,
2269 target_role = ?endpoint.target_role,
2270 container_count = containers.len(),
2271 "no backends collected for endpoint - either no matching role, no overlay IPs, or filtering excluded all"
2272 );
2273 }
2274
2275 addrs
2276 }
2277
2278 pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
2283 let services = self.services.read().await;
2284 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
2285 container: name.to_string(),
2286 reason: "service not found".to_string(),
2287 })?;
2288
2289 Ok(instance.replica_count().await)
2290 }
2291
2292 pub async fn remove_service(&self, name: &str) -> Result<()> {
2302 if let Some(scheduler) = &self.cron_scheduler {
2304 scheduler.unregister(name).await;
2305 }
2306
2307 if let Some(executor) = &self.job_executor {
2309 executor.unregister_job(name).await;
2310 }
2311
2312 if let Some(stream_registry) = &self.stream_registry {
2314 let services = self.services.read().await;
2316 if let Some(instance) = services.get(name) {
2317 for endpoint in &instance.spec.endpoints {
2318 match endpoint.protocol {
2319 Protocol::Tcp => {
2320 let _ = stream_registry.unregister_tcp(endpoint.port);
2321 tracing::debug!(
2322 service = %name,
2323 port = endpoint.port,
2324 "Unregistered TCP stream route"
2325 );
2326 }
2327 Protocol::Udp => {
2328 let _ = stream_registry.unregister_udp(endpoint.port);
2329 tracing::debug!(
2330 service = %name,
2331 port = endpoint.port,
2332 "Unregistered UDP stream route"
2333 );
2334 }
2335 _ => {} }
2337 }
2338 }
2339 drop(services); }
2341
2342 {
2348 let services = self.services.read().await;
2349 if let Some(instance) = services.get(name) {
2350 if instance.spec.publish_to_node_loopback() {
2351 if let Some(proxy) = instance.proxy_manager() {
2352 let containers = instance.containers().read().await;
2353 for container in containers.values() {
2354 if let Some(ip) = container.overlay_ip {
2355 proxy
2356 .unpublish_loopback_for_container(
2357 &instance.spec,
2358 ip,
2359 container.port_override,
2360 )
2361 .await;
2362 }
2363 }
2364 }
2365 }
2366 }
2367 drop(services); }
2369
2370 if let Some(supervisor) = &self.container_supervisor {
2372 let containers = self.get_service_containers(name).await;
2373 for container_id in containers {
2374 supervisor.unsupervise(&container_id).await;
2375 }
2376 tracing::debug!(service = %name, "Unregistered containers from supervisor");
2377 }
2378
2379 self.cleanup_service_dns(name).await;
2381
2382 let mut services = self.services.write().await;
2384 if services.remove(name).is_some() {
2385 tracing::debug!(service = %name, "Removed service from manager");
2386 }
2387
2388 Ok(())
2389 }
2390
2391 async fn cleanup_service_dns(&self, name: &str) {
2396 let Some(dns) = &self.dns_server else {
2397 return;
2398 };
2399
2400 if let Err(e) = dns.remove_record(name).await {
2402 tracing::warn!(
2403 hostname = %name,
2404 error = %e,
2405 "failed to remove bare service-name DNS record"
2406 );
2407 }
2408
2409 let service_hostname = format!("{name}.service.local");
2411 if let Err(e) = dns.remove_record(&service_hostname).await {
2412 tracing::warn!(
2413 hostname = %service_hostname,
2414 error = %e,
2415 "failed to remove service DNS record"
2416 );
2417 } else {
2418 tracing::debug!(hostname = %service_hostname, "removed service DNS record");
2419 }
2420
2421 let services = self.services.read().await;
2423 if let Some(instance) = services.get(name) {
2424 let containers = instance.containers().read().await;
2425 for (id, _) in containers.iter() {
2426 let replica_hostname = format!("{}.{}.service.local", id.replica, name);
2427 if let Err(e) = dns.remove_record(&replica_hostname).await {
2428 tracing::warn!(
2429 hostname = %replica_hostname,
2430 error = %e,
2431 "failed to remove replica DNS record during service removal"
2432 );
2433 }
2434 }
2435 }
2436 }
2437
2438 pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
2441 let services = self.services.read().await;
2442 services.get(name).map(|i| {
2443 (
2444 i.has_overlay_manager(),
2445 i.has_proxy_manager(),
2446 i.has_dns_server(),
2447 )
2448 })
2449 }
2450
2451 pub async fn list_services(&self) -> Vec<String> {
2453 self.services.read().await.keys().cloned().collect()
2454 }
2455
2456 pub async fn get_service_logs(
2470 &self,
2471 service_name: &str,
2472 tail: usize,
2473 instance: Option<&str>,
2474 ) -> Result<Vec<LogEntry>> {
2475 let container_ids = self.get_service_containers(service_name).await;
2476
2477 if container_ids.is_empty() {
2478 return Err(AgentError::NotFound {
2479 container: service_name.to_string(),
2480 reason: "no containers found for service".to_string(),
2481 });
2482 }
2483
2484 let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
2486 if let Ok(replica_num) = inst.parse::<u32>() {
2487 container_ids
2488 .iter()
2489 .filter(|id| id.replica == replica_num)
2490 .collect()
2491 } else {
2492 container_ids
2494 .iter()
2495 .filter(|id| id.to_string().contains(inst))
2496 .collect()
2497 }
2498 } else {
2499 container_ids.iter().collect()
2500 };
2501
2502 if target_ids.is_empty() {
2503 return Err(AgentError::NotFound {
2504 container: format!("{}/{}", service_name, instance.unwrap_or("?")),
2505 reason: "instance not found".to_string(),
2506 });
2507 }
2508
2509 let mut all_entries: Vec<LogEntry> = Vec::new();
2510
2511 for id in &target_ids {
2512 match self.runtime.container_logs(id, tail).await {
2513 Ok(mut entries) => {
2514 for entry in &mut entries {
2516 if entry.service.is_none() {
2517 entry.service = Some(service_name.to_string());
2518 }
2519 if entry.deployment.is_none() {
2520 entry.deployment.clone_from(&self.deployment_name);
2521 }
2522 }
2523 all_entries.extend(entries);
2524 }
2525 Err(e) => {
2526 tracing::warn!(
2527 service = service_name,
2528 container = %id,
2529 error = %e,
2530 "Failed to read container logs"
2531 );
2532 }
2533 }
2534 }
2535
2536 Ok(all_entries)
2537 }
2538
2539 pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
2549 let services = self.services.read().await;
2550 if let Some(instance) = services.get(service_name) {
2551 instance.container_ids().await
2552 } else {
2553 Vec::new()
2554 }
2555 }
2556
2557 pub async fn get_service_container_infos(&self, service_name: &str) -> Vec<ContainerInfo> {
2566 let services = self.services.read().await;
2567 if let Some(instance) = services.get(service_name) {
2568 instance.container_infos().await
2569 } else {
2570 Vec::new()
2571 }
2572 }
2573
2574 pub async fn local_service_state(
2579 &self,
2580 service: &str,
2581 ) -> zlayer_types::cluster::NodeServiceState {
2582 use zlayer_types::cluster::{ClusterContainerSummary, NodeServiceState};
2583 let node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2584 let infos = self.get_service_container_infos(service).await;
2585 #[allow(clippy::cast_possible_truncation)]
2586 let running = infos
2587 .iter()
2588 .filter(|i| i.state.eq_ignore_ascii_case("running"))
2589 .count() as u32;
2590 let healthy = if running == 0 {
2593 true
2594 } else {
2595 let states = self.health_states();
2596 let guard = states.read().await;
2597 matches!(guard.get(service), Some(HealthState::Healthy))
2598 };
2599 let containers = infos
2600 .into_iter()
2601 .map(|i| ClusterContainerSummary {
2602 node_id,
2603 id: i.id.to_string(),
2604 service: i.id.service.clone(),
2605 replica: i.id.replica,
2606 image: i.image,
2607 state: i.state,
2608 pid: i.pid,
2609 overlay_ip: i.overlay_ip,
2610 })
2611 .collect();
2612 NodeServiceState {
2613 node_id,
2614 running,
2615 healthy,
2616 containers,
2617 }
2618 }
2619
2620 pub async fn cluster_service_states(
2626 &self,
2627 service: &str,
2628 ) -> Vec<zlayer_types::cluster::NodeServiceState> {
2629 let mut states = vec![self.local_service_state(service).await];
2630 if let Some(cluster) = &self.cluster {
2631 states.extend(cluster.fetch_remote_service_states(service).await);
2632 }
2633 states
2634 }
2635
2636 pub async fn exec_in_container(
2655 &self,
2656 service_name: &str,
2657 replica: Option<u32>,
2658 cmd: &[String],
2659 ) -> Result<(i32, String, String)> {
2660 let container_ids = self.get_service_containers(service_name).await;
2661
2662 if container_ids.is_empty() {
2663 return Err(AgentError::NotFound {
2664 container: service_name.to_string(),
2665 reason: "no containers found for service".to_string(),
2666 });
2667 }
2668
2669 let target = if let Some(rep) = replica {
2671 container_ids
2672 .into_iter()
2673 .find(|cid| cid.replica == rep)
2674 .ok_or_else(|| AgentError::NotFound {
2675 container: format!("{service_name}-rep-{rep}"),
2676 reason: format!("replica {rep} not found for service"),
2677 })?
2678 } else {
2679 container_ids.into_iter().next().unwrap()
2681 };
2682
2683 self.runtime.exec(&target, cmd).await
2684 }
2685
2686 pub async fn list_container_views(&self) -> Vec<DeploymentContainerView> {
2696 let deployment = self.deployment_name.clone();
2697 let services = self.services.read().await;
2698 let mut out = Vec::new();
2699 for (service_name, instance) in services.iter() {
2700 let container_name = instance
2701 .spec
2702 .labels
2703 .get("com.docker.compose.container_name")
2704 .cloned();
2705 let ports = instance.spec.port_mappings.clone();
2706 for info in instance.container_infos().await {
2707 out.push(DeploymentContainerView {
2708 deployment: deployment.clone(),
2709 service: service_name.clone(),
2710 container_id: info.id,
2711 container_name: container_name.clone(),
2712 image: info.image,
2713 state: info.state,
2714 pid: info.pid,
2715 ports: ports.clone(),
2716 });
2717 }
2718 }
2719 out
2720 }
2721
2722 pub async fn resolve_container_name(&self, name: &str) -> Option<ContainerId> {
2735 let views = self.list_container_views().await;
2736 if let Some(v) = views
2738 .iter()
2739 .find(|v| v.container_name.as_deref() == Some(name))
2740 {
2741 return Some(v.container_id.clone());
2742 }
2743 for v in &views {
2745 let dep = v.deployment.as_deref().unwrap_or("");
2746 let svc = &v.service;
2747 let rep1 = v.container_id.replica + 1;
2748 let candidates = [
2749 format!("{dep}-{svc}-{rep1}"),
2750 format!("{dep}_{svc}_{rep1}"),
2751 svc.clone(),
2752 ];
2753 if candidates.iter().any(|c| c == name) {
2754 return Some(v.container_id.clone());
2755 }
2756 }
2757 for v in &views {
2759 if v.container_id.to_string() == name {
2760 return Some(v.container_id.clone());
2761 }
2762 }
2763 None
2764 }
2765
2766 pub async fn exec_in_container_id_with_opts(
2776 &self,
2777 id: &ContainerId,
2778 opts: &crate::runtime::ExecOptions,
2779 ) -> Result<(i32, String, String)> {
2780 self.runtime.exec_with_opts(id, opts).await
2781 }
2782
2783 pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
2798 let executor = self
2799 .job_executor
2800 .as_ref()
2801 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2802
2803 let spec = executor
2804 .get_job_spec(name)
2805 .await
2806 .ok_or_else(|| AgentError::NotFound {
2807 container: name.to_string(),
2808 reason: "job not registered".to_string(),
2809 })?;
2810
2811 executor.trigger(name, &spec, trigger).await
2812 }
2813
2814 pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
2822 if let Some(executor) = &self.job_executor {
2823 executor.get_execution(id).await
2824 } else {
2825 None
2826 }
2827 }
2828
2829 pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
2837 if let Some(executor) = &self.job_executor {
2838 executor.list_executions(name).await
2839 } else {
2840 Vec::new()
2841 }
2842 }
2843
2844 pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
2852 let executor = self
2853 .job_executor
2854 .as_ref()
2855 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2856
2857 executor.cancel(id).await
2858 }
2859
2860 pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
2873 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2874 AgentError::Configuration("Cron scheduler not configured".to_string())
2875 })?;
2876
2877 scheduler.trigger_now(name).await
2878 }
2879
2880 pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
2886 if let Some(scheduler) = &self.cron_scheduler {
2887 scheduler.set_enabled(name, enabled).await;
2888 }
2889 }
2890
2891 pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2893 if let Some(scheduler) = &self.cron_scheduler {
2894 scheduler.list_jobs().await
2895 } else {
2896 Vec::new()
2897 }
2898 }
2899
2900 pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2908 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2909 AgentError::Configuration("Cron scheduler not configured".to_string())
2910 })?;
2911
2912 let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2913 Ok(tokio::spawn(async move {
2914 scheduler.run_loop().await;
2915 }))
2916 }
2917
2918 pub fn shutdown_cron(&self) {
2920 if let Some(scheduler) = &self.cron_scheduler {
2921 scheduler.shutdown();
2922 }
2923 }
2924}
2925
2926#[cfg(test)]
2927#[allow(deprecated)]
2928mod tests {
2929 use super::*;
2930 use crate::runtime::MockRuntime;
2931
2932 #[tokio::test]
2933 async fn test_service_manager() {
2934 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2935 let manager = ServiceManager::new(runtime);
2936
2937 let spec = mock_spec();
2939 Box::pin(manager.upsert_service("test".to_string(), spec))
2940 .await
2941 .unwrap();
2942
2943 manager.scale_service("test", 3).await.unwrap();
2945
2946 let count = manager.service_replica_count("test").await.unwrap();
2948 assert_eq!(count, 3);
2949
2950 let services = manager.list_services().await;
2952 assert_eq!(services, vec!["test".to_string()]);
2953 }
2954
2955 #[tokio::test]
2956 async fn test_service_manager_basic_lifecycle() {
2957 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2958 let manager = ServiceManager::new(runtime);
2959
2960 let spec = mock_spec();
2962 Box::pin(manager.upsert_service("api".to_string(), spec))
2963 .await
2964 .unwrap();
2965
2966 manager.scale_service("api", 2).await.unwrap();
2968
2969 let count = manager.service_replica_count("api").await.unwrap();
2971 assert_eq!(count, 2);
2972
2973 manager.remove_service("api").await.unwrap();
2975
2976 let services = manager.list_services().await;
2978 assert!(!services.contains(&"api".to_string()));
2979 }
2980
2981 #[tokio::test]
2982 async fn test_service_manager_with_full_config() {
2983 use tokio::sync::RwLock;
2984
2985 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2986
2987 let overlay_manager = Arc::new(RwLock::new(
2989 OverlayManager::new("test-deployment".to_string(), "test".to_string())
2990 .await
2991 .unwrap(),
2992 ));
2993
2994 let manager =
2995 ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2996
2997 let spec = mock_spec();
2999 Box::pin(manager.upsert_service("web".to_string(), spec))
3000 .await
3001 .unwrap();
3002
3003 let services = manager.list_services().await;
3005 assert!(services.contains(&"web".to_string()));
3006 }
3007
3008 #[test]
3009 fn test_container_state_as_str() {
3010 use crate::runtime::ContainerState;
3011 assert_eq!(ContainerState::Pending.as_str(), "pending");
3012 assert_eq!(ContainerState::Initializing.as_str(), "initializing");
3013 assert_eq!(ContainerState::Running.as_str(), "running");
3014 assert_eq!(ContainerState::Stopping.as_str(), "stopping");
3015 assert_eq!(ContainerState::Exited { code: 0 }.as_str(), "exited");
3016 assert_eq!(
3017 ContainerState::Failed {
3018 reason: "boom".to_string()
3019 }
3020 .as_str(),
3021 "failed"
3022 );
3023 assert_eq!(ContainerState::Running.to_string(), "running");
3025 }
3026
3027 #[tokio::test]
3031 async fn test_container_infos_surfaces_image_and_state() {
3032 use crate::runtime::{Container, ContainerState};
3033
3034 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3035 let manager = ServiceManager::new(runtime);
3036
3037 let spec = mock_spec(); let image = spec.image.name.to_string();
3039 Box::pin(manager.upsert_service("web".to_string(), spec))
3040 .await
3041 .unwrap();
3042
3043 {
3045 let services = manager.services.read().await;
3046 let instance = services.get("web").unwrap();
3047 let mut containers = instance.containers().write().await;
3048
3049 let running_id = ContainerId::new("web", 0);
3050 containers.insert(
3051 running_id.clone(),
3052 Container {
3053 id: running_id,
3054 image: image.clone(),
3055 state: ContainerState::Running,
3056 pid: Some(4242),
3057 task: None,
3058 overlay_ip: None,
3059 health_monitor: None,
3060 port_override: None,
3061 },
3062 );
3063
3064 let exited_id = ContainerId::new("web", 1);
3065 containers.insert(
3066 exited_id.clone(),
3067 Container {
3068 id: exited_id,
3069 image: image.clone(),
3070 state: ContainerState::Exited { code: 1 },
3071 pid: None,
3072 task: None,
3073 overlay_ip: None,
3074 health_monitor: None,
3075 port_override: None,
3076 },
3077 );
3078 }
3079
3080 let mut infos = manager.get_service_container_infos("web").await;
3081 infos.sort_by_key(|i| i.id.replica);
3082 assert_eq!(infos.len(), 2);
3083
3084 assert!(infos.iter().all(|i| i.image == image));
3086 assert!(infos.iter().all(|i| i.image == "test:latest"));
3087
3088 assert_eq!(infos[0].state, "running");
3090 assert_eq!(infos[0].pid, Some(4242));
3091 assert_eq!(infos[1].state, "exited");
3092 assert_eq!(infos[1].pid, None);
3093
3094 assert!(manager
3096 .get_service_container_infos("missing")
3097 .await
3098 .is_empty());
3099 }
3100
3101 #[tokio::test]
3106 async fn upsert_recreates_local_replicas_on_image_reference_change() {
3107 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3108 let manager = ServiceManager::new(runtime);
3109
3110 let mut spec = mock_spec();
3112 spec.image.name = "docker.io/library/nginx:1.28-alpine".parse().unwrap();
3113 spec.image.pull_policy = zlayer_spec::PullPolicy::IfNotPresent;
3114 Box::pin(manager.upsert_service("web".to_string(), spec.clone()))
3115 .await
3116 .unwrap();
3117 manager.scale_service_local("web", 2).await.unwrap();
3118
3119 let v1: Vec<String> = manager
3120 .get_service_container_infos("web")
3121 .await
3122 .into_iter()
3123 .map(|i| i.image)
3124 .collect();
3125 assert_eq!(v1.len(), 2);
3126 assert!(
3127 v1.iter().all(|img| img.contains("1.28")),
3128 "expected v1 images, got {v1:?}"
3129 );
3130
3131 let mut spec_v2 = spec;
3133 spec_v2.image.name = "docker.io/library/nginx:1.29-alpine".parse().unwrap();
3134 Box::pin(manager.upsert_service("web".to_string(), spec_v2))
3135 .await
3136 .unwrap();
3137
3138 let v2: Vec<String> = manager
3139 .get_service_container_infos("web")
3140 .await
3141 .into_iter()
3142 .map(|i| i.image)
3143 .collect();
3144 assert_eq!(v2.len(), 2, "replica count preserved across upgrade");
3145 assert!(
3146 v2.iter().all(|img| img.contains("1.29")),
3147 "containers must be recreated on the new image, got {v2:?}"
3148 );
3149 }
3150
3151 fn mock_spec() -> ServiceSpec {
3152 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3153 r"
3154version: v1
3155deployment: test
3156services:
3157 test:
3158 rtype: service
3159 image:
3160 name: test:latest
3161 endpoints:
3162 - name: http
3163 protocol: http
3164 port: 8080
3165 scale:
3166 mode: fixed
3167 replicas: 1
3168",
3169 )
3170 .unwrap()
3171 .services
3172 .remove("test")
3173 .unwrap()
3174 }
3175
3176 fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
3178 let mut spec = mock_spec();
3179 spec.depends = deps;
3180 spec
3181 }
3182
3183 fn dep(
3185 service: &str,
3186 condition: zlayer_spec::DependencyCondition,
3187 timeout_ms: u64,
3188 on_timeout: zlayer_spec::TimeoutAction,
3189 ) -> DependsSpec {
3190 DependsSpec {
3191 service: service.to_string(),
3192 condition,
3193 timeout: Some(Duration::from_millis(timeout_ms)),
3194 on_timeout,
3195 }
3196 }
3197
3198 #[tokio::test]
3199 async fn test_deploy_with_dependencies_no_deps() {
3200 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3201 let manager = ServiceManager::new(runtime);
3202
3203 let mut services = HashMap::new();
3205 services.insert("a".to_string(), mock_spec());
3206 services.insert("b".to_string(), mock_spec());
3207
3208 Box::pin(manager.deploy_with_dependencies(services))
3210 .await
3211 .unwrap();
3212
3213 let service_list = manager.list_services().await;
3215 assert_eq!(service_list.len(), 2);
3216 }
3217
3218 #[tokio::test]
3219 async fn test_deploy_with_dependencies_linear() {
3220 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3221 let manager = ServiceManager::new(runtime);
3222
3223 let mut services = HashMap::new();
3226 services.insert("c".to_string(), mock_spec());
3227 services.insert(
3228 "b".to_string(),
3229 mock_spec_with_deps(vec![dep(
3230 "c",
3231 zlayer_spec::DependencyCondition::Started,
3232 5000,
3233 zlayer_spec::TimeoutAction::Fail,
3234 )]),
3235 );
3236 services.insert(
3237 "a".to_string(),
3238 mock_spec_with_deps(vec![dep(
3239 "b",
3240 zlayer_spec::DependencyCondition::Started,
3241 5000,
3242 zlayer_spec::TimeoutAction::Fail,
3243 )]),
3244 );
3245
3246 Box::pin(manager.deploy_with_dependencies(services))
3248 .await
3249 .unwrap();
3250
3251 let service_list = manager.list_services().await;
3253 assert_eq!(service_list.len(), 3);
3254 }
3255
3256 #[tokio::test]
3257 async fn test_deploy_with_dependencies_cycle_detection() {
3258 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3259 let manager = ServiceManager::new(runtime);
3260
3261 let mut services = HashMap::new();
3263 services.insert(
3264 "a".to_string(),
3265 mock_spec_with_deps(vec![dep(
3266 "b",
3267 zlayer_spec::DependencyCondition::Started,
3268 5000,
3269 zlayer_spec::TimeoutAction::Fail,
3270 )]),
3271 );
3272 services.insert(
3273 "b".to_string(),
3274 mock_spec_with_deps(vec![dep(
3275 "a",
3276 zlayer_spec::DependencyCondition::Started,
3277 5000,
3278 zlayer_spec::TimeoutAction::Fail,
3279 )]),
3280 );
3281
3282 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3284 assert!(result.is_err());
3285 let err = result.unwrap_err().to_string();
3286 assert!(err.contains("Cyclic dependency"));
3287 }
3288
3289 #[tokio::test]
3290 async fn test_deploy_with_dependencies_timeout_continue() {
3291 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3292 let manager = ServiceManager::new(runtime);
3293
3294 let mut services = HashMap::new();
3297 services.insert("b".to_string(), mock_spec());
3298 services.insert(
3299 "a".to_string(),
3300 mock_spec_with_deps(vec![dep(
3301 "b",
3302 zlayer_spec::DependencyCondition::Healthy, 100, zlayer_spec::TimeoutAction::Continue, )]),
3306 );
3307
3308 Box::pin(manager.deploy_with_dependencies(services))
3310 .await
3311 .unwrap();
3312
3313 let service_list = manager.list_services().await;
3314 assert_eq!(service_list.len(), 2);
3315 }
3316
3317 #[tokio::test]
3318 async fn test_deploy_with_dependencies_timeout_warn() {
3319 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3320 let manager = ServiceManager::new(runtime);
3321
3322 let mut services = HashMap::new();
3325 services.insert("b".to_string(), mock_spec());
3326 services.insert(
3327 "a".to_string(),
3328 mock_spec_with_deps(vec![dep(
3329 "b",
3330 zlayer_spec::DependencyCondition::Healthy,
3331 100,
3332 zlayer_spec::TimeoutAction::Warn,
3333 )]),
3334 );
3335
3336 Box::pin(manager.deploy_with_dependencies(services))
3338 .await
3339 .unwrap();
3340
3341 let service_list = manager.list_services().await;
3342 assert_eq!(service_list.len(), 2);
3343 }
3344
3345 #[tokio::test]
3346 async fn test_deploy_with_dependencies_timeout_fail() {
3347 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3348 let manager = ServiceManager::new(runtime);
3349
3350 let mut services = HashMap::new();
3353 services.insert("b".to_string(), mock_spec());
3354 services.insert(
3355 "a".to_string(),
3356 mock_spec_with_deps(vec![dep(
3357 "b",
3358 zlayer_spec::DependencyCondition::Healthy,
3359 100,
3360 zlayer_spec::TimeoutAction::Fail,
3361 )]),
3362 );
3363
3364 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3366 assert!(result.is_err());
3367
3368 let err = result.unwrap_err().to_string();
3370 assert!(err.contains("Dependency timeout"));
3371 }
3372
3373 #[tokio::test]
3374 async fn test_check_dependencies_all_satisfied() {
3375 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3376 let manager = ServiceManager::new(runtime);
3377
3378 manager
3380 .update_health_state("db", HealthState::Healthy)
3381 .await;
3382
3383 let deps = vec![DependsSpec {
3384 service: "db".to_string(),
3385 condition: zlayer_spec::DependencyCondition::Healthy,
3386 timeout: Some(Duration::from_secs(60)),
3387 on_timeout: zlayer_spec::TimeoutAction::Fail,
3388 }];
3389
3390 let satisfied = manager.check_dependencies(&deps).await.unwrap();
3391 assert!(satisfied);
3392 }
3393
3394 #[tokio::test]
3395 async fn test_check_dependencies_not_satisfied() {
3396 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3397 let manager = ServiceManager::new(runtime);
3398
3399 let deps = vec![DependsSpec {
3401 service: "db".to_string(),
3402 condition: zlayer_spec::DependencyCondition::Healthy,
3403 timeout: Some(Duration::from_secs(60)),
3404 on_timeout: zlayer_spec::TimeoutAction::Fail,
3405 }];
3406
3407 let satisfied = manager.check_dependencies(&deps).await.unwrap();
3408 assert!(!satisfied);
3409 }
3410
3411 #[tokio::test]
3412 async fn test_health_state_tracking() {
3413 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3414 let manager = ServiceManager::new(runtime);
3415
3416 manager
3418 .update_health_state("db", HealthState::Healthy)
3419 .await;
3420 manager
3421 .update_health_state("cache", HealthState::Unknown)
3422 .await;
3423
3424 let states = manager.health_states();
3426 let states_read = states.read().await;
3427
3428 assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
3429 assert!(matches!(
3430 states_read.get("cache"),
3431 Some(HealthState::Unknown)
3432 ));
3433 }
3434
3435 #[cfg(unix)]
3459 #[tokio::test]
3460 async fn test_health_states_bridge_fires_without_proxy() {
3461 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3462
3463 let mut spec = mock_spec();
3466 spec.health = zlayer_spec::HealthSpec {
3467 start_grace: Some(Duration::from_millis(0)),
3468 interval: Some(Duration::from_millis(50)),
3469 timeout: Some(Duration::from_secs(5)),
3470 retries: 1,
3471 check: HealthCheck::Command {
3472 command: "true".to_string(),
3473 },
3474 };
3475
3476 let mut instance =
3479 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3480 let health_states: Arc<RwLock<HashMap<String, HealthState>>> =
3481 Arc::new(RwLock::new(HashMap::new()));
3482 instance.set_health_states(Arc::clone(&health_states));
3483
3484 instance.scale_to(1).await.unwrap();
3487
3488 let mut bridged = false;
3491 for _ in 0..100 {
3492 if matches!(
3493 health_states.read().await.get("web"),
3494 Some(HealthState::Healthy)
3495 ) {
3496 bridged = true;
3497 break;
3498 }
3499 tokio::time::sleep(Duration::from_millis(50)).await;
3500 }
3501
3502 assert!(
3503 bridged,
3504 "health_states must receive Healthy for the service even without a \
3505 proxy or overlay IP; the bridge regressed and stabilization would time out"
3506 );
3507 }
3508
3509 fn mock_job_spec() -> ServiceSpec {
3512 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3513 r"
3514version: v1
3515deployment: test
3516services:
3517 backup:
3518 rtype: job
3519 image:
3520 name: backup:latest
3521",
3522 )
3523 .unwrap()
3524 .services
3525 .remove("backup")
3526 .unwrap()
3527 }
3528
3529 fn mock_cron_spec() -> ServiceSpec {
3530 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3531 r#"
3532version: v1
3533deployment: test
3534services:
3535 cleanup:
3536 rtype: cron
3537 schedule: "0 0 * * * * *"
3538 image:
3539 name: cleanup:latest
3540"#,
3541 )
3542 .unwrap()
3543 .services
3544 .remove("cleanup")
3545 .unwrap()
3546 }
3547
3548 #[tokio::test]
3549 async fn test_service_manager_with_job_executor() {
3550 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3551 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3552
3553 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3554
3555 let job_spec = mock_job_spec();
3557 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3558 .await
3559 .unwrap();
3560
3561 let exec_id = manager
3563 .trigger_job("backup", JobTrigger::Cli)
3564 .await
3565 .unwrap();
3566
3567 tokio::time::sleep(Duration::from_millis(50)).await;
3569
3570 let execution = manager.get_job_execution(&exec_id).await;
3572 assert!(execution.is_some());
3573 assert_eq!(execution.unwrap().job_name, "backup");
3574 }
3575
3576 #[tokio::test]
3577 async fn test_service_manager_with_cron_scheduler() {
3578 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3579 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3580 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3581
3582 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3583
3584 let cron_spec = mock_cron_spec();
3586 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3587 .await
3588 .unwrap();
3589
3590 let cron_jobs = manager.list_cron_jobs().await;
3592 assert_eq!(cron_jobs.len(), 1);
3593 assert_eq!(cron_jobs[0].name, "cleanup");
3594 assert!(cron_jobs[0].enabled);
3595 }
3596
3597 #[tokio::test]
3598 async fn test_service_manager_trigger_cron() {
3599 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3600 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3601 let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
3602
3603 let manager = ServiceManager::new(runtime)
3604 .with_job_executor(job_executor)
3605 .with_cron_scheduler(cron_scheduler);
3606
3607 let cron_spec = mock_cron_spec();
3609 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3610 .await
3611 .unwrap();
3612
3613 let exec_id = manager.trigger_cron("cleanup").await.unwrap();
3615 assert!(!exec_id.0.is_empty());
3616 }
3617
3618 #[tokio::test]
3619 async fn test_service_manager_enable_disable_cron() {
3620 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3621 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3622 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3623
3624 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3625
3626 let cron_spec = mock_cron_spec();
3628 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3629 .await
3630 .unwrap();
3631
3632 let cron_jobs = manager.list_cron_jobs().await;
3634 assert!(cron_jobs[0].enabled);
3635
3636 manager.set_cron_enabled("cleanup", false).await;
3638 let cron_jobs = manager.list_cron_jobs().await;
3639 assert!(!cron_jobs[0].enabled);
3640
3641 manager.set_cron_enabled("cleanup", true).await;
3643 let cron_jobs = manager.list_cron_jobs().await;
3644 assert!(cron_jobs[0].enabled);
3645 }
3646
3647 #[tokio::test]
3648 async fn test_service_manager_remove_cleans_up_job() {
3649 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3650 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3651
3652 let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
3653
3654 let job_spec = mock_job_spec();
3656 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3657 .await
3658 .unwrap();
3659
3660 let spec = job_executor.get_job_spec("backup").await;
3662 assert!(spec.is_some());
3663
3664 manager.remove_service("backup").await.unwrap();
3666
3667 let spec = job_executor.get_job_spec("backup").await;
3669 assert!(spec.is_none());
3670 }
3671
3672 #[tokio::test]
3673 async fn test_service_manager_remove_cleans_up_cron() {
3674 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3675 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3676 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3677
3678 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
3679
3680 let cron_spec = mock_cron_spec();
3682 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3683 .await
3684 .unwrap();
3685
3686 assert_eq!(cron_scheduler.job_count().await, 1);
3688
3689 manager.remove_service("cleanup").await.unwrap();
3691
3692 assert_eq!(cron_scheduler.job_count().await, 0);
3694 }
3695
3696 #[tokio::test]
3697 async fn test_service_manager_job_without_executor() {
3698 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3699 let manager = ServiceManager::new(runtime);
3700
3701 let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
3703 assert!(result.is_err());
3704 assert!(result.unwrap_err().to_string().contains("not configured"));
3705 }
3706
3707 #[tokio::test]
3708 async fn test_service_manager_cron_without_scheduler() {
3709 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3710 let manager = ServiceManager::new(runtime);
3711
3712 let cron_spec = mock_cron_spec();
3714 let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
3715 assert!(result.is_err());
3716 assert!(result.unwrap_err().to_string().contains("not configured"));
3717 }
3718
3719 #[tokio::test]
3720 async fn test_service_manager_list_job_executions() {
3721 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3722 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3723
3724 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3725
3726 let job_spec = mock_job_spec();
3728 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3729 .await
3730 .unwrap();
3731
3732 manager
3734 .trigger_job("backup", JobTrigger::Cli)
3735 .await
3736 .unwrap();
3737 manager
3738 .trigger_job("backup", JobTrigger::Scheduler)
3739 .await
3740 .unwrap();
3741
3742 tokio::time::sleep(Duration::from_millis(50)).await;
3744
3745 let executions = manager.list_job_executions("backup").await;
3747 assert_eq!(executions.len(), 2);
3748 }
3749
3750 #[tokio::test]
3753 async fn test_service_manager_with_supervisor() {
3754 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3755 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3756
3757 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
3758
3759 let spec = mock_spec();
3761 Box::pin(manager.upsert_service("api".to_string(), spec))
3762 .await
3763 .unwrap();
3764
3765 manager.scale_service("api", 2).await.unwrap();
3767
3768 assert_eq!(supervisor.supervised_count().await, 2);
3770
3771 manager.scale_service("api", 1).await.unwrap();
3773 assert_eq!(supervisor.supervised_count().await, 1);
3774
3775 manager.remove_service("api").await.unwrap();
3777 assert_eq!(supervisor.supervised_count().await, 0);
3778 }
3779
3780 #[tokio::test]
3781 async fn test_service_manager_supervisor_state() {
3782 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3783 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3784
3785 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
3786
3787 let spec = mock_spec();
3789 Box::pin(manager.upsert_service("web".to_string(), spec))
3790 .await
3791 .unwrap();
3792 manager.scale_service("web", 1).await.unwrap();
3793
3794 let container_id = ContainerId::new("web".to_string(), 1);
3796 let state = manager.get_container_supervised_state(&container_id).await;
3797 assert_eq!(state, Some(SupervisedState::Running));
3798 }
3799
3800 #[tokio::test]
3801 async fn test_service_manager_start_supervisor() {
3802 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3803 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3804
3805 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
3806
3807 let handle = manager.start_container_supervisor().unwrap();
3809
3810 tokio::time::sleep(Duration::from_millis(50)).await;
3812 assert!(supervisor.is_running());
3813
3814 manager.shutdown_container_supervisor();
3816
3817 tokio::time::timeout(Duration::from_secs(1), handle)
3819 .await
3820 .unwrap()
3821 .unwrap();
3822
3823 assert!(!supervisor.is_running());
3824 }
3825
3826 #[tokio::test]
3827 async fn test_service_manager_supervisor_not_configured() {
3828 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3829 let manager = ServiceManager::new(runtime);
3830
3831 let result = manager.start_container_supervisor();
3833 assert!(result.is_err());
3834 assert!(result.unwrap_err().to_string().contains("not configured"));
3835 }
3836
3837 fn mock_tcp_spec() -> ServiceSpec {
3840 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3841 r"
3842version: v1
3843deployment: test
3844services:
3845 database:
3846 rtype: service
3847 image:
3848 name: postgres:latest
3849 endpoints:
3850 - name: postgresql
3851 protocol: tcp
3852 port: 5432
3853 scale:
3854 mode: fixed
3855 replicas: 1
3856",
3857 )
3858 .unwrap()
3859 .services
3860 .remove("database")
3861 .unwrap()
3862 }
3863
3864 fn mock_udp_spec() -> ServiceSpec {
3865 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3866 r"
3867version: v1
3868deployment: test
3869services:
3870 dns:
3871 rtype: service
3872 image:
3873 name: dns:latest
3874 endpoints:
3875 - name: dns
3876 protocol: udp
3877 port: 53
3878 scale:
3879 mode: fixed
3880 replicas: 1
3881",
3882 )
3883 .unwrap()
3884 .services
3885 .remove("dns")
3886 .unwrap()
3887 }
3888
3889 fn mock_mixed_spec() -> ServiceSpec {
3890 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3891 r"
3892version: v1
3893deployment: test
3894services:
3895 mixed:
3896 rtype: service
3897 image:
3898 name: mixed:latest
3899 endpoints:
3900 - name: http
3901 protocol: http
3902 port: 8080
3903 - name: grpc
3904 protocol: tcp
3905 port: 9000
3906 - name: metrics
3907 protocol: udp
3908 port: 8125
3909 scale:
3910 mode: fixed
3911 replicas: 1
3912",
3913 )
3914 .unwrap()
3915 .services
3916 .remove("mixed")
3917 .unwrap()
3918 }
3919
3920 #[tokio::test]
3921 async fn test_service_manager_with_stream_registry_tcp() {
3922 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3923 let stream_registry = Arc::new(StreamRegistry::new());
3924
3925 let mut manager = ServiceManager::new(runtime);
3926 manager.set_stream_registry(stream_registry.clone());
3927 manager.set_deployment_name("test".to_string());
3928
3929 let spec = mock_tcp_spec();
3931 Box::pin(manager.upsert_service("database".to_string(), spec))
3932 .await
3933 .unwrap();
3934
3935 assert_eq!(stream_registry.tcp_count(), 1);
3937 assert!(stream_registry.tcp_ports().contains(&5432));
3938
3939 manager.remove_service("database").await.unwrap();
3941 assert_eq!(stream_registry.tcp_count(), 0);
3942 }
3943
3944 #[tokio::test]
3945 async fn test_service_manager_with_stream_registry_udp() {
3946 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3947 let stream_registry = Arc::new(StreamRegistry::new());
3948
3949 let mut manager = ServiceManager::new(runtime);
3950 manager.set_stream_registry(stream_registry.clone());
3951 manager.set_deployment_name("test".to_string());
3952
3953 let spec = mock_udp_spec();
3955 Box::pin(manager.upsert_service("dns".to_string(), spec))
3956 .await
3957 .unwrap();
3958
3959 assert_eq!(stream_registry.udp_count(), 1);
3961 assert!(stream_registry.udp_ports().contains(&53));
3962
3963 manager.remove_service("dns").await.unwrap();
3965 assert_eq!(stream_registry.udp_count(), 0);
3966 }
3967
3968 #[tokio::test]
3969 async fn test_service_manager_with_stream_registry_mixed() {
3970 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3971 let stream_registry = Arc::new(StreamRegistry::new());
3972
3973 let mut manager = ServiceManager::new(runtime);
3974 manager.set_stream_registry(stream_registry.clone());
3975 manager.set_deployment_name("test".to_string());
3976
3977 let spec = mock_mixed_spec();
3979 Box::pin(manager.upsert_service("mixed".to_string(), spec))
3980 .await
3981 .unwrap();
3982
3983 assert_eq!(stream_registry.tcp_count(), 1); assert_eq!(stream_registry.udp_count(), 1); assert!(stream_registry.tcp_ports().contains(&9000));
3988 assert!(stream_registry.udp_ports().contains(&8125));
3989
3990 manager.remove_service("mixed").await.unwrap();
3992 assert_eq!(stream_registry.tcp_count(), 0);
3993 assert_eq!(stream_registry.udp_count(), 0);
3994 }
3995
3996 #[tokio::test]
3997 async fn test_service_manager_stream_registry_builder() {
3998 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3999 let stream_registry = Arc::new(StreamRegistry::new());
4000
4001 let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
4003
4004 assert!(manager.stream_registry().is_some());
4006 }
4007
4008 #[tokio::test]
4009 async fn test_tcp_service_without_stream_registry() {
4010 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4011
4012 let mut manager = ServiceManager::new(runtime);
4014 manager.set_deployment_name("test".to_string());
4015
4016 let spec = mock_tcp_spec();
4018 Box::pin(manager.upsert_service("database".to_string(), spec))
4019 .await
4020 .unwrap();
4021
4022 let services = manager.list_services().await;
4024 assert!(services.contains(&"database".to_string()));
4025 }
4026
4027 #[tokio::test]
4036 #[allow(clippy::too_many_lines)]
4037 async fn test_collect_endpoint_backends_respects_target_role() {
4038 use crate::runtime::Container;
4039 use std::collections::HashMap as StdHashMap;
4040 use std::net::{IpAddr, Ipv4Addr};
4041 use zlayer_spec::{
4042 EndpointSpec, ExposeType, GroupAffinity, Protocol, ReplicaGroup, ScaleSpec,
4043 };
4044
4045 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4046 let manager = ServiceManager::new(runtime.clone());
4047
4048 let mut spec = mock_spec();
4053 spec.replica_groups = Some(vec![
4054 ReplicaGroup {
4055 role: "primary".to_string(),
4056 count: 1,
4057 image: None,
4058 env: StdHashMap::new(),
4059 command: None,
4060 resources: None,
4061 affinity: GroupAffinity::default(),
4062 },
4063 ReplicaGroup {
4064 role: "read".to_string(),
4065 count: 2,
4066 image: None,
4067 env: StdHashMap::new(),
4068 command: None,
4069 resources: None,
4070 affinity: GroupAffinity::default(),
4071 },
4072 ]);
4073 spec.scale = ScaleSpec::Fixed { replicas: 3 };
4074 spec.endpoints = vec![
4075 EndpointSpec {
4076 name: "write".to_string(),
4077 protocol: Protocol::Tcp,
4078 port: 5432,
4079 target_port: Some(5432),
4080 path: None,
4081 host: None,
4082 expose: ExposeType::Internal,
4083 stream: None,
4084 tunnel: None,
4085 target_role: Some("primary".to_string()),
4086 },
4087 EndpointSpec {
4088 name: "read".to_string(),
4089 protocol: Protocol::Tcp,
4090 port: 5433,
4091 target_port: Some(5432),
4092 path: None,
4093 host: None,
4094 expose: ExposeType::Internal,
4095 stream: None,
4096 tunnel: None,
4097 target_role: Some("read".to_string()),
4098 },
4099 EndpointSpec {
4100 name: "any".to_string(),
4101 protocol: Protocol::Tcp,
4102 port: 5434,
4103 target_port: Some(5432),
4104 path: None,
4105 host: None,
4106 expose: ExposeType::Internal,
4107 stream: None,
4108 tunnel: None,
4109 target_role: None,
4110 },
4111 ];
4112
4113 let instance = ServiceInstance::new(
4114 "postgres".to_string(),
4115 spec.clone(),
4116 runtime,
4117 None, );
4119
4120 let cid_primary = ContainerId::with_role_and_node("postgres", 1, "primary", 0);
4122 let cid_first_read = ContainerId::with_role_and_node("postgres", 2, "read", 0);
4123 let cid_second_read = ContainerId::with_role_and_node("postgres", 3, "read", 0);
4124
4125 let ip_primary = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
4126 let ip_first_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2));
4127 let ip_second_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3));
4128
4129 {
4130 let mut containers = instance.containers().write().await;
4131 containers.insert(
4132 cid_primary.clone(),
4133 Container {
4134 id: cid_primary,
4135 image: spec.image.name.to_string(),
4136 state: crate::runtime::ContainerState::Running,
4137 pid: None,
4138 task: None,
4139 overlay_ip: Some(ip_primary),
4140 health_monitor: None,
4141 port_override: None,
4142 },
4143 );
4144 containers.insert(
4145 cid_first_read.clone(),
4146 Container {
4147 id: cid_first_read,
4148 image: spec.image.name.to_string(),
4149 state: crate::runtime::ContainerState::Running,
4150 pid: None,
4151 task: None,
4152 overlay_ip: Some(ip_first_read),
4153 health_monitor: None,
4154 port_override: None,
4155 },
4156 );
4157 containers.insert(
4158 cid_second_read.clone(),
4159 Container {
4160 id: cid_second_read,
4161 image: spec.image.name.to_string(),
4162 state: crate::runtime::ContainerState::Running,
4163 pid: None,
4164 task: None,
4165 overlay_ip: Some(ip_second_read),
4166 health_monitor: None,
4167 port_override: None,
4168 },
4169 );
4170 }
4171
4172 let write_ep = &spec.endpoints[0];
4173 let read_ep = &spec.endpoints[1];
4174 let any_ep = &spec.endpoints[2];
4175
4176 let write_backends = manager.collect_endpoint_backends(&instance, write_ep).await;
4177 let read_backends = manager.collect_endpoint_backends(&instance, read_ep).await;
4178 let any_backends = manager.collect_endpoint_backends(&instance, any_ep).await;
4179
4180 assert_eq!(write_backends.len(), 1, "write should match only primary");
4182 assert!(
4183 write_backends.iter().any(|a| a.ip() == ip_primary),
4184 "write backends missing primary IP: {write_backends:?}"
4185 );
4186
4187 assert_eq!(
4189 read_backends.len(),
4190 2,
4191 "read should match both read replicas"
4192 );
4193 assert!(read_backends.iter().any(|a| a.ip() == ip_first_read));
4194 assert!(read_backends.iter().any(|a| a.ip() == ip_second_read));
4195 assert!(
4196 !read_backends.iter().any(|a| a.ip() == ip_primary),
4197 "read backends must not contain primary: {read_backends:?}"
4198 );
4199
4200 assert_eq!(
4202 any_backends.len(),
4203 3,
4204 "any-role endpoint should see all containers"
4205 );
4206 }
4207}