1use crate::error::{AgentError, Result};
8use crate::init::InitOrchestrator;
9use crate::overlay_manager::OverlayManager;
10use crate::runtime::{ContainerId, Runtime};
11#[cfg(not(target_os = "windows"))]
14use crate::runtime::OverlayAttachKind;
15use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::RwLock;
20use tracing::{debug, error, info, warn};
21use uuid::Uuid;
22use zlayer_spec::ServiceSpec;
23
24#[cfg_attr(target_os = "windows", allow(dead_code))]
27#[derive(Default)]
28enum OverlayAttachment {
29 #[default]
32 None,
33 Pid(u32),
35 Guest(String),
37 HostShared(String),
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct JobExecutionId(pub String);
45
46impl JobExecutionId {
47 #[must_use]
49 pub fn new() -> Self {
50 Self(Uuid::new_v4().to_string())
51 }
52}
53
54impl Default for JobExecutionId {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl std::fmt::Display for JobExecutionId {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 write!(f, "{}", self.0)
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum JobStatus {
69 Pending,
71 Initializing,
73 Running,
75 Completed { exit_code: i32, duration: Duration },
77 Failed {
79 reason: String,
80 exit_code: Option<i32>,
81 },
82 Cancelled,
84}
85
86impl std::fmt::Display for JobStatus {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 JobStatus::Pending => write!(f, "pending"),
90 JobStatus::Initializing => write!(f, "initializing"),
91 JobStatus::Running => write!(f, "running"),
92 JobStatus::Completed { exit_code, .. } => write!(f, "completed({exit_code})"),
93 JobStatus::Failed { exit_code, .. } => {
94 if let Some(code) = exit_code {
95 write!(f, "failed({code})")
96 } else {
97 write!(f, "failed")
98 }
99 }
100 JobStatus::Cancelled => write!(f, "cancelled"),
101 }
102 }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum JobTrigger {
108 Endpoint { remote_addr: Option<String> },
110 Cli,
112 Scheduler,
114 Internal { reason: String },
116}
117
118impl std::fmt::Display for JobTrigger {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 match self {
121 JobTrigger::Endpoint { remote_addr } => {
122 if let Some(addr) = remote_addr {
123 write!(f, "endpoint({addr})")
124 } else {
125 write!(f, "endpoint")
126 }
127 }
128 JobTrigger::Cli => write!(f, "cli"),
129 JobTrigger::Scheduler => write!(f, "scheduler"),
130 JobTrigger::Internal { reason } => write!(f, "internal({reason})"),
131 }
132 }
133}
134
135#[derive(Debug, Clone)]
137pub struct JobExecution {
138 pub id: JobExecutionId,
139 pub job_name: String,
140 pub status: JobStatus,
141 pub started_at: Option<Instant>,
142 pub completed_at: Option<Instant>,
143 pub container_id: Option<ContainerId>,
144 pub logs: Option<String>,
146 pub trigger: JobTrigger,
148}
149
150#[derive(Debug, Clone)]
152pub struct JobExecutorConfig {
153 pub max_concurrent: usize,
155 pub retention: Duration,
157 pub max_log_size: usize,
159}
160
161impl Default for JobExecutorConfig {
162 fn default() -> Self {
163 Self {
164 max_concurrent: 10,
165 retention: Duration::from_secs(3600), max_log_size: 1024 * 1024, }
168 }
169}
170
171pub struct JobExecutor {
173 runtime: Arc<dyn Runtime + Send + Sync>,
174 executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
176 job_specs: Arc<RwLock<HashMap<String, ServiceSpec>>>,
178 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
182 token_sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>,
185 config: JobExecutorConfig,
187 shutdown: AtomicBool,
189}
190
191impl JobExecutor {
192 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
194 Self::with_config(runtime, JobExecutorConfig::default())
195 }
196
197 pub fn with_config(runtime: Arc<dyn Runtime + Send + Sync>, config: JobExecutorConfig) -> Self {
199 Self {
200 runtime,
201 executions: Arc::new(RwLock::new(HashMap::new())),
202 job_specs: Arc::new(RwLock::new(HashMap::new())),
203 overlay_manager: None,
204 token_sink: None,
205 config,
206 shutdown: AtomicBool::new(false),
207 }
208 }
209
210 pub fn set_overlay_manager(&mut self, overlay_manager: Arc<RwLock<OverlayManager>>) {
215 self.overlay_manager = Some(overlay_manager);
216 }
217
218 pub fn set_token_sink(&mut self, sink: Arc<dyn crate::auth::ContainerTokenSink>) {
220 self.token_sink = Some(sink);
221 }
222
223 pub async fn register_job(&self, name: &str, spec: ServiceSpec) {
225 let mut specs = self.job_specs.write().await;
226 specs.insert(name.to_string(), spec);
227 info!(job = %name, "Registered job spec");
228 }
229
230 pub async fn unregister_job(&self, name: &str) {
232 let mut specs = self.job_specs.write().await;
233 specs.remove(name);
234 info!(job = %name, "Unregistered job spec");
235 }
236
237 pub async fn get_job_spec(&self, name: &str) -> Option<ServiceSpec> {
239 let specs = self.job_specs.read().await;
240 specs.get(name).cloned()
241 }
242
243 pub async fn registered_job_names(&self) -> Vec<String> {
245 let specs = self.job_specs.read().await;
246 let mut names: Vec<String> = specs.keys().cloned().collect();
247 names.sort();
248 names
249 }
250
251 pub async fn latest_execution(&self, job_name: &str) -> Option<JobExecution> {
254 let executions = self.executions.read().await;
255 executions
256 .values()
257 .filter(|e| e.job_name == job_name)
258 .max_by_key(|e| e.started_at)
259 .cloned()
260 }
261
262 pub async fn trigger(
267 &self,
268 job_name: &str,
269 spec: &ServiceSpec,
270 trigger: JobTrigger,
271 ) -> Result<JobExecutionId> {
272 if self.shutdown.load(Ordering::Relaxed) {
273 return Err(AgentError::Internal("Job executor is shutting down".into()));
274 }
275
276 let exec_id = JobExecutionId::new();
277
278 info!(
279 job = %job_name,
280 execution_id = %exec_id,
281 trigger = %trigger,
282 "Triggering job execution"
283 );
284
285 let execution = JobExecution {
287 id: exec_id.clone(),
288 job_name: job_name.to_string(),
289 status: JobStatus::Pending,
290 started_at: None,
291 completed_at: None,
292 container_id: None,
293 logs: None,
294 trigger,
295 };
296
297 {
299 let mut executions = self.executions.write().await;
300 executions.insert(exec_id.clone(), execution);
301 }
302
303 let runtime = self.runtime.clone();
305 let spec = spec.clone();
306 let exec_id_clone = exec_id.clone();
307 let executions = self.executions.clone();
308 let job_name = job_name.to_string();
309 let max_log_size = self.config.max_log_size;
310 let overlay_manager = self.overlay_manager.clone();
311 let token_sink = self.token_sink.clone();
312
313 tokio::spawn(async move {
314 Box::pin(Self::run_job(
315 runtime,
316 executions,
317 exec_id_clone,
318 &job_name,
319 spec,
320 max_log_size,
321 overlay_manager,
322 token_sink,
323 ))
324 .await;
325 });
326
327 Ok(exec_id)
328 }
329
330 #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
332 async fn run_job(
333 runtime: Arc<dyn Runtime + Send + Sync>,
334 executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
335 exec_id: JobExecutionId,
336 job_name: &str,
337 spec: ServiceSpec,
338 max_log_size: usize,
339 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
340 token_sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>,
341 ) {
342 let started = Instant::now();
343
344 Self::update_status(&executions, &exec_id, |exec| {
346 exec.status = JobStatus::Initializing;
347 exec.started_at = Some(started);
348 })
349 .await;
350
351 let replica = exec_id.0.chars().take(8).collect::<String>();
354 let replica_num = u32::from_str_radix(&replica, 16).unwrap_or(0) % 10000;
355 let container_id = ContainerId::new(format!("job-{job_name}"), replica_num);
356
357 Self::update_status(&executions, &exec_id, |exec| {
359 exec.container_id = Some(container_id.clone());
360 })
361 .await;
362
363 debug!(
364 job = %job_name,
365 execution_id = %exec_id,
366 container_id = %container_id,
367 "Creating job container"
368 );
369
370 let image_str = spec.image.name.to_string();
372 if let Err(e) = runtime
373 .pull_image_with_policy(
374 &image_str,
375 spec.image.pull_policy,
376 None,
377 spec.image.source_policy.unwrap_or_default(),
378 )
379 .await
380 {
381 error!(
382 job = %job_name,
383 execution_id = %exec_id,
384 error = %e,
385 "Image pull failed"
386 );
387 Self::update_status(&executions, &exec_id, |exec| {
388 exec.status = JobStatus::Failed {
389 reason: format!("Image pull failed: {e}"),
390 exit_code: None,
391 };
392 exec.completed_at = Some(Instant::now());
393 })
394 .await;
395 return;
396 }
397
398 if let Err(e) = runtime.create_container(&container_id, &spec).await {
400 let error_msg = e.to_string();
401 error!(
402 job = %job_name,
403 execution_id = %exec_id,
404 error = %error_msg,
405 "Container create failed"
406 );
407 Self::update_status(&executions, &exec_id, |exec| {
408 exec.status = JobStatus::Failed {
409 reason: format!("Container create failed: {error_msg}"),
410 exit_code: None,
411 };
412 exec.completed_at = Some(Instant::now());
413 })
414 .await;
415 return;
416 }
417
418 let overlay_attachment = Self::attach_overlay(
426 overlay_manager.as_ref(),
427 runtime.as_ref(),
428 &container_id,
429 job_name,
430 &spec,
431 )
432 .await;
433
434 let init_orchestrator = InitOrchestrator::new(container_id.clone(), spec.init.clone());
436 if let Err(e) = init_orchestrator.run().await {
437 let error_msg = e.to_string();
438 error!(
439 job = %job_name,
440 execution_id = %exec_id,
441 error = %error_msg,
442 "Init failed"
443 );
444 Self::update_status(&executions, &exec_id, |exec| {
445 exec.status = JobStatus::Failed {
446 reason: format!("Init failed: {error_msg}"),
447 exit_code: None,
448 };
449 exec.completed_at = Some(Instant::now());
450 })
451 .await;
452 Self::detach_overlay(
454 overlay_manager.as_ref(),
455 runtime.as_ref(),
456 &container_id,
457 &overlay_attachment,
458 )
459 .await;
460 Self::maybe_teardown_job_segment(
461 overlay_manager.as_ref(),
462 &executions,
463 &exec_id,
464 job_name,
465 )
466 .await;
467 let _ = runtime.remove_container(&container_id).await;
468 Self::revoke_token(token_sink.as_ref(), &container_id).await;
469 return;
470 }
471
472 Self::update_status(&executions, &exec_id, |exec| {
474 exec.status = JobStatus::Running;
475 })
476 .await;
477
478 debug!(
479 job = %job_name,
480 execution_id = %exec_id,
481 "Starting job container"
482 );
483
484 if let Err(e) = runtime.start_container(&container_id).await {
486 let error_msg = e.to_string();
487 error!(
488 job = %job_name,
489 execution_id = %exec_id,
490 error = %error_msg,
491 "Container start failed"
492 );
493 Self::update_status(&executions, &exec_id, |exec| {
494 exec.status = JobStatus::Failed {
495 reason: format!("Container start failed: {error_msg}"),
496 exit_code: None,
497 };
498 exec.completed_at = Some(Instant::now());
499 })
500 .await;
501 Self::detach_overlay(
503 overlay_manager.as_ref(),
504 runtime.as_ref(),
505 &container_id,
506 &overlay_attachment,
507 )
508 .await;
509 Self::maybe_teardown_job_segment(
510 overlay_manager.as_ref(),
511 &executions,
512 &exec_id,
513 job_name,
514 )
515 .await;
516 let _ = runtime.remove_container(&container_id).await;
517 Self::revoke_token(token_sink.as_ref(), &container_id).await;
518 return;
519 }
520
521 let exit_code = runtime.wait_container(&container_id).await;
523 let duration = started.elapsed();
524
525 let logs = match runtime.get_logs(&container_id).await {
527 Ok(entries) => Some(
528 entries
529 .iter()
530 .map(ToString::to_string)
531 .collect::<Vec<_>>()
532 .join("\n"),
533 ),
534 Err(e) => {
535 match runtime.container_logs(&container_id, max_log_size).await {
537 Ok(entries) => Some(
538 entries
539 .iter()
540 .map(ToString::to_string)
541 .collect::<Vec<_>>()
542 .join("\n"),
543 ),
544 Err(e2) => {
545 warn!(
546 job = %job_name,
547 execution_id = %exec_id,
548 error = %e,
549 fallback_error = %e2,
550 "Failed to collect logs"
551 );
552 None
553 }
554 }
555 }
556 };
557
558 Self::update_status(&executions, &exec_id, |exec| {
560 exec.logs = logs;
561 exec.completed_at = Some(Instant::now());
562
563 match exit_code {
564 Ok(code) => {
565 if code == 0 {
566 info!(
567 job = exec.job_name,
568 execution_id = %exec.id,
569 duration_ms = duration.as_millis(),
570 "Job completed successfully"
571 );
572 exec.status = JobStatus::Completed {
573 exit_code: code,
574 duration,
575 };
576 } else {
577 warn!(
578 job = exec.job_name,
579 execution_id = %exec.id,
580 exit_code = code,
581 duration_ms = duration.as_millis(),
582 "Job failed with non-zero exit code"
583 );
584 exec.status = JobStatus::Failed {
585 reason: format!("Non-zero exit code: {code}"),
586 exit_code: Some(code),
587 };
588 }
589 }
590 Err(err) => {
591 error!(
592 job = exec.job_name,
593 execution_id = %exec.id,
594 error = %err,
595 "Job execution error"
596 );
597 exec.status = JobStatus::Failed {
598 reason: err.to_string(),
599 exit_code: None,
600 };
601 }
602 }
603 })
604 .await;
605
606 Self::detach_overlay(
609 overlay_manager.as_ref(),
610 runtime.as_ref(),
611 &container_id,
612 &overlay_attachment,
613 )
614 .await;
615 Self::maybe_teardown_job_segment(overlay_manager.as_ref(), &executions, &exec_id, job_name)
616 .await;
617
618 if let Err(e) = runtime.remove_container(&container_id).await {
620 warn!(
621 job = %job_name,
622 execution_id = %exec_id,
623 error = %e,
624 "Failed to remove job container"
625 );
626 }
627
628 Self::revoke_token(token_sink.as_ref(), &container_id).await;
630 }
631
632 #[cfg(not(target_os = "windows"))]
640 #[allow(clippy::too_many_lines)]
641 async fn attach_overlay(
642 overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
643 runtime: &(dyn Runtime + Send + Sync),
644 container_id: &ContainerId,
645 job_name: &str,
646 spec: &ServiceSpec,
647 ) -> OverlayAttachment {
648 let Some(overlay) = overlay_manager else {
649 return OverlayAttachment::None;
650 };
651 let guard = overlay.read().await;
652
653 let mode = spec.overlay.as_ref().map(|o| o.mode).unwrap_or_default();
658 if let Err(e) = guard.setup_service_overlay(job_name, mode).await {
659 warn!(service = %job_name, error = %e, "failed to set up job overlay segment; job will have no overlay network");
660 return OverlayAttachment::None;
661 }
662
663 let dns_override = guard.dns_domain().and_then(|zone| {
668 spec.deployment.as_deref().map(|d| {
669 let zone = zone.trim_end_matches('.');
670 format!("{d}.{zone} {zone}")
671 })
672 });
673
674 let isolation_network = {
678 let explicit = spec
679 .labels
680 .get(zlayer_types::overlay::ISOLATION_NETWORK_LABEL)
681 .cloned();
682 crate::overlay_manager::resolve_isolation_network(mode, job_name, explicit)
683 };
684
685 match runtime.overlay_attach_kind_for(container_id).await {
686 OverlayAttachKind::HostProxy => {
691 let cid = container_id.to_string();
692 match guard
693 .attach_container_host_shared(
694 &cid,
695 job_name,
696 true,
697 isolation_network.clone(),
698 dns_override,
699 )
700 .await
701 {
702 Ok(ip) => {
703 info!(container = %container_id, overlay_ip = %ip, "attached host-shared job container to overlay");
704 if let Err(e) = runtime.attach_overlay_ip(container_id, ip).await {
705 warn!(container = %container_id, error = %e, "failed to start host-shared overlay forwarders for job");
706 }
707 OverlayAttachment::HostShared(cid)
708 }
709 Err(e) => {
710 warn!(container = %container_id, error = %e, "failed to attach host-shared job container to overlay");
711 OverlayAttachment::None
712 }
713 }
714 }
715 OverlayAttachKind::GuestManaged => {
718 let cid = container_id.to_string();
719 match guard
720 .attach_container_guest(
721 &cid,
722 job_name,
723 true,
724 isolation_network.clone(),
725 dns_override,
726 )
727 .await
728 {
729 Ok(cfg) => match runtime.push_overlay_config(container_id, &cfg).await {
730 Ok(()) => {
731 info!(container = %container_id, overlay_ip = %cfg.overlay_ip, "attached job container to overlay (guest)");
732 OverlayAttachment::Guest(cid)
733 }
734 Err(e) => {
735 warn!(container = %container_id, error = %e, "failed to push overlay config into job guest; rolling back");
736 let _ = guard.detach_container_guest(&cid).await;
737 OverlayAttachment::None
738 }
739 },
740 Err(e) => {
741 warn!(container = %container_id, error = %e, "failed to allocate guest overlay config for job");
742 OverlayAttachment::None
743 }
744 }
745 }
746 _ => match runtime.get_container_pid(container_id).await {
748 Ok(Some(pid)) => match guard
749 .attach_container(pid, job_name, true, true, isolation_network, dns_override)
750 .await
751 {
752 Ok(ip) => {
753 info!(container = %container_id, overlay_ip = %ip, "attached job container to overlay");
754 OverlayAttachment::Pid(pid)
755 }
756 Err(e) => {
757 warn!(container = %container_id, error = %e, "failed to attach job container to overlay network");
758 OverlayAttachment::None
759 }
760 },
761 Ok(None) => {
762 debug!(container = %container_id, "skipping job overlay attach - no PID available");
763 OverlayAttachment::None
764 }
765 Err(e) => {
766 warn!(container = %container_id, error = %e, "failed to read job container PID for overlay attach");
767 OverlayAttachment::None
768 }
769 },
770 }
771 }
772
773 #[cfg(target_os = "windows")]
776 #[allow(clippy::unused_async)] async fn attach_overlay(
778 _overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
779 _runtime: &(dyn Runtime + Send + Sync),
780 _container_id: &ContainerId,
781 _job_name: &str,
782 _spec: &ServiceSpec,
783 ) -> OverlayAttachment {
784 OverlayAttachment::None
785 }
786
787 #[cfg(not(target_os = "windows"))]
791 async fn detach_overlay(
792 overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
793 runtime: &(dyn Runtime + Send + Sync),
794 container_id: &ContainerId,
795 attachment: &OverlayAttachment,
796 ) {
797 let Some(overlay) = overlay_manager else {
798 return;
799 };
800 let guard = overlay.read().await;
801 match attachment {
802 OverlayAttachment::Pid(pid) => {
803 if let Err(e) = guard.detach_container(*pid).await {
804 warn!(pid = pid, error = %e, "failed to detach job container from overlay (veth/IP may leak)");
805 }
806 }
807 OverlayAttachment::Guest(id) => {
808 if let Err(e) = guard.detach_container_guest(id).await {
809 warn!(id = %id, error = %e, "failed to detach job guest from overlay");
810 }
811 }
812 OverlayAttachment::HostShared(id) => {
813 if let Err(e) = runtime.detach_overlay_ip(container_id).await {
814 warn!(container = %container_id, error = %e, "failed to stop host-shared overlay forwarders for job");
815 }
816 if let Err(e) = guard.detach_container_host_shared(id).await {
817 warn!(id = %id, error = %e, "failed to detach host-shared job container from overlay");
818 }
819 }
820 OverlayAttachment::None => {}
821 }
822 }
823
824 #[cfg(target_os = "windows")]
827 #[allow(clippy::unused_async)] async fn detach_overlay(
829 _overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
830 _runtime: &(dyn Runtime + Send + Sync),
831 _container_id: &ContainerId,
832 _attachment: &OverlayAttachment,
833 ) {
834 }
835
836 #[cfg(not(target_os = "windows"))]
841 async fn maybe_teardown_job_segment(
842 overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
843 executions: &Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
844 exec_id: &JobExecutionId,
845 job_name: &str,
846 ) {
847 let Some(overlay) = overlay_manager else {
848 return;
849 };
850 if Self::other_active_execution_exists(executions, exec_id, job_name).await {
855 return;
856 }
857 let guard = overlay.read().await;
858 guard.teardown_service_overlay(job_name).await;
859 }
860
861 #[cfg(target_os = "windows")]
864 #[allow(clippy::unused_async)] async fn maybe_teardown_job_segment(
866 _overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
867 _executions: &Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
868 _exec_id: &JobExecutionId,
869 _job_name: &str,
870 ) {
871 }
872
873 #[cfg(not(target_os = "windows"))]
877 async fn other_active_execution_exists(
878 executions: &Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
879 exec_id: &JobExecutionId,
880 job_name: &str,
881 ) -> bool {
882 let execs = executions.read().await;
883 execs.values().any(|e| {
884 &e.id != exec_id
885 && e.job_name == job_name
886 && matches!(
887 e.status,
888 JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
889 )
890 })
891 }
892
893 async fn revoke_token(
897 token_sink: Option<&Arc<dyn crate::auth::ContainerTokenSink>>,
898 container_id: &ContainerId,
899 ) {
900 if let Some(sink) = token_sink {
901 sink.revoke(&format!(
902 "container:{}:{}-{}",
903 container_id.service, container_id.service, container_id.replica
904 ))
905 .await;
906 }
907 }
908
909 async fn update_status<F>(
910 executions: &RwLock<HashMap<JobExecutionId, JobExecution>>,
911 exec_id: &JobExecutionId,
912 f: F,
913 ) where
914 F: FnOnce(&mut JobExecution),
915 {
916 let mut execs = executions.write().await;
917 if let Some(exec) = execs.get_mut(exec_id) {
918 f(exec);
919 }
920 }
921
922 pub async fn get_execution(&self, exec_id: &JobExecutionId) -> Option<JobExecution> {
924 let executions = self.executions.read().await;
925 executions.get(exec_id).cloned()
926 }
927
928 pub async fn list_executions(&self, job_name: &str) -> Vec<JobExecution> {
930 let executions = self.executions.read().await;
931 executions
932 .values()
933 .filter(|e| e.job_name == job_name)
934 .cloned()
935 .collect()
936 }
937
938 pub async fn list_all_executions(&self) -> Vec<JobExecution> {
940 let executions = self.executions.read().await;
941 executions.values().cloned().collect()
942 }
943
944 pub async fn cancel(&self, exec_id: &JobExecutionId) -> Result<()> {
949 let mut executions = self.executions.write().await;
950 if let Some(execution) = executions.get_mut(exec_id) {
951 if matches!(
952 execution.status,
953 JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
954 ) {
955 if let Some(ref container_id) = execution.container_id {
956 self.runtime
957 .stop_container(container_id, Duration::from_secs(10))
958 .await?;
959 self.runtime.remove_container(container_id).await?;
960 Self::revoke_token(self.token_sink.as_ref(), container_id).await;
961 }
962 execution.status = JobStatus::Cancelled;
963 execution.completed_at = Some(Instant::now());
964 info!(
965 job = %execution.job_name,
966 execution_id = %exec_id,
967 "Job execution cancelled"
968 );
969 }
970 }
971 Ok(())
972 }
973
974 pub async fn cleanup_old_executions(&self) {
976 let now = Instant::now();
977 let mut executions = self.executions.write().await;
978 let before_count = executions.len();
979 executions.retain(|_, exec| match exec.completed_at {
980 Some(completed) => now.duration_since(completed) < self.config.retention,
981 None => true, });
983 let removed = before_count - executions.len();
984 if removed > 0 {
985 debug!(removed = removed, "Cleaned up old job execution records");
986 }
987 }
988
989 pub fn shutdown(&self) {
991 self.shutdown.store(true, Ordering::Relaxed);
992 }
993
994 pub fn is_shutting_down(&self) -> bool {
996 self.shutdown.load(Ordering::Relaxed)
997 }
998
999 pub async fn active_execution_count(&self) -> usize {
1001 let executions = self.executions.read().await;
1002 executions
1003 .values()
1004 .filter(|e| {
1005 matches!(
1006 e.status,
1007 JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
1008 )
1009 })
1010 .count()
1011 }
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016 use super::*;
1017 use crate::runtime::MockRuntime;
1018
1019 fn mock_job_spec() -> ServiceSpec {
1020 use zlayer_spec::*;
1021 serde_yaml::from_str::<DeploymentSpec>(
1022 r"
1023version: v1
1024deployment: test
1025services:
1026 backup:
1027 rtype: job
1028 image:
1029 name: backup:latest
1030",
1031 )
1032 .unwrap()
1033 .services
1034 .remove("backup")
1035 .unwrap()
1036 }
1037
1038 #[tokio::test]
1039 async fn test_job_execution_id() {
1040 let id1 = JobExecutionId::new();
1041 let id2 = JobExecutionId::new();
1042 assert_ne!(id1, id2);
1043 assert!(!id1.0.is_empty());
1044 }
1045
1046 #[tokio::test]
1047 async fn test_job_executor_trigger() {
1048 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1049 let executor = JobExecutor::new(runtime);
1050
1051 let spec = mock_job_spec();
1052 let exec_id = executor
1053 .trigger("backup", &spec, JobTrigger::Cli)
1054 .await
1055 .unwrap();
1056
1057 tokio::time::sleep(Duration::from_millis(50)).await;
1059
1060 let execution = executor.get_execution(&exec_id).await;
1061 assert!(execution.is_some());
1062
1063 let exec = execution.unwrap();
1064 assert_eq!(exec.job_name, "backup");
1065 assert!(matches!(exec.trigger, JobTrigger::Cli));
1066 }
1067
1068 #[tokio::test]
1069 async fn test_job_executor_list_executions() {
1070 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1071 let executor = JobExecutor::new(runtime);
1072
1073 let spec = mock_job_spec();
1074
1075 executor
1077 .trigger("backup", &spec, JobTrigger::Cli)
1078 .await
1079 .unwrap();
1080 executor
1081 .trigger("backup", &spec, JobTrigger::Scheduler)
1082 .await
1083 .unwrap();
1084
1085 tokio::time::sleep(Duration::from_millis(50)).await;
1086
1087 let executions = executor.list_executions("backup").await;
1088 assert_eq!(executions.len(), 2);
1089 }
1090
1091 #[tokio::test]
1092 async fn test_job_executor_register_spec() {
1093 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1094 let executor = JobExecutor::new(runtime);
1095
1096 let spec = mock_job_spec();
1097 executor.register_job("backup", spec.clone()).await;
1098
1099 let retrieved = executor.get_job_spec("backup").await;
1100 assert!(retrieved.is_some());
1101 assert_eq!(retrieved.unwrap().image.name, spec.image.name);
1102 }
1103
1104 #[cfg(not(target_os = "windows"))]
1105 fn job_exec(id: &str, job_name: &str, status: JobStatus) -> JobExecution {
1106 JobExecution {
1107 id: JobExecutionId(id.to_string()),
1108 job_name: job_name.to_string(),
1109 status,
1110 started_at: None,
1111 completed_at: None,
1112 container_id: None,
1113 logs: None,
1114 trigger: JobTrigger::Cli,
1115 }
1116 }
1117
1118 #[cfg(not(target_os = "windows"))]
1119 #[tokio::test]
1120 async fn test_other_active_execution_exists() {
1121 use std::collections::HashMap;
1122
1123 let current = JobExecutionId("current".to_string());
1124
1125 {
1127 let mut map: HashMap<JobExecutionId, JobExecution> = HashMap::new();
1128 map.insert(
1129 current.clone(),
1130 job_exec(
1131 "current",
1132 "backup",
1133 JobStatus::Completed {
1134 exit_code: 0,
1135 duration: Duration::from_secs(1),
1136 },
1137 ),
1138 );
1139 map.insert(
1140 JobExecutionId("sibling".to_string()),
1141 job_exec("sibling", "backup", JobStatus::Running),
1142 );
1143 let executions = Arc::new(RwLock::new(map));
1144 assert!(
1145 JobExecutor::other_active_execution_exists(&executions, ¤t, "backup").await
1146 );
1147 }
1148
1149 {
1151 let mut map: HashMap<JobExecutionId, JobExecution> = HashMap::new();
1152 map.insert(
1153 current.clone(),
1154 job_exec(
1155 "current",
1156 "backup",
1157 JobStatus::Failed {
1158 reason: "boom".into(),
1159 exit_code: Some(1),
1160 },
1161 ),
1162 );
1163 map.insert(
1164 JobExecutionId("sibling".to_string()),
1165 job_exec(
1166 "sibling",
1167 "backup",
1168 JobStatus::Completed {
1169 exit_code: 0,
1170 duration: Duration::from_secs(1),
1171 },
1172 ),
1173 );
1174 let executions = Arc::new(RwLock::new(map));
1175 assert!(
1176 !JobExecutor::other_active_execution_exists(&executions, ¤t, "backup").await
1177 );
1178 }
1179
1180 {
1182 let mut map: HashMap<JobExecutionId, JobExecution> = HashMap::new();
1183 map.insert(
1184 current.clone(),
1185 job_exec(
1186 "current",
1187 "backup",
1188 JobStatus::Completed {
1189 exit_code: 0,
1190 duration: Duration::from_secs(1),
1191 },
1192 ),
1193 );
1194 map.insert(
1195 JobExecutionId("other".to_string()),
1196 job_exec("other", "restore", JobStatus::Running),
1197 );
1198 let executions = Arc::new(RwLock::new(map));
1199 assert!(
1200 !JobExecutor::other_active_execution_exists(&executions, ¤t, "backup").await
1201 );
1202 }
1203 }
1204
1205 #[tokio::test]
1206 async fn test_job_status_display() {
1207 assert_eq!(format!("{}", JobStatus::Pending), "pending");
1208 assert_eq!(format!("{}", JobStatus::Running), "running");
1209 assert_eq!(
1210 format!(
1211 "{}",
1212 JobStatus::Completed {
1213 exit_code: 0,
1214 duration: Duration::from_secs(10)
1215 }
1216 ),
1217 "completed(0)"
1218 );
1219 assert_eq!(
1220 format!(
1221 "{}",
1222 JobStatus::Failed {
1223 reason: "error".into(),
1224 exit_code: Some(1)
1225 }
1226 ),
1227 "failed(1)"
1228 );
1229 }
1230}