1use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6 DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{
24 effective_pull_policy, DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType,
25 ServiceSpec,
26};
27
28pub struct ServiceInstance {
30 pub service_name: String,
31 pub spec: ServiceSpec,
32 runtime: Arc<dyn Runtime + Send + Sync>,
33 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
34 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
36 proxy_manager: Option<Arc<ProxyManager>>,
38 dns_server: Option<Arc<DnsServer>>,
40 health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
42 last_pulled_digest: tokio::sync::RwLock<Option<String>>,
47}
48
49impl ServiceInstance {
50 pub fn new(
52 service_name: String,
53 spec: ServiceSpec,
54 runtime: Arc<dyn Runtime + Send + Sync>,
55 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
56 ) -> Self {
57 Self {
58 service_name,
59 spec,
60 runtime,
61 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
62 overlay_manager,
63 proxy_manager: None,
64 dns_server: None,
65 health_states: None,
66 last_pulled_digest: tokio::sync::RwLock::new(None),
67 }
68 }
69
70 pub fn with_proxy(
72 service_name: String,
73 spec: ServiceSpec,
74 runtime: Arc<dyn Runtime + Send + Sync>,
75 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
76 proxy_manager: Arc<ProxyManager>,
77 ) -> Self {
78 Self {
79 service_name,
80 spec,
81 runtime,
82 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
83 overlay_manager,
84 proxy_manager: Some(proxy_manager),
85 dns_server: None,
86 health_states: None,
87 last_pulled_digest: tokio::sync::RwLock::new(None),
88 }
89 }
90
91 #[must_use]
93 pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
94 self.dns_server = Some(dns_server);
95 self
96 }
97
98 pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
100 self.dns_server = Some(dns_server);
101 }
102
103 pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
105 self.proxy_manager = Some(proxy_manager);
106 }
107
108 pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
110 self.health_states = Some(states);
111 }
112
113 pub async fn last_pulled_digest(&self) -> Option<String> {
117 self.last_pulled_digest.read().await.clone()
118 }
119
120 async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
128 let image_str = self.spec.image.name.to_string();
129 let effective = effective_pull_policy(&self.spec.image.name, self.spec.image.pull_policy);
130
131 if matches!(effective, PullPolicy::Never) {
132 return Ok(self.last_pulled_digest.read().await.clone());
133 }
134
135 self.runtime
136 .pull_image_with_policy(&image_str, effective, None)
137 .await
138 .map_err(|e| AgentError::PullFailed {
139 image: self.spec.image.name.to_string(),
140 reason: e.to_string(),
141 })?;
142
143 let new_digest = match self.runtime.list_images().await {
149 Ok(images) => images
150 .into_iter()
151 .find(|info| info.reference == image_str)
152 .and_then(|info| info.digest),
153 Err(e) => {
154 tracing::debug!(
155 image = %image_str,
156 error = %e,
157 "list_images unavailable; cannot record post-pull digest"
158 );
159 None
160 }
161 };
162
163 if let Some(ref digest) = new_digest {
164 *self.last_pulled_digest.write().await = Some(digest.clone());
165 }
166
167 Ok(new_digest)
168 }
169
170 #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
179 pub async fn scale_to(&self, replicas: u32) -> Result<()> {
180 let current_replicas = { self.containers.read().await.len() as u32 }; let effective = effective_pull_policy(&self.spec.image.name, self.spec.image.pull_policy);
189 if replicas >= current_replicas && !matches!(effective, PullPolicy::Never) {
190 let _ = self.pull_and_refresh_digest().await?;
191 }
192
193 if replicas > current_replicas {
195 for i in current_replicas..replicas {
196 let id = ContainerId {
197 service: self.service_name.clone(),
198 replica: i + 1,
199 };
200
201 self.runtime
209 .create_container(&id, &self.spec)
210 .await
211 .map_err(|e| match e {
212 AgentError::RouteToPeer { .. } => e,
213 other => AgentError::CreateFailed {
214 id: id.to_string(),
215 reason: other.to_string(),
216 },
217 })?;
218
219 let init_orchestrator = InitOrchestrator::with_error_policy(
221 id.clone(),
222 self.spec.init.clone(),
223 self.spec.errors.clone(),
224 );
225 init_orchestrator.run().await?;
226
227 self.runtime
229 .start_container(&id)
230 .await
231 .map_err(|e| AgentError::StartFailed {
232 id: id.to_string(),
233 reason: e.to_string(),
234 })?;
235
236 let mut container_pid = None;
238 for attempt in 1..=5u32 {
239 match self.runtime.get_container_pid(&id).await {
240 Ok(Some(pid)) => {
241 container_pid = Some(pid);
242 break;
243 }
244 Ok(None) if attempt < 5 => {
245 tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
246 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
247 }
248 Ok(None) => {
249 tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
250 }
251 Err(e) => {
252 tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
253 if attempt < 5 {
254 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
255 }
256 }
257 }
258 }
259
260 if container_pid.is_some() {
267 let alive = match self.runtime.container_state(&id).await {
268 Ok(
269 ContainerState::Running
270 | ContainerState::Pending
271 | ContainerState::Initializing,
272 ) => true,
273 Ok(state) => {
274 tracing::warn!(
275 container = %id,
276 ?state,
277 "container exited before overlay attach could run"
278 );
279 false
280 }
281 Err(e) => {
282 tracing::warn!(
286 container = %id,
287 error = %e,
288 "container state query failed before overlay attach, proceeding"
289 );
290 true
291 }
292 };
293 if !alive {
294 let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
295 || " <log read failed>".to_string(),
296 |entries| {
297 if entries.is_empty() {
298 " <no log output>".to_string()
299 } else {
300 entries
301 .into_iter()
302 .map(|e| format!(" {}", e.message))
303 .collect::<Vec<_>>()
304 .join("\n")
305 }
306 },
307 );
308 return Err(AgentError::StartFailed {
309 id: id.to_string(),
310 reason: format!("container exited during startup:\n{log_tail}"),
311 });
312 }
313 }
314
315 let overlay_ip = if let Some(overlay) = &self.overlay_manager {
325 let overlay_guard = overlay.read().await;
326 #[cfg(target_os = "windows")]
327 let attach_result: Option<std::net::IpAddr> = {
328 let _ = container_pid; match self.runtime.get_container_namespace_id(&id).await {
330 Ok(Some(ns_id)) => {
331 let ip_override =
332 self.runtime.get_container_ip(&id).await.ok().flatten();
333 let dns_server = overlay_guard.dns_server_addr().map(|sa| sa.ip());
334 let dns_domain =
335 overlay_guard.dns_domain().map(ToString::to_string);
336 match overlay_guard
337 .attach_container_hcn(
338 ns_id,
339 &self.service_name,
340 ip_override,
341 true,
342 dns_server,
343 dns_domain,
344 )
345 .await
346 {
347 Ok(ip) => Some(ip),
348 Err(e) => {
349 tracing::warn!(
350 container = %id,
351 error = %e,
352 "HCN overlay attach failed"
353 );
354 None
355 }
356 }
357 }
358 Ok(None) => {
359 tracing::debug!(
360 container = %id,
361 "skipping HCN overlay attach - no namespace id available"
362 );
363 None
364 }
365 Err(e) => {
366 tracing::warn!(
367 container = %id,
368 error = %e,
369 "failed to fetch HCN namespace id"
370 );
371 None
372 }
373 }
374 };
375 #[cfg(not(target_os = "windows"))]
376 let attach_result: Option<std::net::IpAddr> = {
377 if let Some(pid) = container_pid {
378 match overlay_guard
379 .attach_container(pid, &self.service_name, true)
380 .await
381 {
382 Ok(ip) => Some(ip),
383 Err(e) => {
384 tracing::warn!(
385 container = %id,
386 error = %e,
387 "failed to attach container to overlay network"
388 );
389 None
390 }
391 }
392 } else {
393 tracing::debug!(
395 container = %id,
396 "skipping overlay attachment - no PID available"
397 );
398 None
399 }
400 };
401
402 if let Some(ip) = attach_result {
403 tracing::info!(
404 container = %id,
405 overlay_ip = %ip,
406 "attached container to overlay network"
407 );
408
409 if let Some(dns) = &self.dns_server {
411 let service_hostname = format!("{}.service.local", self.service_name);
413
414 let replica_hostname =
416 format!("{}.{}.service.local", id.replica, self.service_name);
417
418 match dns.add_record(&service_hostname, ip).await {
419 Ok(()) => tracing::debug!(
420 hostname = %service_hostname,
421 ip = %ip,
422 "registered DNS for service"
423 ),
424 Err(e) => tracing::warn!(
425 hostname = %service_hostname,
426 error = %e,
427 "failed to register DNS for service"
428 ),
429 }
430
431 if let Err(e) = dns.add_record(&replica_hostname, ip).await {
433 tracing::warn!(
434 hostname = %replica_hostname,
435 error = %e,
436 "failed to register replica DNS"
437 );
438 } else {
439 tracing::debug!(
440 hostname = %replica_hostname,
441 ip = %ip,
442 "registered DNS for replica"
443 );
444 }
445 }
446
447 Some(ip)
448 } else {
449 None
450 }
451 } else {
452 None
453 };
454
455 let effective_ip = if overlay_ip.is_none() {
457 match self.runtime.get_container_ip(&id).await {
458 Ok(Some(ip)) => {
459 tracing::info!(
460 container = %id,
461 ip = %ip,
462 "using runtime container IP for proxy (overlay unavailable)"
463 );
464 Some(ip)
465 }
466 Ok(None) => {
467 tracing::warn!(
468 container = %id,
469 "no container IP available from runtime, proxy routing will be unavailable"
470 );
471 None
472 }
473 Err(e) => {
474 tracing::warn!(
475 container = %id,
476 error = %e,
477 "failed to get container IP from runtime"
478 );
479 None
480 }
481 }
482 } else {
483 overlay_ip
484 };
485
486 tracing::info!(
487 container = %id,
488 service = %self.service_name,
489 overlay_ip = ?overlay_ip,
490 effective_ip = ?effective_ip,
491 "Container IP resolution complete"
492 );
493
494 let port_override = match self.runtime.get_container_port_override(&id).await {
499 Ok(Some(port)) => {
500 tracing::info!(
501 container = %id,
502 port = port,
503 "runtime assigned dynamic port override for this container"
504 );
505 Some(port)
506 }
507 Ok(None) => None,
508 Err(e) => {
509 tracing::warn!(
510 container = %id,
511 error = %e,
512 "failed to query port override from runtime, using spec port"
513 );
514 None
515 }
516 };
517
518 let health_monitor_handle = {
520 let mut check = self.spec.health.check.clone();
521
522 if let HealthCheck::Tcp { ref mut port } = check {
526 if *port == 0 {
527 *port = port_override.unwrap_or_else(|| {
528 self.spec
529 .endpoints
530 .iter()
531 .find(|ep| {
532 matches!(
533 ep.protocol,
534 Protocol::Http | Protocol::Https | Protocol::Websocket
535 )
536 })
537 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
538 });
539 }
540 }
541
542 let start_grace = self
543 .spec
544 .health
545 .start_grace
546 .unwrap_or(Duration::from_secs(5));
547 let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
548 let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
549 let retries = self.spec.health.retries;
550
551 let checker = HealthChecker::new(check, effective_ip);
552 let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
553 .with_start_grace(start_grace)
554 .with_check_timeout(check_timeout);
555
556 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
559 let proxy = Arc::clone(proxy);
560 let service_name = self.service_name.clone();
561 let port = port_override.unwrap_or_else(|| {
565 self.spec
566 .endpoints
567 .iter()
568 .find(|ep| {
569 matches!(
570 ep.protocol,
571 Protocol::Http | Protocol::Https | Protocol::Websocket
572 )
573 })
574 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
575 });
576
577 let backend_addr = SocketAddr::new(ip, port);
578
579 proxy.add_backend(&self.service_name, backend_addr).await;
583
584 let health_states_opt = self.health_states.clone();
585 let svc_name_for_states = self.service_name.clone();
586
587 let health_callback: HealthCallback =
588 Arc::new(move |container_id: ContainerId, is_healthy: bool| {
589 let proxy = Arc::clone(&proxy);
590 let service_name = service_name.clone();
591 tracing::info!(
592 container = %container_id,
593 service = %service_name,
594 backend = %backend_addr,
595 healthy = is_healthy,
596 "health status changed, updating proxy backend"
597 );
598 tokio::spawn(async move {
600 proxy
601 .update_backend_health(
602 &service_name,
603 backend_addr,
604 is_healthy,
605 )
606 .await;
607 });
608 if let Some(ref health_states) = health_states_opt {
610 let states = Arc::clone(health_states);
611 let svc = svc_name_for_states.clone();
612 tokio::spawn(async move {
613 let state = if is_healthy {
614 HealthState::Healthy
615 } else {
616 HealthState::Unhealthy {
617 failures: 0,
618 reason: "health check failed".into(),
619 }
620 };
621 states.write().await.insert(svc, state);
622 });
623 }
624 });
625
626 monitor = monitor.with_callback(health_callback);
627 }
628
629 monitor.start()
630 };
631
632 {
634 let mut containers = self.containers.write().await;
635 containers.insert(
636 id.clone(),
637 Container {
638 id: id.clone(),
639 state: ContainerState::Running,
640 pid: None,
641 task: None,
642 overlay_ip: effective_ip,
643 health_monitor: Some(health_monitor_handle),
644 port_override,
645 },
646 );
647 } }
649 }
650
651 if replicas < current_replicas {
653 for i in replicas..current_replicas {
654 let id = ContainerId {
655 service: self.service_name.clone(),
656 replica: i + 1,
657 };
658
659 let removed_container = {
661 let mut containers = self.containers.write().await;
662 containers.remove(&id)
663 }; if let Some(container) = removed_container {
667 if let Some(handle) = container.health_monitor {
669 handle.abort();
670 }
671
672 if let Some(dns) = &self.dns_server {
674 let replica_hostname =
676 format!("{}.{}.service.local", id.replica, self.service_name);
677 if let Err(e) = dns.remove_record(&replica_hostname).await {
678 tracing::warn!(
679 hostname = %replica_hostname,
680 error = %e,
681 "failed to remove replica DNS record"
682 );
683 } else {
684 tracing::debug!(
685 hostname = %replica_hostname,
686 "removed replica DNS record"
687 );
688 }
689
690 }
694
695 self.runtime
697 .stop_container(&id, Duration::from_secs(30))
698 .await?;
699
700 if let Err(e) = self.runtime.sync_container_volumes(&id).await {
702 tracing::warn!(
703 container = %id,
704 error = %e,
705 "failed to sync volumes before removal"
706 );
707 }
708
709 self.runtime.remove_container(&id).await?;
711 }
712 }
713 }
714
715 Ok(())
716 }
717
718 pub async fn replica_count(&self) -> usize {
720 self.containers.read().await.len()
721 }
722
723 pub async fn container_ids(&self) -> Vec<ContainerId> {
725 self.containers.read().await.keys().cloned().collect()
726 }
727
728 pub fn containers(
733 &self,
734 ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
735 &self.containers
736 }
737
738 pub fn has_overlay_manager(&self) -> bool {
740 self.overlay_manager.is_some()
741 }
742
743 pub fn has_proxy_manager(&self) -> bool {
745 self.proxy_manager.is_some()
746 }
747
748 pub fn has_dns_server(&self) -> bool {
750 self.dns_server.is_some()
751 }
752}
753
754pub struct ServiceManager {
756 runtime: Arc<dyn Runtime + Send + Sync>,
757 services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
758 scale_semaphore: Arc<Semaphore>,
759 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
761 stream_registry: Option<Arc<StreamRegistry>>,
763 proxy_manager: Option<Arc<ProxyManager>>,
765 dns_server: Option<Arc<DnsServer>>,
767 deployment_name: Option<String>,
769 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
771 job_executor: Option<Arc<JobExecutor>>,
773 cron_scheduler: Option<Arc<CronScheduler>>,
775 container_supervisor: Option<Arc<ContainerSupervisor>>,
777}
778
779pub struct ServiceManagerBuilder {
797 runtime: Arc<dyn Runtime + Send + Sync>,
798 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
799 proxy_manager: Option<Arc<ProxyManager>>,
800 stream_registry: Option<Arc<StreamRegistry>>,
801 dns_server: Option<Arc<DnsServer>>,
802 deployment_name: Option<String>,
803 job_executor: Option<Arc<JobExecutor>>,
804 cron_scheduler: Option<Arc<CronScheduler>>,
805 container_supervisor: Option<Arc<ContainerSupervisor>>,
806}
807
808impl ServiceManagerBuilder {
809 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
811 Self {
812 runtime,
813 overlay_manager: None,
814 proxy_manager: None,
815 stream_registry: None,
816 dns_server: None,
817 deployment_name: None,
818 job_executor: None,
819 cron_scheduler: None,
820 container_supervisor: None,
821 }
822 }
823
824 #[must_use]
826 pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
827 self.overlay_manager = Some(om);
828 self
829 }
830
831 #[must_use]
833 pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
834 self.proxy_manager = Some(pm);
835 self
836 }
837
838 #[must_use]
840 pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
841 self.stream_registry = Some(sr);
842 self
843 }
844
845 #[must_use]
847 pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
848 self.dns_server = Some(dns);
849 self
850 }
851
852 #[must_use]
854 pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
855 self.deployment_name = Some(name.into());
856 self
857 }
858
859 #[must_use]
861 pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
862 self.job_executor = Some(je);
863 self
864 }
865
866 #[must_use]
868 pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
869 self.cron_scheduler = Some(cs);
870 self
871 }
872
873 #[must_use]
875 pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
876 self.container_supervisor = Some(cs);
877 self
878 }
879
880 pub fn build(self) -> ServiceManager {
885 if self.proxy_manager.is_none() {
886 tracing::warn!("ServiceManager built without proxy_manager");
887 }
888 if self.stream_registry.is_none() {
889 tracing::warn!("ServiceManager built without stream_registry");
890 }
891 if self.container_supervisor.is_none() {
892 tracing::warn!("ServiceManager built without container_supervisor");
893 }
894 if self.deployment_name.is_none() {
895 tracing::warn!("ServiceManager built without deployment_name");
896 }
897
898 ServiceManager {
899 runtime: self.runtime,
900 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
901 scale_semaphore: Arc::new(Semaphore::new(10)),
902 overlay_manager: self.overlay_manager,
903 stream_registry: self.stream_registry,
904 proxy_manager: self.proxy_manager,
905 dns_server: self.dns_server,
906 deployment_name: self.deployment_name,
907 health_states: Arc::new(RwLock::new(HashMap::new())),
908 job_executor: self.job_executor,
909 cron_scheduler: self.cron_scheduler,
910 container_supervisor: self.container_supervisor,
911 }
912 }
913}
914
915impl ServiceManager {
916 pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
929 ServiceManagerBuilder::new(runtime)
930 }
931
932 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
934 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
935 Self {
936 runtime,
937 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
938 scale_semaphore: Arc::new(Semaphore::new(10)), overlay_manager: None,
940 stream_registry: None,
941 proxy_manager: None,
942 dns_server: None,
943 deployment_name: None,
944 health_states: Arc::new(RwLock::new(HashMap::new())),
945 job_executor: None,
946 cron_scheduler: None,
947 container_supervisor: None,
948 }
949 }
950
951 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
953 pub fn with_overlay(
954 runtime: Arc<dyn Runtime + Send + Sync>,
955 overlay_manager: Arc<RwLock<OverlayManager>>,
956 ) -> Self {
957 Self {
958 runtime,
959 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
960 scale_semaphore: Arc::new(Semaphore::new(10)),
961 overlay_manager: Some(overlay_manager),
962 stream_registry: None,
963 proxy_manager: None,
964 dns_server: None,
965 deployment_name: None,
966 health_states: Arc::new(RwLock::new(HashMap::new())),
967 job_executor: None,
968 cron_scheduler: None,
969 container_supervisor: None,
970 }
971 }
972
973 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
975 pub fn with_full_config(
976 runtime: Arc<dyn Runtime + Send + Sync>,
977 overlay_manager: Arc<RwLock<OverlayManager>>,
978 deployment_name: String,
979 ) -> Self {
980 Self {
981 runtime,
982 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
983 scale_semaphore: Arc::new(Semaphore::new(10)),
984 overlay_manager: Some(overlay_manager),
985 stream_registry: None,
986 proxy_manager: None,
987 dns_server: None,
988 deployment_name: Some(deployment_name),
989 health_states: Arc::new(RwLock::new(HashMap::new())),
990 job_executor: None,
991 cron_scheduler: None,
992 container_supervisor: None,
993 }
994 }
995
996 pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
998 Arc::clone(&self.health_states)
999 }
1000
1001 pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1003 let mut states = self.health_states.write().await;
1004 states.insert(service_name.to_string(), state);
1005 }
1006
1007 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1009 pub fn set_deployment_name(&mut self, name: String) {
1010 self.deployment_name = Some(name);
1011 }
1012
1013 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1015 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1016 self.stream_registry = Some(registry);
1017 }
1018
1019 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1021 #[must_use]
1022 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1023 self.stream_registry = Some(registry);
1024 self
1025 }
1026
1027 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1029 self.stream_registry.as_ref()
1030 }
1031
1032 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1034 pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1035 self.overlay_manager = Some(manager);
1036 }
1037
1038 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1040 pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1041 self.proxy_manager = Some(proxy);
1042 }
1043
1044 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1046 #[must_use]
1047 pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1048 self.proxy_manager = Some(proxy);
1049 self
1050 }
1051
1052 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1054 self.proxy_manager.as_ref()
1055 }
1056
1057 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1059 pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1060 self.dns_server = Some(dns);
1061 }
1062
1063 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1065 #[must_use]
1066 pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1067 self.dns_server = Some(dns);
1068 self
1069 }
1070
1071 pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1073 self.dns_server.as_ref()
1074 }
1075
1076 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1078 pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1079 self.job_executor = Some(executor);
1080 }
1081
1082 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1084 pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1085 self.cron_scheduler = Some(scheduler);
1086 }
1087
1088 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1090 #[must_use]
1091 pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1092 self.job_executor = Some(executor);
1093 self
1094 }
1095
1096 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1098 #[must_use]
1099 pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1100 self.cron_scheduler = Some(scheduler);
1101 self
1102 }
1103
1104 pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1106 self.job_executor.as_ref()
1107 }
1108
1109 pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1111 self.cron_scheduler.as_ref()
1112 }
1113
1114 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1116 pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1117 self.container_supervisor = Some(supervisor);
1118 }
1119
1120 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1122 #[must_use]
1123 pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1124 self.container_supervisor = Some(supervisor);
1125 self
1126 }
1127
1128 pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1130 self.container_supervisor.as_ref()
1131 }
1132
1133 pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1144 let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1145 AgentError::Configuration("Container supervisor not configured".to_string())
1146 })?;
1147
1148 let supervisor = Arc::clone(supervisor);
1149 Ok(tokio::spawn(async move {
1150 supervisor.run_loop().await;
1151 }))
1152 }
1153
1154 pub fn shutdown_container_supervisor(&self) {
1156 if let Some(supervisor) = &self.container_supervisor {
1157 supervisor.shutdown();
1158 }
1159 }
1160
1161 pub async fn get_container_supervised_state(
1163 &self,
1164 container_id: &ContainerId,
1165 ) -> Option<SupervisedState> {
1166 if let Some(supervisor) = &self.container_supervisor {
1167 supervisor.get_state(container_id).await
1168 } else {
1169 None
1170 }
1171 }
1172
1173 pub async fn take_supervisor_events(
1177 &self,
1178 ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1179 if let Some(supervisor) = &self.container_supervisor {
1180 supervisor.take_event_receiver().await
1181 } else {
1182 None
1183 }
1184 }
1185
1186 pub async fn deploy_with_dependencies(
1203 &self,
1204 services: HashMap<String, ServiceSpec>,
1205 ) -> Result<()> {
1206 if services.is_empty() {
1207 return Ok(());
1208 }
1209
1210 let graph = DependencyGraph::build(&services)?;
1212
1213 tracing::info!(
1214 service_count = services.len(),
1215 "Starting deployment with dependency ordering"
1216 );
1217
1218 let order = graph.startup_order();
1220 tracing::debug!(order = ?order, "Computed startup order");
1221
1222 for service_name in order {
1224 let service_spec = services
1225 .get(service_name)
1226 .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1227
1228 if !service_spec.depends.is_empty() {
1230 tracing::info!(
1231 service = %service_name,
1232 dependency_count = service_spec.depends.len(),
1233 "Waiting for dependencies"
1234 );
1235 self.wait_for_dependencies(service_name, &service_spec.depends)
1236 .await?;
1237 }
1238
1239 tracing::info!(service = %service_name, "Starting service");
1241 self.upsert_service(service_name.clone(), service_spec.clone())
1242 .await?;
1243
1244 let replicas = match &service_spec.scale {
1246 zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1247 zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, zlayer_spec::ScaleSpec::Manual => 1, };
1250 self.scale_service(service_name, replicas).await?;
1251
1252 self.update_health_state(service_name, HealthState::Unknown)
1254 .await;
1255
1256 tracing::info!(
1257 service = %service_name,
1258 replicas = replicas,
1259 "Service started"
1260 );
1261 }
1262
1263 tracing::info!(service_count = services.len(), "Deployment complete");
1264
1265 Ok(())
1266 }
1267
1268 async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1277 let condition_checker = DependencyConditionChecker::new(
1278 Arc::clone(&self.runtime),
1279 Arc::clone(&self.health_states),
1280 None,
1281 );
1282
1283 let waiter = DependencyWaiter::new(condition_checker);
1284 let results = waiter.wait_for_all(deps).await?;
1285
1286 for result in results {
1288 match result {
1289 WaitResult::TimedOutFail {
1290 service: dep_service,
1291 condition,
1292 timeout,
1293 } => {
1294 return Err(AgentError::DependencyTimeout {
1295 service: service.to_string(),
1296 dependency: dep_service,
1297 condition: format!("{condition:?}"),
1298 timeout,
1299 });
1300 }
1301 WaitResult::TimedOutWarn {
1302 service: dep_service,
1303 condition,
1304 } => {
1305 tracing::warn!(
1306 service = %service,
1307 dependency = %dep_service,
1308 condition = ?condition,
1309 "Dependency timed out but continuing"
1310 );
1311 }
1312 WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1313 }
1315 }
1316 }
1317
1318 Ok(())
1319 }
1320
1321 pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1328 let condition_checker = DependencyConditionChecker::new(
1329 Arc::clone(&self.runtime),
1330 Arc::clone(&self.health_states),
1331 None,
1332 );
1333
1334 for dep in deps {
1335 if !condition_checker.check(dep).await? {
1336 return Ok(false);
1337 }
1338 }
1339
1340 Ok(true)
1341 }
1342
1343 #[allow(clippy::too_many_lines)]
1353 pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1354 match spec.rtype {
1355 ResourceType::Service => {
1356 let mut services = self.services.write().await;
1358
1359 if let Some(instance) = services.get_mut(&name) {
1360 instance.spec = spec.clone();
1366 if let Some(dns) = &self.dns_server {
1367 instance.set_dns_server(Arc::clone(dns));
1368 }
1369
1370 let effective = effective_pull_policy(&spec.image.name, spec.image.pull_policy);
1371 let old_digest = instance.last_pulled_digest().await;
1372 let current_replicas =
1373 u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
1374 drop(services); match effective {
1377 PullPolicy::Never | PullPolicy::IfNotPresent => {
1378 tracing::debug!(
1381 service = %name,
1382 policy = ?effective,
1383 "service unchanged on re-deploy (effective pull policy skips refresh)"
1384 );
1385 }
1386 PullPolicy::Always | PullPolicy::Newer => {
1387 let services_ro = self.services.read().await;
1391 let new_digest = if let Some(inst) = services_ro.get(&name) {
1392 inst.pull_and_refresh_digest().await?
1393 } else {
1394 tracing::warn!(
1398 service = %name,
1399 "service removed during upsert; skipping drift recreate"
1400 );
1401 return Ok(());
1402 };
1403 drop(services_ro);
1404
1405 let should_recreate = match effective {
1410 PullPolicy::Always => true,
1411 PullPolicy::Newer => match (&old_digest, &new_digest) {
1412 (Some(old), Some(new)) => old != new,
1413 _ => false,
1414 },
1415 _ => false,
1416 };
1417
1418 if should_recreate && current_replicas > 0 {
1419 tracing::info!(
1420 service = %name,
1421 policy = ?effective,
1422 old_digest = ?old_digest,
1423 new_digest = ?new_digest,
1424 replicas = current_replicas,
1425 "image drift detected; performing rolling recreate"
1426 );
1427 self.scale_service(&name, 0).await?;
1428 self.scale_service(&name, current_replicas).await?;
1429 tracing::info!(
1430 service = %name,
1431 new_digest = ?new_digest,
1432 "service recreated with refreshed image"
1433 );
1434 } else {
1435 tracing::debug!(
1436 service = %name,
1437 policy = ?effective,
1438 old_digest = ?old_digest,
1439 new_digest = ?new_digest,
1440 "service up to date; no recreate required"
1441 );
1442 }
1443 }
1444 }
1445 return Ok(());
1446 }
1447 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1449 let mut instance = if let Some(proxy) = &self.proxy_manager {
1450 ServiceInstance::with_proxy(
1451 name.clone(),
1452 spec,
1453 self.runtime.clone(),
1454 overlay,
1455 Arc::clone(proxy),
1456 )
1457 } else {
1458 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1459 };
1460 if let Some(dns) = &self.dns_server {
1462 instance.set_dns_server(Arc::clone(dns));
1463 }
1464 instance.set_health_states(Arc::clone(&self.health_states));
1466 if let Some(proxy) = &self.proxy_manager {
1468 proxy.add_service(&name, &instance.spec).await;
1469 }
1470 if let Some(stream_registry) = &self.stream_registry {
1472 for endpoint in &instance.spec.endpoints {
1473 let svc = StreamService::new(
1474 name.clone(),
1475 Vec::new(), );
1477 match endpoint.protocol {
1478 Protocol::Tcp => {
1479 stream_registry.register_tcp(endpoint.port, svc);
1480 tracing::debug!(
1481 service = %name,
1482 port = endpoint.port,
1483 "Registered TCP stream route"
1484 );
1485 }
1486 Protocol::Udp => {
1487 stream_registry.register_udp(endpoint.port, svc);
1488 tracing::debug!(
1489 service = %name,
1490 port = endpoint.port,
1491 "Registered UDP stream route"
1492 );
1493 }
1494 _ => {} }
1496 }
1497 }
1498 services.insert(name, instance);
1499 }
1500 ResourceType::Job => {
1501 if let Some(executor) = &self.job_executor {
1504 executor.register_job(&name, spec).await;
1505 tracing::info!(job = %name, "Registered job spec");
1506 } else {
1507 tracing::warn!(
1508 job = %name,
1509 "Job executor not configured, storing as service for reference"
1510 );
1511 let mut services = self.services.write().await;
1513 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1514 let mut instance = if let Some(proxy) = &self.proxy_manager {
1515 ServiceInstance::with_proxy(
1516 name.clone(),
1517 spec,
1518 self.runtime.clone(),
1519 overlay,
1520 Arc::clone(proxy),
1521 )
1522 } else {
1523 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1524 };
1525 if let Some(dns) = &self.dns_server {
1527 instance.set_dns_server(Arc::clone(dns));
1528 }
1529 services.insert(name, instance);
1530 }
1531 }
1532 ResourceType::Cron => {
1533 if let Some(scheduler) = &self.cron_scheduler {
1535 scheduler.register(&name, &spec).await?;
1536 tracing::info!(cron = %name, "Registered cron job with scheduler");
1537 } else {
1538 return Err(AgentError::Configuration(format!(
1539 "Cron scheduler not configured for cron job '{name}'"
1540 )));
1541 }
1542 }
1543 }
1544
1545 Ok(())
1546 }
1547
1548 async fn update_proxy_backends(&self, service_name: &str, addrs: Vec<SocketAddr>) {
1550 if let Some(proxy) = &self.proxy_manager {
1551 proxy.update_backends(service_name, addrs).await;
1552 }
1553 }
1554
1555 fn update_stream_backends(&self, spec: &ServiceSpec, addrs: &[SocketAddr]) {
1564 let Some(stream_registry) = &self.stream_registry else {
1565 return;
1566 };
1567
1568 let primary_spec_port = spec
1572 .endpoints
1573 .iter()
1574 .find(|ep| {
1575 matches!(
1576 ep.protocol,
1577 Protocol::Http | Protocol::Https | Protocol::Websocket
1578 )
1579 })
1580 .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1581
1582 let has_port_overrides = addrs.iter().any(|addr| addr.port() != primary_spec_port);
1583
1584 for endpoint in &spec.endpoints {
1585 match endpoint.protocol {
1586 Protocol::Tcp => {
1587 let tcp_backends: Vec<SocketAddr> = if has_port_overrides {
1588 addrs.to_vec()
1591 } else {
1592 addrs
1595 .iter()
1596 .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1597 .collect()
1598 };
1599
1600 stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
1601
1602 tracing::debug!(
1603 endpoint = %endpoint.name,
1604 port = endpoint.port,
1605 backend_count = addrs.len(),
1606 "Updated TCP stream backends"
1607 );
1608 }
1609 Protocol::Udp => {
1610 let udp_backends: Vec<SocketAddr> = if has_port_overrides {
1611 addrs.to_vec()
1612 } else {
1613 addrs
1614 .iter()
1615 .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1616 .collect()
1617 };
1618
1619 stream_registry.update_udp_backends(endpoint.port, udp_backends);
1620
1621 tracing::debug!(
1622 endpoint = %endpoint.name,
1623 port = endpoint.port,
1624 backend_count = addrs.len(),
1625 "Updated UDP stream backends"
1626 );
1627 }
1628 _ => {} }
1630 }
1631 }
1632
1633 #[allow(clippy::cast_possible_truncation)]
1638 pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
1639 let _permit = self.scale_semaphore.acquire().await;
1640
1641 let services = self.services.read().await;
1642 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1643 container: name.to_string(),
1644 reason: "service not found".to_string(),
1645 })?;
1646
1647 let current_replicas = instance.replica_count().await as u32;
1649
1650 instance.scale_to(replicas).await?;
1652
1653 let addrs = self.collect_backend_addrs(instance, replicas).await;
1659
1660 if self.proxy_manager.is_some() && !addrs.is_empty() {
1662 self.update_proxy_backends(name, addrs.clone()).await;
1663 }
1664
1665 if self.stream_registry.is_some() {
1667 self.update_stream_backends(&instance.spec, &addrs);
1668 }
1669
1670 if let Some(supervisor) = &self.container_supervisor {
1672 if replicas > current_replicas {
1674 for i in current_replicas..replicas {
1675 let container_id = ContainerId {
1676 service: name.to_string(),
1677 replica: i + 1,
1678 };
1679 supervisor.supervise(&container_id, &instance.spec).await;
1680 }
1681 }
1682 if replicas < current_replicas {
1684 for i in replicas..current_replicas {
1685 let container_id = ContainerId {
1686 service: name.to_string(),
1687 replica: i + 1,
1688 };
1689 supervisor.unsupervise(&container_id).await;
1690 }
1691 }
1692 }
1693
1694 Ok(())
1695 }
1696
1697 async fn collect_backend_addrs(
1708 &self,
1709 instance: &ServiceInstance,
1710 _replicas: u32, ) -> Vec<SocketAddr> {
1712 let mut addrs = Vec::new();
1713
1714 let spec_port = instance
1716 .spec
1717 .endpoints
1718 .iter()
1719 .find(|ep| {
1720 matches!(
1721 ep.protocol,
1722 Protocol::Http | Protocol::Https | Protocol::Websocket
1723 )
1724 })
1725 .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1726
1727 let containers = instance.containers().read().await;
1729
1730 for container in containers.values() {
1731 if let Some(ip) = container.overlay_ip {
1732 let port = container.port_override.unwrap_or(spec_port);
1735 addrs.push(SocketAddr::new(ip, port));
1736 }
1737 }
1738
1739 if addrs.is_empty() && !containers.is_empty() {
1742 tracing::warn!(
1743 service = %instance.service_name,
1744 container_count = containers.len(),
1745 "no overlay IPs available for backends - containers may not be reachable via proxy"
1746 );
1747 }
1748
1749 addrs
1750 }
1751
1752 pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
1757 let services = self.services.read().await;
1758 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1759 container: name.to_string(),
1760 reason: "service not found".to_string(),
1761 })?;
1762
1763 Ok(instance.replica_count().await)
1764 }
1765
1766 pub async fn remove_service(&self, name: &str) -> Result<()> {
1776 if let Some(scheduler) = &self.cron_scheduler {
1778 scheduler.unregister(name).await;
1779 }
1780
1781 if let Some(executor) = &self.job_executor {
1783 executor.unregister_job(name).await;
1784 }
1785
1786 if let Some(stream_registry) = &self.stream_registry {
1788 let services = self.services.read().await;
1790 if let Some(instance) = services.get(name) {
1791 for endpoint in &instance.spec.endpoints {
1792 match endpoint.protocol {
1793 Protocol::Tcp => {
1794 let _ = stream_registry.unregister_tcp(endpoint.port);
1795 tracing::debug!(
1796 service = %name,
1797 port = endpoint.port,
1798 "Unregistered TCP stream route"
1799 );
1800 }
1801 Protocol::Udp => {
1802 let _ = stream_registry.unregister_udp(endpoint.port);
1803 tracing::debug!(
1804 service = %name,
1805 port = endpoint.port,
1806 "Unregistered UDP stream route"
1807 );
1808 }
1809 _ => {} }
1811 }
1812 }
1813 drop(services); }
1815
1816 if let Some(supervisor) = &self.container_supervisor {
1818 let containers = self.get_service_containers(name).await;
1819 for container_id in containers {
1820 supervisor.unsupervise(&container_id).await;
1821 }
1822 tracing::debug!(service = %name, "Unregistered containers from supervisor");
1823 }
1824
1825 if let Some(dns) = &self.dns_server {
1827 let service_hostname = format!("{name}.service.local");
1829 if let Err(e) = dns.remove_record(&service_hostname).await {
1830 tracing::warn!(
1831 hostname = %service_hostname,
1832 error = %e,
1833 "failed to remove service DNS record"
1834 );
1835 } else {
1836 tracing::debug!(
1837 hostname = %service_hostname,
1838 "removed service DNS record"
1839 );
1840 }
1841
1842 let services = self.services.read().await;
1844 if let Some(instance) = services.get(name) {
1845 let containers = instance.containers().read().await;
1846 for (id, _) in containers.iter() {
1847 let replica_hostname = format!("{}.{}.service.local", id.replica, name);
1848 if let Err(e) = dns.remove_record(&replica_hostname).await {
1849 tracing::warn!(
1850 hostname = %replica_hostname,
1851 error = %e,
1852 "failed to remove replica DNS record during service removal"
1853 );
1854 }
1855 }
1856 }
1857 drop(services); }
1859
1860 let mut services = self.services.write().await;
1862 if services.remove(name).is_some() {
1863 tracing::debug!(service = %name, "Removed service from manager");
1864 }
1865
1866 Ok(())
1867 }
1868
1869 pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
1872 let services = self.services.read().await;
1873 services.get(name).map(|i| {
1874 (
1875 i.has_overlay_manager(),
1876 i.has_proxy_manager(),
1877 i.has_dns_server(),
1878 )
1879 })
1880 }
1881
1882 pub async fn list_services(&self) -> Vec<String> {
1884 self.services.read().await.keys().cloned().collect()
1885 }
1886
1887 pub async fn get_service_logs(
1901 &self,
1902 service_name: &str,
1903 tail: usize,
1904 instance: Option<&str>,
1905 ) -> Result<Vec<LogEntry>> {
1906 let container_ids = self.get_service_containers(service_name).await;
1907
1908 if container_ids.is_empty() {
1909 return Err(AgentError::NotFound {
1910 container: service_name.to_string(),
1911 reason: "no containers found for service".to_string(),
1912 });
1913 }
1914
1915 let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
1917 if let Ok(replica_num) = inst.parse::<u32>() {
1918 container_ids
1919 .iter()
1920 .filter(|id| id.replica == replica_num)
1921 .collect()
1922 } else {
1923 container_ids
1925 .iter()
1926 .filter(|id| id.to_string().contains(inst))
1927 .collect()
1928 }
1929 } else {
1930 container_ids.iter().collect()
1931 };
1932
1933 if target_ids.is_empty() {
1934 return Err(AgentError::NotFound {
1935 container: format!("{}/{}", service_name, instance.unwrap_or("?")),
1936 reason: "instance not found".to_string(),
1937 });
1938 }
1939
1940 let mut all_entries: Vec<LogEntry> = Vec::new();
1941
1942 for id in &target_ids {
1943 match self.runtime.container_logs(id, tail).await {
1944 Ok(mut entries) => {
1945 for entry in &mut entries {
1947 if entry.service.is_none() {
1948 entry.service = Some(service_name.to_string());
1949 }
1950 if entry.deployment.is_none() {
1951 entry.deployment.clone_from(&self.deployment_name);
1952 }
1953 }
1954 all_entries.extend(entries);
1955 }
1956 Err(e) => {
1957 tracing::warn!(
1958 service = service_name,
1959 container = %id,
1960 error = %e,
1961 "Failed to read container logs"
1962 );
1963 }
1964 }
1965 }
1966
1967 Ok(all_entries)
1968 }
1969
1970 pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
1980 let services = self.services.read().await;
1981 if let Some(instance) = services.get(service_name) {
1982 instance.container_ids().await
1983 } else {
1984 Vec::new()
1985 }
1986 }
1987
1988 pub async fn exec_in_container(
2007 &self,
2008 service_name: &str,
2009 replica: Option<u32>,
2010 cmd: &[String],
2011 ) -> Result<(i32, String, String)> {
2012 let container_ids = self.get_service_containers(service_name).await;
2013
2014 if container_ids.is_empty() {
2015 return Err(AgentError::NotFound {
2016 container: service_name.to_string(),
2017 reason: "no containers found for service".to_string(),
2018 });
2019 }
2020
2021 let target = if let Some(rep) = replica {
2023 container_ids
2024 .into_iter()
2025 .find(|cid| cid.replica == rep)
2026 .ok_or_else(|| AgentError::NotFound {
2027 container: format!("{service_name}-rep-{rep}"),
2028 reason: format!("replica {rep} not found for service"),
2029 })?
2030 } else {
2031 container_ids.into_iter().next().unwrap()
2033 };
2034
2035 self.runtime.exec(&target, cmd).await
2036 }
2037
2038 pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
2053 let executor = self
2054 .job_executor
2055 .as_ref()
2056 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2057
2058 let spec = executor
2059 .get_job_spec(name)
2060 .await
2061 .ok_or_else(|| AgentError::NotFound {
2062 container: name.to_string(),
2063 reason: "job not registered".to_string(),
2064 })?;
2065
2066 executor.trigger(name, &spec, trigger).await
2067 }
2068
2069 pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
2077 if let Some(executor) = &self.job_executor {
2078 executor.get_execution(id).await
2079 } else {
2080 None
2081 }
2082 }
2083
2084 pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
2092 if let Some(executor) = &self.job_executor {
2093 executor.list_executions(name).await
2094 } else {
2095 Vec::new()
2096 }
2097 }
2098
2099 pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
2107 let executor = self
2108 .job_executor
2109 .as_ref()
2110 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2111
2112 executor.cancel(id).await
2113 }
2114
2115 pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
2128 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2129 AgentError::Configuration("Cron scheduler not configured".to_string())
2130 })?;
2131
2132 scheduler.trigger_now(name).await
2133 }
2134
2135 pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
2141 if let Some(scheduler) = &self.cron_scheduler {
2142 scheduler.set_enabled(name, enabled).await;
2143 }
2144 }
2145
2146 pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2148 if let Some(scheduler) = &self.cron_scheduler {
2149 scheduler.list_jobs().await
2150 } else {
2151 Vec::new()
2152 }
2153 }
2154
2155 pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2163 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2164 AgentError::Configuration("Cron scheduler not configured".to_string())
2165 })?;
2166
2167 let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2168 Ok(tokio::spawn(async move {
2169 scheduler.run_loop().await;
2170 }))
2171 }
2172
2173 pub fn shutdown_cron(&self) {
2175 if let Some(scheduler) = &self.cron_scheduler {
2176 scheduler.shutdown();
2177 }
2178 }
2179}
2180
2181#[cfg(test)]
2182#[allow(deprecated)]
2183mod tests {
2184 use super::*;
2185 use crate::runtime::MockRuntime;
2186
2187 #[tokio::test]
2188 async fn test_service_manager() {
2189 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2190 let manager = ServiceManager::new(runtime);
2191
2192 let spec = mock_spec();
2194 manager
2195 .upsert_service("test".to_string(), spec)
2196 .await
2197 .unwrap();
2198
2199 manager.scale_service("test", 3).await.unwrap();
2201
2202 let count = manager.service_replica_count("test").await.unwrap();
2204 assert_eq!(count, 3);
2205
2206 let services = manager.list_services().await;
2208 assert_eq!(services, vec!["test".to_string()]);
2209 }
2210
2211 #[tokio::test]
2212 async fn test_service_manager_basic_lifecycle() {
2213 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2214 let manager = ServiceManager::new(runtime);
2215
2216 let spec = mock_spec();
2218 manager
2219 .upsert_service("api".to_string(), spec)
2220 .await
2221 .unwrap();
2222
2223 manager.scale_service("api", 2).await.unwrap();
2225
2226 let count = manager.service_replica_count("api").await.unwrap();
2228 assert_eq!(count, 2);
2229
2230 manager.remove_service("api").await.unwrap();
2232
2233 let services = manager.list_services().await;
2235 assert!(!services.contains(&"api".to_string()));
2236 }
2237
2238 #[tokio::test]
2239 async fn test_service_manager_with_full_config() {
2240 use tokio::sync::RwLock;
2241
2242 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2243
2244 let overlay_manager = Arc::new(RwLock::new(
2246 OverlayManager::new("test-deployment".to_string())
2247 .await
2248 .unwrap(),
2249 ));
2250
2251 let manager =
2252 ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2253
2254 let spec = mock_spec();
2256 manager
2257 .upsert_service("web".to_string(), spec)
2258 .await
2259 .unwrap();
2260
2261 let services = manager.list_services().await;
2263 assert!(services.contains(&"web".to_string()));
2264 }
2265
2266 fn mock_spec() -> ServiceSpec {
2267 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2268 r"
2269version: v1
2270deployment: test
2271services:
2272 test:
2273 rtype: service
2274 image:
2275 name: test:latest
2276 endpoints:
2277 - name: http
2278 protocol: http
2279 port: 8080
2280 scale:
2281 mode: fixed
2282 replicas: 1
2283",
2284 )
2285 .unwrap()
2286 .services
2287 .remove("test")
2288 .unwrap()
2289 }
2290
2291 fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
2293 let mut spec = mock_spec();
2294 spec.depends = deps;
2295 spec
2296 }
2297
2298 fn dep(
2300 service: &str,
2301 condition: zlayer_spec::DependencyCondition,
2302 timeout_ms: u64,
2303 on_timeout: zlayer_spec::TimeoutAction,
2304 ) -> DependsSpec {
2305 DependsSpec {
2306 service: service.to_string(),
2307 condition,
2308 timeout: Some(Duration::from_millis(timeout_ms)),
2309 on_timeout,
2310 }
2311 }
2312
2313 #[tokio::test]
2314 async fn test_deploy_with_dependencies_no_deps() {
2315 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2316 let manager = ServiceManager::new(runtime);
2317
2318 let mut services = HashMap::new();
2320 services.insert("a".to_string(), mock_spec());
2321 services.insert("b".to_string(), mock_spec());
2322
2323 manager.deploy_with_dependencies(services).await.unwrap();
2325
2326 let service_list = manager.list_services().await;
2328 assert_eq!(service_list.len(), 2);
2329 }
2330
2331 #[tokio::test]
2332 async fn test_deploy_with_dependencies_linear() {
2333 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2334 let manager = ServiceManager::new(runtime);
2335
2336 let mut services = HashMap::new();
2339 services.insert("c".to_string(), mock_spec());
2340 services.insert(
2341 "b".to_string(),
2342 mock_spec_with_deps(vec![dep(
2343 "c",
2344 zlayer_spec::DependencyCondition::Started,
2345 5000,
2346 zlayer_spec::TimeoutAction::Fail,
2347 )]),
2348 );
2349 services.insert(
2350 "a".to_string(),
2351 mock_spec_with_deps(vec![dep(
2352 "b",
2353 zlayer_spec::DependencyCondition::Started,
2354 5000,
2355 zlayer_spec::TimeoutAction::Fail,
2356 )]),
2357 );
2358
2359 manager.deploy_with_dependencies(services).await.unwrap();
2361
2362 let service_list = manager.list_services().await;
2364 assert_eq!(service_list.len(), 3);
2365 }
2366
2367 #[tokio::test]
2368 async fn test_deploy_with_dependencies_cycle_detection() {
2369 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2370 let manager = ServiceManager::new(runtime);
2371
2372 let mut services = HashMap::new();
2374 services.insert(
2375 "a".to_string(),
2376 mock_spec_with_deps(vec![dep(
2377 "b",
2378 zlayer_spec::DependencyCondition::Started,
2379 5000,
2380 zlayer_spec::TimeoutAction::Fail,
2381 )]),
2382 );
2383 services.insert(
2384 "b".to_string(),
2385 mock_spec_with_deps(vec![dep(
2386 "a",
2387 zlayer_spec::DependencyCondition::Started,
2388 5000,
2389 zlayer_spec::TimeoutAction::Fail,
2390 )]),
2391 );
2392
2393 let result = manager.deploy_with_dependencies(services).await;
2395 assert!(result.is_err());
2396 let err = result.unwrap_err().to_string();
2397 assert!(err.contains("Cyclic dependency"));
2398 }
2399
2400 #[tokio::test]
2401 async fn test_deploy_with_dependencies_timeout_continue() {
2402 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2403 let manager = ServiceManager::new(runtime);
2404
2405 let mut services = HashMap::new();
2408 services.insert("b".to_string(), mock_spec());
2409 services.insert(
2410 "a".to_string(),
2411 mock_spec_with_deps(vec![dep(
2412 "b",
2413 zlayer_spec::DependencyCondition::Healthy, 100, zlayer_spec::TimeoutAction::Continue, )]),
2417 );
2418
2419 manager.deploy_with_dependencies(services).await.unwrap();
2421
2422 let service_list = manager.list_services().await;
2423 assert_eq!(service_list.len(), 2);
2424 }
2425
2426 #[tokio::test]
2427 async fn test_deploy_with_dependencies_timeout_warn() {
2428 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2429 let manager = ServiceManager::new(runtime);
2430
2431 let mut services = HashMap::new();
2434 services.insert("b".to_string(), mock_spec());
2435 services.insert(
2436 "a".to_string(),
2437 mock_spec_with_deps(vec![dep(
2438 "b",
2439 zlayer_spec::DependencyCondition::Healthy,
2440 100,
2441 zlayer_spec::TimeoutAction::Warn,
2442 )]),
2443 );
2444
2445 manager.deploy_with_dependencies(services).await.unwrap();
2447
2448 let service_list = manager.list_services().await;
2449 assert_eq!(service_list.len(), 2);
2450 }
2451
2452 #[tokio::test]
2453 async fn test_deploy_with_dependencies_timeout_fail() {
2454 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2455 let manager = ServiceManager::new(runtime);
2456
2457 let mut services = HashMap::new();
2460 services.insert("b".to_string(), mock_spec());
2461 services.insert(
2462 "a".to_string(),
2463 mock_spec_with_deps(vec![dep(
2464 "b",
2465 zlayer_spec::DependencyCondition::Healthy,
2466 100,
2467 zlayer_spec::TimeoutAction::Fail,
2468 )]),
2469 );
2470
2471 let result = manager.deploy_with_dependencies(services).await;
2473 assert!(result.is_err());
2474
2475 let err = result.unwrap_err().to_string();
2477 assert!(err.contains("Dependency timeout"));
2478 }
2479
2480 #[tokio::test]
2481 async fn test_check_dependencies_all_satisfied() {
2482 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2483 let manager = ServiceManager::new(runtime);
2484
2485 manager
2487 .update_health_state("db", HealthState::Healthy)
2488 .await;
2489
2490 let deps = vec![DependsSpec {
2491 service: "db".to_string(),
2492 condition: zlayer_spec::DependencyCondition::Healthy,
2493 timeout: Some(Duration::from_secs(60)),
2494 on_timeout: zlayer_spec::TimeoutAction::Fail,
2495 }];
2496
2497 let satisfied = manager.check_dependencies(&deps).await.unwrap();
2498 assert!(satisfied);
2499 }
2500
2501 #[tokio::test]
2502 async fn test_check_dependencies_not_satisfied() {
2503 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2504 let manager = ServiceManager::new(runtime);
2505
2506 let deps = vec![DependsSpec {
2508 service: "db".to_string(),
2509 condition: zlayer_spec::DependencyCondition::Healthy,
2510 timeout: Some(Duration::from_secs(60)),
2511 on_timeout: zlayer_spec::TimeoutAction::Fail,
2512 }];
2513
2514 let satisfied = manager.check_dependencies(&deps).await.unwrap();
2515 assert!(!satisfied);
2516 }
2517
2518 #[tokio::test]
2519 async fn test_health_state_tracking() {
2520 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2521 let manager = ServiceManager::new(runtime);
2522
2523 manager
2525 .update_health_state("db", HealthState::Healthy)
2526 .await;
2527 manager
2528 .update_health_state("cache", HealthState::Unknown)
2529 .await;
2530
2531 let states = manager.health_states();
2533 let states_read = states.read().await;
2534
2535 assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
2536 assert!(matches!(
2537 states_read.get("cache"),
2538 Some(HealthState::Unknown)
2539 ));
2540 }
2541
2542 fn mock_job_spec() -> ServiceSpec {
2545 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2546 r"
2547version: v1
2548deployment: test
2549services:
2550 backup:
2551 rtype: job
2552 image:
2553 name: backup:latest
2554",
2555 )
2556 .unwrap()
2557 .services
2558 .remove("backup")
2559 .unwrap()
2560 }
2561
2562 fn mock_cron_spec() -> ServiceSpec {
2563 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2564 r#"
2565version: v1
2566deployment: test
2567services:
2568 cleanup:
2569 rtype: cron
2570 schedule: "0 0 * * * * *"
2571 image:
2572 name: cleanup:latest
2573"#,
2574 )
2575 .unwrap()
2576 .services
2577 .remove("cleanup")
2578 .unwrap()
2579 }
2580
2581 #[tokio::test]
2582 async fn test_service_manager_with_job_executor() {
2583 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2584 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2585
2586 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2587
2588 let job_spec = mock_job_spec();
2590 manager
2591 .upsert_service("backup".to_string(), job_spec)
2592 .await
2593 .unwrap();
2594
2595 let exec_id = manager
2597 .trigger_job("backup", JobTrigger::Cli)
2598 .await
2599 .unwrap();
2600
2601 tokio::time::sleep(Duration::from_millis(50)).await;
2603
2604 let execution = manager.get_job_execution(&exec_id).await;
2606 assert!(execution.is_some());
2607 assert_eq!(execution.unwrap().job_name, "backup");
2608 }
2609
2610 #[tokio::test]
2611 async fn test_service_manager_with_cron_scheduler() {
2612 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2613 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2614 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2615
2616 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2617
2618 let cron_spec = mock_cron_spec();
2620 manager
2621 .upsert_service("cleanup".to_string(), cron_spec)
2622 .await
2623 .unwrap();
2624
2625 let cron_jobs = manager.list_cron_jobs().await;
2627 assert_eq!(cron_jobs.len(), 1);
2628 assert_eq!(cron_jobs[0].name, "cleanup");
2629 assert!(cron_jobs[0].enabled);
2630 }
2631
2632 #[tokio::test]
2633 async fn test_service_manager_trigger_cron() {
2634 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2635 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2636 let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
2637
2638 let manager = ServiceManager::new(runtime)
2639 .with_job_executor(job_executor)
2640 .with_cron_scheduler(cron_scheduler);
2641
2642 let cron_spec = mock_cron_spec();
2644 manager
2645 .upsert_service("cleanup".to_string(), cron_spec)
2646 .await
2647 .unwrap();
2648
2649 let exec_id = manager.trigger_cron("cleanup").await.unwrap();
2651 assert!(!exec_id.0.is_empty());
2652 }
2653
2654 #[tokio::test]
2655 async fn test_service_manager_enable_disable_cron() {
2656 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2657 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2658 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2659
2660 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2661
2662 let cron_spec = mock_cron_spec();
2664 manager
2665 .upsert_service("cleanup".to_string(), cron_spec)
2666 .await
2667 .unwrap();
2668
2669 let cron_jobs = manager.list_cron_jobs().await;
2671 assert!(cron_jobs[0].enabled);
2672
2673 manager.set_cron_enabled("cleanup", false).await;
2675 let cron_jobs = manager.list_cron_jobs().await;
2676 assert!(!cron_jobs[0].enabled);
2677
2678 manager.set_cron_enabled("cleanup", true).await;
2680 let cron_jobs = manager.list_cron_jobs().await;
2681 assert!(cron_jobs[0].enabled);
2682 }
2683
2684 #[tokio::test]
2685 async fn test_service_manager_remove_cleans_up_job() {
2686 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2687 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2688
2689 let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
2690
2691 let job_spec = mock_job_spec();
2693 manager
2694 .upsert_service("backup".to_string(), job_spec)
2695 .await
2696 .unwrap();
2697
2698 let spec = job_executor.get_job_spec("backup").await;
2700 assert!(spec.is_some());
2701
2702 manager.remove_service("backup").await.unwrap();
2704
2705 let spec = job_executor.get_job_spec("backup").await;
2707 assert!(spec.is_none());
2708 }
2709
2710 #[tokio::test]
2711 async fn test_service_manager_remove_cleans_up_cron() {
2712 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2713 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2714 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2715
2716 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
2717
2718 let cron_spec = mock_cron_spec();
2720 manager
2721 .upsert_service("cleanup".to_string(), cron_spec)
2722 .await
2723 .unwrap();
2724
2725 assert_eq!(cron_scheduler.job_count().await, 1);
2727
2728 manager.remove_service("cleanup").await.unwrap();
2730
2731 assert_eq!(cron_scheduler.job_count().await, 0);
2733 }
2734
2735 #[tokio::test]
2736 async fn test_service_manager_job_without_executor() {
2737 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2738 let manager = ServiceManager::new(runtime);
2739
2740 let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
2742 assert!(result.is_err());
2743 assert!(result.unwrap_err().to_string().contains("not configured"));
2744 }
2745
2746 #[tokio::test]
2747 async fn test_service_manager_cron_without_scheduler() {
2748 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2749 let manager = ServiceManager::new(runtime);
2750
2751 let cron_spec = mock_cron_spec();
2753 let result = manager
2754 .upsert_service("cleanup".to_string(), cron_spec)
2755 .await;
2756 assert!(result.is_err());
2757 assert!(result.unwrap_err().to_string().contains("not configured"));
2758 }
2759
2760 #[tokio::test]
2761 async fn test_service_manager_list_job_executions() {
2762 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2763 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2764
2765 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2766
2767 let job_spec = mock_job_spec();
2769 manager
2770 .upsert_service("backup".to_string(), job_spec)
2771 .await
2772 .unwrap();
2773
2774 manager
2776 .trigger_job("backup", JobTrigger::Cli)
2777 .await
2778 .unwrap();
2779 manager
2780 .trigger_job("backup", JobTrigger::Scheduler)
2781 .await
2782 .unwrap();
2783
2784 tokio::time::sleep(Duration::from_millis(50)).await;
2786
2787 let executions = manager.list_job_executions("backup").await;
2789 assert_eq!(executions.len(), 2);
2790 }
2791
2792 #[tokio::test]
2795 async fn test_service_manager_with_supervisor() {
2796 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2797 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2798
2799 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2800
2801 let spec = mock_spec();
2803 manager
2804 .upsert_service("api".to_string(), spec)
2805 .await
2806 .unwrap();
2807
2808 manager.scale_service("api", 2).await.unwrap();
2810
2811 assert_eq!(supervisor.supervised_count().await, 2);
2813
2814 manager.scale_service("api", 1).await.unwrap();
2816 assert_eq!(supervisor.supervised_count().await, 1);
2817
2818 manager.remove_service("api").await.unwrap();
2820 assert_eq!(supervisor.supervised_count().await, 0);
2821 }
2822
2823 #[tokio::test]
2824 async fn test_service_manager_supervisor_state() {
2825 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2826 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2827
2828 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
2829
2830 let spec = mock_spec();
2832 manager
2833 .upsert_service("web".to_string(), spec)
2834 .await
2835 .unwrap();
2836 manager.scale_service("web", 1).await.unwrap();
2837
2838 let container_id = ContainerId {
2840 service: "web".to_string(),
2841 replica: 1,
2842 };
2843 let state = manager.get_container_supervised_state(&container_id).await;
2844 assert_eq!(state, Some(SupervisedState::Running));
2845 }
2846
2847 #[tokio::test]
2848 async fn test_service_manager_start_supervisor() {
2849 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2850 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2851
2852 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2853
2854 let handle = manager.start_container_supervisor().unwrap();
2856
2857 tokio::time::sleep(Duration::from_millis(50)).await;
2859 assert!(supervisor.is_running());
2860
2861 manager.shutdown_container_supervisor();
2863
2864 tokio::time::timeout(Duration::from_secs(1), handle)
2866 .await
2867 .unwrap()
2868 .unwrap();
2869
2870 assert!(!supervisor.is_running());
2871 }
2872
2873 #[tokio::test]
2874 async fn test_service_manager_supervisor_not_configured() {
2875 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2876 let manager = ServiceManager::new(runtime);
2877
2878 let result = manager.start_container_supervisor();
2880 assert!(result.is_err());
2881 assert!(result.unwrap_err().to_string().contains("not configured"));
2882 }
2883
2884 fn mock_tcp_spec() -> ServiceSpec {
2887 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2888 r"
2889version: v1
2890deployment: test
2891services:
2892 database:
2893 rtype: service
2894 image:
2895 name: postgres:latest
2896 endpoints:
2897 - name: postgresql
2898 protocol: tcp
2899 port: 5432
2900 scale:
2901 mode: fixed
2902 replicas: 1
2903",
2904 )
2905 .unwrap()
2906 .services
2907 .remove("database")
2908 .unwrap()
2909 }
2910
2911 fn mock_udp_spec() -> ServiceSpec {
2912 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2913 r"
2914version: v1
2915deployment: test
2916services:
2917 dns:
2918 rtype: service
2919 image:
2920 name: dns:latest
2921 endpoints:
2922 - name: dns
2923 protocol: udp
2924 port: 53
2925 scale:
2926 mode: fixed
2927 replicas: 1
2928",
2929 )
2930 .unwrap()
2931 .services
2932 .remove("dns")
2933 .unwrap()
2934 }
2935
2936 fn mock_mixed_spec() -> ServiceSpec {
2937 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2938 r"
2939version: v1
2940deployment: test
2941services:
2942 mixed:
2943 rtype: service
2944 image:
2945 name: mixed:latest
2946 endpoints:
2947 - name: http
2948 protocol: http
2949 port: 8080
2950 - name: grpc
2951 protocol: tcp
2952 port: 9000
2953 - name: metrics
2954 protocol: udp
2955 port: 8125
2956 scale:
2957 mode: fixed
2958 replicas: 1
2959",
2960 )
2961 .unwrap()
2962 .services
2963 .remove("mixed")
2964 .unwrap()
2965 }
2966
2967 #[tokio::test]
2968 async fn test_service_manager_with_stream_registry_tcp() {
2969 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2970 let stream_registry = Arc::new(StreamRegistry::new());
2971
2972 let mut manager = ServiceManager::new(runtime);
2973 manager.set_stream_registry(stream_registry.clone());
2974 manager.set_deployment_name("test".to_string());
2975
2976 let spec = mock_tcp_spec();
2978 manager
2979 .upsert_service("database".to_string(), spec)
2980 .await
2981 .unwrap();
2982
2983 assert_eq!(stream_registry.tcp_count(), 1);
2985 assert!(stream_registry.tcp_ports().contains(&5432));
2986
2987 manager.remove_service("database").await.unwrap();
2989 assert_eq!(stream_registry.tcp_count(), 0);
2990 }
2991
2992 #[tokio::test]
2993 async fn test_service_manager_with_stream_registry_udp() {
2994 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2995 let stream_registry = Arc::new(StreamRegistry::new());
2996
2997 let mut manager = ServiceManager::new(runtime);
2998 manager.set_stream_registry(stream_registry.clone());
2999 manager.set_deployment_name("test".to_string());
3000
3001 let spec = mock_udp_spec();
3003 manager
3004 .upsert_service("dns".to_string(), spec)
3005 .await
3006 .unwrap();
3007
3008 assert_eq!(stream_registry.udp_count(), 1);
3010 assert!(stream_registry.udp_ports().contains(&53));
3011
3012 manager.remove_service("dns").await.unwrap();
3014 assert_eq!(stream_registry.udp_count(), 0);
3015 }
3016
3017 #[tokio::test]
3018 async fn test_service_manager_with_stream_registry_mixed() {
3019 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3020 let stream_registry = Arc::new(StreamRegistry::new());
3021
3022 let mut manager = ServiceManager::new(runtime);
3023 manager.set_stream_registry(stream_registry.clone());
3024 manager.set_deployment_name("test".to_string());
3025
3026 let spec = mock_mixed_spec();
3028 manager
3029 .upsert_service("mixed".to_string(), spec)
3030 .await
3031 .unwrap();
3032
3033 assert_eq!(stream_registry.tcp_count(), 1); assert_eq!(stream_registry.udp_count(), 1); assert!(stream_registry.tcp_ports().contains(&9000));
3038 assert!(stream_registry.udp_ports().contains(&8125));
3039
3040 manager.remove_service("mixed").await.unwrap();
3042 assert_eq!(stream_registry.tcp_count(), 0);
3043 assert_eq!(stream_registry.udp_count(), 0);
3044 }
3045
3046 #[tokio::test]
3047 async fn test_service_manager_stream_registry_builder() {
3048 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3049 let stream_registry = Arc::new(StreamRegistry::new());
3050
3051 let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
3053
3054 assert!(manager.stream_registry().is_some());
3056 }
3057
3058 #[tokio::test]
3059 async fn test_tcp_service_without_stream_registry() {
3060 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3061
3062 let mut manager = ServiceManager::new(runtime);
3064 manager.set_deployment_name("test".to_string());
3065
3066 let spec = mock_tcp_spec();
3068 manager
3069 .upsert_service("database".to_string(), spec)
3070 .await
3071 .unwrap();
3072
3073 let services = manager.list_services().await;
3075 assert!(services.contains(&"database".to_string()));
3076 }
3077}