1use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6 DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, ResourceType, ServiceSpec};
24
25pub struct ServiceInstance {
27 pub service_name: String,
28 pub spec: ServiceSpec,
29 runtime: Arc<dyn Runtime + Send + Sync>,
30 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33 proxy_manager: Option<Arc<ProxyManager>>,
35 dns_server: Option<Arc<DnsServer>>,
37 health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
39}
40
41impl ServiceInstance {
42 pub fn new(
44 service_name: String,
45 spec: ServiceSpec,
46 runtime: Arc<dyn Runtime + Send + Sync>,
47 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
48 ) -> Self {
49 Self {
50 service_name,
51 spec,
52 runtime,
53 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
54 overlay_manager,
55 proxy_manager: None,
56 dns_server: None,
57 health_states: None,
58 }
59 }
60
61 pub fn with_proxy(
63 service_name: String,
64 spec: ServiceSpec,
65 runtime: Arc<dyn Runtime + Send + Sync>,
66 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
67 proxy_manager: Arc<ProxyManager>,
68 ) -> Self {
69 Self {
70 service_name,
71 spec,
72 runtime,
73 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
74 overlay_manager,
75 proxy_manager: Some(proxy_manager),
76 dns_server: None,
77 health_states: None,
78 }
79 }
80
81 #[must_use]
83 pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
84 self.dns_server = Some(dns_server);
85 self
86 }
87
88 pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
90 self.dns_server = Some(dns_server);
91 }
92
93 pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
95 self.proxy_manager = Some(proxy_manager);
96 }
97
98 pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
100 self.health_states = Some(states);
101 }
102
103 #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
112 pub async fn scale_to(&self, replicas: u32) -> Result<()> {
113 let current_replicas = { self.containers.read().await.len() as u32 }; if replicas > current_replicas {
118 let image_str = self.spec.image.name.to_string();
120 self.runtime
121 .pull_image_with_policy(&image_str, self.spec.image.pull_policy, None)
122 .await
123 .map_err(|e| AgentError::PullFailed {
124 image: self.spec.image.name.to_string(),
125 reason: e.to_string(),
126 })?;
127
128 for i in current_replicas..replicas {
129 let id = ContainerId {
130 service: self.service_name.clone(),
131 replica: i + 1,
132 };
133
134 self.runtime
142 .create_container(&id, &self.spec)
143 .await
144 .map_err(|e| match e {
145 AgentError::RouteToPeer { .. } => e,
146 other => AgentError::CreateFailed {
147 id: id.to_string(),
148 reason: other.to_string(),
149 },
150 })?;
151
152 let init_orchestrator = InitOrchestrator::with_error_policy(
154 id.clone(),
155 self.spec.init.clone(),
156 self.spec.errors.clone(),
157 );
158 init_orchestrator.run().await?;
159
160 self.runtime
162 .start_container(&id)
163 .await
164 .map_err(|e| AgentError::StartFailed {
165 id: id.to_string(),
166 reason: e.to_string(),
167 })?;
168
169 let mut container_pid = None;
171 for attempt in 1..=5u32 {
172 match self.runtime.get_container_pid(&id).await {
173 Ok(Some(pid)) => {
174 container_pid = Some(pid);
175 break;
176 }
177 Ok(None) if attempt < 5 => {
178 tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
179 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
180 }
181 Ok(None) => {
182 tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
183 }
184 Err(e) => {
185 tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
186 if attempt < 5 {
187 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
188 }
189 }
190 }
191 }
192
193 if container_pid.is_some() {
200 let alive = match self.runtime.container_state(&id).await {
201 Ok(
202 ContainerState::Running
203 | ContainerState::Pending
204 | ContainerState::Initializing,
205 ) => true,
206 Ok(state) => {
207 tracing::warn!(
208 container = %id,
209 ?state,
210 "container exited before overlay attach could run"
211 );
212 false
213 }
214 Err(e) => {
215 tracing::warn!(
219 container = %id,
220 error = %e,
221 "container state query failed before overlay attach, proceeding"
222 );
223 true
224 }
225 };
226 if !alive {
227 let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
228 || " <log read failed>".to_string(),
229 |entries| {
230 if entries.is_empty() {
231 " <no log output>".to_string()
232 } else {
233 entries
234 .into_iter()
235 .map(|e| format!(" {}", e.message))
236 .collect::<Vec<_>>()
237 .join("\n")
238 }
239 },
240 );
241 return Err(AgentError::StartFailed {
242 id: id.to_string(),
243 reason: format!("container exited during startup:\n{log_tail}"),
244 });
245 }
246 }
247
248 let overlay_ip = if let Some(overlay) = &self.overlay_manager {
258 let overlay_guard = overlay.read().await;
259 #[cfg(target_os = "windows")]
260 let attach_result: Option<std::net::IpAddr> = {
261 let _ = container_pid; match self.runtime.get_container_namespace_id(&id).await {
263 Ok(Some(ns_id)) => {
264 let ip_override =
265 self.runtime.get_container_ip(&id).await.ok().flatten();
266 let dns_server = overlay_guard.dns_server_addr().map(|sa| sa.ip());
267 let dns_domain =
268 overlay_guard.dns_domain().map(ToString::to_string);
269 match overlay_guard
270 .attach_container_hcn(
271 ns_id,
272 &self.service_name,
273 ip_override,
274 true,
275 dns_server,
276 dns_domain,
277 )
278 .await
279 {
280 Ok(ip) => Some(ip),
281 Err(e) => {
282 tracing::warn!(
283 container = %id,
284 error = %e,
285 "HCN overlay attach failed"
286 );
287 None
288 }
289 }
290 }
291 Ok(None) => {
292 tracing::debug!(
293 container = %id,
294 "skipping HCN overlay attach - no namespace id available"
295 );
296 None
297 }
298 Err(e) => {
299 tracing::warn!(
300 container = %id,
301 error = %e,
302 "failed to fetch HCN namespace id"
303 );
304 None
305 }
306 }
307 };
308 #[cfg(not(target_os = "windows"))]
309 let attach_result: Option<std::net::IpAddr> = {
310 if let Some(pid) = container_pid {
311 match overlay_guard
312 .attach_container(pid, &self.service_name, true)
313 .await
314 {
315 Ok(ip) => Some(ip),
316 Err(e) => {
317 tracing::warn!(
318 container = %id,
319 error = %e,
320 "failed to attach container to overlay network"
321 );
322 None
323 }
324 }
325 } else {
326 tracing::debug!(
328 container = %id,
329 "skipping overlay attachment - no PID available"
330 );
331 None
332 }
333 };
334
335 if let Some(ip) = attach_result {
336 tracing::info!(
337 container = %id,
338 overlay_ip = %ip,
339 "attached container to overlay network"
340 );
341
342 if let Some(dns) = &self.dns_server {
344 let service_hostname = format!("{}.service.local", self.service_name);
346
347 let replica_hostname =
349 format!("{}.{}.service.local", id.replica, self.service_name);
350
351 match dns.add_record(&service_hostname, ip).await {
352 Ok(()) => tracing::debug!(
353 hostname = %service_hostname,
354 ip = %ip,
355 "registered DNS for service"
356 ),
357 Err(e) => tracing::warn!(
358 hostname = %service_hostname,
359 error = %e,
360 "failed to register DNS for service"
361 ),
362 }
363
364 if let Err(e) = dns.add_record(&replica_hostname, ip).await {
366 tracing::warn!(
367 hostname = %replica_hostname,
368 error = %e,
369 "failed to register replica DNS"
370 );
371 } else {
372 tracing::debug!(
373 hostname = %replica_hostname,
374 ip = %ip,
375 "registered DNS for replica"
376 );
377 }
378 }
379
380 Some(ip)
381 } else {
382 None
383 }
384 } else {
385 None
386 };
387
388 let effective_ip = if overlay_ip.is_none() {
390 match self.runtime.get_container_ip(&id).await {
391 Ok(Some(ip)) => {
392 tracing::info!(
393 container = %id,
394 ip = %ip,
395 "using runtime container IP for proxy (overlay unavailable)"
396 );
397 Some(ip)
398 }
399 Ok(None) => {
400 tracing::warn!(
401 container = %id,
402 "no container IP available from runtime, proxy routing will be unavailable"
403 );
404 None
405 }
406 Err(e) => {
407 tracing::warn!(
408 container = %id,
409 error = %e,
410 "failed to get container IP from runtime"
411 );
412 None
413 }
414 }
415 } else {
416 overlay_ip
417 };
418
419 tracing::info!(
420 container = %id,
421 service = %self.service_name,
422 overlay_ip = ?overlay_ip,
423 effective_ip = ?effective_ip,
424 "Container IP resolution complete"
425 );
426
427 let port_override = match self.runtime.get_container_port_override(&id).await {
432 Ok(Some(port)) => {
433 tracing::info!(
434 container = %id,
435 port = port,
436 "runtime assigned dynamic port override for this container"
437 );
438 Some(port)
439 }
440 Ok(None) => None,
441 Err(e) => {
442 tracing::warn!(
443 container = %id,
444 error = %e,
445 "failed to query port override from runtime, using spec port"
446 );
447 None
448 }
449 };
450
451 let health_monitor_handle = {
453 let mut check = self.spec.health.check.clone();
454
455 if let HealthCheck::Tcp { ref mut port } = check {
459 if *port == 0 {
460 *port = port_override.unwrap_or_else(|| {
461 self.spec
462 .endpoints
463 .iter()
464 .find(|ep| {
465 matches!(
466 ep.protocol,
467 Protocol::Http | Protocol::Https | Protocol::Websocket
468 )
469 })
470 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
471 });
472 }
473 }
474
475 let start_grace = self
476 .spec
477 .health
478 .start_grace
479 .unwrap_or(Duration::from_secs(5));
480 let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
481 let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
482 let retries = self.spec.health.retries;
483
484 let checker = HealthChecker::new(check, effective_ip);
485 let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
486 .with_start_grace(start_grace)
487 .with_check_timeout(check_timeout);
488
489 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
492 let proxy = Arc::clone(proxy);
493 let service_name = self.service_name.clone();
494 let port = port_override.unwrap_or_else(|| {
498 self.spec
499 .endpoints
500 .iter()
501 .find(|ep| {
502 matches!(
503 ep.protocol,
504 Protocol::Http | Protocol::Https | Protocol::Websocket
505 )
506 })
507 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
508 });
509
510 let backend_addr = SocketAddr::new(ip, port);
511
512 proxy.add_backend(&self.service_name, backend_addr).await;
516
517 let health_states_opt = self.health_states.clone();
518 let svc_name_for_states = self.service_name.clone();
519
520 let health_callback: HealthCallback =
521 Arc::new(move |container_id: ContainerId, is_healthy: bool| {
522 let proxy = Arc::clone(&proxy);
523 let service_name = service_name.clone();
524 tracing::info!(
525 container = %container_id,
526 service = %service_name,
527 backend = %backend_addr,
528 healthy = is_healthy,
529 "health status changed, updating proxy backend"
530 );
531 tokio::spawn(async move {
533 proxy
534 .update_backend_health(
535 &service_name,
536 backend_addr,
537 is_healthy,
538 )
539 .await;
540 });
541 if let Some(ref health_states) = health_states_opt {
543 let states = Arc::clone(health_states);
544 let svc = svc_name_for_states.clone();
545 tokio::spawn(async move {
546 let state = if is_healthy {
547 HealthState::Healthy
548 } else {
549 HealthState::Unhealthy {
550 failures: 0,
551 reason: "health check failed".into(),
552 }
553 };
554 states.write().await.insert(svc, state);
555 });
556 }
557 });
558
559 monitor = monitor.with_callback(health_callback);
560 }
561
562 monitor.start()
563 };
564
565 {
567 let mut containers = self.containers.write().await;
568 containers.insert(
569 id.clone(),
570 Container {
571 id: id.clone(),
572 state: ContainerState::Running,
573 pid: None,
574 task: None,
575 overlay_ip: effective_ip,
576 health_monitor: Some(health_monitor_handle),
577 port_override,
578 },
579 );
580 } }
582 }
583
584 if replicas < current_replicas {
586 for i in replicas..current_replicas {
587 let id = ContainerId {
588 service: self.service_name.clone(),
589 replica: i + 1,
590 };
591
592 let removed_container = {
594 let mut containers = self.containers.write().await;
595 containers.remove(&id)
596 }; if let Some(container) = removed_container {
600 if let Some(handle) = container.health_monitor {
602 handle.abort();
603 }
604
605 if let Some(dns) = &self.dns_server {
607 let replica_hostname =
609 format!("{}.{}.service.local", id.replica, self.service_name);
610 if let Err(e) = dns.remove_record(&replica_hostname).await {
611 tracing::warn!(
612 hostname = %replica_hostname,
613 error = %e,
614 "failed to remove replica DNS record"
615 );
616 } else {
617 tracing::debug!(
618 hostname = %replica_hostname,
619 "removed replica DNS record"
620 );
621 }
622
623 }
627
628 self.runtime
630 .stop_container(&id, Duration::from_secs(30))
631 .await?;
632
633 if let Err(e) = self.runtime.sync_container_volumes(&id).await {
635 tracing::warn!(
636 container = %id,
637 error = %e,
638 "failed to sync volumes before removal"
639 );
640 }
641
642 self.runtime.remove_container(&id).await?;
644 }
645 }
646 }
647
648 Ok(())
649 }
650
651 pub async fn replica_count(&self) -> usize {
653 self.containers.read().await.len()
654 }
655
656 pub async fn container_ids(&self) -> Vec<ContainerId> {
658 self.containers.read().await.keys().cloned().collect()
659 }
660
661 pub fn containers(
666 &self,
667 ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
668 &self.containers
669 }
670
671 pub fn has_overlay_manager(&self) -> bool {
673 self.overlay_manager.is_some()
674 }
675
676 pub fn has_proxy_manager(&self) -> bool {
678 self.proxy_manager.is_some()
679 }
680
681 pub fn has_dns_server(&self) -> bool {
683 self.dns_server.is_some()
684 }
685}
686
687pub struct ServiceManager {
689 runtime: Arc<dyn Runtime + Send + Sync>,
690 services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
691 scale_semaphore: Arc<Semaphore>,
692 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
694 stream_registry: Option<Arc<StreamRegistry>>,
696 proxy_manager: Option<Arc<ProxyManager>>,
698 dns_server: Option<Arc<DnsServer>>,
700 deployment_name: Option<String>,
702 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
704 job_executor: Option<Arc<JobExecutor>>,
706 cron_scheduler: Option<Arc<CronScheduler>>,
708 container_supervisor: Option<Arc<ContainerSupervisor>>,
710}
711
712pub struct ServiceManagerBuilder {
730 runtime: Arc<dyn Runtime + Send + Sync>,
731 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
732 proxy_manager: Option<Arc<ProxyManager>>,
733 stream_registry: Option<Arc<StreamRegistry>>,
734 dns_server: Option<Arc<DnsServer>>,
735 deployment_name: Option<String>,
736 job_executor: Option<Arc<JobExecutor>>,
737 cron_scheduler: Option<Arc<CronScheduler>>,
738 container_supervisor: Option<Arc<ContainerSupervisor>>,
739}
740
741impl ServiceManagerBuilder {
742 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
744 Self {
745 runtime,
746 overlay_manager: None,
747 proxy_manager: None,
748 stream_registry: None,
749 dns_server: None,
750 deployment_name: None,
751 job_executor: None,
752 cron_scheduler: None,
753 container_supervisor: None,
754 }
755 }
756
757 #[must_use]
759 pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
760 self.overlay_manager = Some(om);
761 self
762 }
763
764 #[must_use]
766 pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
767 self.proxy_manager = Some(pm);
768 self
769 }
770
771 #[must_use]
773 pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
774 self.stream_registry = Some(sr);
775 self
776 }
777
778 #[must_use]
780 pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
781 self.dns_server = Some(dns);
782 self
783 }
784
785 #[must_use]
787 pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
788 self.deployment_name = Some(name.into());
789 self
790 }
791
792 #[must_use]
794 pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
795 self.job_executor = Some(je);
796 self
797 }
798
799 #[must_use]
801 pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
802 self.cron_scheduler = Some(cs);
803 self
804 }
805
806 #[must_use]
808 pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
809 self.container_supervisor = Some(cs);
810 self
811 }
812
813 pub fn build(self) -> ServiceManager {
818 if self.proxy_manager.is_none() {
819 tracing::warn!("ServiceManager built without proxy_manager");
820 }
821 if self.stream_registry.is_none() {
822 tracing::warn!("ServiceManager built without stream_registry");
823 }
824 if self.container_supervisor.is_none() {
825 tracing::warn!("ServiceManager built without container_supervisor");
826 }
827 if self.deployment_name.is_none() {
828 tracing::warn!("ServiceManager built without deployment_name");
829 }
830
831 ServiceManager {
832 runtime: self.runtime,
833 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
834 scale_semaphore: Arc::new(Semaphore::new(10)),
835 overlay_manager: self.overlay_manager,
836 stream_registry: self.stream_registry,
837 proxy_manager: self.proxy_manager,
838 dns_server: self.dns_server,
839 deployment_name: self.deployment_name,
840 health_states: Arc::new(RwLock::new(HashMap::new())),
841 job_executor: self.job_executor,
842 cron_scheduler: self.cron_scheduler,
843 container_supervisor: self.container_supervisor,
844 }
845 }
846}
847
848impl ServiceManager {
849 pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
862 ServiceManagerBuilder::new(runtime)
863 }
864
865 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
867 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
868 Self {
869 runtime,
870 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
871 scale_semaphore: Arc::new(Semaphore::new(10)), overlay_manager: None,
873 stream_registry: None,
874 proxy_manager: None,
875 dns_server: None,
876 deployment_name: None,
877 health_states: Arc::new(RwLock::new(HashMap::new())),
878 job_executor: None,
879 cron_scheduler: None,
880 container_supervisor: None,
881 }
882 }
883
884 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
886 pub fn with_overlay(
887 runtime: Arc<dyn Runtime + Send + Sync>,
888 overlay_manager: Arc<RwLock<OverlayManager>>,
889 ) -> Self {
890 Self {
891 runtime,
892 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
893 scale_semaphore: Arc::new(Semaphore::new(10)),
894 overlay_manager: Some(overlay_manager),
895 stream_registry: None,
896 proxy_manager: None,
897 dns_server: None,
898 deployment_name: None,
899 health_states: Arc::new(RwLock::new(HashMap::new())),
900 job_executor: None,
901 cron_scheduler: None,
902 container_supervisor: None,
903 }
904 }
905
906 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
908 pub fn with_full_config(
909 runtime: Arc<dyn Runtime + Send + Sync>,
910 overlay_manager: Arc<RwLock<OverlayManager>>,
911 deployment_name: String,
912 ) -> Self {
913 Self {
914 runtime,
915 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
916 scale_semaphore: Arc::new(Semaphore::new(10)),
917 overlay_manager: Some(overlay_manager),
918 stream_registry: None,
919 proxy_manager: None,
920 dns_server: None,
921 deployment_name: Some(deployment_name),
922 health_states: Arc::new(RwLock::new(HashMap::new())),
923 job_executor: None,
924 cron_scheduler: None,
925 container_supervisor: None,
926 }
927 }
928
929 pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
931 Arc::clone(&self.health_states)
932 }
933
934 pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
936 let mut states = self.health_states.write().await;
937 states.insert(service_name.to_string(), state);
938 }
939
940 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
942 pub fn set_deployment_name(&mut self, name: String) {
943 self.deployment_name = Some(name);
944 }
945
946 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
948 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
949 self.stream_registry = Some(registry);
950 }
951
952 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
954 #[must_use]
955 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
956 self.stream_registry = Some(registry);
957 self
958 }
959
960 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
962 self.stream_registry.as_ref()
963 }
964
965 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
967 pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
968 self.overlay_manager = Some(manager);
969 }
970
971 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
973 pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
974 self.proxy_manager = Some(proxy);
975 }
976
977 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
979 #[must_use]
980 pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
981 self.proxy_manager = Some(proxy);
982 self
983 }
984
985 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
987 self.proxy_manager.as_ref()
988 }
989
990 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
992 pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
993 self.dns_server = Some(dns);
994 }
995
996 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
998 #[must_use]
999 pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1000 self.dns_server = Some(dns);
1001 self
1002 }
1003
1004 pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1006 self.dns_server.as_ref()
1007 }
1008
1009 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1011 pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1012 self.job_executor = Some(executor);
1013 }
1014
1015 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1017 pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1018 self.cron_scheduler = Some(scheduler);
1019 }
1020
1021 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1023 #[must_use]
1024 pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1025 self.job_executor = Some(executor);
1026 self
1027 }
1028
1029 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1031 #[must_use]
1032 pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1033 self.cron_scheduler = Some(scheduler);
1034 self
1035 }
1036
1037 pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1039 self.job_executor.as_ref()
1040 }
1041
1042 pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1044 self.cron_scheduler.as_ref()
1045 }
1046
1047 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1049 pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1050 self.container_supervisor = Some(supervisor);
1051 }
1052
1053 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1055 #[must_use]
1056 pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1057 self.container_supervisor = Some(supervisor);
1058 self
1059 }
1060
1061 pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1063 self.container_supervisor.as_ref()
1064 }
1065
1066 pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1077 let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1078 AgentError::Configuration("Container supervisor not configured".to_string())
1079 })?;
1080
1081 let supervisor = Arc::clone(supervisor);
1082 Ok(tokio::spawn(async move {
1083 supervisor.run_loop().await;
1084 }))
1085 }
1086
1087 pub fn shutdown_container_supervisor(&self) {
1089 if let Some(supervisor) = &self.container_supervisor {
1090 supervisor.shutdown();
1091 }
1092 }
1093
1094 pub async fn get_container_supervised_state(
1096 &self,
1097 container_id: &ContainerId,
1098 ) -> Option<SupervisedState> {
1099 if let Some(supervisor) = &self.container_supervisor {
1100 supervisor.get_state(container_id).await
1101 } else {
1102 None
1103 }
1104 }
1105
1106 pub async fn take_supervisor_events(
1110 &self,
1111 ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1112 if let Some(supervisor) = &self.container_supervisor {
1113 supervisor.take_event_receiver().await
1114 } else {
1115 None
1116 }
1117 }
1118
1119 pub async fn deploy_with_dependencies(
1136 &self,
1137 services: HashMap<String, ServiceSpec>,
1138 ) -> Result<()> {
1139 if services.is_empty() {
1140 return Ok(());
1141 }
1142
1143 let graph = DependencyGraph::build(&services)?;
1145
1146 tracing::info!(
1147 service_count = services.len(),
1148 "Starting deployment with dependency ordering"
1149 );
1150
1151 let order = graph.startup_order();
1153 tracing::debug!(order = ?order, "Computed startup order");
1154
1155 for service_name in order {
1157 let service_spec = services
1158 .get(service_name)
1159 .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1160
1161 if !service_spec.depends.is_empty() {
1163 tracing::info!(
1164 service = %service_name,
1165 dependency_count = service_spec.depends.len(),
1166 "Waiting for dependencies"
1167 );
1168 self.wait_for_dependencies(service_name, &service_spec.depends)
1169 .await?;
1170 }
1171
1172 tracing::info!(service = %service_name, "Starting service");
1174 self.upsert_service(service_name.clone(), service_spec.clone())
1175 .await?;
1176
1177 let replicas = match &service_spec.scale {
1179 zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1180 zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, zlayer_spec::ScaleSpec::Manual => 1, };
1183 self.scale_service(service_name, replicas).await?;
1184
1185 self.update_health_state(service_name, HealthState::Unknown)
1187 .await;
1188
1189 tracing::info!(
1190 service = %service_name,
1191 replicas = replicas,
1192 "Service started"
1193 );
1194 }
1195
1196 tracing::info!(service_count = services.len(), "Deployment complete");
1197
1198 Ok(())
1199 }
1200
1201 async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1210 let condition_checker = DependencyConditionChecker::new(
1211 Arc::clone(&self.runtime),
1212 Arc::clone(&self.health_states),
1213 None,
1214 );
1215
1216 let waiter = DependencyWaiter::new(condition_checker);
1217 let results = waiter.wait_for_all(deps).await?;
1218
1219 for result in results {
1221 match result {
1222 WaitResult::TimedOutFail {
1223 service: dep_service,
1224 condition,
1225 timeout,
1226 } => {
1227 return Err(AgentError::DependencyTimeout {
1228 service: service.to_string(),
1229 dependency: dep_service,
1230 condition: format!("{condition:?}"),
1231 timeout,
1232 });
1233 }
1234 WaitResult::TimedOutWarn {
1235 service: dep_service,
1236 condition,
1237 } => {
1238 tracing::warn!(
1239 service = %service,
1240 dependency = %dep_service,
1241 condition = ?condition,
1242 "Dependency timed out but continuing"
1243 );
1244 }
1245 WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1246 }
1248 }
1249 }
1250
1251 Ok(())
1252 }
1253
1254 pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1261 let condition_checker = DependencyConditionChecker::new(
1262 Arc::clone(&self.runtime),
1263 Arc::clone(&self.health_states),
1264 None,
1265 );
1266
1267 for dep in deps {
1268 if !condition_checker.check(dep).await? {
1269 return Ok(false);
1270 }
1271 }
1272
1273 Ok(true)
1274 }
1275
1276 pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1286 match spec.rtype {
1287 ResourceType::Service => {
1288 let mut services = self.services.write().await;
1290
1291 if let Some(instance) = services.get_mut(&name) {
1292 instance.spec = spec;
1294 if let Some(dns) = &self.dns_server {
1296 instance.set_dns_server(Arc::clone(dns));
1297 }
1298 } else {
1299 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1301 let mut instance = if let Some(proxy) = &self.proxy_manager {
1302 ServiceInstance::with_proxy(
1303 name.clone(),
1304 spec,
1305 self.runtime.clone(),
1306 overlay,
1307 Arc::clone(proxy),
1308 )
1309 } else {
1310 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1311 };
1312 if let Some(dns) = &self.dns_server {
1314 instance.set_dns_server(Arc::clone(dns));
1315 }
1316 instance.set_health_states(Arc::clone(&self.health_states));
1318 if let Some(proxy) = &self.proxy_manager {
1320 proxy.add_service(&name, &instance.spec).await;
1321 }
1322 if let Some(stream_registry) = &self.stream_registry {
1324 for endpoint in &instance.spec.endpoints {
1325 let svc = StreamService::new(
1326 name.clone(),
1327 Vec::new(), );
1329 match endpoint.protocol {
1330 Protocol::Tcp => {
1331 stream_registry.register_tcp(endpoint.port, svc);
1332 tracing::debug!(
1333 service = %name,
1334 port = endpoint.port,
1335 "Registered TCP stream route"
1336 );
1337 }
1338 Protocol::Udp => {
1339 stream_registry.register_udp(endpoint.port, svc);
1340 tracing::debug!(
1341 service = %name,
1342 port = endpoint.port,
1343 "Registered UDP stream route"
1344 );
1345 }
1346 _ => {} }
1348 }
1349 }
1350 services.insert(name, instance);
1351 }
1352 }
1353 ResourceType::Job => {
1354 if let Some(executor) = &self.job_executor {
1357 executor.register_job(&name, spec).await;
1358 tracing::info!(job = %name, "Registered job spec");
1359 } else {
1360 tracing::warn!(
1361 job = %name,
1362 "Job executor not configured, storing as service for reference"
1363 );
1364 let mut services = self.services.write().await;
1366 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1367 let mut instance = if let Some(proxy) = &self.proxy_manager {
1368 ServiceInstance::with_proxy(
1369 name.clone(),
1370 spec,
1371 self.runtime.clone(),
1372 overlay,
1373 Arc::clone(proxy),
1374 )
1375 } else {
1376 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1377 };
1378 if let Some(dns) = &self.dns_server {
1380 instance.set_dns_server(Arc::clone(dns));
1381 }
1382 services.insert(name, instance);
1383 }
1384 }
1385 ResourceType::Cron => {
1386 if let Some(scheduler) = &self.cron_scheduler {
1388 scheduler.register(&name, &spec).await?;
1389 tracing::info!(cron = %name, "Registered cron job with scheduler");
1390 } else {
1391 return Err(AgentError::Configuration(format!(
1392 "Cron scheduler not configured for cron job '{name}'"
1393 )));
1394 }
1395 }
1396 }
1397
1398 Ok(())
1399 }
1400
1401 async fn update_proxy_backends(&self, service_name: &str, addrs: Vec<SocketAddr>) {
1403 if let Some(proxy) = &self.proxy_manager {
1404 proxy.update_backends(service_name, addrs).await;
1405 }
1406 }
1407
1408 fn update_stream_backends(&self, spec: &ServiceSpec, addrs: &[SocketAddr]) {
1417 let Some(stream_registry) = &self.stream_registry else {
1418 return;
1419 };
1420
1421 let primary_spec_port = spec
1425 .endpoints
1426 .iter()
1427 .find(|ep| {
1428 matches!(
1429 ep.protocol,
1430 Protocol::Http | Protocol::Https | Protocol::Websocket
1431 )
1432 })
1433 .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1434
1435 let has_port_overrides = addrs.iter().any(|addr| addr.port() != primary_spec_port);
1436
1437 for endpoint in &spec.endpoints {
1438 match endpoint.protocol {
1439 Protocol::Tcp => {
1440 let tcp_backends: Vec<SocketAddr> = if has_port_overrides {
1441 addrs.to_vec()
1444 } else {
1445 addrs
1448 .iter()
1449 .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1450 .collect()
1451 };
1452
1453 stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
1454
1455 tracing::debug!(
1456 endpoint = %endpoint.name,
1457 port = endpoint.port,
1458 backend_count = addrs.len(),
1459 "Updated TCP stream backends"
1460 );
1461 }
1462 Protocol::Udp => {
1463 let udp_backends: Vec<SocketAddr> = if has_port_overrides {
1464 addrs.to_vec()
1465 } else {
1466 addrs
1467 .iter()
1468 .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1469 .collect()
1470 };
1471
1472 stream_registry.update_udp_backends(endpoint.port, udp_backends);
1473
1474 tracing::debug!(
1475 endpoint = %endpoint.name,
1476 port = endpoint.port,
1477 backend_count = addrs.len(),
1478 "Updated UDP stream backends"
1479 );
1480 }
1481 _ => {} }
1483 }
1484 }
1485
1486 #[allow(clippy::cast_possible_truncation)]
1491 pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
1492 let _permit = self.scale_semaphore.acquire().await;
1493
1494 let services = self.services.read().await;
1495 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1496 container: name.to_string(),
1497 reason: "service not found".to_string(),
1498 })?;
1499
1500 let current_replicas = instance.replica_count().await as u32;
1502
1503 instance.scale_to(replicas).await?;
1505
1506 let addrs = self.collect_backend_addrs(instance, replicas).await;
1512
1513 if self.proxy_manager.is_some() && !addrs.is_empty() {
1515 self.update_proxy_backends(name, addrs.clone()).await;
1516 }
1517
1518 if self.stream_registry.is_some() {
1520 self.update_stream_backends(&instance.spec, &addrs);
1521 }
1522
1523 if let Some(supervisor) = &self.container_supervisor {
1525 if replicas > current_replicas {
1527 for i in current_replicas..replicas {
1528 let container_id = ContainerId {
1529 service: name.to_string(),
1530 replica: i + 1,
1531 };
1532 supervisor.supervise(&container_id, &instance.spec).await;
1533 }
1534 }
1535 if replicas < current_replicas {
1537 for i in replicas..current_replicas {
1538 let container_id = ContainerId {
1539 service: name.to_string(),
1540 replica: i + 1,
1541 };
1542 supervisor.unsupervise(&container_id).await;
1543 }
1544 }
1545 }
1546
1547 Ok(())
1548 }
1549
1550 async fn collect_backend_addrs(
1561 &self,
1562 instance: &ServiceInstance,
1563 _replicas: u32, ) -> Vec<SocketAddr> {
1565 let mut addrs = Vec::new();
1566
1567 let spec_port = instance
1569 .spec
1570 .endpoints
1571 .iter()
1572 .find(|ep| {
1573 matches!(
1574 ep.protocol,
1575 Protocol::Http | Protocol::Https | Protocol::Websocket
1576 )
1577 })
1578 .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1579
1580 let containers = instance.containers().read().await;
1582
1583 for container in containers.values() {
1584 if let Some(ip) = container.overlay_ip {
1585 let port = container.port_override.unwrap_or(spec_port);
1588 addrs.push(SocketAddr::new(ip, port));
1589 }
1590 }
1591
1592 if addrs.is_empty() && !containers.is_empty() {
1595 tracing::warn!(
1596 service = %instance.service_name,
1597 container_count = containers.len(),
1598 "no overlay IPs available for backends - containers may not be reachable via proxy"
1599 );
1600 }
1601
1602 addrs
1603 }
1604
1605 pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
1610 let services = self.services.read().await;
1611 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1612 container: name.to_string(),
1613 reason: "service not found".to_string(),
1614 })?;
1615
1616 Ok(instance.replica_count().await)
1617 }
1618
1619 pub async fn remove_service(&self, name: &str) -> Result<()> {
1629 if let Some(scheduler) = &self.cron_scheduler {
1631 scheduler.unregister(name).await;
1632 }
1633
1634 if let Some(executor) = &self.job_executor {
1636 executor.unregister_job(name).await;
1637 }
1638
1639 if let Some(stream_registry) = &self.stream_registry {
1641 let services = self.services.read().await;
1643 if let Some(instance) = services.get(name) {
1644 for endpoint in &instance.spec.endpoints {
1645 match endpoint.protocol {
1646 Protocol::Tcp => {
1647 let _ = stream_registry.unregister_tcp(endpoint.port);
1648 tracing::debug!(
1649 service = %name,
1650 port = endpoint.port,
1651 "Unregistered TCP stream route"
1652 );
1653 }
1654 Protocol::Udp => {
1655 let _ = stream_registry.unregister_udp(endpoint.port);
1656 tracing::debug!(
1657 service = %name,
1658 port = endpoint.port,
1659 "Unregistered UDP stream route"
1660 );
1661 }
1662 _ => {} }
1664 }
1665 }
1666 drop(services); }
1668
1669 if let Some(supervisor) = &self.container_supervisor {
1671 let containers = self.get_service_containers(name).await;
1672 for container_id in containers {
1673 supervisor.unsupervise(&container_id).await;
1674 }
1675 tracing::debug!(service = %name, "Unregistered containers from supervisor");
1676 }
1677
1678 if let Some(dns) = &self.dns_server {
1680 let service_hostname = format!("{name}.service.local");
1682 if let Err(e) = dns.remove_record(&service_hostname).await {
1683 tracing::warn!(
1684 hostname = %service_hostname,
1685 error = %e,
1686 "failed to remove service DNS record"
1687 );
1688 } else {
1689 tracing::debug!(
1690 hostname = %service_hostname,
1691 "removed service DNS record"
1692 );
1693 }
1694
1695 let services = self.services.read().await;
1697 if let Some(instance) = services.get(name) {
1698 let containers = instance.containers().read().await;
1699 for (id, _) in containers.iter() {
1700 let replica_hostname = format!("{}.{}.service.local", id.replica, name);
1701 if let Err(e) = dns.remove_record(&replica_hostname).await {
1702 tracing::warn!(
1703 hostname = %replica_hostname,
1704 error = %e,
1705 "failed to remove replica DNS record during service removal"
1706 );
1707 }
1708 }
1709 }
1710 drop(services); }
1712
1713 let mut services = self.services.write().await;
1715 if services.remove(name).is_some() {
1716 tracing::debug!(service = %name, "Removed service from manager");
1717 }
1718
1719 Ok(())
1720 }
1721
1722 pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
1725 let services = self.services.read().await;
1726 services.get(name).map(|i| {
1727 (
1728 i.has_overlay_manager(),
1729 i.has_proxy_manager(),
1730 i.has_dns_server(),
1731 )
1732 })
1733 }
1734
1735 pub async fn list_services(&self) -> Vec<String> {
1737 self.services.read().await.keys().cloned().collect()
1738 }
1739
1740 pub async fn get_service_logs(
1754 &self,
1755 service_name: &str,
1756 tail: usize,
1757 instance: Option<&str>,
1758 ) -> Result<Vec<LogEntry>> {
1759 let container_ids = self.get_service_containers(service_name).await;
1760
1761 if container_ids.is_empty() {
1762 return Err(AgentError::NotFound {
1763 container: service_name.to_string(),
1764 reason: "no containers found for service".to_string(),
1765 });
1766 }
1767
1768 let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
1770 if let Ok(replica_num) = inst.parse::<u32>() {
1771 container_ids
1772 .iter()
1773 .filter(|id| id.replica == replica_num)
1774 .collect()
1775 } else {
1776 container_ids
1778 .iter()
1779 .filter(|id| id.to_string().contains(inst))
1780 .collect()
1781 }
1782 } else {
1783 container_ids.iter().collect()
1784 };
1785
1786 if target_ids.is_empty() {
1787 return Err(AgentError::NotFound {
1788 container: format!("{}/{}", service_name, instance.unwrap_or("?")),
1789 reason: "instance not found".to_string(),
1790 });
1791 }
1792
1793 let mut all_entries: Vec<LogEntry> = Vec::new();
1794
1795 for id in &target_ids {
1796 match self.runtime.container_logs(id, tail).await {
1797 Ok(mut entries) => {
1798 for entry in &mut entries {
1800 if entry.service.is_none() {
1801 entry.service = Some(service_name.to_string());
1802 }
1803 if entry.deployment.is_none() {
1804 entry.deployment.clone_from(&self.deployment_name);
1805 }
1806 }
1807 all_entries.extend(entries);
1808 }
1809 Err(e) => {
1810 tracing::warn!(
1811 service = service_name,
1812 container = %id,
1813 error = %e,
1814 "Failed to read container logs"
1815 );
1816 }
1817 }
1818 }
1819
1820 Ok(all_entries)
1821 }
1822
1823 pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
1833 let services = self.services.read().await;
1834 if let Some(instance) = services.get(service_name) {
1835 instance.container_ids().await
1836 } else {
1837 Vec::new()
1838 }
1839 }
1840
1841 pub async fn exec_in_container(
1860 &self,
1861 service_name: &str,
1862 replica: Option<u32>,
1863 cmd: &[String],
1864 ) -> Result<(i32, String, String)> {
1865 let container_ids = self.get_service_containers(service_name).await;
1866
1867 if container_ids.is_empty() {
1868 return Err(AgentError::NotFound {
1869 container: service_name.to_string(),
1870 reason: "no containers found for service".to_string(),
1871 });
1872 }
1873
1874 let target = if let Some(rep) = replica {
1876 container_ids
1877 .into_iter()
1878 .find(|cid| cid.replica == rep)
1879 .ok_or_else(|| AgentError::NotFound {
1880 container: format!("{service_name}-rep-{rep}"),
1881 reason: format!("replica {rep} not found for service"),
1882 })?
1883 } else {
1884 container_ids.into_iter().next().unwrap()
1886 };
1887
1888 self.runtime.exec(&target, cmd).await
1889 }
1890
1891 pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
1906 let executor = self
1907 .job_executor
1908 .as_ref()
1909 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
1910
1911 let spec = executor
1912 .get_job_spec(name)
1913 .await
1914 .ok_or_else(|| AgentError::NotFound {
1915 container: name.to_string(),
1916 reason: "job not registered".to_string(),
1917 })?;
1918
1919 executor.trigger(name, &spec, trigger).await
1920 }
1921
1922 pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
1930 if let Some(executor) = &self.job_executor {
1931 executor.get_execution(id).await
1932 } else {
1933 None
1934 }
1935 }
1936
1937 pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
1945 if let Some(executor) = &self.job_executor {
1946 executor.list_executions(name).await
1947 } else {
1948 Vec::new()
1949 }
1950 }
1951
1952 pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
1960 let executor = self
1961 .job_executor
1962 .as_ref()
1963 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
1964
1965 executor.cancel(id).await
1966 }
1967
1968 pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
1981 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
1982 AgentError::Configuration("Cron scheduler not configured".to_string())
1983 })?;
1984
1985 scheduler.trigger_now(name).await
1986 }
1987
1988 pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
1994 if let Some(scheduler) = &self.cron_scheduler {
1995 scheduler.set_enabled(name, enabled).await;
1996 }
1997 }
1998
1999 pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2001 if let Some(scheduler) = &self.cron_scheduler {
2002 scheduler.list_jobs().await
2003 } else {
2004 Vec::new()
2005 }
2006 }
2007
2008 pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2016 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2017 AgentError::Configuration("Cron scheduler not configured".to_string())
2018 })?;
2019
2020 let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2021 Ok(tokio::spawn(async move {
2022 scheduler.run_loop().await;
2023 }))
2024 }
2025
2026 pub fn shutdown_cron(&self) {
2028 if let Some(scheduler) = &self.cron_scheduler {
2029 scheduler.shutdown();
2030 }
2031 }
2032}
2033
2034#[cfg(test)]
2035#[allow(deprecated)]
2036mod tests {
2037 use super::*;
2038 use crate::runtime::MockRuntime;
2039
2040 #[tokio::test]
2041 async fn test_service_manager() {
2042 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2043 let manager = ServiceManager::new(runtime);
2044
2045 let spec = mock_spec();
2047 manager
2048 .upsert_service("test".to_string(), spec)
2049 .await
2050 .unwrap();
2051
2052 manager.scale_service("test", 3).await.unwrap();
2054
2055 let count = manager.service_replica_count("test").await.unwrap();
2057 assert_eq!(count, 3);
2058
2059 let services = manager.list_services().await;
2061 assert_eq!(services, vec!["test".to_string()]);
2062 }
2063
2064 #[tokio::test]
2065 async fn test_service_manager_basic_lifecycle() {
2066 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2067 let manager = ServiceManager::new(runtime);
2068
2069 let spec = mock_spec();
2071 manager
2072 .upsert_service("api".to_string(), spec)
2073 .await
2074 .unwrap();
2075
2076 manager.scale_service("api", 2).await.unwrap();
2078
2079 let count = manager.service_replica_count("api").await.unwrap();
2081 assert_eq!(count, 2);
2082
2083 manager.remove_service("api").await.unwrap();
2085
2086 let services = manager.list_services().await;
2088 assert!(!services.contains(&"api".to_string()));
2089 }
2090
2091 #[tokio::test]
2092 async fn test_service_manager_with_full_config() {
2093 use tokio::sync::RwLock;
2094
2095 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2096
2097 let overlay_manager = Arc::new(RwLock::new(
2099 OverlayManager::new("test-deployment".to_string())
2100 .await
2101 .unwrap(),
2102 ));
2103
2104 let manager =
2105 ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2106
2107 let spec = mock_spec();
2109 manager
2110 .upsert_service("web".to_string(), spec)
2111 .await
2112 .unwrap();
2113
2114 let services = manager.list_services().await;
2116 assert!(services.contains(&"web".to_string()));
2117 }
2118
2119 fn mock_spec() -> ServiceSpec {
2120 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2121 r"
2122version: v1
2123deployment: test
2124services:
2125 test:
2126 rtype: service
2127 image:
2128 name: test:latest
2129 endpoints:
2130 - name: http
2131 protocol: http
2132 port: 8080
2133 scale:
2134 mode: fixed
2135 replicas: 1
2136",
2137 )
2138 .unwrap()
2139 .services
2140 .remove("test")
2141 .unwrap()
2142 }
2143
2144 fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
2146 let mut spec = mock_spec();
2147 spec.depends = deps;
2148 spec
2149 }
2150
2151 fn dep(
2153 service: &str,
2154 condition: zlayer_spec::DependencyCondition,
2155 timeout_ms: u64,
2156 on_timeout: zlayer_spec::TimeoutAction,
2157 ) -> DependsSpec {
2158 DependsSpec {
2159 service: service.to_string(),
2160 condition,
2161 timeout: Some(Duration::from_millis(timeout_ms)),
2162 on_timeout,
2163 }
2164 }
2165
2166 #[tokio::test]
2167 async fn test_deploy_with_dependencies_no_deps() {
2168 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2169 let manager = ServiceManager::new(runtime);
2170
2171 let mut services = HashMap::new();
2173 services.insert("a".to_string(), mock_spec());
2174 services.insert("b".to_string(), mock_spec());
2175
2176 manager.deploy_with_dependencies(services).await.unwrap();
2178
2179 let service_list = manager.list_services().await;
2181 assert_eq!(service_list.len(), 2);
2182 }
2183
2184 #[tokio::test]
2185 async fn test_deploy_with_dependencies_linear() {
2186 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2187 let manager = ServiceManager::new(runtime);
2188
2189 let mut services = HashMap::new();
2192 services.insert("c".to_string(), mock_spec());
2193 services.insert(
2194 "b".to_string(),
2195 mock_spec_with_deps(vec![dep(
2196 "c",
2197 zlayer_spec::DependencyCondition::Started,
2198 5000,
2199 zlayer_spec::TimeoutAction::Fail,
2200 )]),
2201 );
2202 services.insert(
2203 "a".to_string(),
2204 mock_spec_with_deps(vec![dep(
2205 "b",
2206 zlayer_spec::DependencyCondition::Started,
2207 5000,
2208 zlayer_spec::TimeoutAction::Fail,
2209 )]),
2210 );
2211
2212 manager.deploy_with_dependencies(services).await.unwrap();
2214
2215 let service_list = manager.list_services().await;
2217 assert_eq!(service_list.len(), 3);
2218 }
2219
2220 #[tokio::test]
2221 async fn test_deploy_with_dependencies_cycle_detection() {
2222 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2223 let manager = ServiceManager::new(runtime);
2224
2225 let mut services = HashMap::new();
2227 services.insert(
2228 "a".to_string(),
2229 mock_spec_with_deps(vec![dep(
2230 "b",
2231 zlayer_spec::DependencyCondition::Started,
2232 5000,
2233 zlayer_spec::TimeoutAction::Fail,
2234 )]),
2235 );
2236 services.insert(
2237 "b".to_string(),
2238 mock_spec_with_deps(vec![dep(
2239 "a",
2240 zlayer_spec::DependencyCondition::Started,
2241 5000,
2242 zlayer_spec::TimeoutAction::Fail,
2243 )]),
2244 );
2245
2246 let result = manager.deploy_with_dependencies(services).await;
2248 assert!(result.is_err());
2249 let err = result.unwrap_err().to_string();
2250 assert!(err.contains("Cyclic dependency"));
2251 }
2252
2253 #[tokio::test]
2254 async fn test_deploy_with_dependencies_timeout_continue() {
2255 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2256 let manager = ServiceManager::new(runtime);
2257
2258 let mut services = HashMap::new();
2261 services.insert("b".to_string(), mock_spec());
2262 services.insert(
2263 "a".to_string(),
2264 mock_spec_with_deps(vec![dep(
2265 "b",
2266 zlayer_spec::DependencyCondition::Healthy, 100, zlayer_spec::TimeoutAction::Continue, )]),
2270 );
2271
2272 manager.deploy_with_dependencies(services).await.unwrap();
2274
2275 let service_list = manager.list_services().await;
2276 assert_eq!(service_list.len(), 2);
2277 }
2278
2279 #[tokio::test]
2280 async fn test_deploy_with_dependencies_timeout_warn() {
2281 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2282 let manager = ServiceManager::new(runtime);
2283
2284 let mut services = HashMap::new();
2287 services.insert("b".to_string(), mock_spec());
2288 services.insert(
2289 "a".to_string(),
2290 mock_spec_with_deps(vec![dep(
2291 "b",
2292 zlayer_spec::DependencyCondition::Healthy,
2293 100,
2294 zlayer_spec::TimeoutAction::Warn,
2295 )]),
2296 );
2297
2298 manager.deploy_with_dependencies(services).await.unwrap();
2300
2301 let service_list = manager.list_services().await;
2302 assert_eq!(service_list.len(), 2);
2303 }
2304
2305 #[tokio::test]
2306 async fn test_deploy_with_dependencies_timeout_fail() {
2307 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2308 let manager = ServiceManager::new(runtime);
2309
2310 let mut services = HashMap::new();
2313 services.insert("b".to_string(), mock_spec());
2314 services.insert(
2315 "a".to_string(),
2316 mock_spec_with_deps(vec![dep(
2317 "b",
2318 zlayer_spec::DependencyCondition::Healthy,
2319 100,
2320 zlayer_spec::TimeoutAction::Fail,
2321 )]),
2322 );
2323
2324 let result = manager.deploy_with_dependencies(services).await;
2326 assert!(result.is_err());
2327
2328 let err = result.unwrap_err().to_string();
2330 assert!(err.contains("Dependency timeout"));
2331 }
2332
2333 #[tokio::test]
2334 async fn test_check_dependencies_all_satisfied() {
2335 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2336 let manager = ServiceManager::new(runtime);
2337
2338 manager
2340 .update_health_state("db", HealthState::Healthy)
2341 .await;
2342
2343 let deps = vec![DependsSpec {
2344 service: "db".to_string(),
2345 condition: zlayer_spec::DependencyCondition::Healthy,
2346 timeout: Some(Duration::from_secs(60)),
2347 on_timeout: zlayer_spec::TimeoutAction::Fail,
2348 }];
2349
2350 let satisfied = manager.check_dependencies(&deps).await.unwrap();
2351 assert!(satisfied);
2352 }
2353
2354 #[tokio::test]
2355 async fn test_check_dependencies_not_satisfied() {
2356 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2357 let manager = ServiceManager::new(runtime);
2358
2359 let deps = vec![DependsSpec {
2361 service: "db".to_string(),
2362 condition: zlayer_spec::DependencyCondition::Healthy,
2363 timeout: Some(Duration::from_secs(60)),
2364 on_timeout: zlayer_spec::TimeoutAction::Fail,
2365 }];
2366
2367 let satisfied = manager.check_dependencies(&deps).await.unwrap();
2368 assert!(!satisfied);
2369 }
2370
2371 #[tokio::test]
2372 async fn test_health_state_tracking() {
2373 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2374 let manager = ServiceManager::new(runtime);
2375
2376 manager
2378 .update_health_state("db", HealthState::Healthy)
2379 .await;
2380 manager
2381 .update_health_state("cache", HealthState::Unknown)
2382 .await;
2383
2384 let states = manager.health_states();
2386 let states_read = states.read().await;
2387
2388 assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
2389 assert!(matches!(
2390 states_read.get("cache"),
2391 Some(HealthState::Unknown)
2392 ));
2393 }
2394
2395 fn mock_job_spec() -> ServiceSpec {
2398 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2399 r"
2400version: v1
2401deployment: test
2402services:
2403 backup:
2404 rtype: job
2405 image:
2406 name: backup:latest
2407",
2408 )
2409 .unwrap()
2410 .services
2411 .remove("backup")
2412 .unwrap()
2413 }
2414
2415 fn mock_cron_spec() -> ServiceSpec {
2416 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2417 r#"
2418version: v1
2419deployment: test
2420services:
2421 cleanup:
2422 rtype: cron
2423 schedule: "0 0 * * * * *"
2424 image:
2425 name: cleanup:latest
2426"#,
2427 )
2428 .unwrap()
2429 .services
2430 .remove("cleanup")
2431 .unwrap()
2432 }
2433
2434 #[tokio::test]
2435 async fn test_service_manager_with_job_executor() {
2436 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2437 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2438
2439 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2440
2441 let job_spec = mock_job_spec();
2443 manager
2444 .upsert_service("backup".to_string(), job_spec)
2445 .await
2446 .unwrap();
2447
2448 let exec_id = manager
2450 .trigger_job("backup", JobTrigger::Cli)
2451 .await
2452 .unwrap();
2453
2454 tokio::time::sleep(Duration::from_millis(50)).await;
2456
2457 let execution = manager.get_job_execution(&exec_id).await;
2459 assert!(execution.is_some());
2460 assert_eq!(execution.unwrap().job_name, "backup");
2461 }
2462
2463 #[tokio::test]
2464 async fn test_service_manager_with_cron_scheduler() {
2465 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2466 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2467 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2468
2469 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2470
2471 let cron_spec = mock_cron_spec();
2473 manager
2474 .upsert_service("cleanup".to_string(), cron_spec)
2475 .await
2476 .unwrap();
2477
2478 let cron_jobs = manager.list_cron_jobs().await;
2480 assert_eq!(cron_jobs.len(), 1);
2481 assert_eq!(cron_jobs[0].name, "cleanup");
2482 assert!(cron_jobs[0].enabled);
2483 }
2484
2485 #[tokio::test]
2486 async fn test_service_manager_trigger_cron() {
2487 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2488 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2489 let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
2490
2491 let manager = ServiceManager::new(runtime)
2492 .with_job_executor(job_executor)
2493 .with_cron_scheduler(cron_scheduler);
2494
2495 let cron_spec = mock_cron_spec();
2497 manager
2498 .upsert_service("cleanup".to_string(), cron_spec)
2499 .await
2500 .unwrap();
2501
2502 let exec_id = manager.trigger_cron("cleanup").await.unwrap();
2504 assert!(!exec_id.0.is_empty());
2505 }
2506
2507 #[tokio::test]
2508 async fn test_service_manager_enable_disable_cron() {
2509 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2510 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2511 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2512
2513 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2514
2515 let cron_spec = mock_cron_spec();
2517 manager
2518 .upsert_service("cleanup".to_string(), cron_spec)
2519 .await
2520 .unwrap();
2521
2522 let cron_jobs = manager.list_cron_jobs().await;
2524 assert!(cron_jobs[0].enabled);
2525
2526 manager.set_cron_enabled("cleanup", false).await;
2528 let cron_jobs = manager.list_cron_jobs().await;
2529 assert!(!cron_jobs[0].enabled);
2530
2531 manager.set_cron_enabled("cleanup", true).await;
2533 let cron_jobs = manager.list_cron_jobs().await;
2534 assert!(cron_jobs[0].enabled);
2535 }
2536
2537 #[tokio::test]
2538 async fn test_service_manager_remove_cleans_up_job() {
2539 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2540 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2541
2542 let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
2543
2544 let job_spec = mock_job_spec();
2546 manager
2547 .upsert_service("backup".to_string(), job_spec)
2548 .await
2549 .unwrap();
2550
2551 let spec = job_executor.get_job_spec("backup").await;
2553 assert!(spec.is_some());
2554
2555 manager.remove_service("backup").await.unwrap();
2557
2558 let spec = job_executor.get_job_spec("backup").await;
2560 assert!(spec.is_none());
2561 }
2562
2563 #[tokio::test]
2564 async fn test_service_manager_remove_cleans_up_cron() {
2565 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2566 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2567 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2568
2569 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
2570
2571 let cron_spec = mock_cron_spec();
2573 manager
2574 .upsert_service("cleanup".to_string(), cron_spec)
2575 .await
2576 .unwrap();
2577
2578 assert_eq!(cron_scheduler.job_count().await, 1);
2580
2581 manager.remove_service("cleanup").await.unwrap();
2583
2584 assert_eq!(cron_scheduler.job_count().await, 0);
2586 }
2587
2588 #[tokio::test]
2589 async fn test_service_manager_job_without_executor() {
2590 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2591 let manager = ServiceManager::new(runtime);
2592
2593 let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
2595 assert!(result.is_err());
2596 assert!(result.unwrap_err().to_string().contains("not configured"));
2597 }
2598
2599 #[tokio::test]
2600 async fn test_service_manager_cron_without_scheduler() {
2601 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2602 let manager = ServiceManager::new(runtime);
2603
2604 let cron_spec = mock_cron_spec();
2606 let result = manager
2607 .upsert_service("cleanup".to_string(), cron_spec)
2608 .await;
2609 assert!(result.is_err());
2610 assert!(result.unwrap_err().to_string().contains("not configured"));
2611 }
2612
2613 #[tokio::test]
2614 async fn test_service_manager_list_job_executions() {
2615 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2616 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2617
2618 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2619
2620 let job_spec = mock_job_spec();
2622 manager
2623 .upsert_service("backup".to_string(), job_spec)
2624 .await
2625 .unwrap();
2626
2627 manager
2629 .trigger_job("backup", JobTrigger::Cli)
2630 .await
2631 .unwrap();
2632 manager
2633 .trigger_job("backup", JobTrigger::Scheduler)
2634 .await
2635 .unwrap();
2636
2637 tokio::time::sleep(Duration::from_millis(50)).await;
2639
2640 let executions = manager.list_job_executions("backup").await;
2642 assert_eq!(executions.len(), 2);
2643 }
2644
2645 #[tokio::test]
2648 async fn test_service_manager_with_supervisor() {
2649 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2650 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2651
2652 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2653
2654 let spec = mock_spec();
2656 manager
2657 .upsert_service("api".to_string(), spec)
2658 .await
2659 .unwrap();
2660
2661 manager.scale_service("api", 2).await.unwrap();
2663
2664 assert_eq!(supervisor.supervised_count().await, 2);
2666
2667 manager.scale_service("api", 1).await.unwrap();
2669 assert_eq!(supervisor.supervised_count().await, 1);
2670
2671 manager.remove_service("api").await.unwrap();
2673 assert_eq!(supervisor.supervised_count().await, 0);
2674 }
2675
2676 #[tokio::test]
2677 async fn test_service_manager_supervisor_state() {
2678 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2679 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2680
2681 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
2682
2683 let spec = mock_spec();
2685 manager
2686 .upsert_service("web".to_string(), spec)
2687 .await
2688 .unwrap();
2689 manager.scale_service("web", 1).await.unwrap();
2690
2691 let container_id = ContainerId {
2693 service: "web".to_string(),
2694 replica: 1,
2695 };
2696 let state = manager.get_container_supervised_state(&container_id).await;
2697 assert_eq!(state, Some(SupervisedState::Running));
2698 }
2699
2700 #[tokio::test]
2701 async fn test_service_manager_start_supervisor() {
2702 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2703 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2704
2705 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2706
2707 let handle = manager.start_container_supervisor().unwrap();
2709
2710 tokio::time::sleep(Duration::from_millis(50)).await;
2712 assert!(supervisor.is_running());
2713
2714 manager.shutdown_container_supervisor();
2716
2717 tokio::time::timeout(Duration::from_secs(1), handle)
2719 .await
2720 .unwrap()
2721 .unwrap();
2722
2723 assert!(!supervisor.is_running());
2724 }
2725
2726 #[tokio::test]
2727 async fn test_service_manager_supervisor_not_configured() {
2728 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2729 let manager = ServiceManager::new(runtime);
2730
2731 let result = manager.start_container_supervisor();
2733 assert!(result.is_err());
2734 assert!(result.unwrap_err().to_string().contains("not configured"));
2735 }
2736
2737 fn mock_tcp_spec() -> ServiceSpec {
2740 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2741 r"
2742version: v1
2743deployment: test
2744services:
2745 database:
2746 rtype: service
2747 image:
2748 name: postgres:latest
2749 endpoints:
2750 - name: postgresql
2751 protocol: tcp
2752 port: 5432
2753 scale:
2754 mode: fixed
2755 replicas: 1
2756",
2757 )
2758 .unwrap()
2759 .services
2760 .remove("database")
2761 .unwrap()
2762 }
2763
2764 fn mock_udp_spec() -> ServiceSpec {
2765 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2766 r"
2767version: v1
2768deployment: test
2769services:
2770 dns:
2771 rtype: service
2772 image:
2773 name: dns:latest
2774 endpoints:
2775 - name: dns
2776 protocol: udp
2777 port: 53
2778 scale:
2779 mode: fixed
2780 replicas: 1
2781",
2782 )
2783 .unwrap()
2784 .services
2785 .remove("dns")
2786 .unwrap()
2787 }
2788
2789 fn mock_mixed_spec() -> ServiceSpec {
2790 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2791 r"
2792version: v1
2793deployment: test
2794services:
2795 mixed:
2796 rtype: service
2797 image:
2798 name: mixed:latest
2799 endpoints:
2800 - name: http
2801 protocol: http
2802 port: 8080
2803 - name: grpc
2804 protocol: tcp
2805 port: 9000
2806 - name: metrics
2807 protocol: udp
2808 port: 8125
2809 scale:
2810 mode: fixed
2811 replicas: 1
2812",
2813 )
2814 .unwrap()
2815 .services
2816 .remove("mixed")
2817 .unwrap()
2818 }
2819
2820 #[tokio::test]
2821 async fn test_service_manager_with_stream_registry_tcp() {
2822 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2823 let stream_registry = Arc::new(StreamRegistry::new());
2824
2825 let mut manager = ServiceManager::new(runtime);
2826 manager.set_stream_registry(stream_registry.clone());
2827 manager.set_deployment_name("test".to_string());
2828
2829 let spec = mock_tcp_spec();
2831 manager
2832 .upsert_service("database".to_string(), spec)
2833 .await
2834 .unwrap();
2835
2836 assert_eq!(stream_registry.tcp_count(), 1);
2838 assert!(stream_registry.tcp_ports().contains(&5432));
2839
2840 manager.remove_service("database").await.unwrap();
2842 assert_eq!(stream_registry.tcp_count(), 0);
2843 }
2844
2845 #[tokio::test]
2846 async fn test_service_manager_with_stream_registry_udp() {
2847 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2848 let stream_registry = Arc::new(StreamRegistry::new());
2849
2850 let mut manager = ServiceManager::new(runtime);
2851 manager.set_stream_registry(stream_registry.clone());
2852 manager.set_deployment_name("test".to_string());
2853
2854 let spec = mock_udp_spec();
2856 manager
2857 .upsert_service("dns".to_string(), spec)
2858 .await
2859 .unwrap();
2860
2861 assert_eq!(stream_registry.udp_count(), 1);
2863 assert!(stream_registry.udp_ports().contains(&53));
2864
2865 manager.remove_service("dns").await.unwrap();
2867 assert_eq!(stream_registry.udp_count(), 0);
2868 }
2869
2870 #[tokio::test]
2871 async fn test_service_manager_with_stream_registry_mixed() {
2872 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2873 let stream_registry = Arc::new(StreamRegistry::new());
2874
2875 let mut manager = ServiceManager::new(runtime);
2876 manager.set_stream_registry(stream_registry.clone());
2877 manager.set_deployment_name("test".to_string());
2878
2879 let spec = mock_mixed_spec();
2881 manager
2882 .upsert_service("mixed".to_string(), spec)
2883 .await
2884 .unwrap();
2885
2886 assert_eq!(stream_registry.tcp_count(), 1); assert_eq!(stream_registry.udp_count(), 1); assert!(stream_registry.tcp_ports().contains(&9000));
2891 assert!(stream_registry.udp_ports().contains(&8125));
2892
2893 manager.remove_service("mixed").await.unwrap();
2895 assert_eq!(stream_registry.tcp_count(), 0);
2896 assert_eq!(stream_registry.udp_count(), 0);
2897 }
2898
2899 #[tokio::test]
2900 async fn test_service_manager_stream_registry_builder() {
2901 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2902 let stream_registry = Arc::new(StreamRegistry::new());
2903
2904 let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
2906
2907 assert!(manager.stream_registry().is_some());
2909 }
2910
2911 #[tokio::test]
2912 async fn test_tcp_service_without_stream_registry() {
2913 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2914
2915 let mut manager = ServiceManager::new(runtime);
2917 manager.set_deployment_name("test".to_string());
2918
2919 let spec = mock_tcp_spec();
2921 manager
2922 .upsert_service("database".to_string(), spec)
2923 .await
2924 .unwrap();
2925
2926 let services = manager.list_services().await;
2928 assert!(services.contains(&"database".to_string()));
2929 }
2930}