1use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6 DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{
24 effective_pull_policy, DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType,
25 ServiceSpec,
26};
27
28pub struct ServiceInstance {
30 pub service_name: String,
31 pub spec: ServiceSpec,
32 runtime: Arc<dyn Runtime + Send + Sync>,
33 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
34 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
36 proxy_manager: Option<Arc<ProxyManager>>,
38 dns_server: Option<Arc<DnsServer>>,
40 health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
42 last_pulled_digest: tokio::sync::RwLock<Option<String>>,
47}
48
49impl ServiceInstance {
50 pub fn new(
52 service_name: String,
53 spec: ServiceSpec,
54 runtime: Arc<dyn Runtime + Send + Sync>,
55 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
56 ) -> Self {
57 Self {
58 service_name,
59 spec,
60 runtime,
61 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
62 overlay_manager,
63 proxy_manager: None,
64 dns_server: None,
65 health_states: None,
66 last_pulled_digest: tokio::sync::RwLock::new(None),
67 }
68 }
69
70 pub fn with_proxy(
72 service_name: String,
73 spec: ServiceSpec,
74 runtime: Arc<dyn Runtime + Send + Sync>,
75 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
76 proxy_manager: Arc<ProxyManager>,
77 ) -> Self {
78 Self {
79 service_name,
80 spec,
81 runtime,
82 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
83 overlay_manager,
84 proxy_manager: Some(proxy_manager),
85 dns_server: None,
86 health_states: None,
87 last_pulled_digest: tokio::sync::RwLock::new(None),
88 }
89 }
90
91 #[must_use]
93 pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
94 self.dns_server = Some(dns_server);
95 self
96 }
97
98 pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
100 self.dns_server = Some(dns_server);
101 }
102
103 pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
105 self.proxy_manager = Some(proxy_manager);
106 }
107
108 pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
110 self.health_states = Some(states);
111 }
112
113 pub async fn last_pulled_digest(&self) -> Option<String> {
117 self.last_pulled_digest.read().await.clone()
118 }
119
120 async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
128 let image_str = self.spec.image.name.to_string();
129 let effective = effective_pull_policy(&self.spec.image.name, self.spec.image.pull_policy);
130
131 if matches!(effective, PullPolicy::Never) {
132 return Ok(self.last_pulled_digest.read().await.clone());
133 }
134
135 self.runtime
136 .pull_image_with_policy(&image_str, effective, None)
137 .await
138 .map_err(|e| AgentError::PullFailed {
139 image: self.spec.image.name.to_string(),
140 reason: e.to_string(),
141 })?;
142
143 let new_digest = match self.runtime.list_images().await {
149 Ok(images) => images
150 .into_iter()
151 .find(|info| info.reference == image_str)
152 .and_then(|info| info.digest),
153 Err(e) => {
154 tracing::debug!(
155 image = %image_str,
156 error = %e,
157 "list_images unavailable; cannot record post-pull digest"
158 );
159 None
160 }
161 };
162
163 if let Some(ref digest) = new_digest {
164 *self.last_pulled_digest.write().await = Some(digest.clone());
165 }
166
167 Ok(new_digest)
168 }
169
170 #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
179 pub async fn scale_to(&self, replicas: u32) -> Result<()> {
180 let current_replicas = { self.containers.read().await.len() as u32 }; let effective = effective_pull_policy(&self.spec.image.name, self.spec.image.pull_policy);
189 if replicas >= current_replicas && !matches!(effective, PullPolicy::Never) {
190 let _ = self.pull_and_refresh_digest().await?;
191 }
192
193 if replicas > current_replicas {
195 for i in current_replicas..replicas {
196 let id = ContainerId {
197 service: self.service_name.clone(),
198 replica: i + 1,
199 };
200
201 self.runtime
209 .create_container(&id, &self.spec)
210 .await
211 .map_err(|e| match e {
212 AgentError::RouteToPeer { .. } => e,
213 other => AgentError::CreateFailed {
214 id: id.to_string(),
215 reason: other.to_string(),
216 },
217 })?;
218
219 let init_orchestrator = InitOrchestrator::with_error_policy(
221 id.clone(),
222 self.spec.init.clone(),
223 self.spec.errors.clone(),
224 );
225 init_orchestrator.run().await?;
226
227 self.runtime
229 .start_container(&id)
230 .await
231 .map_err(|e| AgentError::StartFailed {
232 id: id.to_string(),
233 reason: e.to_string(),
234 })?;
235
236 let mut container_pid = None;
238 for attempt in 1..=5u32 {
239 match self.runtime.get_container_pid(&id).await {
240 Ok(Some(pid)) => {
241 container_pid = Some(pid);
242 break;
243 }
244 Ok(None) if attempt < 5 => {
245 tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
246 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
247 }
248 Ok(None) => {
249 tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
250 }
251 Err(e) => {
252 tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
253 if attempt < 5 {
254 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
255 }
256 }
257 }
258 }
259
260 if container_pid.is_some() {
267 let alive = match self.runtime.container_state(&id).await {
268 Ok(
269 ContainerState::Running
270 | ContainerState::Pending
271 | ContainerState::Initializing,
272 ) => true,
273 Ok(state) => {
274 tracing::warn!(
275 container = %id,
276 ?state,
277 "container exited before overlay attach could run"
278 );
279 false
280 }
281 Err(e) => {
282 tracing::warn!(
286 container = %id,
287 error = %e,
288 "container state query failed before overlay attach, proceeding"
289 );
290 true
291 }
292 };
293 if !alive {
294 let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
295 || " <log read failed>".to_string(),
296 |entries| {
297 if entries.is_empty() {
298 " <no log output>".to_string()
299 } else {
300 entries
301 .into_iter()
302 .map(|e| format!(" {}", e.message))
303 .collect::<Vec<_>>()
304 .join("\n")
305 }
306 },
307 );
308 return Err(AgentError::StartFailed {
309 id: id.to_string(),
310 reason: format!("container exited during startup:\n{log_tail}"),
311 });
312 }
313 }
314
315 let overlay_ip = if let Some(overlay) = &self.overlay_manager {
325 let overlay_guard = overlay.read().await;
326 #[cfg(target_os = "windows")]
327 let attach_result: Option<std::net::IpAddr> = {
328 let _ = container_pid; match self.runtime.get_container_namespace_id(&id).await {
330 Ok(Some(ns_id)) => {
331 let ip_override =
332 self.runtime.get_container_ip(&id).await.ok().flatten();
333 let dns_server = overlay_guard.dns_server_addr().map(|sa| sa.ip());
334 let dns_domain =
335 overlay_guard.dns_domain().map(ToString::to_string);
336 match overlay_guard
337 .attach_container_hcn(
338 ns_id,
339 &self.service_name,
340 ip_override,
341 true,
342 dns_server,
343 dns_domain,
344 )
345 .await
346 {
347 Ok(ip) => Some(ip),
348 Err(e) => {
349 tracing::warn!(
350 container = %id,
351 error = %e,
352 "HCN overlay attach failed"
353 );
354 None
355 }
356 }
357 }
358 Ok(None) => {
359 tracing::debug!(
360 container = %id,
361 "skipping HCN overlay attach - no namespace id available"
362 );
363 None
364 }
365 Err(e) => {
366 tracing::warn!(
367 container = %id,
368 error = %e,
369 "failed to fetch HCN namespace id"
370 );
371 None
372 }
373 }
374 };
375 #[cfg(not(target_os = "windows"))]
376 let attach_result: Option<std::net::IpAddr> = {
377 if let Some(pid) = container_pid {
378 match overlay_guard
379 .attach_container(pid, &self.service_name, true)
380 .await
381 {
382 Ok(ip) => Some(ip),
383 Err(e) => {
384 tracing::warn!(
385 container = %id,
386 error = %e,
387 "failed to attach container to overlay network"
388 );
389 None
390 }
391 }
392 } else {
393 tracing::debug!(
395 container = %id,
396 "skipping overlay attachment - no PID available"
397 );
398 None
399 }
400 };
401
402 if let Some(ip) = attach_result {
403 tracing::info!(
404 container = %id,
405 overlay_ip = %ip,
406 "attached container to overlay network"
407 );
408
409 if let Some(dns) = &self.dns_server {
411 let service_hostname = format!("{}.service.local", self.service_name);
413
414 let replica_hostname =
416 format!("{}.{}.service.local", id.replica, self.service_name);
417
418 match dns.add_record(&service_hostname, ip).await {
419 Ok(()) => tracing::debug!(
420 hostname = %service_hostname,
421 ip = %ip,
422 "registered DNS for service"
423 ),
424 Err(e) => tracing::warn!(
425 hostname = %service_hostname,
426 error = %e,
427 "failed to register DNS for service"
428 ),
429 }
430
431 if let Err(e) = dns.add_record(&replica_hostname, ip).await {
433 tracing::warn!(
434 hostname = %replica_hostname,
435 error = %e,
436 "failed to register replica DNS"
437 );
438 } else {
439 tracing::debug!(
440 hostname = %replica_hostname,
441 ip = %ip,
442 "registered DNS for replica"
443 );
444 }
445 }
446
447 Some(ip)
448 } else {
449 None
450 }
451 } else {
452 None
453 };
454
455 let effective_ip = if overlay_ip.is_none() {
457 match self.runtime.get_container_ip(&id).await {
458 Ok(Some(ip)) => {
459 tracing::info!(
460 container = %id,
461 ip = %ip,
462 "using runtime container IP for proxy (overlay unavailable)"
463 );
464 Some(ip)
465 }
466 Ok(None) => {
467 tracing::warn!(
468 container = %id,
469 "no container IP available from runtime, proxy routing will be unavailable"
470 );
471 None
472 }
473 Err(e) => {
474 tracing::warn!(
475 container = %id,
476 error = %e,
477 "failed to get container IP from runtime"
478 );
479 None
480 }
481 }
482 } else {
483 overlay_ip
484 };
485
486 tracing::info!(
487 container = %id,
488 service = %self.service_name,
489 overlay_ip = ?overlay_ip,
490 effective_ip = ?effective_ip,
491 "Container IP resolution complete"
492 );
493
494 let port_override = match self.runtime.get_container_port_override(&id).await {
499 Ok(Some(port)) => {
500 tracing::info!(
501 container = %id,
502 port = port,
503 "runtime assigned dynamic port override for this container"
504 );
505 Some(port)
506 }
507 Ok(None) => None,
508 Err(e) => {
509 tracing::warn!(
510 container = %id,
511 error = %e,
512 "failed to query port override from runtime, using spec port"
513 );
514 None
515 }
516 };
517
518 let health_monitor_handle = {
520 let mut check = self.spec.health.check.clone();
521
522 if let HealthCheck::Tcp { ref mut port } = check {
526 if *port == 0 {
527 *port = port_override.unwrap_or_else(|| {
528 self.spec
529 .endpoints
530 .iter()
531 .find(|ep| {
532 matches!(
533 ep.protocol,
534 Protocol::Http | Protocol::Https | Protocol::Websocket
535 )
536 })
537 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
538 });
539 }
540 }
541
542 let start_grace = self
543 .spec
544 .health
545 .start_grace
546 .unwrap_or(Duration::from_secs(5));
547 let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
548 let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
549 let retries = self.spec.health.retries;
550
551 let checker = HealthChecker::new(check, effective_ip);
552 let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
553 .with_start_grace(start_grace)
554 .with_check_timeout(check_timeout);
555
556 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
559 let proxy = Arc::clone(proxy);
560 let service_name = self.service_name.clone();
561 let port = port_override.unwrap_or_else(|| {
565 self.spec
566 .endpoints
567 .iter()
568 .find(|ep| {
569 matches!(
570 ep.protocol,
571 Protocol::Http | Protocol::Https | Protocol::Websocket
572 )
573 })
574 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
575 });
576
577 let backend_addr = SocketAddr::new(ip, port);
578
579 proxy.add_backend(&self.service_name, backend_addr).await;
583
584 let health_states_opt = self.health_states.clone();
585 let svc_name_for_states = self.service_name.clone();
586
587 let health_callback: HealthCallback =
588 Arc::new(move |container_id: ContainerId, is_healthy: bool| {
589 let proxy = Arc::clone(&proxy);
590 let service_name = service_name.clone();
591 tracing::info!(
592 container = %container_id,
593 service = %service_name,
594 backend = %backend_addr,
595 healthy = is_healthy,
596 "health status changed, updating proxy backend"
597 );
598 tokio::spawn(async move {
600 proxy
601 .update_backend_health(
602 &service_name,
603 backend_addr,
604 is_healthy,
605 )
606 .await;
607 });
608 if let Some(ref health_states) = health_states_opt {
610 let states = Arc::clone(health_states);
611 let svc = svc_name_for_states.clone();
612 tokio::spawn(async move {
613 let state = if is_healthy {
614 HealthState::Healthy
615 } else {
616 HealthState::Unhealthy {
617 failures: 0,
618 reason: "health check failed".into(),
619 }
620 };
621 states.write().await.insert(svc, state);
622 });
623 }
624 });
625
626 monitor = monitor.with_callback(health_callback);
627 }
628
629 monitor.start()
630 };
631
632 {
634 let mut containers = self.containers.write().await;
635 containers.insert(
636 id.clone(),
637 Container {
638 id: id.clone(),
639 state: ContainerState::Running,
640 pid: None,
641 task: None,
642 overlay_ip: effective_ip,
643 health_monitor: Some(health_monitor_handle),
644 port_override,
645 },
646 );
647 } }
649 }
650
651 if replicas < current_replicas {
653 for i in replicas..current_replicas {
654 let id = ContainerId {
655 service: self.service_name.clone(),
656 replica: i + 1,
657 };
658
659 let removed_container = {
661 let mut containers = self.containers.write().await;
662 containers.remove(&id)
663 }; if let Some(container) = removed_container {
667 if let Some(handle) = container.health_monitor {
669 handle.abort();
670 }
671
672 if let Some(dns) = &self.dns_server {
674 let replica_hostname =
676 format!("{}.{}.service.local", id.replica, self.service_name);
677 if let Err(e) = dns.remove_record(&replica_hostname).await {
678 tracing::warn!(
679 hostname = %replica_hostname,
680 error = %e,
681 "failed to remove replica DNS record"
682 );
683 } else {
684 tracing::debug!(
685 hostname = %replica_hostname,
686 "removed replica DNS record"
687 );
688 }
689
690 }
694
695 self.runtime
697 .stop_container(&id, Duration::from_secs(30))
698 .await?;
699
700 if let Err(e) = self.runtime.sync_container_volumes(&id).await {
702 tracing::warn!(
703 container = %id,
704 error = %e,
705 "failed to sync volumes before removal"
706 );
707 }
708
709 self.runtime.remove_container(&id).await?;
711 }
712 }
713 }
714
715 Ok(())
716 }
717
718 pub async fn replica_count(&self) -> usize {
720 self.containers.read().await.len()
721 }
722
723 pub async fn container_ids(&self) -> Vec<ContainerId> {
725 self.containers.read().await.keys().cloned().collect()
726 }
727
728 pub fn containers(
733 &self,
734 ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
735 &self.containers
736 }
737
738 pub fn has_overlay_manager(&self) -> bool {
740 self.overlay_manager.is_some()
741 }
742
743 pub fn has_proxy_manager(&self) -> bool {
745 self.proxy_manager.is_some()
746 }
747
748 pub fn has_dns_server(&self) -> bool {
750 self.dns_server.is_some()
751 }
752}
753
754pub struct ServiceManager {
756 runtime: Arc<dyn Runtime + Send + Sync>,
757 services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
758 scale_semaphore: Arc<Semaphore>,
759 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
761 stream_registry: Option<Arc<StreamRegistry>>,
763 proxy_manager: Option<Arc<ProxyManager>>,
765 dns_server: Option<Arc<DnsServer>>,
767 deployment_name: Option<String>,
769 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
771 job_executor: Option<Arc<JobExecutor>>,
773 cron_scheduler: Option<Arc<CronScheduler>>,
775 container_supervisor: Option<Arc<ContainerSupervisor>>,
777}
778
779pub struct ServiceManagerBuilder {
797 runtime: Arc<dyn Runtime + Send + Sync>,
798 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
799 proxy_manager: Option<Arc<ProxyManager>>,
800 stream_registry: Option<Arc<StreamRegistry>>,
801 dns_server: Option<Arc<DnsServer>>,
802 deployment_name: Option<String>,
803 job_executor: Option<Arc<JobExecutor>>,
804 cron_scheduler: Option<Arc<CronScheduler>>,
805 container_supervisor: Option<Arc<ContainerSupervisor>>,
806}
807
808impl ServiceManagerBuilder {
809 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
811 Self {
812 runtime,
813 overlay_manager: None,
814 proxy_manager: None,
815 stream_registry: None,
816 dns_server: None,
817 deployment_name: None,
818 job_executor: None,
819 cron_scheduler: None,
820 container_supervisor: None,
821 }
822 }
823
824 #[must_use]
826 pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
827 self.overlay_manager = Some(om);
828 self
829 }
830
831 #[must_use]
833 pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
834 self.proxy_manager = Some(pm);
835 self
836 }
837
838 #[must_use]
840 pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
841 self.stream_registry = Some(sr);
842 self
843 }
844
845 #[must_use]
847 pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
848 self.dns_server = Some(dns);
849 self
850 }
851
852 #[must_use]
854 pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
855 self.deployment_name = Some(name.into());
856 self
857 }
858
859 #[must_use]
861 pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
862 self.job_executor = Some(je);
863 self
864 }
865
866 #[must_use]
868 pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
869 self.cron_scheduler = Some(cs);
870 self
871 }
872
873 #[must_use]
875 pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
876 self.container_supervisor = Some(cs);
877 self
878 }
879
880 pub fn build(self) -> ServiceManager {
885 if self.proxy_manager.is_none() {
886 tracing::warn!("ServiceManager built without proxy_manager");
887 }
888 if self.stream_registry.is_none() {
889 tracing::warn!("ServiceManager built without stream_registry");
890 }
891 if self.container_supervisor.is_none() {
892 tracing::warn!("ServiceManager built without container_supervisor");
893 }
894 if self.deployment_name.is_none() {
895 tracing::warn!("ServiceManager built without deployment_name");
896 }
897
898 ServiceManager {
899 runtime: self.runtime,
900 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
901 scale_semaphore: Arc::new(Semaphore::new(10)),
902 overlay_manager: self.overlay_manager,
903 stream_registry: self.stream_registry,
904 proxy_manager: self.proxy_manager,
905 dns_server: self.dns_server,
906 deployment_name: self.deployment_name,
907 health_states: Arc::new(RwLock::new(HashMap::new())),
908 job_executor: self.job_executor,
909 cron_scheduler: self.cron_scheduler,
910 container_supervisor: self.container_supervisor,
911 }
912 }
913}
914
915impl ServiceManager {
916 pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
929 ServiceManagerBuilder::new(runtime)
930 }
931
932 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
934 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
935 Self {
936 runtime,
937 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
938 scale_semaphore: Arc::new(Semaphore::new(10)), overlay_manager: None,
940 stream_registry: None,
941 proxy_manager: None,
942 dns_server: None,
943 deployment_name: None,
944 health_states: Arc::new(RwLock::new(HashMap::new())),
945 job_executor: None,
946 cron_scheduler: None,
947 container_supervisor: None,
948 }
949 }
950
951 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
953 pub fn with_overlay(
954 runtime: Arc<dyn Runtime + Send + Sync>,
955 overlay_manager: Arc<RwLock<OverlayManager>>,
956 ) -> Self {
957 Self {
958 runtime,
959 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
960 scale_semaphore: Arc::new(Semaphore::new(10)),
961 overlay_manager: Some(overlay_manager),
962 stream_registry: None,
963 proxy_manager: None,
964 dns_server: None,
965 deployment_name: None,
966 health_states: Arc::new(RwLock::new(HashMap::new())),
967 job_executor: None,
968 cron_scheduler: None,
969 container_supervisor: None,
970 }
971 }
972
973 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
975 pub fn with_full_config(
976 runtime: Arc<dyn Runtime + Send + Sync>,
977 overlay_manager: Arc<RwLock<OverlayManager>>,
978 deployment_name: String,
979 ) -> Self {
980 Self {
981 runtime,
982 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
983 scale_semaphore: Arc::new(Semaphore::new(10)),
984 overlay_manager: Some(overlay_manager),
985 stream_registry: None,
986 proxy_manager: None,
987 dns_server: None,
988 deployment_name: Some(deployment_name),
989 health_states: Arc::new(RwLock::new(HashMap::new())),
990 job_executor: None,
991 cron_scheduler: None,
992 container_supervisor: None,
993 }
994 }
995
996 pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
998 Arc::clone(&self.health_states)
999 }
1000
1001 pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1003 let mut states = self.health_states.write().await;
1004 states.insert(service_name.to_string(), state);
1005 }
1006
1007 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1009 pub fn set_deployment_name(&mut self, name: String) {
1010 self.deployment_name = Some(name);
1011 }
1012
1013 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1015 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1016 self.stream_registry = Some(registry);
1017 }
1018
1019 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1021 #[must_use]
1022 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1023 self.stream_registry = Some(registry);
1024 self
1025 }
1026
1027 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1029 self.stream_registry.as_ref()
1030 }
1031
1032 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1034 pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1035 self.overlay_manager = Some(manager);
1036 }
1037
1038 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1040 pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1041 self.proxy_manager = Some(proxy);
1042 }
1043
1044 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1046 #[must_use]
1047 pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1048 self.proxy_manager = Some(proxy);
1049 self
1050 }
1051
1052 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1054 self.proxy_manager.as_ref()
1055 }
1056
1057 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1059 pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1060 self.dns_server = Some(dns);
1061 }
1062
1063 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1065 #[must_use]
1066 pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1067 self.dns_server = Some(dns);
1068 self
1069 }
1070
1071 pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1073 self.dns_server.as_ref()
1074 }
1075
1076 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1078 pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1079 self.job_executor = Some(executor);
1080 }
1081
1082 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1084 pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1085 self.cron_scheduler = Some(scheduler);
1086 }
1087
1088 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1090 #[must_use]
1091 pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1092 self.job_executor = Some(executor);
1093 self
1094 }
1095
1096 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1098 #[must_use]
1099 pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1100 self.cron_scheduler = Some(scheduler);
1101 self
1102 }
1103
1104 pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1106 self.job_executor.as_ref()
1107 }
1108
1109 pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1111 self.cron_scheduler.as_ref()
1112 }
1113
1114 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1116 pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1117 self.container_supervisor = Some(supervisor);
1118 }
1119
1120 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1122 #[must_use]
1123 pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1124 self.container_supervisor = Some(supervisor);
1125 self
1126 }
1127
1128 pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1130 self.container_supervisor.as_ref()
1131 }
1132
1133 pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1144 let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1145 AgentError::Configuration("Container supervisor not configured".to_string())
1146 })?;
1147
1148 let supervisor = Arc::clone(supervisor);
1149 Ok(tokio::spawn(async move {
1150 supervisor.run_loop().await;
1151 }))
1152 }
1153
1154 pub fn shutdown_container_supervisor(&self) {
1156 if let Some(supervisor) = &self.container_supervisor {
1157 supervisor.shutdown();
1158 }
1159 }
1160
1161 pub async fn get_container_supervised_state(
1163 &self,
1164 container_id: &ContainerId,
1165 ) -> Option<SupervisedState> {
1166 if let Some(supervisor) = &self.container_supervisor {
1167 supervisor.get_state(container_id).await
1168 } else {
1169 None
1170 }
1171 }
1172
1173 pub async fn take_supervisor_events(
1177 &self,
1178 ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1179 if let Some(supervisor) = &self.container_supervisor {
1180 supervisor.take_event_receiver().await
1181 } else {
1182 None
1183 }
1184 }
1185
1186 pub async fn deploy_with_dependencies(
1203 &self,
1204 services: HashMap<String, ServiceSpec>,
1205 ) -> Result<()> {
1206 if services.is_empty() {
1207 return Ok(());
1208 }
1209
1210 let graph = DependencyGraph::build(&services)?;
1212
1213 tracing::info!(
1214 service_count = services.len(),
1215 "Starting deployment with dependency ordering"
1216 );
1217
1218 let order = graph.startup_order();
1220 tracing::debug!(order = ?order, "Computed startup order");
1221
1222 for service_name in order {
1224 let service_spec = services
1225 .get(service_name)
1226 .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1227
1228 if !service_spec.depends.is_empty() {
1230 tracing::info!(
1231 service = %service_name,
1232 dependency_count = service_spec.depends.len(),
1233 "Waiting for dependencies"
1234 );
1235 self.wait_for_dependencies(service_name, &service_spec.depends)
1236 .await?;
1237 }
1238
1239 tracing::info!(service = %service_name, "Starting service");
1241 Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
1242
1243 let replicas = match &service_spec.scale {
1245 zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1246 zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, zlayer_spec::ScaleSpec::Manual => 1, };
1249 self.scale_service(service_name, replicas).await?;
1250
1251 self.update_health_state(service_name, HealthState::Unknown)
1253 .await;
1254
1255 tracing::info!(
1256 service = %service_name,
1257 replicas = replicas,
1258 "Service started"
1259 );
1260 }
1261
1262 tracing::info!(service_count = services.len(), "Deployment complete");
1263
1264 Ok(())
1265 }
1266
1267 async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1276 let condition_checker = DependencyConditionChecker::new(
1277 Arc::clone(&self.runtime),
1278 Arc::clone(&self.health_states),
1279 None,
1280 );
1281
1282 let waiter = DependencyWaiter::new(condition_checker);
1283 let results = waiter.wait_for_all(deps).await?;
1284
1285 for result in results {
1287 match result {
1288 WaitResult::TimedOutFail {
1289 service: dep_service,
1290 condition,
1291 timeout,
1292 } => {
1293 return Err(AgentError::DependencyTimeout {
1294 service: service.to_string(),
1295 dependency: dep_service,
1296 condition: format!("{condition:?}"),
1297 timeout,
1298 });
1299 }
1300 WaitResult::TimedOutWarn {
1301 service: dep_service,
1302 condition,
1303 } => {
1304 tracing::warn!(
1305 service = %service,
1306 dependency = %dep_service,
1307 condition = ?condition,
1308 "Dependency timed out but continuing"
1309 );
1310 }
1311 WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1312 }
1314 }
1315 }
1316
1317 Ok(())
1318 }
1319
1320 pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1327 let condition_checker = DependencyConditionChecker::new(
1328 Arc::clone(&self.runtime),
1329 Arc::clone(&self.health_states),
1330 None,
1331 );
1332
1333 for dep in deps {
1334 if !condition_checker.check(dep).await? {
1335 return Ok(false);
1336 }
1337 }
1338
1339 Ok(true)
1340 }
1341
1342 #[allow(clippy::too_many_lines)]
1352 pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1353 match spec.rtype {
1354 ResourceType::Service => {
1355 let mut services = self.services.write().await;
1357
1358 if let Some(instance) = services.get_mut(&name) {
1359 instance.spec = spec.clone();
1365 if let Some(dns) = &self.dns_server {
1366 instance.set_dns_server(Arc::clone(dns));
1367 }
1368
1369 let effective = effective_pull_policy(&spec.image.name, spec.image.pull_policy);
1370 let old_digest = instance.last_pulled_digest().await;
1371 let current_replicas =
1372 u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
1373 drop(services); match effective {
1376 PullPolicy::Never | PullPolicy::IfNotPresent => {
1377 tracing::debug!(
1380 service = %name,
1381 policy = ?effective,
1382 "service unchanged on re-deploy (effective pull policy skips refresh)"
1383 );
1384 }
1385 PullPolicy::Always | PullPolicy::Newer => {
1386 let services_ro = self.services.read().await;
1390 let new_digest = if let Some(inst) = services_ro.get(&name) {
1391 inst.pull_and_refresh_digest().await?
1392 } else {
1393 tracing::warn!(
1397 service = %name,
1398 "service removed during upsert; skipping drift recreate"
1399 );
1400 return Ok(());
1401 };
1402 drop(services_ro);
1403
1404 let should_recreate = match effective {
1409 PullPolicy::Always => true,
1410 PullPolicy::Newer => match (&old_digest, &new_digest) {
1411 (Some(old), Some(new)) => old != new,
1412 _ => false,
1413 },
1414 _ => false,
1415 };
1416
1417 if should_recreate && current_replicas > 0 {
1418 tracing::info!(
1419 service = %name,
1420 policy = ?effective,
1421 old_digest = ?old_digest,
1422 new_digest = ?new_digest,
1423 replicas = current_replicas,
1424 "image drift detected; performing rolling recreate"
1425 );
1426 self.scale_service(&name, 0).await?;
1427 self.scale_service(&name, current_replicas).await?;
1428 tracing::info!(
1429 service = %name,
1430 new_digest = ?new_digest,
1431 "service recreated with refreshed image"
1432 );
1433 } else {
1434 tracing::debug!(
1435 service = %name,
1436 policy = ?effective,
1437 old_digest = ?old_digest,
1438 new_digest = ?new_digest,
1439 "service up to date; no recreate required"
1440 );
1441 }
1442 }
1443 }
1444 return Ok(());
1445 }
1446 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1448 let mut instance = if let Some(proxy) = &self.proxy_manager {
1449 ServiceInstance::with_proxy(
1450 name.clone(),
1451 spec,
1452 self.runtime.clone(),
1453 overlay,
1454 Arc::clone(proxy),
1455 )
1456 } else {
1457 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1458 };
1459 if let Some(dns) = &self.dns_server {
1461 instance.set_dns_server(Arc::clone(dns));
1462 }
1463 instance.set_health_states(Arc::clone(&self.health_states));
1465 if let Some(proxy) = &self.proxy_manager {
1467 proxy.add_service(&name, &instance.spec).await;
1468 }
1469 if let Some(stream_registry) = &self.stream_registry {
1471 for endpoint in &instance.spec.endpoints {
1472 let svc = StreamService::new(
1473 name.clone(),
1474 Vec::new(), );
1476 match endpoint.protocol {
1477 Protocol::Tcp => {
1478 stream_registry.register_tcp(endpoint.port, svc);
1479 tracing::debug!(
1480 service = %name,
1481 port = endpoint.port,
1482 "Registered TCP stream route"
1483 );
1484 }
1485 Protocol::Udp => {
1486 stream_registry.register_udp(endpoint.port, svc);
1487 tracing::debug!(
1488 service = %name,
1489 port = endpoint.port,
1490 "Registered UDP stream route"
1491 );
1492 }
1493 _ => {} }
1495 }
1496 }
1497 services.insert(name, instance);
1498 }
1499 ResourceType::Job => {
1500 if let Some(executor) = &self.job_executor {
1503 executor.register_job(&name, spec).await;
1504 tracing::info!(job = %name, "Registered job spec");
1505 } else {
1506 tracing::warn!(
1507 job = %name,
1508 "Job executor not configured, storing as service for reference"
1509 );
1510 let mut services = self.services.write().await;
1512 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1513 let mut instance = if let Some(proxy) = &self.proxy_manager {
1514 ServiceInstance::with_proxy(
1515 name.clone(),
1516 spec,
1517 self.runtime.clone(),
1518 overlay,
1519 Arc::clone(proxy),
1520 )
1521 } else {
1522 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1523 };
1524 if let Some(dns) = &self.dns_server {
1526 instance.set_dns_server(Arc::clone(dns));
1527 }
1528 services.insert(name, instance);
1529 }
1530 }
1531 ResourceType::Cron => {
1532 if let Some(scheduler) = &self.cron_scheduler {
1534 scheduler.register(&name, &spec).await?;
1535 tracing::info!(cron = %name, "Registered cron job with scheduler");
1536 } else {
1537 return Err(AgentError::Configuration(format!(
1538 "Cron scheduler not configured for cron job '{name}'"
1539 )));
1540 }
1541 }
1542 }
1543
1544 Ok(())
1545 }
1546
1547 async fn update_proxy_backends(&self, service_name: &str, addrs: Vec<SocketAddr>) {
1549 if let Some(proxy) = &self.proxy_manager {
1550 proxy.update_backends(service_name, addrs).await;
1551 }
1552 }
1553
1554 fn update_stream_backends(&self, spec: &ServiceSpec, addrs: &[SocketAddr]) {
1563 let Some(stream_registry) = &self.stream_registry else {
1564 return;
1565 };
1566
1567 let primary_spec_port = spec
1571 .endpoints
1572 .iter()
1573 .find(|ep| {
1574 matches!(
1575 ep.protocol,
1576 Protocol::Http | Protocol::Https | Protocol::Websocket
1577 )
1578 })
1579 .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1580
1581 let has_port_overrides = addrs.iter().any(|addr| addr.port() != primary_spec_port);
1582
1583 for endpoint in &spec.endpoints {
1584 match endpoint.protocol {
1585 Protocol::Tcp => {
1586 let tcp_backends: Vec<SocketAddr> = if has_port_overrides {
1587 addrs.to_vec()
1590 } else {
1591 addrs
1594 .iter()
1595 .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1596 .collect()
1597 };
1598
1599 stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
1600
1601 tracing::debug!(
1602 endpoint = %endpoint.name,
1603 port = endpoint.port,
1604 backend_count = addrs.len(),
1605 "Updated TCP stream backends"
1606 );
1607 }
1608 Protocol::Udp => {
1609 let udp_backends: Vec<SocketAddr> = if has_port_overrides {
1610 addrs.to_vec()
1611 } else {
1612 addrs
1613 .iter()
1614 .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1615 .collect()
1616 };
1617
1618 stream_registry.update_udp_backends(endpoint.port, udp_backends);
1619
1620 tracing::debug!(
1621 endpoint = %endpoint.name,
1622 port = endpoint.port,
1623 backend_count = addrs.len(),
1624 "Updated UDP stream backends"
1625 );
1626 }
1627 _ => {} }
1629 }
1630 }
1631
1632 #[allow(clippy::cast_possible_truncation)]
1637 pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
1638 let _permit = self.scale_semaphore.acquire().await;
1639
1640 let services = self.services.read().await;
1641 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1642 container: name.to_string(),
1643 reason: "service not found".to_string(),
1644 })?;
1645
1646 let current_replicas = instance.replica_count().await as u32;
1648
1649 instance.scale_to(replicas).await?;
1651
1652 let addrs = self.collect_backend_addrs(instance, replicas).await;
1658
1659 if self.proxy_manager.is_some() && !addrs.is_empty() {
1661 self.update_proxy_backends(name, addrs.clone()).await;
1662 }
1663
1664 if self.stream_registry.is_some() {
1666 self.update_stream_backends(&instance.spec, &addrs);
1667 }
1668
1669 if let Some(supervisor) = &self.container_supervisor {
1671 if replicas > current_replicas {
1673 for i in current_replicas..replicas {
1674 let container_id = ContainerId {
1675 service: name.to_string(),
1676 replica: i + 1,
1677 };
1678 supervisor.supervise(&container_id, &instance.spec).await;
1679 }
1680 }
1681 if replicas < current_replicas {
1683 for i in replicas..current_replicas {
1684 let container_id = ContainerId {
1685 service: name.to_string(),
1686 replica: i + 1,
1687 };
1688 supervisor.unsupervise(&container_id).await;
1689 }
1690 }
1691 }
1692
1693 Ok(())
1694 }
1695
1696 async fn collect_backend_addrs(
1707 &self,
1708 instance: &ServiceInstance,
1709 _replicas: u32, ) -> Vec<SocketAddr> {
1711 let mut addrs = Vec::new();
1712
1713 let spec_port = instance
1715 .spec
1716 .endpoints
1717 .iter()
1718 .find(|ep| {
1719 matches!(
1720 ep.protocol,
1721 Protocol::Http | Protocol::Https | Protocol::Websocket
1722 )
1723 })
1724 .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1725
1726 let containers = instance.containers().read().await;
1728
1729 for container in containers.values() {
1730 if let Some(ip) = container.overlay_ip {
1731 let port = container.port_override.unwrap_or(spec_port);
1734 addrs.push(SocketAddr::new(ip, port));
1735 }
1736 }
1737
1738 if addrs.is_empty() && !containers.is_empty() {
1741 tracing::warn!(
1742 service = %instance.service_name,
1743 container_count = containers.len(),
1744 "no overlay IPs available for backends - containers may not be reachable via proxy"
1745 );
1746 }
1747
1748 addrs
1749 }
1750
1751 pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
1756 let services = self.services.read().await;
1757 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1758 container: name.to_string(),
1759 reason: "service not found".to_string(),
1760 })?;
1761
1762 Ok(instance.replica_count().await)
1763 }
1764
1765 pub async fn remove_service(&self, name: &str) -> Result<()> {
1775 if let Some(scheduler) = &self.cron_scheduler {
1777 scheduler.unregister(name).await;
1778 }
1779
1780 if let Some(executor) = &self.job_executor {
1782 executor.unregister_job(name).await;
1783 }
1784
1785 if let Some(stream_registry) = &self.stream_registry {
1787 let services = self.services.read().await;
1789 if let Some(instance) = services.get(name) {
1790 for endpoint in &instance.spec.endpoints {
1791 match endpoint.protocol {
1792 Protocol::Tcp => {
1793 let _ = stream_registry.unregister_tcp(endpoint.port);
1794 tracing::debug!(
1795 service = %name,
1796 port = endpoint.port,
1797 "Unregistered TCP stream route"
1798 );
1799 }
1800 Protocol::Udp => {
1801 let _ = stream_registry.unregister_udp(endpoint.port);
1802 tracing::debug!(
1803 service = %name,
1804 port = endpoint.port,
1805 "Unregistered UDP stream route"
1806 );
1807 }
1808 _ => {} }
1810 }
1811 }
1812 drop(services); }
1814
1815 if let Some(supervisor) = &self.container_supervisor {
1817 let containers = self.get_service_containers(name).await;
1818 for container_id in containers {
1819 supervisor.unsupervise(&container_id).await;
1820 }
1821 tracing::debug!(service = %name, "Unregistered containers from supervisor");
1822 }
1823
1824 if let Some(dns) = &self.dns_server {
1826 let service_hostname = format!("{name}.service.local");
1828 if let Err(e) = dns.remove_record(&service_hostname).await {
1829 tracing::warn!(
1830 hostname = %service_hostname,
1831 error = %e,
1832 "failed to remove service DNS record"
1833 );
1834 } else {
1835 tracing::debug!(
1836 hostname = %service_hostname,
1837 "removed service DNS record"
1838 );
1839 }
1840
1841 let services = self.services.read().await;
1843 if let Some(instance) = services.get(name) {
1844 let containers = instance.containers().read().await;
1845 for (id, _) in containers.iter() {
1846 let replica_hostname = format!("{}.{}.service.local", id.replica, name);
1847 if let Err(e) = dns.remove_record(&replica_hostname).await {
1848 tracing::warn!(
1849 hostname = %replica_hostname,
1850 error = %e,
1851 "failed to remove replica DNS record during service removal"
1852 );
1853 }
1854 }
1855 }
1856 drop(services); }
1858
1859 let mut services = self.services.write().await;
1861 if services.remove(name).is_some() {
1862 tracing::debug!(service = %name, "Removed service from manager");
1863 }
1864
1865 Ok(())
1866 }
1867
1868 pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
1871 let services = self.services.read().await;
1872 services.get(name).map(|i| {
1873 (
1874 i.has_overlay_manager(),
1875 i.has_proxy_manager(),
1876 i.has_dns_server(),
1877 )
1878 })
1879 }
1880
1881 pub async fn list_services(&self) -> Vec<String> {
1883 self.services.read().await.keys().cloned().collect()
1884 }
1885
1886 pub async fn get_service_logs(
1900 &self,
1901 service_name: &str,
1902 tail: usize,
1903 instance: Option<&str>,
1904 ) -> Result<Vec<LogEntry>> {
1905 let container_ids = self.get_service_containers(service_name).await;
1906
1907 if container_ids.is_empty() {
1908 return Err(AgentError::NotFound {
1909 container: service_name.to_string(),
1910 reason: "no containers found for service".to_string(),
1911 });
1912 }
1913
1914 let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
1916 if let Ok(replica_num) = inst.parse::<u32>() {
1917 container_ids
1918 .iter()
1919 .filter(|id| id.replica == replica_num)
1920 .collect()
1921 } else {
1922 container_ids
1924 .iter()
1925 .filter(|id| id.to_string().contains(inst))
1926 .collect()
1927 }
1928 } else {
1929 container_ids.iter().collect()
1930 };
1931
1932 if target_ids.is_empty() {
1933 return Err(AgentError::NotFound {
1934 container: format!("{}/{}", service_name, instance.unwrap_or("?")),
1935 reason: "instance not found".to_string(),
1936 });
1937 }
1938
1939 let mut all_entries: Vec<LogEntry> = Vec::new();
1940
1941 for id in &target_ids {
1942 match self.runtime.container_logs(id, tail).await {
1943 Ok(mut entries) => {
1944 for entry in &mut entries {
1946 if entry.service.is_none() {
1947 entry.service = Some(service_name.to_string());
1948 }
1949 if entry.deployment.is_none() {
1950 entry.deployment.clone_from(&self.deployment_name);
1951 }
1952 }
1953 all_entries.extend(entries);
1954 }
1955 Err(e) => {
1956 tracing::warn!(
1957 service = service_name,
1958 container = %id,
1959 error = %e,
1960 "Failed to read container logs"
1961 );
1962 }
1963 }
1964 }
1965
1966 Ok(all_entries)
1967 }
1968
1969 pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
1979 let services = self.services.read().await;
1980 if let Some(instance) = services.get(service_name) {
1981 instance.container_ids().await
1982 } else {
1983 Vec::new()
1984 }
1985 }
1986
1987 pub async fn exec_in_container(
2006 &self,
2007 service_name: &str,
2008 replica: Option<u32>,
2009 cmd: &[String],
2010 ) -> Result<(i32, String, String)> {
2011 let container_ids = self.get_service_containers(service_name).await;
2012
2013 if container_ids.is_empty() {
2014 return Err(AgentError::NotFound {
2015 container: service_name.to_string(),
2016 reason: "no containers found for service".to_string(),
2017 });
2018 }
2019
2020 let target = if let Some(rep) = replica {
2022 container_ids
2023 .into_iter()
2024 .find(|cid| cid.replica == rep)
2025 .ok_or_else(|| AgentError::NotFound {
2026 container: format!("{service_name}-rep-{rep}"),
2027 reason: format!("replica {rep} not found for service"),
2028 })?
2029 } else {
2030 container_ids.into_iter().next().unwrap()
2032 };
2033
2034 self.runtime.exec(&target, cmd).await
2035 }
2036
2037 pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
2052 let executor = self
2053 .job_executor
2054 .as_ref()
2055 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2056
2057 let spec = executor
2058 .get_job_spec(name)
2059 .await
2060 .ok_or_else(|| AgentError::NotFound {
2061 container: name.to_string(),
2062 reason: "job not registered".to_string(),
2063 })?;
2064
2065 executor.trigger(name, &spec, trigger).await
2066 }
2067
2068 pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
2076 if let Some(executor) = &self.job_executor {
2077 executor.get_execution(id).await
2078 } else {
2079 None
2080 }
2081 }
2082
2083 pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
2091 if let Some(executor) = &self.job_executor {
2092 executor.list_executions(name).await
2093 } else {
2094 Vec::new()
2095 }
2096 }
2097
2098 pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
2106 let executor = self
2107 .job_executor
2108 .as_ref()
2109 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2110
2111 executor.cancel(id).await
2112 }
2113
2114 pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
2127 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2128 AgentError::Configuration("Cron scheduler not configured".to_string())
2129 })?;
2130
2131 scheduler.trigger_now(name).await
2132 }
2133
2134 pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
2140 if let Some(scheduler) = &self.cron_scheduler {
2141 scheduler.set_enabled(name, enabled).await;
2142 }
2143 }
2144
2145 pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2147 if let Some(scheduler) = &self.cron_scheduler {
2148 scheduler.list_jobs().await
2149 } else {
2150 Vec::new()
2151 }
2152 }
2153
2154 pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2162 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2163 AgentError::Configuration("Cron scheduler not configured".to_string())
2164 })?;
2165
2166 let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2167 Ok(tokio::spawn(async move {
2168 scheduler.run_loop().await;
2169 }))
2170 }
2171
2172 pub fn shutdown_cron(&self) {
2174 if let Some(scheduler) = &self.cron_scheduler {
2175 scheduler.shutdown();
2176 }
2177 }
2178}
2179
2180#[cfg(test)]
2181#[allow(deprecated)]
2182mod tests {
2183 use super::*;
2184 use crate::runtime::MockRuntime;
2185
2186 #[tokio::test]
2187 async fn test_service_manager() {
2188 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2189 let manager = ServiceManager::new(runtime);
2190
2191 let spec = mock_spec();
2193 Box::pin(manager.upsert_service("test".to_string(), spec))
2194 .await
2195 .unwrap();
2196
2197 manager.scale_service("test", 3).await.unwrap();
2199
2200 let count = manager.service_replica_count("test").await.unwrap();
2202 assert_eq!(count, 3);
2203
2204 let services = manager.list_services().await;
2206 assert_eq!(services, vec!["test".to_string()]);
2207 }
2208
2209 #[tokio::test]
2210 async fn test_service_manager_basic_lifecycle() {
2211 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2212 let manager = ServiceManager::new(runtime);
2213
2214 let spec = mock_spec();
2216 Box::pin(manager.upsert_service("api".to_string(), spec))
2217 .await
2218 .unwrap();
2219
2220 manager.scale_service("api", 2).await.unwrap();
2222
2223 let count = manager.service_replica_count("api").await.unwrap();
2225 assert_eq!(count, 2);
2226
2227 manager.remove_service("api").await.unwrap();
2229
2230 let services = manager.list_services().await;
2232 assert!(!services.contains(&"api".to_string()));
2233 }
2234
2235 #[tokio::test]
2236 async fn test_service_manager_with_full_config() {
2237 use tokio::sync::RwLock;
2238
2239 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2240
2241 let overlay_manager = Arc::new(RwLock::new(
2243 OverlayManager::new("test-deployment".to_string())
2244 .await
2245 .unwrap(),
2246 ));
2247
2248 let manager =
2249 ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2250
2251 let spec = mock_spec();
2253 Box::pin(manager.upsert_service("web".to_string(), spec))
2254 .await
2255 .unwrap();
2256
2257 let services = manager.list_services().await;
2259 assert!(services.contains(&"web".to_string()));
2260 }
2261
2262 fn mock_spec() -> ServiceSpec {
2263 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2264 r"
2265version: v1
2266deployment: test
2267services:
2268 test:
2269 rtype: service
2270 image:
2271 name: test:latest
2272 endpoints:
2273 - name: http
2274 protocol: http
2275 port: 8080
2276 scale:
2277 mode: fixed
2278 replicas: 1
2279",
2280 )
2281 .unwrap()
2282 .services
2283 .remove("test")
2284 .unwrap()
2285 }
2286
2287 fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
2289 let mut spec = mock_spec();
2290 spec.depends = deps;
2291 spec
2292 }
2293
2294 fn dep(
2296 service: &str,
2297 condition: zlayer_spec::DependencyCondition,
2298 timeout_ms: u64,
2299 on_timeout: zlayer_spec::TimeoutAction,
2300 ) -> DependsSpec {
2301 DependsSpec {
2302 service: service.to_string(),
2303 condition,
2304 timeout: Some(Duration::from_millis(timeout_ms)),
2305 on_timeout,
2306 }
2307 }
2308
2309 #[tokio::test]
2310 async fn test_deploy_with_dependencies_no_deps() {
2311 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2312 let manager = ServiceManager::new(runtime);
2313
2314 let mut services = HashMap::new();
2316 services.insert("a".to_string(), mock_spec());
2317 services.insert("b".to_string(), mock_spec());
2318
2319 Box::pin(manager.deploy_with_dependencies(services))
2321 .await
2322 .unwrap();
2323
2324 let service_list = manager.list_services().await;
2326 assert_eq!(service_list.len(), 2);
2327 }
2328
2329 #[tokio::test]
2330 async fn test_deploy_with_dependencies_linear() {
2331 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2332 let manager = ServiceManager::new(runtime);
2333
2334 let mut services = HashMap::new();
2337 services.insert("c".to_string(), mock_spec());
2338 services.insert(
2339 "b".to_string(),
2340 mock_spec_with_deps(vec![dep(
2341 "c",
2342 zlayer_spec::DependencyCondition::Started,
2343 5000,
2344 zlayer_spec::TimeoutAction::Fail,
2345 )]),
2346 );
2347 services.insert(
2348 "a".to_string(),
2349 mock_spec_with_deps(vec![dep(
2350 "b",
2351 zlayer_spec::DependencyCondition::Started,
2352 5000,
2353 zlayer_spec::TimeoutAction::Fail,
2354 )]),
2355 );
2356
2357 Box::pin(manager.deploy_with_dependencies(services))
2359 .await
2360 .unwrap();
2361
2362 let service_list = manager.list_services().await;
2364 assert_eq!(service_list.len(), 3);
2365 }
2366
2367 #[tokio::test]
2368 async fn test_deploy_with_dependencies_cycle_detection() {
2369 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2370 let manager = ServiceManager::new(runtime);
2371
2372 let mut services = HashMap::new();
2374 services.insert(
2375 "a".to_string(),
2376 mock_spec_with_deps(vec![dep(
2377 "b",
2378 zlayer_spec::DependencyCondition::Started,
2379 5000,
2380 zlayer_spec::TimeoutAction::Fail,
2381 )]),
2382 );
2383 services.insert(
2384 "b".to_string(),
2385 mock_spec_with_deps(vec![dep(
2386 "a",
2387 zlayer_spec::DependencyCondition::Started,
2388 5000,
2389 zlayer_spec::TimeoutAction::Fail,
2390 )]),
2391 );
2392
2393 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
2395 assert!(result.is_err());
2396 let err = result.unwrap_err().to_string();
2397 assert!(err.contains("Cyclic dependency"));
2398 }
2399
2400 #[tokio::test]
2401 async fn test_deploy_with_dependencies_timeout_continue() {
2402 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2403 let manager = ServiceManager::new(runtime);
2404
2405 let mut services = HashMap::new();
2408 services.insert("b".to_string(), mock_spec());
2409 services.insert(
2410 "a".to_string(),
2411 mock_spec_with_deps(vec![dep(
2412 "b",
2413 zlayer_spec::DependencyCondition::Healthy, 100, zlayer_spec::TimeoutAction::Continue, )]),
2417 );
2418
2419 Box::pin(manager.deploy_with_dependencies(services))
2421 .await
2422 .unwrap();
2423
2424 let service_list = manager.list_services().await;
2425 assert_eq!(service_list.len(), 2);
2426 }
2427
2428 #[tokio::test]
2429 async fn test_deploy_with_dependencies_timeout_warn() {
2430 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2431 let manager = ServiceManager::new(runtime);
2432
2433 let mut services = HashMap::new();
2436 services.insert("b".to_string(), mock_spec());
2437 services.insert(
2438 "a".to_string(),
2439 mock_spec_with_deps(vec![dep(
2440 "b",
2441 zlayer_spec::DependencyCondition::Healthy,
2442 100,
2443 zlayer_spec::TimeoutAction::Warn,
2444 )]),
2445 );
2446
2447 Box::pin(manager.deploy_with_dependencies(services))
2449 .await
2450 .unwrap();
2451
2452 let service_list = manager.list_services().await;
2453 assert_eq!(service_list.len(), 2);
2454 }
2455
2456 #[tokio::test]
2457 async fn test_deploy_with_dependencies_timeout_fail() {
2458 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2459 let manager = ServiceManager::new(runtime);
2460
2461 let mut services = HashMap::new();
2464 services.insert("b".to_string(), mock_spec());
2465 services.insert(
2466 "a".to_string(),
2467 mock_spec_with_deps(vec![dep(
2468 "b",
2469 zlayer_spec::DependencyCondition::Healthy,
2470 100,
2471 zlayer_spec::TimeoutAction::Fail,
2472 )]),
2473 );
2474
2475 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
2477 assert!(result.is_err());
2478
2479 let err = result.unwrap_err().to_string();
2481 assert!(err.contains("Dependency timeout"));
2482 }
2483
2484 #[tokio::test]
2485 async fn test_check_dependencies_all_satisfied() {
2486 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2487 let manager = ServiceManager::new(runtime);
2488
2489 manager
2491 .update_health_state("db", HealthState::Healthy)
2492 .await;
2493
2494 let deps = vec![DependsSpec {
2495 service: "db".to_string(),
2496 condition: zlayer_spec::DependencyCondition::Healthy,
2497 timeout: Some(Duration::from_secs(60)),
2498 on_timeout: zlayer_spec::TimeoutAction::Fail,
2499 }];
2500
2501 let satisfied = manager.check_dependencies(&deps).await.unwrap();
2502 assert!(satisfied);
2503 }
2504
2505 #[tokio::test]
2506 async fn test_check_dependencies_not_satisfied() {
2507 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2508 let manager = ServiceManager::new(runtime);
2509
2510 let deps = vec![DependsSpec {
2512 service: "db".to_string(),
2513 condition: zlayer_spec::DependencyCondition::Healthy,
2514 timeout: Some(Duration::from_secs(60)),
2515 on_timeout: zlayer_spec::TimeoutAction::Fail,
2516 }];
2517
2518 let satisfied = manager.check_dependencies(&deps).await.unwrap();
2519 assert!(!satisfied);
2520 }
2521
2522 #[tokio::test]
2523 async fn test_health_state_tracking() {
2524 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2525 let manager = ServiceManager::new(runtime);
2526
2527 manager
2529 .update_health_state("db", HealthState::Healthy)
2530 .await;
2531 manager
2532 .update_health_state("cache", HealthState::Unknown)
2533 .await;
2534
2535 let states = manager.health_states();
2537 let states_read = states.read().await;
2538
2539 assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
2540 assert!(matches!(
2541 states_read.get("cache"),
2542 Some(HealthState::Unknown)
2543 ));
2544 }
2545
2546 fn mock_job_spec() -> ServiceSpec {
2549 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2550 r"
2551version: v1
2552deployment: test
2553services:
2554 backup:
2555 rtype: job
2556 image:
2557 name: backup:latest
2558",
2559 )
2560 .unwrap()
2561 .services
2562 .remove("backup")
2563 .unwrap()
2564 }
2565
2566 fn mock_cron_spec() -> ServiceSpec {
2567 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2568 r#"
2569version: v1
2570deployment: test
2571services:
2572 cleanup:
2573 rtype: cron
2574 schedule: "0 0 * * * * *"
2575 image:
2576 name: cleanup:latest
2577"#,
2578 )
2579 .unwrap()
2580 .services
2581 .remove("cleanup")
2582 .unwrap()
2583 }
2584
2585 #[tokio::test]
2586 async fn test_service_manager_with_job_executor() {
2587 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2588 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2589
2590 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2591
2592 let job_spec = mock_job_spec();
2594 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
2595 .await
2596 .unwrap();
2597
2598 let exec_id = manager
2600 .trigger_job("backup", JobTrigger::Cli)
2601 .await
2602 .unwrap();
2603
2604 tokio::time::sleep(Duration::from_millis(50)).await;
2606
2607 let execution = manager.get_job_execution(&exec_id).await;
2609 assert!(execution.is_some());
2610 assert_eq!(execution.unwrap().job_name, "backup");
2611 }
2612
2613 #[tokio::test]
2614 async fn test_service_manager_with_cron_scheduler() {
2615 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2616 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2617 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2618
2619 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2620
2621 let cron_spec = mock_cron_spec();
2623 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
2624 .await
2625 .unwrap();
2626
2627 let cron_jobs = manager.list_cron_jobs().await;
2629 assert_eq!(cron_jobs.len(), 1);
2630 assert_eq!(cron_jobs[0].name, "cleanup");
2631 assert!(cron_jobs[0].enabled);
2632 }
2633
2634 #[tokio::test]
2635 async fn test_service_manager_trigger_cron() {
2636 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2637 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2638 let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
2639
2640 let manager = ServiceManager::new(runtime)
2641 .with_job_executor(job_executor)
2642 .with_cron_scheduler(cron_scheduler);
2643
2644 let cron_spec = mock_cron_spec();
2646 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
2647 .await
2648 .unwrap();
2649
2650 let exec_id = manager.trigger_cron("cleanup").await.unwrap();
2652 assert!(!exec_id.0.is_empty());
2653 }
2654
2655 #[tokio::test]
2656 async fn test_service_manager_enable_disable_cron() {
2657 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2658 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2659 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2660
2661 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2662
2663 let cron_spec = mock_cron_spec();
2665 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
2666 .await
2667 .unwrap();
2668
2669 let cron_jobs = manager.list_cron_jobs().await;
2671 assert!(cron_jobs[0].enabled);
2672
2673 manager.set_cron_enabled("cleanup", false).await;
2675 let cron_jobs = manager.list_cron_jobs().await;
2676 assert!(!cron_jobs[0].enabled);
2677
2678 manager.set_cron_enabled("cleanup", true).await;
2680 let cron_jobs = manager.list_cron_jobs().await;
2681 assert!(cron_jobs[0].enabled);
2682 }
2683
2684 #[tokio::test]
2685 async fn test_service_manager_remove_cleans_up_job() {
2686 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2687 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2688
2689 let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
2690
2691 let job_spec = mock_job_spec();
2693 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
2694 .await
2695 .unwrap();
2696
2697 let spec = job_executor.get_job_spec("backup").await;
2699 assert!(spec.is_some());
2700
2701 manager.remove_service("backup").await.unwrap();
2703
2704 let spec = job_executor.get_job_spec("backup").await;
2706 assert!(spec.is_none());
2707 }
2708
2709 #[tokio::test]
2710 async fn test_service_manager_remove_cleans_up_cron() {
2711 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2712 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2713 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2714
2715 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
2716
2717 let cron_spec = mock_cron_spec();
2719 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
2720 .await
2721 .unwrap();
2722
2723 assert_eq!(cron_scheduler.job_count().await, 1);
2725
2726 manager.remove_service("cleanup").await.unwrap();
2728
2729 assert_eq!(cron_scheduler.job_count().await, 0);
2731 }
2732
2733 #[tokio::test]
2734 async fn test_service_manager_job_without_executor() {
2735 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2736 let manager = ServiceManager::new(runtime);
2737
2738 let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
2740 assert!(result.is_err());
2741 assert!(result.unwrap_err().to_string().contains("not configured"));
2742 }
2743
2744 #[tokio::test]
2745 async fn test_service_manager_cron_without_scheduler() {
2746 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2747 let manager = ServiceManager::new(runtime);
2748
2749 let cron_spec = mock_cron_spec();
2751 let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
2752 assert!(result.is_err());
2753 assert!(result.unwrap_err().to_string().contains("not configured"));
2754 }
2755
2756 #[tokio::test]
2757 async fn test_service_manager_list_job_executions() {
2758 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2759 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2760
2761 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2762
2763 let job_spec = mock_job_spec();
2765 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
2766 .await
2767 .unwrap();
2768
2769 manager
2771 .trigger_job("backup", JobTrigger::Cli)
2772 .await
2773 .unwrap();
2774 manager
2775 .trigger_job("backup", JobTrigger::Scheduler)
2776 .await
2777 .unwrap();
2778
2779 tokio::time::sleep(Duration::from_millis(50)).await;
2781
2782 let executions = manager.list_job_executions("backup").await;
2784 assert_eq!(executions.len(), 2);
2785 }
2786
2787 #[tokio::test]
2790 async fn test_service_manager_with_supervisor() {
2791 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2792 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2793
2794 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2795
2796 let spec = mock_spec();
2798 Box::pin(manager.upsert_service("api".to_string(), spec))
2799 .await
2800 .unwrap();
2801
2802 manager.scale_service("api", 2).await.unwrap();
2804
2805 assert_eq!(supervisor.supervised_count().await, 2);
2807
2808 manager.scale_service("api", 1).await.unwrap();
2810 assert_eq!(supervisor.supervised_count().await, 1);
2811
2812 manager.remove_service("api").await.unwrap();
2814 assert_eq!(supervisor.supervised_count().await, 0);
2815 }
2816
2817 #[tokio::test]
2818 async fn test_service_manager_supervisor_state() {
2819 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2820 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2821
2822 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
2823
2824 let spec = mock_spec();
2826 Box::pin(manager.upsert_service("web".to_string(), spec))
2827 .await
2828 .unwrap();
2829 manager.scale_service("web", 1).await.unwrap();
2830
2831 let container_id = ContainerId {
2833 service: "web".to_string(),
2834 replica: 1,
2835 };
2836 let state = manager.get_container_supervised_state(&container_id).await;
2837 assert_eq!(state, Some(SupervisedState::Running));
2838 }
2839
2840 #[tokio::test]
2841 async fn test_service_manager_start_supervisor() {
2842 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2843 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2844
2845 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2846
2847 let handle = manager.start_container_supervisor().unwrap();
2849
2850 tokio::time::sleep(Duration::from_millis(50)).await;
2852 assert!(supervisor.is_running());
2853
2854 manager.shutdown_container_supervisor();
2856
2857 tokio::time::timeout(Duration::from_secs(1), handle)
2859 .await
2860 .unwrap()
2861 .unwrap();
2862
2863 assert!(!supervisor.is_running());
2864 }
2865
2866 #[tokio::test]
2867 async fn test_service_manager_supervisor_not_configured() {
2868 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2869 let manager = ServiceManager::new(runtime);
2870
2871 let result = manager.start_container_supervisor();
2873 assert!(result.is_err());
2874 assert!(result.unwrap_err().to_string().contains("not configured"));
2875 }
2876
2877 fn mock_tcp_spec() -> ServiceSpec {
2880 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2881 r"
2882version: v1
2883deployment: test
2884services:
2885 database:
2886 rtype: service
2887 image:
2888 name: postgres:latest
2889 endpoints:
2890 - name: postgresql
2891 protocol: tcp
2892 port: 5432
2893 scale:
2894 mode: fixed
2895 replicas: 1
2896",
2897 )
2898 .unwrap()
2899 .services
2900 .remove("database")
2901 .unwrap()
2902 }
2903
2904 fn mock_udp_spec() -> ServiceSpec {
2905 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2906 r"
2907version: v1
2908deployment: test
2909services:
2910 dns:
2911 rtype: service
2912 image:
2913 name: dns:latest
2914 endpoints:
2915 - name: dns
2916 protocol: udp
2917 port: 53
2918 scale:
2919 mode: fixed
2920 replicas: 1
2921",
2922 )
2923 .unwrap()
2924 .services
2925 .remove("dns")
2926 .unwrap()
2927 }
2928
2929 fn mock_mixed_spec() -> ServiceSpec {
2930 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2931 r"
2932version: v1
2933deployment: test
2934services:
2935 mixed:
2936 rtype: service
2937 image:
2938 name: mixed:latest
2939 endpoints:
2940 - name: http
2941 protocol: http
2942 port: 8080
2943 - name: grpc
2944 protocol: tcp
2945 port: 9000
2946 - name: metrics
2947 protocol: udp
2948 port: 8125
2949 scale:
2950 mode: fixed
2951 replicas: 1
2952",
2953 )
2954 .unwrap()
2955 .services
2956 .remove("mixed")
2957 .unwrap()
2958 }
2959
2960 #[tokio::test]
2961 async fn test_service_manager_with_stream_registry_tcp() {
2962 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2963 let stream_registry = Arc::new(StreamRegistry::new());
2964
2965 let mut manager = ServiceManager::new(runtime);
2966 manager.set_stream_registry(stream_registry.clone());
2967 manager.set_deployment_name("test".to_string());
2968
2969 let spec = mock_tcp_spec();
2971 Box::pin(manager.upsert_service("database".to_string(), spec))
2972 .await
2973 .unwrap();
2974
2975 assert_eq!(stream_registry.tcp_count(), 1);
2977 assert!(stream_registry.tcp_ports().contains(&5432));
2978
2979 manager.remove_service("database").await.unwrap();
2981 assert_eq!(stream_registry.tcp_count(), 0);
2982 }
2983
2984 #[tokio::test]
2985 async fn test_service_manager_with_stream_registry_udp() {
2986 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2987 let stream_registry = Arc::new(StreamRegistry::new());
2988
2989 let mut manager = ServiceManager::new(runtime);
2990 manager.set_stream_registry(stream_registry.clone());
2991 manager.set_deployment_name("test".to_string());
2992
2993 let spec = mock_udp_spec();
2995 Box::pin(manager.upsert_service("dns".to_string(), spec))
2996 .await
2997 .unwrap();
2998
2999 assert_eq!(stream_registry.udp_count(), 1);
3001 assert!(stream_registry.udp_ports().contains(&53));
3002
3003 manager.remove_service("dns").await.unwrap();
3005 assert_eq!(stream_registry.udp_count(), 0);
3006 }
3007
3008 #[tokio::test]
3009 async fn test_service_manager_with_stream_registry_mixed() {
3010 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3011 let stream_registry = Arc::new(StreamRegistry::new());
3012
3013 let mut manager = ServiceManager::new(runtime);
3014 manager.set_stream_registry(stream_registry.clone());
3015 manager.set_deployment_name("test".to_string());
3016
3017 let spec = mock_mixed_spec();
3019 Box::pin(manager.upsert_service("mixed".to_string(), spec))
3020 .await
3021 .unwrap();
3022
3023 assert_eq!(stream_registry.tcp_count(), 1); assert_eq!(stream_registry.udp_count(), 1); assert!(stream_registry.tcp_ports().contains(&9000));
3028 assert!(stream_registry.udp_ports().contains(&8125));
3029
3030 manager.remove_service("mixed").await.unwrap();
3032 assert_eq!(stream_registry.tcp_count(), 0);
3033 assert_eq!(stream_registry.udp_count(), 0);
3034 }
3035
3036 #[tokio::test]
3037 async fn test_service_manager_stream_registry_builder() {
3038 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3039 let stream_registry = Arc::new(StreamRegistry::new());
3040
3041 let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
3043
3044 assert!(manager.stream_registry().is_some());
3046 }
3047
3048 #[tokio::test]
3049 async fn test_tcp_service_without_stream_registry() {
3050 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3051
3052 let mut manager = ServiceManager::new(runtime);
3054 manager.set_deployment_name("test".to_string());
3055
3056 let spec = mock_tcp_spec();
3058 Box::pin(manager.upsert_service("database".to_string(), spec))
3059 .await
3060 .unwrap();
3061
3062 let services = manager.list_services().await;
3064 assert!(services.contains(&"database".to_string()));
3065 }
3066}