1use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6 DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, ResourceType, ServiceSpec};
24
25pub struct ServiceInstance {
27 pub service_name: String,
28 pub spec: ServiceSpec,
29 runtime: Arc<dyn Runtime + Send + Sync>,
30 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33 proxy_manager: Option<Arc<ProxyManager>>,
35 dns_server: Option<Arc<DnsServer>>,
37 health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
39}
40
41impl ServiceInstance {
42 pub fn new(
44 service_name: String,
45 spec: ServiceSpec,
46 runtime: Arc<dyn Runtime + Send + Sync>,
47 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
48 ) -> Self {
49 Self {
50 service_name,
51 spec,
52 runtime,
53 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
54 overlay_manager,
55 proxy_manager: None,
56 dns_server: None,
57 health_states: None,
58 }
59 }
60
61 pub fn with_proxy(
63 service_name: String,
64 spec: ServiceSpec,
65 runtime: Arc<dyn Runtime + Send + Sync>,
66 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
67 proxy_manager: Arc<ProxyManager>,
68 ) -> Self {
69 Self {
70 service_name,
71 spec,
72 runtime,
73 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
74 overlay_manager,
75 proxy_manager: Some(proxy_manager),
76 dns_server: None,
77 health_states: None,
78 }
79 }
80
81 #[must_use]
83 pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
84 self.dns_server = Some(dns_server);
85 self
86 }
87
88 pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
90 self.dns_server = Some(dns_server);
91 }
92
93 pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
95 self.proxy_manager = Some(proxy_manager);
96 }
97
98 pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
100 self.health_states = Some(states);
101 }
102
103 #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
112 pub async fn scale_to(&self, replicas: u32) -> Result<()> {
113 let current_replicas = { self.containers.read().await.len() as u32 }; if replicas > current_replicas {
118 self.runtime
120 .pull_image_with_policy(&self.spec.image.name, self.spec.image.pull_policy, None)
121 .await
122 .map_err(|e| AgentError::PullFailed {
123 image: self.spec.image.name.clone(),
124 reason: e.to_string(),
125 })?;
126
127 for i in current_replicas..replicas {
128 let id = ContainerId {
129 service: self.service_name.clone(),
130 replica: i + 1,
131 };
132
133 self.runtime
141 .create_container(&id, &self.spec)
142 .await
143 .map_err(|e| match e {
144 AgentError::RouteToPeer { .. } => e,
145 other => AgentError::CreateFailed {
146 id: id.to_string(),
147 reason: other.to_string(),
148 },
149 })?;
150
151 let init_orchestrator = InitOrchestrator::with_error_policy(
153 id.clone(),
154 self.spec.init.clone(),
155 self.spec.errors.clone(),
156 );
157 init_orchestrator.run().await?;
158
159 self.runtime
161 .start_container(&id)
162 .await
163 .map_err(|e| AgentError::StartFailed {
164 id: id.to_string(),
165 reason: e.to_string(),
166 })?;
167
168 let mut container_pid = None;
170 for attempt in 1..=5u32 {
171 match self.runtime.get_container_pid(&id).await {
172 Ok(Some(pid)) => {
173 container_pid = Some(pid);
174 break;
175 }
176 Ok(None) if attempt < 5 => {
177 tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
178 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
179 }
180 Ok(None) => {
181 tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
182 }
183 Err(e) => {
184 tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
185 if attempt < 5 {
186 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
187 }
188 }
189 }
190 }
191
192 if container_pid.is_some() {
199 let alive = match self.runtime.container_state(&id).await {
200 Ok(
201 ContainerState::Running
202 | ContainerState::Pending
203 | ContainerState::Initializing,
204 ) => true,
205 Ok(state) => {
206 tracing::warn!(
207 container = %id,
208 ?state,
209 "container exited before overlay attach could run"
210 );
211 false
212 }
213 Err(e) => {
214 tracing::warn!(
218 container = %id,
219 error = %e,
220 "container state query failed before overlay attach, proceeding"
221 );
222 true
223 }
224 };
225 if !alive {
226 let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
227 || " <log read failed>".to_string(),
228 |entries| {
229 if entries.is_empty() {
230 " <no log output>".to_string()
231 } else {
232 entries
233 .into_iter()
234 .map(|e| format!(" {}", e.message))
235 .collect::<Vec<_>>()
236 .join("\n")
237 }
238 },
239 );
240 return Err(AgentError::StartFailed {
241 id: id.to_string(),
242 reason: format!("container exited during startup:\n{log_tail}"),
243 });
244 }
245 }
246
247 let overlay_ip = if let Some(overlay) = &self.overlay_manager {
257 let overlay_guard = overlay.read().await;
258 #[cfg(target_os = "windows")]
259 let attach_result: Option<std::net::IpAddr> = {
260 let _ = container_pid; match self.runtime.get_container_namespace_id(&id).await {
262 Ok(Some(ns_id)) => {
263 let ip_override =
264 self.runtime.get_container_ip(&id).await.ok().flatten();
265 let dns_server = overlay_guard.dns_server_addr().map(|sa| sa.ip());
266 let dns_domain =
267 overlay_guard.dns_domain().map(ToString::to_string);
268 match overlay_guard
269 .attach_container_hcn(
270 ns_id,
271 &self.service_name,
272 ip_override,
273 true,
274 dns_server,
275 dns_domain,
276 )
277 .await
278 {
279 Ok(ip) => Some(ip),
280 Err(e) => {
281 tracing::warn!(
282 container = %id,
283 error = %e,
284 "HCN overlay attach failed"
285 );
286 None
287 }
288 }
289 }
290 Ok(None) => {
291 tracing::debug!(
292 container = %id,
293 "skipping HCN overlay attach - no namespace id available"
294 );
295 None
296 }
297 Err(e) => {
298 tracing::warn!(
299 container = %id,
300 error = %e,
301 "failed to fetch HCN namespace id"
302 );
303 None
304 }
305 }
306 };
307 #[cfg(not(target_os = "windows"))]
308 let attach_result: Option<std::net::IpAddr> = {
309 if let Some(pid) = container_pid {
310 match overlay_guard
311 .attach_container(pid, &self.service_name, true)
312 .await
313 {
314 Ok(ip) => Some(ip),
315 Err(e) => {
316 tracing::warn!(
317 container = %id,
318 error = %e,
319 "failed to attach container to overlay network"
320 );
321 None
322 }
323 }
324 } else {
325 tracing::debug!(
327 container = %id,
328 "skipping overlay attachment - no PID available"
329 );
330 None
331 }
332 };
333
334 if let Some(ip) = attach_result {
335 tracing::info!(
336 container = %id,
337 overlay_ip = %ip,
338 "attached container to overlay network"
339 );
340
341 if let Some(dns) = &self.dns_server {
343 let service_hostname = format!("{}.service.local", self.service_name);
345
346 let replica_hostname =
348 format!("{}.{}.service.local", id.replica, self.service_name);
349
350 match dns.add_record(&service_hostname, ip).await {
351 Ok(()) => tracing::debug!(
352 hostname = %service_hostname,
353 ip = %ip,
354 "registered DNS for service"
355 ),
356 Err(e) => tracing::warn!(
357 hostname = %service_hostname,
358 error = %e,
359 "failed to register DNS for service"
360 ),
361 }
362
363 if let Err(e) = dns.add_record(&replica_hostname, ip).await {
365 tracing::warn!(
366 hostname = %replica_hostname,
367 error = %e,
368 "failed to register replica DNS"
369 );
370 } else {
371 tracing::debug!(
372 hostname = %replica_hostname,
373 ip = %ip,
374 "registered DNS for replica"
375 );
376 }
377 }
378
379 Some(ip)
380 } else {
381 None
382 }
383 } else {
384 None
385 };
386
387 let effective_ip = if overlay_ip.is_none() {
389 match self.runtime.get_container_ip(&id).await {
390 Ok(Some(ip)) => {
391 tracing::info!(
392 container = %id,
393 ip = %ip,
394 "using runtime container IP for proxy (overlay unavailable)"
395 );
396 Some(ip)
397 }
398 Ok(None) => {
399 tracing::warn!(
400 container = %id,
401 "no container IP available from runtime, proxy routing will be unavailable"
402 );
403 None
404 }
405 Err(e) => {
406 tracing::warn!(
407 container = %id,
408 error = %e,
409 "failed to get container IP from runtime"
410 );
411 None
412 }
413 }
414 } else {
415 overlay_ip
416 };
417
418 tracing::info!(
419 container = %id,
420 service = %self.service_name,
421 overlay_ip = ?overlay_ip,
422 effective_ip = ?effective_ip,
423 "Container IP resolution complete"
424 );
425
426 let port_override = match self.runtime.get_container_port_override(&id).await {
431 Ok(Some(port)) => {
432 tracing::info!(
433 container = %id,
434 port = port,
435 "runtime assigned dynamic port override for this container"
436 );
437 Some(port)
438 }
439 Ok(None) => None,
440 Err(e) => {
441 tracing::warn!(
442 container = %id,
443 error = %e,
444 "failed to query port override from runtime, using spec port"
445 );
446 None
447 }
448 };
449
450 let health_monitor_handle = {
452 let mut check = self.spec.health.check.clone();
453
454 if let HealthCheck::Tcp { ref mut port } = check {
458 if *port == 0 {
459 *port = port_override.unwrap_or_else(|| {
460 self.spec
461 .endpoints
462 .iter()
463 .find(|ep| {
464 matches!(
465 ep.protocol,
466 Protocol::Http | Protocol::Https | Protocol::Websocket
467 )
468 })
469 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
470 });
471 }
472 }
473
474 let start_grace = self
475 .spec
476 .health
477 .start_grace
478 .unwrap_or(Duration::from_secs(5));
479 let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
480 let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
481 let retries = self.spec.health.retries;
482
483 let checker = HealthChecker::new(check, effective_ip);
484 let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
485 .with_start_grace(start_grace)
486 .with_check_timeout(check_timeout);
487
488 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
491 let proxy = Arc::clone(proxy);
492 let service_name = self.service_name.clone();
493 let port = port_override.unwrap_or_else(|| {
497 self.spec
498 .endpoints
499 .iter()
500 .find(|ep| {
501 matches!(
502 ep.protocol,
503 Protocol::Http | Protocol::Https | Protocol::Websocket
504 )
505 })
506 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
507 });
508
509 let backend_addr = SocketAddr::new(ip, port);
510
511 proxy.add_backend(&self.service_name, backend_addr).await;
515
516 let health_states_opt = self.health_states.clone();
517 let svc_name_for_states = self.service_name.clone();
518
519 let health_callback: HealthCallback =
520 Arc::new(move |container_id: ContainerId, is_healthy: bool| {
521 let proxy = Arc::clone(&proxy);
522 let service_name = service_name.clone();
523 tracing::info!(
524 container = %container_id,
525 service = %service_name,
526 backend = %backend_addr,
527 healthy = is_healthy,
528 "health status changed, updating proxy backend"
529 );
530 tokio::spawn(async move {
532 proxy
533 .update_backend_health(
534 &service_name,
535 backend_addr,
536 is_healthy,
537 )
538 .await;
539 });
540 if let Some(ref health_states) = health_states_opt {
542 let states = Arc::clone(health_states);
543 let svc = svc_name_for_states.clone();
544 tokio::spawn(async move {
545 let state = if is_healthy {
546 HealthState::Healthy
547 } else {
548 HealthState::Unhealthy {
549 failures: 0,
550 reason: "health check failed".into(),
551 }
552 };
553 states.write().await.insert(svc, state);
554 });
555 }
556 });
557
558 monitor = monitor.with_callback(health_callback);
559 }
560
561 monitor.start()
562 };
563
564 {
566 let mut containers = self.containers.write().await;
567 containers.insert(
568 id.clone(),
569 Container {
570 id: id.clone(),
571 state: ContainerState::Running,
572 pid: None,
573 task: None,
574 overlay_ip: effective_ip,
575 health_monitor: Some(health_monitor_handle),
576 port_override,
577 },
578 );
579 } }
581 }
582
583 if replicas < current_replicas {
585 for i in replicas..current_replicas {
586 let id = ContainerId {
587 service: self.service_name.clone(),
588 replica: i + 1,
589 };
590
591 let removed_container = {
593 let mut containers = self.containers.write().await;
594 containers.remove(&id)
595 }; if let Some(container) = removed_container {
599 if let Some(handle) = container.health_monitor {
601 handle.abort();
602 }
603
604 if let Some(dns) = &self.dns_server {
606 let replica_hostname =
608 format!("{}.{}.service.local", id.replica, self.service_name);
609 if let Err(e) = dns.remove_record(&replica_hostname).await {
610 tracing::warn!(
611 hostname = %replica_hostname,
612 error = %e,
613 "failed to remove replica DNS record"
614 );
615 } else {
616 tracing::debug!(
617 hostname = %replica_hostname,
618 "removed replica DNS record"
619 );
620 }
621
622 }
626
627 self.runtime
629 .stop_container(&id, Duration::from_secs(30))
630 .await?;
631
632 if let Err(e) = self.runtime.sync_container_volumes(&id).await {
634 tracing::warn!(
635 container = %id,
636 error = %e,
637 "failed to sync volumes before removal"
638 );
639 }
640
641 self.runtime.remove_container(&id).await?;
643 }
644 }
645 }
646
647 Ok(())
648 }
649
650 pub async fn replica_count(&self) -> usize {
652 self.containers.read().await.len()
653 }
654
655 pub async fn container_ids(&self) -> Vec<ContainerId> {
657 self.containers.read().await.keys().cloned().collect()
658 }
659
660 pub fn containers(
665 &self,
666 ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
667 &self.containers
668 }
669
670 pub fn has_overlay_manager(&self) -> bool {
672 self.overlay_manager.is_some()
673 }
674
675 pub fn has_proxy_manager(&self) -> bool {
677 self.proxy_manager.is_some()
678 }
679
680 pub fn has_dns_server(&self) -> bool {
682 self.dns_server.is_some()
683 }
684}
685
686pub struct ServiceManager {
688 runtime: Arc<dyn Runtime + Send + Sync>,
689 services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
690 scale_semaphore: Arc<Semaphore>,
691 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
693 stream_registry: Option<Arc<StreamRegistry>>,
695 proxy_manager: Option<Arc<ProxyManager>>,
697 dns_server: Option<Arc<DnsServer>>,
699 deployment_name: Option<String>,
701 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
703 job_executor: Option<Arc<JobExecutor>>,
705 cron_scheduler: Option<Arc<CronScheduler>>,
707 container_supervisor: Option<Arc<ContainerSupervisor>>,
709}
710
711pub struct ServiceManagerBuilder {
729 runtime: Arc<dyn Runtime + Send + Sync>,
730 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
731 proxy_manager: Option<Arc<ProxyManager>>,
732 stream_registry: Option<Arc<StreamRegistry>>,
733 dns_server: Option<Arc<DnsServer>>,
734 deployment_name: Option<String>,
735 job_executor: Option<Arc<JobExecutor>>,
736 cron_scheduler: Option<Arc<CronScheduler>>,
737 container_supervisor: Option<Arc<ContainerSupervisor>>,
738}
739
740impl ServiceManagerBuilder {
741 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
743 Self {
744 runtime,
745 overlay_manager: None,
746 proxy_manager: None,
747 stream_registry: None,
748 dns_server: None,
749 deployment_name: None,
750 job_executor: None,
751 cron_scheduler: None,
752 container_supervisor: None,
753 }
754 }
755
756 #[must_use]
758 pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
759 self.overlay_manager = Some(om);
760 self
761 }
762
763 #[must_use]
765 pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
766 self.proxy_manager = Some(pm);
767 self
768 }
769
770 #[must_use]
772 pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
773 self.stream_registry = Some(sr);
774 self
775 }
776
777 #[must_use]
779 pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
780 self.dns_server = Some(dns);
781 self
782 }
783
784 #[must_use]
786 pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
787 self.deployment_name = Some(name.into());
788 self
789 }
790
791 #[must_use]
793 pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
794 self.job_executor = Some(je);
795 self
796 }
797
798 #[must_use]
800 pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
801 self.cron_scheduler = Some(cs);
802 self
803 }
804
805 #[must_use]
807 pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
808 self.container_supervisor = Some(cs);
809 self
810 }
811
812 pub fn build(self) -> ServiceManager {
817 if self.proxy_manager.is_none() {
818 tracing::warn!("ServiceManager built without proxy_manager");
819 }
820 if self.stream_registry.is_none() {
821 tracing::warn!("ServiceManager built without stream_registry");
822 }
823 if self.container_supervisor.is_none() {
824 tracing::warn!("ServiceManager built without container_supervisor");
825 }
826 if self.deployment_name.is_none() {
827 tracing::warn!("ServiceManager built without deployment_name");
828 }
829
830 ServiceManager {
831 runtime: self.runtime,
832 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
833 scale_semaphore: Arc::new(Semaphore::new(10)),
834 overlay_manager: self.overlay_manager,
835 stream_registry: self.stream_registry,
836 proxy_manager: self.proxy_manager,
837 dns_server: self.dns_server,
838 deployment_name: self.deployment_name,
839 health_states: Arc::new(RwLock::new(HashMap::new())),
840 job_executor: self.job_executor,
841 cron_scheduler: self.cron_scheduler,
842 container_supervisor: self.container_supervisor,
843 }
844 }
845}
846
847impl ServiceManager {
848 pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
861 ServiceManagerBuilder::new(runtime)
862 }
863
864 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
866 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
867 Self {
868 runtime,
869 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
870 scale_semaphore: Arc::new(Semaphore::new(10)), overlay_manager: None,
872 stream_registry: None,
873 proxy_manager: None,
874 dns_server: None,
875 deployment_name: None,
876 health_states: Arc::new(RwLock::new(HashMap::new())),
877 job_executor: None,
878 cron_scheduler: None,
879 container_supervisor: None,
880 }
881 }
882
883 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
885 pub fn with_overlay(
886 runtime: Arc<dyn Runtime + Send + Sync>,
887 overlay_manager: Arc<RwLock<OverlayManager>>,
888 ) -> Self {
889 Self {
890 runtime,
891 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
892 scale_semaphore: Arc::new(Semaphore::new(10)),
893 overlay_manager: Some(overlay_manager),
894 stream_registry: None,
895 proxy_manager: None,
896 dns_server: None,
897 deployment_name: None,
898 health_states: Arc::new(RwLock::new(HashMap::new())),
899 job_executor: None,
900 cron_scheduler: None,
901 container_supervisor: None,
902 }
903 }
904
905 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
907 pub fn with_full_config(
908 runtime: Arc<dyn Runtime + Send + Sync>,
909 overlay_manager: Arc<RwLock<OverlayManager>>,
910 deployment_name: String,
911 ) -> Self {
912 Self {
913 runtime,
914 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
915 scale_semaphore: Arc::new(Semaphore::new(10)),
916 overlay_manager: Some(overlay_manager),
917 stream_registry: None,
918 proxy_manager: None,
919 dns_server: None,
920 deployment_name: Some(deployment_name),
921 health_states: Arc::new(RwLock::new(HashMap::new())),
922 job_executor: None,
923 cron_scheduler: None,
924 container_supervisor: None,
925 }
926 }
927
928 pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
930 Arc::clone(&self.health_states)
931 }
932
933 pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
935 let mut states = self.health_states.write().await;
936 states.insert(service_name.to_string(), state);
937 }
938
939 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
941 pub fn set_deployment_name(&mut self, name: String) {
942 self.deployment_name = Some(name);
943 }
944
945 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
947 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
948 self.stream_registry = Some(registry);
949 }
950
951 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
953 #[must_use]
954 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
955 self.stream_registry = Some(registry);
956 self
957 }
958
959 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
961 self.stream_registry.as_ref()
962 }
963
964 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
966 pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
967 self.overlay_manager = Some(manager);
968 }
969
970 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
972 pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
973 self.proxy_manager = Some(proxy);
974 }
975
976 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
978 #[must_use]
979 pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
980 self.proxy_manager = Some(proxy);
981 self
982 }
983
984 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
986 self.proxy_manager.as_ref()
987 }
988
989 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
991 pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
992 self.dns_server = Some(dns);
993 }
994
995 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
997 #[must_use]
998 pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
999 self.dns_server = Some(dns);
1000 self
1001 }
1002
1003 pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1005 self.dns_server.as_ref()
1006 }
1007
1008 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1010 pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1011 self.job_executor = Some(executor);
1012 }
1013
1014 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1016 pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1017 self.cron_scheduler = Some(scheduler);
1018 }
1019
1020 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1022 #[must_use]
1023 pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1024 self.job_executor = Some(executor);
1025 self
1026 }
1027
1028 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1030 #[must_use]
1031 pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1032 self.cron_scheduler = Some(scheduler);
1033 self
1034 }
1035
1036 pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1038 self.job_executor.as_ref()
1039 }
1040
1041 pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1043 self.cron_scheduler.as_ref()
1044 }
1045
1046 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1048 pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1049 self.container_supervisor = Some(supervisor);
1050 }
1051
1052 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1054 #[must_use]
1055 pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1056 self.container_supervisor = Some(supervisor);
1057 self
1058 }
1059
1060 pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1062 self.container_supervisor.as_ref()
1063 }
1064
1065 pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1076 let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1077 AgentError::Configuration("Container supervisor not configured".to_string())
1078 })?;
1079
1080 let supervisor = Arc::clone(supervisor);
1081 Ok(tokio::spawn(async move {
1082 supervisor.run_loop().await;
1083 }))
1084 }
1085
1086 pub fn shutdown_container_supervisor(&self) {
1088 if let Some(supervisor) = &self.container_supervisor {
1089 supervisor.shutdown();
1090 }
1091 }
1092
1093 pub async fn get_container_supervised_state(
1095 &self,
1096 container_id: &ContainerId,
1097 ) -> Option<SupervisedState> {
1098 if let Some(supervisor) = &self.container_supervisor {
1099 supervisor.get_state(container_id).await
1100 } else {
1101 None
1102 }
1103 }
1104
1105 pub async fn take_supervisor_events(
1109 &self,
1110 ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1111 if let Some(supervisor) = &self.container_supervisor {
1112 supervisor.take_event_receiver().await
1113 } else {
1114 None
1115 }
1116 }
1117
1118 pub async fn deploy_with_dependencies(
1135 &self,
1136 services: HashMap<String, ServiceSpec>,
1137 ) -> Result<()> {
1138 if services.is_empty() {
1139 return Ok(());
1140 }
1141
1142 let graph = DependencyGraph::build(&services)?;
1144
1145 tracing::info!(
1146 service_count = services.len(),
1147 "Starting deployment with dependency ordering"
1148 );
1149
1150 let order = graph.startup_order();
1152 tracing::debug!(order = ?order, "Computed startup order");
1153
1154 for service_name in order {
1156 let service_spec = services
1157 .get(service_name)
1158 .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1159
1160 if !service_spec.depends.is_empty() {
1162 tracing::info!(
1163 service = %service_name,
1164 dependency_count = service_spec.depends.len(),
1165 "Waiting for dependencies"
1166 );
1167 self.wait_for_dependencies(service_name, &service_spec.depends)
1168 .await?;
1169 }
1170
1171 tracing::info!(service = %service_name, "Starting service");
1173 self.upsert_service(service_name.clone(), service_spec.clone())
1174 .await?;
1175
1176 let replicas = match &service_spec.scale {
1178 zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1179 zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, zlayer_spec::ScaleSpec::Manual => 1, };
1182 self.scale_service(service_name, replicas).await?;
1183
1184 self.update_health_state(service_name, HealthState::Unknown)
1186 .await;
1187
1188 tracing::info!(
1189 service = %service_name,
1190 replicas = replicas,
1191 "Service started"
1192 );
1193 }
1194
1195 tracing::info!(service_count = services.len(), "Deployment complete");
1196
1197 Ok(())
1198 }
1199
1200 async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1209 let condition_checker = DependencyConditionChecker::new(
1210 Arc::clone(&self.runtime),
1211 Arc::clone(&self.health_states),
1212 None,
1213 );
1214
1215 let waiter = DependencyWaiter::new(condition_checker);
1216 let results = waiter.wait_for_all(deps).await?;
1217
1218 for result in results {
1220 match result {
1221 WaitResult::TimedOutFail {
1222 service: dep_service,
1223 condition,
1224 timeout,
1225 } => {
1226 return Err(AgentError::DependencyTimeout {
1227 service: service.to_string(),
1228 dependency: dep_service,
1229 condition: format!("{condition:?}"),
1230 timeout,
1231 });
1232 }
1233 WaitResult::TimedOutWarn {
1234 service: dep_service,
1235 condition,
1236 } => {
1237 tracing::warn!(
1238 service = %service,
1239 dependency = %dep_service,
1240 condition = ?condition,
1241 "Dependency timed out but continuing"
1242 );
1243 }
1244 WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1245 }
1247 }
1248 }
1249
1250 Ok(())
1251 }
1252
1253 pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1260 let condition_checker = DependencyConditionChecker::new(
1261 Arc::clone(&self.runtime),
1262 Arc::clone(&self.health_states),
1263 None,
1264 );
1265
1266 for dep in deps {
1267 if !condition_checker.check(dep).await? {
1268 return Ok(false);
1269 }
1270 }
1271
1272 Ok(true)
1273 }
1274
1275 pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1285 match spec.rtype {
1286 ResourceType::Service => {
1287 let mut services = self.services.write().await;
1289
1290 if let Some(instance) = services.get_mut(&name) {
1291 instance.spec = spec;
1293 if let Some(dns) = &self.dns_server {
1295 instance.set_dns_server(Arc::clone(dns));
1296 }
1297 } else {
1298 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1300 let mut instance = if let Some(proxy) = &self.proxy_manager {
1301 ServiceInstance::with_proxy(
1302 name.clone(),
1303 spec,
1304 self.runtime.clone(),
1305 overlay,
1306 Arc::clone(proxy),
1307 )
1308 } else {
1309 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1310 };
1311 if let Some(dns) = &self.dns_server {
1313 instance.set_dns_server(Arc::clone(dns));
1314 }
1315 instance.set_health_states(Arc::clone(&self.health_states));
1317 if let Some(proxy) = &self.proxy_manager {
1319 proxy.add_service(&name, &instance.spec).await;
1320 }
1321 if let Some(stream_registry) = &self.stream_registry {
1323 for endpoint in &instance.spec.endpoints {
1324 let svc = StreamService::new(
1325 name.clone(),
1326 Vec::new(), );
1328 match endpoint.protocol {
1329 Protocol::Tcp => {
1330 stream_registry.register_tcp(endpoint.port, svc);
1331 tracing::debug!(
1332 service = %name,
1333 port = endpoint.port,
1334 "Registered TCP stream route"
1335 );
1336 }
1337 Protocol::Udp => {
1338 stream_registry.register_udp(endpoint.port, svc);
1339 tracing::debug!(
1340 service = %name,
1341 port = endpoint.port,
1342 "Registered UDP stream route"
1343 );
1344 }
1345 _ => {} }
1347 }
1348 }
1349 services.insert(name, instance);
1350 }
1351 }
1352 ResourceType::Job => {
1353 if let Some(executor) = &self.job_executor {
1356 executor.register_job(&name, spec).await;
1357 tracing::info!(job = %name, "Registered job spec");
1358 } else {
1359 tracing::warn!(
1360 job = %name,
1361 "Job executor not configured, storing as service for reference"
1362 );
1363 let mut services = self.services.write().await;
1365 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1366 let mut instance = if let Some(proxy) = &self.proxy_manager {
1367 ServiceInstance::with_proxy(
1368 name.clone(),
1369 spec,
1370 self.runtime.clone(),
1371 overlay,
1372 Arc::clone(proxy),
1373 )
1374 } else {
1375 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1376 };
1377 if let Some(dns) = &self.dns_server {
1379 instance.set_dns_server(Arc::clone(dns));
1380 }
1381 services.insert(name, instance);
1382 }
1383 }
1384 ResourceType::Cron => {
1385 if let Some(scheduler) = &self.cron_scheduler {
1387 scheduler.register(&name, &spec).await?;
1388 tracing::info!(cron = %name, "Registered cron job with scheduler");
1389 } else {
1390 return Err(AgentError::Configuration(format!(
1391 "Cron scheduler not configured for cron job '{name}'"
1392 )));
1393 }
1394 }
1395 }
1396
1397 Ok(())
1398 }
1399
1400 async fn update_proxy_backends(&self, service_name: &str, addrs: Vec<SocketAddr>) {
1402 if let Some(proxy) = &self.proxy_manager {
1403 proxy.update_backends(service_name, addrs).await;
1404 }
1405 }
1406
1407 fn update_stream_backends(&self, spec: &ServiceSpec, addrs: &[SocketAddr]) {
1416 let Some(stream_registry) = &self.stream_registry else {
1417 return;
1418 };
1419
1420 let primary_spec_port = spec
1424 .endpoints
1425 .iter()
1426 .find(|ep| {
1427 matches!(
1428 ep.protocol,
1429 Protocol::Http | Protocol::Https | Protocol::Websocket
1430 )
1431 })
1432 .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1433
1434 let has_port_overrides = addrs.iter().any(|addr| addr.port() != primary_spec_port);
1435
1436 for endpoint in &spec.endpoints {
1437 match endpoint.protocol {
1438 Protocol::Tcp => {
1439 let tcp_backends: Vec<SocketAddr> = if has_port_overrides {
1440 addrs.to_vec()
1443 } else {
1444 addrs
1447 .iter()
1448 .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1449 .collect()
1450 };
1451
1452 stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
1453
1454 tracing::debug!(
1455 endpoint = %endpoint.name,
1456 port = endpoint.port,
1457 backend_count = addrs.len(),
1458 "Updated TCP stream backends"
1459 );
1460 }
1461 Protocol::Udp => {
1462 let udp_backends: Vec<SocketAddr> = if has_port_overrides {
1463 addrs.to_vec()
1464 } else {
1465 addrs
1466 .iter()
1467 .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1468 .collect()
1469 };
1470
1471 stream_registry.update_udp_backends(endpoint.port, udp_backends);
1472
1473 tracing::debug!(
1474 endpoint = %endpoint.name,
1475 port = endpoint.port,
1476 backend_count = addrs.len(),
1477 "Updated UDP stream backends"
1478 );
1479 }
1480 _ => {} }
1482 }
1483 }
1484
1485 #[allow(clippy::cast_possible_truncation)]
1490 pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
1491 let _permit = self.scale_semaphore.acquire().await;
1492
1493 let services = self.services.read().await;
1494 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1495 container: name.to_string(),
1496 reason: "service not found".to_string(),
1497 })?;
1498
1499 let current_replicas = instance.replica_count().await as u32;
1501
1502 instance.scale_to(replicas).await?;
1504
1505 let addrs = self.collect_backend_addrs(instance, replicas).await;
1511
1512 if self.proxy_manager.is_some() && !addrs.is_empty() {
1514 self.update_proxy_backends(name, addrs.clone()).await;
1515 }
1516
1517 if self.stream_registry.is_some() {
1519 self.update_stream_backends(&instance.spec, &addrs);
1520 }
1521
1522 if let Some(supervisor) = &self.container_supervisor {
1524 if replicas > current_replicas {
1526 for i in current_replicas..replicas {
1527 let container_id = ContainerId {
1528 service: name.to_string(),
1529 replica: i + 1,
1530 };
1531 supervisor.supervise(&container_id, &instance.spec).await;
1532 }
1533 }
1534 if replicas < current_replicas {
1536 for i in replicas..current_replicas {
1537 let container_id = ContainerId {
1538 service: name.to_string(),
1539 replica: i + 1,
1540 };
1541 supervisor.unsupervise(&container_id).await;
1542 }
1543 }
1544 }
1545
1546 Ok(())
1547 }
1548
1549 async fn collect_backend_addrs(
1560 &self,
1561 instance: &ServiceInstance,
1562 _replicas: u32, ) -> Vec<SocketAddr> {
1564 let mut addrs = Vec::new();
1565
1566 let spec_port = instance
1568 .spec
1569 .endpoints
1570 .iter()
1571 .find(|ep| {
1572 matches!(
1573 ep.protocol,
1574 Protocol::Http | Protocol::Https | Protocol::Websocket
1575 )
1576 })
1577 .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1578
1579 let containers = instance.containers().read().await;
1581
1582 for container in containers.values() {
1583 if let Some(ip) = container.overlay_ip {
1584 let port = container.port_override.unwrap_or(spec_port);
1587 addrs.push(SocketAddr::new(ip, port));
1588 }
1589 }
1590
1591 if addrs.is_empty() && !containers.is_empty() {
1594 tracing::warn!(
1595 service = %instance.service_name,
1596 container_count = containers.len(),
1597 "no overlay IPs available for backends - containers may not be reachable via proxy"
1598 );
1599 }
1600
1601 addrs
1602 }
1603
1604 pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
1609 let services = self.services.read().await;
1610 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1611 container: name.to_string(),
1612 reason: "service not found".to_string(),
1613 })?;
1614
1615 Ok(instance.replica_count().await)
1616 }
1617
1618 pub async fn remove_service(&self, name: &str) -> Result<()> {
1628 if let Some(scheduler) = &self.cron_scheduler {
1630 scheduler.unregister(name).await;
1631 }
1632
1633 if let Some(executor) = &self.job_executor {
1635 executor.unregister_job(name).await;
1636 }
1637
1638 if let Some(stream_registry) = &self.stream_registry {
1640 let services = self.services.read().await;
1642 if let Some(instance) = services.get(name) {
1643 for endpoint in &instance.spec.endpoints {
1644 match endpoint.protocol {
1645 Protocol::Tcp => {
1646 let _ = stream_registry.unregister_tcp(endpoint.port);
1647 tracing::debug!(
1648 service = %name,
1649 port = endpoint.port,
1650 "Unregistered TCP stream route"
1651 );
1652 }
1653 Protocol::Udp => {
1654 let _ = stream_registry.unregister_udp(endpoint.port);
1655 tracing::debug!(
1656 service = %name,
1657 port = endpoint.port,
1658 "Unregistered UDP stream route"
1659 );
1660 }
1661 _ => {} }
1663 }
1664 }
1665 drop(services); }
1667
1668 if let Some(supervisor) = &self.container_supervisor {
1670 let containers = self.get_service_containers(name).await;
1671 for container_id in containers {
1672 supervisor.unsupervise(&container_id).await;
1673 }
1674 tracing::debug!(service = %name, "Unregistered containers from supervisor");
1675 }
1676
1677 if let Some(dns) = &self.dns_server {
1679 let service_hostname = format!("{name}.service.local");
1681 if let Err(e) = dns.remove_record(&service_hostname).await {
1682 tracing::warn!(
1683 hostname = %service_hostname,
1684 error = %e,
1685 "failed to remove service DNS record"
1686 );
1687 } else {
1688 tracing::debug!(
1689 hostname = %service_hostname,
1690 "removed service DNS record"
1691 );
1692 }
1693
1694 let services = self.services.read().await;
1696 if let Some(instance) = services.get(name) {
1697 let containers = instance.containers().read().await;
1698 for (id, _) in containers.iter() {
1699 let replica_hostname = format!("{}.{}.service.local", id.replica, name);
1700 if let Err(e) = dns.remove_record(&replica_hostname).await {
1701 tracing::warn!(
1702 hostname = %replica_hostname,
1703 error = %e,
1704 "failed to remove replica DNS record during service removal"
1705 );
1706 }
1707 }
1708 }
1709 drop(services); }
1711
1712 let mut services = self.services.write().await;
1714 if services.remove(name).is_some() {
1715 tracing::debug!(service = %name, "Removed service from manager");
1716 }
1717
1718 Ok(())
1719 }
1720
1721 pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
1724 let services = self.services.read().await;
1725 services.get(name).map(|i| {
1726 (
1727 i.has_overlay_manager(),
1728 i.has_proxy_manager(),
1729 i.has_dns_server(),
1730 )
1731 })
1732 }
1733
1734 pub async fn list_services(&self) -> Vec<String> {
1736 self.services.read().await.keys().cloned().collect()
1737 }
1738
1739 pub async fn get_service_logs(
1753 &self,
1754 service_name: &str,
1755 tail: usize,
1756 instance: Option<&str>,
1757 ) -> Result<Vec<LogEntry>> {
1758 let container_ids = self.get_service_containers(service_name).await;
1759
1760 if container_ids.is_empty() {
1761 return Err(AgentError::NotFound {
1762 container: service_name.to_string(),
1763 reason: "no containers found for service".to_string(),
1764 });
1765 }
1766
1767 let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
1769 if let Ok(replica_num) = inst.parse::<u32>() {
1770 container_ids
1771 .iter()
1772 .filter(|id| id.replica == replica_num)
1773 .collect()
1774 } else {
1775 container_ids
1777 .iter()
1778 .filter(|id| id.to_string().contains(inst))
1779 .collect()
1780 }
1781 } else {
1782 container_ids.iter().collect()
1783 };
1784
1785 if target_ids.is_empty() {
1786 return Err(AgentError::NotFound {
1787 container: format!("{}/{}", service_name, instance.unwrap_or("?")),
1788 reason: "instance not found".to_string(),
1789 });
1790 }
1791
1792 let mut all_entries: Vec<LogEntry> = Vec::new();
1793
1794 for id in &target_ids {
1795 match self.runtime.container_logs(id, tail).await {
1796 Ok(mut entries) => {
1797 for entry in &mut entries {
1799 if entry.service.is_none() {
1800 entry.service = Some(service_name.to_string());
1801 }
1802 if entry.deployment.is_none() {
1803 entry.deployment.clone_from(&self.deployment_name);
1804 }
1805 }
1806 all_entries.extend(entries);
1807 }
1808 Err(e) => {
1809 tracing::warn!(
1810 service = service_name,
1811 container = %id,
1812 error = %e,
1813 "Failed to read container logs"
1814 );
1815 }
1816 }
1817 }
1818
1819 Ok(all_entries)
1820 }
1821
1822 pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
1832 let services = self.services.read().await;
1833 if let Some(instance) = services.get(service_name) {
1834 instance.container_ids().await
1835 } else {
1836 Vec::new()
1837 }
1838 }
1839
1840 pub async fn exec_in_container(
1859 &self,
1860 service_name: &str,
1861 replica: Option<u32>,
1862 cmd: &[String],
1863 ) -> Result<(i32, String, String)> {
1864 let container_ids = self.get_service_containers(service_name).await;
1865
1866 if container_ids.is_empty() {
1867 return Err(AgentError::NotFound {
1868 container: service_name.to_string(),
1869 reason: "no containers found for service".to_string(),
1870 });
1871 }
1872
1873 let target = if let Some(rep) = replica {
1875 container_ids
1876 .into_iter()
1877 .find(|cid| cid.replica == rep)
1878 .ok_or_else(|| AgentError::NotFound {
1879 container: format!("{service_name}-rep-{rep}"),
1880 reason: format!("replica {rep} not found for service"),
1881 })?
1882 } else {
1883 container_ids.into_iter().next().unwrap()
1885 };
1886
1887 self.runtime.exec(&target, cmd).await
1888 }
1889
1890 pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
1905 let executor = self
1906 .job_executor
1907 .as_ref()
1908 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
1909
1910 let spec = executor
1911 .get_job_spec(name)
1912 .await
1913 .ok_or_else(|| AgentError::NotFound {
1914 container: name.to_string(),
1915 reason: "job not registered".to_string(),
1916 })?;
1917
1918 executor.trigger(name, &spec, trigger).await
1919 }
1920
1921 pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
1929 if let Some(executor) = &self.job_executor {
1930 executor.get_execution(id).await
1931 } else {
1932 None
1933 }
1934 }
1935
1936 pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
1944 if let Some(executor) = &self.job_executor {
1945 executor.list_executions(name).await
1946 } else {
1947 Vec::new()
1948 }
1949 }
1950
1951 pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
1959 let executor = self
1960 .job_executor
1961 .as_ref()
1962 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
1963
1964 executor.cancel(id).await
1965 }
1966
1967 pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
1980 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
1981 AgentError::Configuration("Cron scheduler not configured".to_string())
1982 })?;
1983
1984 scheduler.trigger_now(name).await
1985 }
1986
1987 pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
1993 if let Some(scheduler) = &self.cron_scheduler {
1994 scheduler.set_enabled(name, enabled).await;
1995 }
1996 }
1997
1998 pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2000 if let Some(scheduler) = &self.cron_scheduler {
2001 scheduler.list_jobs().await
2002 } else {
2003 Vec::new()
2004 }
2005 }
2006
2007 pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2015 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2016 AgentError::Configuration("Cron scheduler not configured".to_string())
2017 })?;
2018
2019 let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2020 Ok(tokio::spawn(async move {
2021 scheduler.run_loop().await;
2022 }))
2023 }
2024
2025 pub fn shutdown_cron(&self) {
2027 if let Some(scheduler) = &self.cron_scheduler {
2028 scheduler.shutdown();
2029 }
2030 }
2031}
2032
2033#[cfg(test)]
2034#[allow(deprecated)]
2035mod tests {
2036 use super::*;
2037 use crate::runtime::MockRuntime;
2038
2039 #[tokio::test]
2040 async fn test_service_manager() {
2041 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2042 let manager = ServiceManager::new(runtime);
2043
2044 let spec = mock_spec();
2046 manager
2047 .upsert_service("test".to_string(), spec)
2048 .await
2049 .unwrap();
2050
2051 manager.scale_service("test", 3).await.unwrap();
2053
2054 let count = manager.service_replica_count("test").await.unwrap();
2056 assert_eq!(count, 3);
2057
2058 let services = manager.list_services().await;
2060 assert_eq!(services, vec!["test".to_string()]);
2061 }
2062
2063 #[tokio::test]
2064 async fn test_service_manager_basic_lifecycle() {
2065 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2066 let manager = ServiceManager::new(runtime);
2067
2068 let spec = mock_spec();
2070 manager
2071 .upsert_service("api".to_string(), spec)
2072 .await
2073 .unwrap();
2074
2075 manager.scale_service("api", 2).await.unwrap();
2077
2078 let count = manager.service_replica_count("api").await.unwrap();
2080 assert_eq!(count, 2);
2081
2082 manager.remove_service("api").await.unwrap();
2084
2085 let services = manager.list_services().await;
2087 assert!(!services.contains(&"api".to_string()));
2088 }
2089
2090 #[tokio::test]
2091 async fn test_service_manager_with_full_config() {
2092 use tokio::sync::RwLock;
2093
2094 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2095
2096 let overlay_manager = Arc::new(RwLock::new(
2098 OverlayManager::new("test-deployment".to_string())
2099 .await
2100 .unwrap(),
2101 ));
2102
2103 let manager =
2104 ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2105
2106 let spec = mock_spec();
2108 manager
2109 .upsert_service("web".to_string(), spec)
2110 .await
2111 .unwrap();
2112
2113 let services = manager.list_services().await;
2115 assert!(services.contains(&"web".to_string()));
2116 }
2117
2118 fn mock_spec() -> ServiceSpec {
2119 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2120 r"
2121version: v1
2122deployment: test
2123services:
2124 test:
2125 rtype: service
2126 image:
2127 name: test:latest
2128 endpoints:
2129 - name: http
2130 protocol: http
2131 port: 8080
2132 scale:
2133 mode: fixed
2134 replicas: 1
2135",
2136 )
2137 .unwrap()
2138 .services
2139 .remove("test")
2140 .unwrap()
2141 }
2142
2143 fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
2145 let mut spec = mock_spec();
2146 spec.depends = deps;
2147 spec
2148 }
2149
2150 fn dep(
2152 service: &str,
2153 condition: zlayer_spec::DependencyCondition,
2154 timeout_ms: u64,
2155 on_timeout: zlayer_spec::TimeoutAction,
2156 ) -> DependsSpec {
2157 DependsSpec {
2158 service: service.to_string(),
2159 condition,
2160 timeout: Some(Duration::from_millis(timeout_ms)),
2161 on_timeout,
2162 }
2163 }
2164
2165 #[tokio::test]
2166 async fn test_deploy_with_dependencies_no_deps() {
2167 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2168 let manager = ServiceManager::new(runtime);
2169
2170 let mut services = HashMap::new();
2172 services.insert("a".to_string(), mock_spec());
2173 services.insert("b".to_string(), mock_spec());
2174
2175 manager.deploy_with_dependencies(services).await.unwrap();
2177
2178 let service_list = manager.list_services().await;
2180 assert_eq!(service_list.len(), 2);
2181 }
2182
2183 #[tokio::test]
2184 async fn test_deploy_with_dependencies_linear() {
2185 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2186 let manager = ServiceManager::new(runtime);
2187
2188 let mut services = HashMap::new();
2191 services.insert("c".to_string(), mock_spec());
2192 services.insert(
2193 "b".to_string(),
2194 mock_spec_with_deps(vec![dep(
2195 "c",
2196 zlayer_spec::DependencyCondition::Started,
2197 5000,
2198 zlayer_spec::TimeoutAction::Fail,
2199 )]),
2200 );
2201 services.insert(
2202 "a".to_string(),
2203 mock_spec_with_deps(vec![dep(
2204 "b",
2205 zlayer_spec::DependencyCondition::Started,
2206 5000,
2207 zlayer_spec::TimeoutAction::Fail,
2208 )]),
2209 );
2210
2211 manager.deploy_with_dependencies(services).await.unwrap();
2213
2214 let service_list = manager.list_services().await;
2216 assert_eq!(service_list.len(), 3);
2217 }
2218
2219 #[tokio::test]
2220 async fn test_deploy_with_dependencies_cycle_detection() {
2221 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2222 let manager = ServiceManager::new(runtime);
2223
2224 let mut services = HashMap::new();
2226 services.insert(
2227 "a".to_string(),
2228 mock_spec_with_deps(vec![dep(
2229 "b",
2230 zlayer_spec::DependencyCondition::Started,
2231 5000,
2232 zlayer_spec::TimeoutAction::Fail,
2233 )]),
2234 );
2235 services.insert(
2236 "b".to_string(),
2237 mock_spec_with_deps(vec![dep(
2238 "a",
2239 zlayer_spec::DependencyCondition::Started,
2240 5000,
2241 zlayer_spec::TimeoutAction::Fail,
2242 )]),
2243 );
2244
2245 let result = manager.deploy_with_dependencies(services).await;
2247 assert!(result.is_err());
2248 let err = result.unwrap_err().to_string();
2249 assert!(err.contains("Cyclic dependency"));
2250 }
2251
2252 #[tokio::test]
2253 async fn test_deploy_with_dependencies_timeout_continue() {
2254 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2255 let manager = ServiceManager::new(runtime);
2256
2257 let mut services = HashMap::new();
2260 services.insert("b".to_string(), mock_spec());
2261 services.insert(
2262 "a".to_string(),
2263 mock_spec_with_deps(vec![dep(
2264 "b",
2265 zlayer_spec::DependencyCondition::Healthy, 100, zlayer_spec::TimeoutAction::Continue, )]),
2269 );
2270
2271 manager.deploy_with_dependencies(services).await.unwrap();
2273
2274 let service_list = manager.list_services().await;
2275 assert_eq!(service_list.len(), 2);
2276 }
2277
2278 #[tokio::test]
2279 async fn test_deploy_with_dependencies_timeout_warn() {
2280 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2281 let manager = ServiceManager::new(runtime);
2282
2283 let mut services = HashMap::new();
2286 services.insert("b".to_string(), mock_spec());
2287 services.insert(
2288 "a".to_string(),
2289 mock_spec_with_deps(vec![dep(
2290 "b",
2291 zlayer_spec::DependencyCondition::Healthy,
2292 100,
2293 zlayer_spec::TimeoutAction::Warn,
2294 )]),
2295 );
2296
2297 manager.deploy_with_dependencies(services).await.unwrap();
2299
2300 let service_list = manager.list_services().await;
2301 assert_eq!(service_list.len(), 2);
2302 }
2303
2304 #[tokio::test]
2305 async fn test_deploy_with_dependencies_timeout_fail() {
2306 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2307 let manager = ServiceManager::new(runtime);
2308
2309 let mut services = HashMap::new();
2312 services.insert("b".to_string(), mock_spec());
2313 services.insert(
2314 "a".to_string(),
2315 mock_spec_with_deps(vec![dep(
2316 "b",
2317 zlayer_spec::DependencyCondition::Healthy,
2318 100,
2319 zlayer_spec::TimeoutAction::Fail,
2320 )]),
2321 );
2322
2323 let result = manager.deploy_with_dependencies(services).await;
2325 assert!(result.is_err());
2326
2327 let err = result.unwrap_err().to_string();
2329 assert!(err.contains("Dependency timeout"));
2330 }
2331
2332 #[tokio::test]
2333 async fn test_check_dependencies_all_satisfied() {
2334 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2335 let manager = ServiceManager::new(runtime);
2336
2337 manager
2339 .update_health_state("db", HealthState::Healthy)
2340 .await;
2341
2342 let deps = vec![DependsSpec {
2343 service: "db".to_string(),
2344 condition: zlayer_spec::DependencyCondition::Healthy,
2345 timeout: Some(Duration::from_secs(60)),
2346 on_timeout: zlayer_spec::TimeoutAction::Fail,
2347 }];
2348
2349 let satisfied = manager.check_dependencies(&deps).await.unwrap();
2350 assert!(satisfied);
2351 }
2352
2353 #[tokio::test]
2354 async fn test_check_dependencies_not_satisfied() {
2355 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2356 let manager = ServiceManager::new(runtime);
2357
2358 let deps = vec![DependsSpec {
2360 service: "db".to_string(),
2361 condition: zlayer_spec::DependencyCondition::Healthy,
2362 timeout: Some(Duration::from_secs(60)),
2363 on_timeout: zlayer_spec::TimeoutAction::Fail,
2364 }];
2365
2366 let satisfied = manager.check_dependencies(&deps).await.unwrap();
2367 assert!(!satisfied);
2368 }
2369
2370 #[tokio::test]
2371 async fn test_health_state_tracking() {
2372 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2373 let manager = ServiceManager::new(runtime);
2374
2375 manager
2377 .update_health_state("db", HealthState::Healthy)
2378 .await;
2379 manager
2380 .update_health_state("cache", HealthState::Unknown)
2381 .await;
2382
2383 let states = manager.health_states();
2385 let states_read = states.read().await;
2386
2387 assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
2388 assert!(matches!(
2389 states_read.get("cache"),
2390 Some(HealthState::Unknown)
2391 ));
2392 }
2393
2394 fn mock_job_spec() -> ServiceSpec {
2397 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2398 r"
2399version: v1
2400deployment: test
2401services:
2402 backup:
2403 rtype: job
2404 image:
2405 name: backup:latest
2406",
2407 )
2408 .unwrap()
2409 .services
2410 .remove("backup")
2411 .unwrap()
2412 }
2413
2414 fn mock_cron_spec() -> ServiceSpec {
2415 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2416 r#"
2417version: v1
2418deployment: test
2419services:
2420 cleanup:
2421 rtype: cron
2422 schedule: "0 0 * * * * *"
2423 image:
2424 name: cleanup:latest
2425"#,
2426 )
2427 .unwrap()
2428 .services
2429 .remove("cleanup")
2430 .unwrap()
2431 }
2432
2433 #[tokio::test]
2434 async fn test_service_manager_with_job_executor() {
2435 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2436 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2437
2438 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2439
2440 let job_spec = mock_job_spec();
2442 manager
2443 .upsert_service("backup".to_string(), job_spec)
2444 .await
2445 .unwrap();
2446
2447 let exec_id = manager
2449 .trigger_job("backup", JobTrigger::Cli)
2450 .await
2451 .unwrap();
2452
2453 tokio::time::sleep(Duration::from_millis(50)).await;
2455
2456 let execution = manager.get_job_execution(&exec_id).await;
2458 assert!(execution.is_some());
2459 assert_eq!(execution.unwrap().job_name, "backup");
2460 }
2461
2462 #[tokio::test]
2463 async fn test_service_manager_with_cron_scheduler() {
2464 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2465 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2466 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2467
2468 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2469
2470 let cron_spec = mock_cron_spec();
2472 manager
2473 .upsert_service("cleanup".to_string(), cron_spec)
2474 .await
2475 .unwrap();
2476
2477 let cron_jobs = manager.list_cron_jobs().await;
2479 assert_eq!(cron_jobs.len(), 1);
2480 assert_eq!(cron_jobs[0].name, "cleanup");
2481 assert!(cron_jobs[0].enabled);
2482 }
2483
2484 #[tokio::test]
2485 async fn test_service_manager_trigger_cron() {
2486 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2487 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2488 let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
2489
2490 let manager = ServiceManager::new(runtime)
2491 .with_job_executor(job_executor)
2492 .with_cron_scheduler(cron_scheduler);
2493
2494 let cron_spec = mock_cron_spec();
2496 manager
2497 .upsert_service("cleanup".to_string(), cron_spec)
2498 .await
2499 .unwrap();
2500
2501 let exec_id = manager.trigger_cron("cleanup").await.unwrap();
2503 assert!(!exec_id.0.is_empty());
2504 }
2505
2506 #[tokio::test]
2507 async fn test_service_manager_enable_disable_cron() {
2508 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2509 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2510 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2511
2512 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2513
2514 let cron_spec = mock_cron_spec();
2516 manager
2517 .upsert_service("cleanup".to_string(), cron_spec)
2518 .await
2519 .unwrap();
2520
2521 let cron_jobs = manager.list_cron_jobs().await;
2523 assert!(cron_jobs[0].enabled);
2524
2525 manager.set_cron_enabled("cleanup", false).await;
2527 let cron_jobs = manager.list_cron_jobs().await;
2528 assert!(!cron_jobs[0].enabled);
2529
2530 manager.set_cron_enabled("cleanup", true).await;
2532 let cron_jobs = manager.list_cron_jobs().await;
2533 assert!(cron_jobs[0].enabled);
2534 }
2535
2536 #[tokio::test]
2537 async fn test_service_manager_remove_cleans_up_job() {
2538 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2539 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2540
2541 let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
2542
2543 let job_spec = mock_job_spec();
2545 manager
2546 .upsert_service("backup".to_string(), job_spec)
2547 .await
2548 .unwrap();
2549
2550 let spec = job_executor.get_job_spec("backup").await;
2552 assert!(spec.is_some());
2553
2554 manager.remove_service("backup").await.unwrap();
2556
2557 let spec = job_executor.get_job_spec("backup").await;
2559 assert!(spec.is_none());
2560 }
2561
2562 #[tokio::test]
2563 async fn test_service_manager_remove_cleans_up_cron() {
2564 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2565 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2566 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2567
2568 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
2569
2570 let cron_spec = mock_cron_spec();
2572 manager
2573 .upsert_service("cleanup".to_string(), cron_spec)
2574 .await
2575 .unwrap();
2576
2577 assert_eq!(cron_scheduler.job_count().await, 1);
2579
2580 manager.remove_service("cleanup").await.unwrap();
2582
2583 assert_eq!(cron_scheduler.job_count().await, 0);
2585 }
2586
2587 #[tokio::test]
2588 async fn test_service_manager_job_without_executor() {
2589 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2590 let manager = ServiceManager::new(runtime);
2591
2592 let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
2594 assert!(result.is_err());
2595 assert!(result.unwrap_err().to_string().contains("not configured"));
2596 }
2597
2598 #[tokio::test]
2599 async fn test_service_manager_cron_without_scheduler() {
2600 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2601 let manager = ServiceManager::new(runtime);
2602
2603 let cron_spec = mock_cron_spec();
2605 let result = manager
2606 .upsert_service("cleanup".to_string(), cron_spec)
2607 .await;
2608 assert!(result.is_err());
2609 assert!(result.unwrap_err().to_string().contains("not configured"));
2610 }
2611
2612 #[tokio::test]
2613 async fn test_service_manager_list_job_executions() {
2614 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2615 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2616
2617 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2618
2619 let job_spec = mock_job_spec();
2621 manager
2622 .upsert_service("backup".to_string(), job_spec)
2623 .await
2624 .unwrap();
2625
2626 manager
2628 .trigger_job("backup", JobTrigger::Cli)
2629 .await
2630 .unwrap();
2631 manager
2632 .trigger_job("backup", JobTrigger::Scheduler)
2633 .await
2634 .unwrap();
2635
2636 tokio::time::sleep(Duration::from_millis(50)).await;
2638
2639 let executions = manager.list_job_executions("backup").await;
2641 assert_eq!(executions.len(), 2);
2642 }
2643
2644 #[tokio::test]
2647 async fn test_service_manager_with_supervisor() {
2648 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2649 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2650
2651 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2652
2653 let spec = mock_spec();
2655 manager
2656 .upsert_service("api".to_string(), spec)
2657 .await
2658 .unwrap();
2659
2660 manager.scale_service("api", 2).await.unwrap();
2662
2663 assert_eq!(supervisor.supervised_count().await, 2);
2665
2666 manager.scale_service("api", 1).await.unwrap();
2668 assert_eq!(supervisor.supervised_count().await, 1);
2669
2670 manager.remove_service("api").await.unwrap();
2672 assert_eq!(supervisor.supervised_count().await, 0);
2673 }
2674
2675 #[tokio::test]
2676 async fn test_service_manager_supervisor_state() {
2677 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2678 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2679
2680 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
2681
2682 let spec = mock_spec();
2684 manager
2685 .upsert_service("web".to_string(), spec)
2686 .await
2687 .unwrap();
2688 manager.scale_service("web", 1).await.unwrap();
2689
2690 let container_id = ContainerId {
2692 service: "web".to_string(),
2693 replica: 1,
2694 };
2695 let state = manager.get_container_supervised_state(&container_id).await;
2696 assert_eq!(state, Some(SupervisedState::Running));
2697 }
2698
2699 #[tokio::test]
2700 async fn test_service_manager_start_supervisor() {
2701 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2702 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2703
2704 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2705
2706 let handle = manager.start_container_supervisor().unwrap();
2708
2709 tokio::time::sleep(Duration::from_millis(50)).await;
2711 assert!(supervisor.is_running());
2712
2713 manager.shutdown_container_supervisor();
2715
2716 tokio::time::timeout(Duration::from_secs(1), handle)
2718 .await
2719 .unwrap()
2720 .unwrap();
2721
2722 assert!(!supervisor.is_running());
2723 }
2724
2725 #[tokio::test]
2726 async fn test_service_manager_supervisor_not_configured() {
2727 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2728 let manager = ServiceManager::new(runtime);
2729
2730 let result = manager.start_container_supervisor();
2732 assert!(result.is_err());
2733 assert!(result.unwrap_err().to_string().contains("not configured"));
2734 }
2735
2736 fn mock_tcp_spec() -> ServiceSpec {
2739 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2740 r"
2741version: v1
2742deployment: test
2743services:
2744 database:
2745 rtype: service
2746 image:
2747 name: postgres:latest
2748 endpoints:
2749 - name: postgresql
2750 protocol: tcp
2751 port: 5432
2752 scale:
2753 mode: fixed
2754 replicas: 1
2755",
2756 )
2757 .unwrap()
2758 .services
2759 .remove("database")
2760 .unwrap()
2761 }
2762
2763 fn mock_udp_spec() -> ServiceSpec {
2764 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2765 r"
2766version: v1
2767deployment: test
2768services:
2769 dns:
2770 rtype: service
2771 image:
2772 name: dns:latest
2773 endpoints:
2774 - name: dns
2775 protocol: udp
2776 port: 53
2777 scale:
2778 mode: fixed
2779 replicas: 1
2780",
2781 )
2782 .unwrap()
2783 .services
2784 .remove("dns")
2785 .unwrap()
2786 }
2787
2788 fn mock_mixed_spec() -> ServiceSpec {
2789 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2790 r"
2791version: v1
2792deployment: test
2793services:
2794 mixed:
2795 rtype: service
2796 image:
2797 name: mixed:latest
2798 endpoints:
2799 - name: http
2800 protocol: http
2801 port: 8080
2802 - name: grpc
2803 protocol: tcp
2804 port: 9000
2805 - name: metrics
2806 protocol: udp
2807 port: 8125
2808 scale:
2809 mode: fixed
2810 replicas: 1
2811",
2812 )
2813 .unwrap()
2814 .services
2815 .remove("mixed")
2816 .unwrap()
2817 }
2818
2819 #[tokio::test]
2820 async fn test_service_manager_with_stream_registry_tcp() {
2821 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2822 let stream_registry = Arc::new(StreamRegistry::new());
2823
2824 let mut manager = ServiceManager::new(runtime);
2825 manager.set_stream_registry(stream_registry.clone());
2826 manager.set_deployment_name("test".to_string());
2827
2828 let spec = mock_tcp_spec();
2830 manager
2831 .upsert_service("database".to_string(), spec)
2832 .await
2833 .unwrap();
2834
2835 assert_eq!(stream_registry.tcp_count(), 1);
2837 assert!(stream_registry.tcp_ports().contains(&5432));
2838
2839 manager.remove_service("database").await.unwrap();
2841 assert_eq!(stream_registry.tcp_count(), 0);
2842 }
2843
2844 #[tokio::test]
2845 async fn test_service_manager_with_stream_registry_udp() {
2846 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2847 let stream_registry = Arc::new(StreamRegistry::new());
2848
2849 let mut manager = ServiceManager::new(runtime);
2850 manager.set_stream_registry(stream_registry.clone());
2851 manager.set_deployment_name("test".to_string());
2852
2853 let spec = mock_udp_spec();
2855 manager
2856 .upsert_service("dns".to_string(), spec)
2857 .await
2858 .unwrap();
2859
2860 assert_eq!(stream_registry.udp_count(), 1);
2862 assert!(stream_registry.udp_ports().contains(&53));
2863
2864 manager.remove_service("dns").await.unwrap();
2866 assert_eq!(stream_registry.udp_count(), 0);
2867 }
2868
2869 #[tokio::test]
2870 async fn test_service_manager_with_stream_registry_mixed() {
2871 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2872 let stream_registry = Arc::new(StreamRegistry::new());
2873
2874 let mut manager = ServiceManager::new(runtime);
2875 manager.set_stream_registry(stream_registry.clone());
2876 manager.set_deployment_name("test".to_string());
2877
2878 let spec = mock_mixed_spec();
2880 manager
2881 .upsert_service("mixed".to_string(), spec)
2882 .await
2883 .unwrap();
2884
2885 assert_eq!(stream_registry.tcp_count(), 1); assert_eq!(stream_registry.udp_count(), 1); assert!(stream_registry.tcp_ports().contains(&9000));
2890 assert!(stream_registry.udp_ports().contains(&8125));
2891
2892 manager.remove_service("mixed").await.unwrap();
2894 assert_eq!(stream_registry.tcp_count(), 0);
2895 assert_eq!(stream_registry.udp_count(), 0);
2896 }
2897
2898 #[tokio::test]
2899 async fn test_service_manager_stream_registry_builder() {
2900 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2901 let stream_registry = Arc::new(StreamRegistry::new());
2902
2903 let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
2905
2906 assert!(manager.stream_registry().is_some());
2908 }
2909
2910 #[tokio::test]
2911 async fn test_tcp_service_without_stream_registry() {
2912 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2913
2914 let mut manager = ServiceManager::new(runtime);
2916 manager.set_deployment_name("test".to_string());
2917
2918 let spec = mock_tcp_spec();
2920 manager
2921 .upsert_service("database".to_string(), spec)
2922 .await
2923 .unwrap();
2924
2925 let services = manager.list_services().await;
2927 assert!(services.contains(&"database".to_string()));
2928 }
2929}