Skip to main content

zlayer_agent/
job.rs

1//! Job execution engine - run-to-completion container lifecycle
2//!
3//! This module provides the `JobExecutor` which handles run-to-completion
4//! workloads (jobs and cron jobs). Unlike services which run indefinitely,
5//! jobs run to completion and track their exit status.
6
7use crate::error::{AgentError, Result};
8use crate::init::InitOrchestrator;
9use crate::overlay_manager::OverlayManager;
10use crate::runtime::{ContainerId, Runtime};
11// Only the non-Windows attach path matches on the attach kind; on Windows the
12// overlay is wired at container-create time inside overlayd.
13#[cfg(not(target_os = "windows"))]
14use crate::runtime::OverlayAttachKind;
15use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::RwLock;
20use tracing::{debug, error, info, warn};
21use uuid::Uuid;
22use zlayer_spec::ServiceSpec;
23
24/// How a job container was attached to the overlay, so it can be detached when
25/// the job exits (otherwise every execution leaks a veth + overlay IP).
26#[cfg_attr(target_os = "windows", allow(dead_code))]
27#[derive(Default)]
28enum OverlayAttachment {
29    /// Not attached (no overlay manager, no PID, attach failed, or Windows —
30    /// where the overlay is wired at container-create time inside overlayd).
31    #[default]
32    None,
33    /// Linux host-process (youki): detach by the PID recorded at attach.
34    Pid(u32),
35    /// macOS VZ guest: detach by the container id used at attach.
36    Guest(String),
37    /// macOS host-shared (Seatbelt/native-VZ/libkrun): detach by the container
38    /// id used at attach (overlayd allocated a distinct overlay /32 + utun alias).
39    HostShared(String),
40}
41
42/// Unique identifier for a job execution
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct JobExecutionId(pub String);
45
46impl JobExecutionId {
47    /// Create a new random execution ID
48    #[must_use]
49    pub fn new() -> Self {
50        Self(Uuid::new_v4().to_string())
51    }
52}
53
54impl Default for JobExecutionId {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60impl std::fmt::Display for JobExecutionId {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        write!(f, "{}", self.0)
63    }
64}
65
66/// Status of a job execution
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum JobStatus {
69    /// Job is queued, waiting to start
70    Pending,
71    /// Init steps are running
72    Initializing,
73    /// Main container is running
74    Running,
75    /// Job completed successfully
76    Completed { exit_code: i32, duration: Duration },
77    /// Job failed
78    Failed {
79        reason: String,
80        exit_code: Option<i32>,
81    },
82    /// Job was cancelled
83    Cancelled,
84}
85
86impl std::fmt::Display for JobStatus {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            JobStatus::Pending => write!(f, "pending"),
90            JobStatus::Initializing => write!(f, "initializing"),
91            JobStatus::Running => write!(f, "running"),
92            JobStatus::Completed { exit_code, .. } => write!(f, "completed({exit_code})"),
93            JobStatus::Failed { exit_code, .. } => {
94                if let Some(code) = exit_code {
95                    write!(f, "failed({code})")
96                } else {
97                    write!(f, "failed")
98                }
99            }
100            JobStatus::Cancelled => write!(f, "cancelled"),
101        }
102    }
103}
104
105/// How the job was triggered
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum JobTrigger {
108    /// Triggered via HTTP endpoint
109    Endpoint { remote_addr: Option<String> },
110    /// Triggered via CLI
111    Cli,
112    /// Triggered by cron scheduler
113    Scheduler,
114    /// Triggered by internal system (dependency, etc.)
115    Internal { reason: String },
116}
117
118impl std::fmt::Display for JobTrigger {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        match self {
121            JobTrigger::Endpoint { remote_addr } => {
122                if let Some(addr) = remote_addr {
123                    write!(f, "endpoint({addr})")
124                } else {
125                    write!(f, "endpoint")
126                }
127            }
128            JobTrigger::Cli => write!(f, "cli"),
129            JobTrigger::Scheduler => write!(f, "scheduler"),
130            JobTrigger::Internal { reason } => write!(f, "internal({reason})"),
131        }
132    }
133}
134
135/// A single job execution record
136#[derive(Debug, Clone)]
137pub struct JobExecution {
138    pub id: JobExecutionId,
139    pub job_name: String,
140    pub status: JobStatus,
141    pub started_at: Option<Instant>,
142    pub completed_at: Option<Instant>,
143    pub container_id: Option<ContainerId>,
144    /// Captured stdout/stderr (limited to last N bytes)
145    pub logs: Option<String>,
146    /// Trigger source (endpoint, cli, scheduler, etc.)
147    pub trigger: JobTrigger,
148}
149
150/// Configuration for the job executor
151#[derive(Debug, Clone)]
152pub struct JobExecutorConfig {
153    /// Maximum concurrent job executions per job name
154    pub max_concurrent: usize,
155    /// How long to retain completed job records
156    pub retention: Duration,
157    /// Maximum log size to capture (in bytes)
158    pub max_log_size: usize,
159}
160
161impl Default for JobExecutorConfig {
162    fn default() -> Self {
163        Self {
164            max_concurrent: 10,
165            retention: Duration::from_secs(3600), // 1 hour
166            max_log_size: 1024 * 1024,            // 1 MB
167        }
168    }
169}
170
171/// Job executor handles run-to-completion workloads
172pub struct JobExecutor {
173    runtime: Arc<dyn Runtime + Send + Sync>,
174    /// Active and recent job executions
175    executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
176    /// Job specs (for jobs that need to be stored)
177    job_specs: Arc<RwLock<HashMap<String, ServiceSpec>>>,
178    /// Overlay manager used to attach each job container to the overlay network
179    /// (so the job can reach the daemon API / peer services) and detach it on
180    /// exit. `None` when the daemon runs without overlay networking.
181    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
182    /// Sink for persisting/revoking per-container scoped tokens. `None`
183    /// disables persistence (token minted without a `jti`, not revocable).
184    token_sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>,
185    /// Configuration
186    config: JobExecutorConfig,
187    /// Shutdown flag
188    shutdown: AtomicBool,
189}
190
191impl JobExecutor {
192    /// Create a new job executor with default configuration
193    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
194        Self::with_config(runtime, JobExecutorConfig::default())
195    }
196
197    /// Create a new job executor with custom configuration
198    pub fn with_config(runtime: Arc<dyn Runtime + Send + Sync>, config: JobExecutorConfig) -> Self {
199        Self {
200            runtime,
201            executions: Arc::new(RwLock::new(HashMap::new())),
202            job_specs: Arc::new(RwLock::new(HashMap::new())),
203            overlay_manager: None,
204            token_sink: None,
205            config,
206            shutdown: AtomicBool::new(false),
207        }
208    }
209
210    /// Attach an overlay manager so job (and cron) containers join the overlay
211    /// network and can reach the daemon API / peer services. Without this, a job
212    /// container comes up with only a loopback netns and any call to the daemon
213    /// node IP fails with `ENETUNREACH`.
214    pub fn set_overlay_manager(&mut self, overlay_manager: Arc<RwLock<OverlayManager>>) {
215        self.overlay_manager = Some(overlay_manager);
216    }
217
218    /// Set the sink used to persist/revoke per-container scoped tokens.
219    pub fn set_token_sink(&mut self, sink: Arc<dyn crate::auth::ContainerTokenSink>) {
220        self.token_sink = Some(sink);
221    }
222
223    /// Register a job spec (for later triggering)
224    pub async fn register_job(&self, name: &str, spec: ServiceSpec) {
225        let mut specs = self.job_specs.write().await;
226        specs.insert(name.to_string(), spec);
227        info!(job = %name, "Registered job spec");
228    }
229
230    /// Unregister a job spec
231    pub async fn unregister_job(&self, name: &str) {
232        let mut specs = self.job_specs.write().await;
233        specs.remove(name);
234        info!(job = %name, "Unregistered job spec");
235    }
236
237    /// Get a registered job spec
238    pub async fn get_job_spec(&self, name: &str) -> Option<ServiceSpec> {
239        let specs = self.job_specs.read().await;
240        specs.get(name).cloned()
241    }
242
243    /// Names of all registered jobs, sorted.
244    pub async fn registered_job_names(&self) -> Vec<String> {
245        let specs = self.job_specs.read().await;
246        let mut names: Vec<String> = specs.keys().cloned().collect();
247        names.sort();
248        names
249    }
250
251    /// The most recent execution for a job name, if any (the one that started
252    /// last; a not-yet-started execution sorts before any started one).
253    pub async fn latest_execution(&self, job_name: &str) -> Option<JobExecution> {
254        let executions = self.executions.read().await;
255        executions
256            .values()
257            .filter(|e| e.job_name == job_name)
258            .max_by_key(|e| e.started_at)
259            .cloned()
260    }
261
262    /// Trigger a job execution
263    ///
264    /// # Errors
265    /// Returns an error if the job container cannot be created or started.
266    pub async fn trigger(
267        &self,
268        job_name: &str,
269        spec: &ServiceSpec,
270        trigger: JobTrigger,
271    ) -> Result<JobExecutionId> {
272        if self.shutdown.load(Ordering::Relaxed) {
273            return Err(AgentError::Internal("Job executor is shutting down".into()));
274        }
275
276        let exec_id = JobExecutionId::new();
277
278        info!(
279            job = %job_name,
280            execution_id = %exec_id,
281            trigger = %trigger,
282            "Triggering job execution"
283        );
284
285        // Create execution record
286        let execution = JobExecution {
287            id: exec_id.clone(),
288            job_name: job_name.to_string(),
289            status: JobStatus::Pending,
290            started_at: None,
291            completed_at: None,
292            container_id: None,
293            logs: None,
294            trigger,
295        };
296
297        // Store execution record
298        {
299            let mut executions = self.executions.write().await;
300            executions.insert(exec_id.clone(), execution);
301        }
302
303        // Spawn the job execution task
304        let runtime = self.runtime.clone();
305        let spec = spec.clone();
306        let exec_id_clone = exec_id.clone();
307        let executions = self.executions.clone();
308        let job_name = job_name.to_string();
309        let max_log_size = self.config.max_log_size;
310        let overlay_manager = self.overlay_manager.clone();
311        let token_sink = self.token_sink.clone();
312
313        tokio::spawn(async move {
314            Box::pin(Self::run_job(
315                runtime,
316                executions,
317                exec_id_clone,
318                &job_name,
319                spec,
320                max_log_size,
321                overlay_manager,
322                token_sink,
323            ))
324            .await;
325        });
326
327        Ok(exec_id)
328    }
329
330    /// Internal: Run a job to completion
331    #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
332    async fn run_job(
333        runtime: Arc<dyn Runtime + Send + Sync>,
334        executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
335        exec_id: JobExecutionId,
336        job_name: &str,
337        spec: ServiceSpec,
338        max_log_size: usize,
339        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
340        token_sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>,
341    ) {
342        let started = Instant::now();
343
344        // Update status to Initializing
345        Self::update_status(&executions, &exec_id, |exec| {
346            exec.status = JobStatus::Initializing;
347            exec.started_at = Some(started);
348        })
349        .await;
350
351        // Create container ID for this execution
352        // Use a unique replica number based on execution ID hash
353        let replica = exec_id.0.chars().take(8).collect::<String>();
354        let replica_num = u32::from_str_radix(&replica, 16).unwrap_or(0) % 10000;
355        let container_id = ContainerId::new(format!("job-{job_name}"), replica_num);
356
357        // Store container ID
358        Self::update_status(&executions, &exec_id, |exec| {
359            exec.container_id = Some(container_id.clone());
360        })
361        .await;
362
363        debug!(
364            job = %job_name,
365            execution_id = %exec_id,
366            container_id = %container_id,
367            "Creating job container"
368        );
369
370        // Pull image
371        let image_str = spec.image.name.to_string();
372        if let Err(e) = runtime
373            .pull_image_with_policy(
374                &image_str,
375                spec.image.pull_policy,
376                None,
377                spec.image.source_policy.unwrap_or_default(),
378            )
379            .await
380        {
381            error!(
382                job = %job_name,
383                execution_id = %exec_id,
384                error = %e,
385                "Image pull failed"
386            );
387            Self::update_status(&executions, &exec_id, |exec| {
388                exec.status = JobStatus::Failed {
389                    reason: format!("Image pull failed: {e}"),
390                    exit_code: None,
391                };
392                exec.completed_at = Some(Instant::now());
393            })
394            .await;
395            return;
396        }
397
398        // Create container
399        if let Err(e) = runtime.create_container(&container_id, &spec).await {
400            let error_msg = e.to_string();
401            error!(
402                job = %job_name,
403                execution_id = %exec_id,
404                error = %error_msg,
405                "Container create failed"
406            );
407            Self::update_status(&executions, &exec_id, |exec| {
408                exec.status = JobStatus::Failed {
409                    reason: format!("Container create failed: {error_msg}"),
410                    exit_code: None,
411                };
412                exec.completed_at = Some(Instant::now());
413            })
414            .await;
415            return;
416        }
417
418        // Attach the job container to the overlay network BEFORE it starts, so
419        // its routes (eth0 service bridge + eth1 global overlay → the daemon node
420        // IP) and resolv.conf are in place before init actions and the job's own
421        // command run. Without this a job container has only loopback and any call
422        // to the daemon node IP fails with `ENETUNREACH`. youki records the init
423        // PID + netns at create (paused on the start fifo), so the attach is
424        // race-free. Best-effort: attach failure is logged, not fatal.
425        let overlay_attachment = Self::attach_overlay(
426            overlay_manager.as_ref(),
427            runtime.as_ref(),
428            &container_id,
429            job_name,
430            &spec,
431        )
432        .await;
433
434        // Run init steps
435        let init_orchestrator = InitOrchestrator::new(container_id.clone(), spec.init.clone());
436        if let Err(e) = init_orchestrator.run().await {
437            let error_msg = e.to_string();
438            error!(
439                job = %job_name,
440                execution_id = %exec_id,
441                error = %error_msg,
442                "Init failed"
443            );
444            Self::update_status(&executions, &exec_id, |exec| {
445                exec.status = JobStatus::Failed {
446                    reason: format!("Init failed: {error_msg}"),
447                    exit_code: None,
448                };
449                exec.completed_at = Some(Instant::now());
450            })
451            .await;
452            // Cleanup: detach overlay (reclaim veth/IP) then remove container.
453            Self::detach_overlay(
454                overlay_manager.as_ref(),
455                runtime.as_ref(),
456                &container_id,
457                &overlay_attachment,
458            )
459            .await;
460            Self::maybe_teardown_job_segment(
461                overlay_manager.as_ref(),
462                &executions,
463                &exec_id,
464                job_name,
465            )
466            .await;
467            let _ = runtime.remove_container(&container_id).await;
468            Self::revoke_token(token_sink.as_ref(), &container_id).await;
469            return;
470        }
471
472        // Update status to Running
473        Self::update_status(&executions, &exec_id, |exec| {
474            exec.status = JobStatus::Running;
475        })
476        .await;
477
478        debug!(
479            job = %job_name,
480            execution_id = %exec_id,
481            "Starting job container"
482        );
483
484        // Start container
485        if let Err(e) = runtime.start_container(&container_id).await {
486            let error_msg = e.to_string();
487            error!(
488                job = %job_name,
489                execution_id = %exec_id,
490                error = %error_msg,
491                "Container start failed"
492            );
493            Self::update_status(&executions, &exec_id, |exec| {
494                exec.status = JobStatus::Failed {
495                    reason: format!("Container start failed: {error_msg}"),
496                    exit_code: None,
497                };
498                exec.completed_at = Some(Instant::now());
499            })
500            .await;
501            // Cleanup: detach overlay (reclaim veth/IP) then remove container.
502            Self::detach_overlay(
503                overlay_manager.as_ref(),
504                runtime.as_ref(),
505                &container_id,
506                &overlay_attachment,
507            )
508            .await;
509            Self::maybe_teardown_job_segment(
510                overlay_manager.as_ref(),
511                &executions,
512                &exec_id,
513                job_name,
514            )
515            .await;
516            let _ = runtime.remove_container(&container_id).await;
517            Self::revoke_token(token_sink.as_ref(), &container_id).await;
518            return;
519        }
520
521        // Wait for container to exit using the runtime's wait_container method
522        let exit_code = runtime.wait_container(&container_id).await;
523        let duration = started.elapsed();
524
525        // Collect logs before cleanup using the runtime's get_logs method
526        let logs = match runtime.get_logs(&container_id).await {
527            Ok(entries) => Some(
528                entries
529                    .iter()
530                    .map(ToString::to_string)
531                    .collect::<Vec<_>>()
532                    .join("\n"),
533            ),
534            Err(e) => {
535                // Fallback to container_logs if get_logs fails
536                match runtime.container_logs(&container_id, max_log_size).await {
537                    Ok(entries) => Some(
538                        entries
539                            .iter()
540                            .map(ToString::to_string)
541                            .collect::<Vec<_>>()
542                            .join("\n"),
543                    ),
544                    Err(e2) => {
545                        warn!(
546                            job = %job_name,
547                            execution_id = %exec_id,
548                            error = %e,
549                            fallback_error = %e2,
550                            "Failed to collect logs"
551                        );
552                        None
553                    }
554                }
555            }
556        };
557
558        // Update final status
559        Self::update_status(&executions, &exec_id, |exec| {
560            exec.logs = logs;
561            exec.completed_at = Some(Instant::now());
562
563            match exit_code {
564                Ok(code) => {
565                    if code == 0 {
566                        info!(
567                            job = exec.job_name,
568                            execution_id = %exec.id,
569                            duration_ms = duration.as_millis(),
570                            "Job completed successfully"
571                        );
572                        exec.status = JobStatus::Completed {
573                            exit_code: code,
574                            duration,
575                        };
576                    } else {
577                        warn!(
578                            job = exec.job_name,
579                            execution_id = %exec.id,
580                            exit_code = code,
581                            duration_ms = duration.as_millis(),
582                            "Job failed with non-zero exit code"
583                        );
584                        exec.status = JobStatus::Failed {
585                            reason: format!("Non-zero exit code: {code}"),
586                            exit_code: Some(code),
587                        };
588                    }
589                }
590                Err(err) => {
591                    error!(
592                        job = exec.job_name,
593                        execution_id = %exec.id,
594                        error = %err,
595                        "Job execution error"
596                    );
597                    exec.status = JobStatus::Failed {
598                        reason: err.to_string(),
599                        exit_code: None,
600                    };
601                }
602            }
603        })
604        .await;
605
606        // Detach from the overlay (reclaim the veth + overlay IP) BEFORE removing
607        // the container, so each job execution doesn't leak network resources.
608        Self::detach_overlay(
609            overlay_manager.as_ref(),
610            runtime.as_ref(),
611            &container_id,
612            &overlay_attachment,
613        )
614        .await;
615        Self::maybe_teardown_job_segment(overlay_manager.as_ref(), &executions, &exec_id, job_name)
616            .await;
617
618        // Cleanup container
619        if let Err(e) = runtime.remove_container(&container_id).await {
620            warn!(
621                job = %job_name,
622                execution_id = %exec_id,
623                error = %e,
624                "Failed to remove job container"
625            );
626        }
627
628        // Revoke the job container's scoped token (best-effort).
629        Self::revoke_token(token_sink.as_ref(), &container_id).await;
630    }
631
632    /// Attach a job container to the overlay network, mirroring the service
633    /// path (`ServiceInstance` create flow): host-process runtimes (Linux youki)
634    /// plumb a veth by PID; the macOS VZ guest gets an allocated overlay config
635    /// pushed over vsock. `join_global = true` so the container gets the global
636    /// overlay interface (and thus a route to the daemon node IP). Returns how it
637    /// was attached so the caller can detach on exit. Best-effort: any failure is
638    /// logged and yields [`OverlayAttachment::None`].
639    #[cfg(not(target_os = "windows"))]
640    #[allow(clippy::too_many_lines)]
641    async fn attach_overlay(
642        overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
643        runtime: &(dyn Runtime + Send + Sync),
644        container_id: &ContainerId,
645        job_name: &str,
646        spec: &ServiceSpec,
647    ) -> OverlayAttachment {
648        let Some(overlay) = overlay_manager else {
649            return OverlayAttachment::None;
650        };
651        let guard = overlay.read().await;
652
653        // Stand up the per-service overlay segment (bridge / dedicated WG / shared
654        // bridge) BEFORE attaching — `attach_container` errors if the service
655        // bridge doesn't exist. Idempotent: a repeat job execution reuses the
656        // existing bridge. Mirrors the service restore path in the daemon.
657        let mode = spec.overlay.as_ref().map(|o| o.mode).unwrap_or_default();
658        if let Err(e) = guard.setup_service_overlay(job_name, mode).await {
659            warn!(service = %job_name, error = %e, "failed to set up job overlay segment; job will have no overlay network");
660            return OverlayAttachment::None;
661        }
662
663        // Per-deployment resolv.conf search domain (`<deployment>.<zone> <zone>`)
664        // so the job's bare `<svc>` resolves to ITS deployment, matching the
665        // service path's `dns_search_domain`. Falls back to the global zone inside
666        // `attach_container` when this is `None`.
667        let dns_override = guard.dns_domain().and_then(|zone| {
668            spec.deployment.as_deref().map(|d| {
669                let zone = zone.trim_end_matches('.');
670                format!("{d}.{zone} {zone}")
671            })
672        });
673
674        // Auto-fence isolation-scope modes (`Isolated`/`Dedicated`) to the job
675        // name when no explicit `com.zlayer.isolation_network` label is set,
676        // mirroring `ServiceInstance::isolation_network`.
677        let isolation_network = {
678            let explicit = spec
679                .labels
680                .get(zlayer_types::overlay::ISOLATION_NETWORK_LABEL)
681                .cloned();
682            crate::overlay_manager::resolve_isolation_network(mode, job_name, explicit)
683        };
684
685        match runtime.overlay_attach_kind_for(container_id).await {
686            // Host-shared native runtime (macOS Seatbelt / native-VZ / libkrun):
687            // overlayd allocates a distinct overlay /32 + utun alias for this job
688            // container so it is a first-class overlay member like every other
689            // runtime. Detach by the container id used at attach.
690            OverlayAttachKind::HostProxy => {
691                let cid = container_id.to_string();
692                match guard
693                    .attach_container_host_shared(
694                        &cid,
695                        job_name,
696                        true,
697                        isolation_network.clone(),
698                        dns_override,
699                    )
700                    .await
701                {
702                    Ok(ip) => {
703                        info!(container = %container_id, overlay_ip = %ip, "attached host-shared job container to overlay");
704                        if let Err(e) = runtime.attach_overlay_ip(container_id, ip).await {
705                            warn!(container = %container_id, error = %e, "failed to start host-shared overlay forwarders for job");
706                        }
707                        OverlayAttachment::HostShared(cid)
708                    }
709                    Err(e) => {
710                        warn!(container = %container_id, error = %e, "failed to attach host-shared job container to overlay");
711                        OverlayAttachment::None
712                    }
713                }
714            }
715            // macOS VZ-Linux guest: overlayd allocates the identity, we push it
716            // into the guest over vsock.
717            OverlayAttachKind::GuestManaged => {
718                let cid = container_id.to_string();
719                match guard
720                    .attach_container_guest(
721                        &cid,
722                        job_name,
723                        true,
724                        isolation_network.clone(),
725                        dns_override,
726                    )
727                    .await
728                {
729                    Ok(cfg) => match runtime.push_overlay_config(container_id, &cfg).await {
730                        Ok(()) => {
731                            info!(container = %container_id, overlay_ip = %cfg.overlay_ip, "attached job container to overlay (guest)");
732                            OverlayAttachment::Guest(cid)
733                        }
734                        Err(e) => {
735                            warn!(container = %container_id, error = %e, "failed to push overlay config into job guest; rolling back");
736                            let _ = guard.detach_container_guest(&cid).await;
737                            OverlayAttachment::None
738                        }
739                    },
740                    Err(e) => {
741                        warn!(container = %container_id, error = %e, "failed to allocate guest overlay config for job");
742                        OverlayAttachment::None
743                    }
744                }
745            }
746            // Host-process runtimes (Linux youki): plumb a veth by PID.
747            _ => match runtime.get_container_pid(container_id).await {
748                Ok(Some(pid)) => match guard
749                    .attach_container(pid, job_name, true, true, isolation_network, dns_override)
750                    .await
751                {
752                    Ok(ip) => {
753                        info!(container = %container_id, overlay_ip = %ip, "attached job container to overlay");
754                        OverlayAttachment::Pid(pid)
755                    }
756                    Err(e) => {
757                        warn!(container = %container_id, error = %e, "failed to attach job container to overlay network");
758                        OverlayAttachment::None
759                    }
760                },
761                Ok(None) => {
762                    debug!(container = %container_id, "skipping job overlay attach - no PID available");
763                    OverlayAttachment::None
764                }
765                Err(e) => {
766                    warn!(container = %container_id, error = %e, "failed to read job container PID for overlay attach");
767                    OverlayAttachment::None
768                }
769            },
770        }
771    }
772
773    /// Windows containers receive their overlay (HCN) at container-create time
774    /// inside overlayd, so there is no post-start attach step for jobs.
775    #[cfg(target_os = "windows")]
776    #[allow(clippy::unused_async)] // signature must match the Linux attach for the shared call site
777    async fn attach_overlay(
778        _overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
779        _runtime: &(dyn Runtime + Send + Sync),
780        _container_id: &ContainerId,
781        _job_name: &str,
782        _spec: &ServiceSpec,
783    ) -> OverlayAttachment {
784        OverlayAttachment::None
785    }
786
787    /// Release the overlay resources held by a job container. Mirrors the
788    /// service path's detach-by-recorded-handle so an exited one-shot container
789    /// never leaks its veth + overlay IP.
790    #[cfg(not(target_os = "windows"))]
791    async fn detach_overlay(
792        overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
793        runtime: &(dyn Runtime + Send + Sync),
794        container_id: &ContainerId,
795        attachment: &OverlayAttachment,
796    ) {
797        let Some(overlay) = overlay_manager else {
798            return;
799        };
800        let guard = overlay.read().await;
801        match attachment {
802            OverlayAttachment::Pid(pid) => {
803                if let Err(e) = guard.detach_container(*pid).await {
804                    warn!(pid = pid, error = %e, "failed to detach job container from overlay (veth/IP may leak)");
805                }
806            }
807            OverlayAttachment::Guest(id) => {
808                if let Err(e) = guard.detach_container_guest(id).await {
809                    warn!(id = %id, error = %e, "failed to detach job guest from overlay");
810                }
811            }
812            OverlayAttachment::HostShared(id) => {
813                if let Err(e) = runtime.detach_overlay_ip(container_id).await {
814                    warn!(container = %container_id, error = %e, "failed to stop host-shared overlay forwarders for job");
815                }
816                if let Err(e) = guard.detach_container_host_shared(id).await {
817                    warn!(id = %id, error = %e, "failed to detach host-shared job container from overlay");
818                }
819            }
820            OverlayAttachment::None => {}
821        }
822    }
823
824    /// Windows: overlay teardown happens at container-remove time inside
825    /// overlayd, so there is no explicit job detach step.
826    #[cfg(target_os = "windows")]
827    #[allow(clippy::unused_async)] // signature must match the Linux detach for the shared call site
828    async fn detach_overlay(
829        _overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
830        _runtime: &(dyn Runtime + Send + Sync),
831        _container_id: &ContainerId,
832        _attachment: &OverlayAttachment,
833    ) {
834    }
835
836    /// Tear down the shared per-job overlay bridge segment created by
837    /// `attach_overlay`'s `setup_service_overlay(job_name, mode)`. `detach_overlay`
838    /// only reclaims the container's veth/IP — without this the per-job bridge
839    /// (`zl-<deployment>-<instance>-<jobname>-b`) leaks one segment per execution.
840    #[cfg(not(target_os = "windows"))]
841    async fn maybe_teardown_job_segment(
842        overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
843        executions: &Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
844        exec_id: &JobExecutionId,
845        job_name: &str,
846    ) {
847        let Some(overlay) = overlay_manager else {
848            return;
849        };
850        // Only tear down the shared per-job bridge when NO OTHER execution of the
851        // same job is still active — concurrent/overlapping cron runs share the
852        // bridge by job_name, so tearing it down under a live sibling would cut
853        // its network. This execution's own record is already terminal here.
854        if Self::other_active_execution_exists(executions, exec_id, job_name).await {
855            return;
856        }
857        let guard = overlay.read().await;
858        guard.teardown_service_overlay(job_name).await;
859    }
860
861    /// Windows: overlay teardown happens at container-remove time inside
862    /// overlayd, so there is no explicit per-job segment teardown step.
863    #[cfg(target_os = "windows")]
864    #[allow(clippy::unused_async)] // signature parity with the Linux path; Windows tears down at container-remove inside overlayd
865    async fn maybe_teardown_job_segment(
866        _overlay_manager: Option<&Arc<RwLock<OverlayManager>>>,
867        _executions: &Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
868        _exec_id: &JobExecutionId,
869        _job_name: &str,
870    ) {
871    }
872
873    /// Whether any execution OTHER than `exec_id` is still active (pending /
874    /// initializing / running) for the same `job_name`. Used to gate per-job
875    /// bridge teardown so an overlapping cron sibling isn't cut off.
876    #[cfg(not(target_os = "windows"))]
877    async fn other_active_execution_exists(
878        executions: &Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
879        exec_id: &JobExecutionId,
880        job_name: &str,
881    ) -> bool {
882        let execs = executions.read().await;
883        execs.values().any(|e| {
884            &e.id != exec_id
885                && e.job_name == job_name
886                && matches!(
887                    e.status,
888                    JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
889                )
890        })
891    }
892
893    /// Revoke a job container's scoped token (best-effort). The jti matches the
894    /// deterministic `container:<service>:<service>-<replica>` string the runtime
895    /// minted under, where `service == container_id.service` (`job-<job_name>`).
896    async fn revoke_token(
897        token_sink: Option<&Arc<dyn crate::auth::ContainerTokenSink>>,
898        container_id: &ContainerId,
899    ) {
900        if let Some(sink) = token_sink {
901            sink.revoke(&format!(
902                "container:{}:{}-{}",
903                container_id.service, container_id.service, container_id.replica
904            ))
905            .await;
906        }
907    }
908
909    async fn update_status<F>(
910        executions: &RwLock<HashMap<JobExecutionId, JobExecution>>,
911        exec_id: &JobExecutionId,
912        f: F,
913    ) where
914        F: FnOnce(&mut JobExecution),
915    {
916        let mut execs = executions.write().await;
917        if let Some(exec) = execs.get_mut(exec_id) {
918            f(exec);
919        }
920    }
921
922    /// Get the status of a job execution
923    pub async fn get_execution(&self, exec_id: &JobExecutionId) -> Option<JobExecution> {
924        let executions = self.executions.read().await;
925        executions.get(exec_id).cloned()
926    }
927
928    /// List all executions for a job
929    pub async fn list_executions(&self, job_name: &str) -> Vec<JobExecution> {
930        let executions = self.executions.read().await;
931        executions
932            .values()
933            .filter(|e| e.job_name == job_name)
934            .cloned()
935            .collect()
936    }
937
938    /// List all executions (across all jobs)
939    pub async fn list_all_executions(&self) -> Vec<JobExecution> {
940        let executions = self.executions.read().await;
941        executions.values().cloned().collect()
942    }
943
944    /// Cancel a running job execution
945    ///
946    /// # Errors
947    /// Returns an error if the execution is not found or not in a cancellable state.
948    pub async fn cancel(&self, exec_id: &JobExecutionId) -> Result<()> {
949        let mut executions = self.executions.write().await;
950        if let Some(execution) = executions.get_mut(exec_id) {
951            if matches!(
952                execution.status,
953                JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
954            ) {
955                if let Some(ref container_id) = execution.container_id {
956                    self.runtime
957                        .stop_container(container_id, Duration::from_secs(10))
958                        .await?;
959                    self.runtime.remove_container(container_id).await?;
960                    Self::revoke_token(self.token_sink.as_ref(), container_id).await;
961                }
962                execution.status = JobStatus::Cancelled;
963                execution.completed_at = Some(Instant::now());
964                info!(
965                    job = %execution.job_name,
966                    execution_id = %exec_id,
967                    "Job execution cancelled"
968                );
969            }
970        }
971        Ok(())
972    }
973
974    /// Clean up old execution records
975    pub async fn cleanup_old_executions(&self) {
976        let now = Instant::now();
977        let mut executions = self.executions.write().await;
978        let before_count = executions.len();
979        executions.retain(|_, exec| match exec.completed_at {
980            Some(completed) => now.duration_since(completed) < self.config.retention,
981            None => true, // Keep running executions
982        });
983        let removed = before_count - executions.len();
984        if removed > 0 {
985            debug!(removed = removed, "Cleaned up old job execution records");
986        }
987    }
988
989    /// Signal shutdown
990    pub fn shutdown(&self) {
991        self.shutdown.store(true, Ordering::Relaxed);
992    }
993
994    /// Check if executor is shutting down
995    pub fn is_shutting_down(&self) -> bool {
996        self.shutdown.load(Ordering::Relaxed)
997    }
998
999    /// Get the number of active (non-completed) executions
1000    pub async fn active_execution_count(&self) -> usize {
1001        let executions = self.executions.read().await;
1002        executions
1003            .values()
1004            .filter(|e| {
1005                matches!(
1006                    e.status,
1007                    JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
1008                )
1009            })
1010            .count()
1011    }
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016    use super::*;
1017    use crate::runtime::MockRuntime;
1018
1019    fn mock_job_spec() -> ServiceSpec {
1020        use zlayer_spec::*;
1021        serde_yaml::from_str::<DeploymentSpec>(
1022            r"
1023version: v1
1024deployment: test
1025services:
1026  backup:
1027    rtype: job
1028    image:
1029      name: backup:latest
1030",
1031        )
1032        .unwrap()
1033        .services
1034        .remove("backup")
1035        .unwrap()
1036    }
1037
1038    #[tokio::test]
1039    async fn test_job_execution_id() {
1040        let id1 = JobExecutionId::new();
1041        let id2 = JobExecutionId::new();
1042        assert_ne!(id1, id2);
1043        assert!(!id1.0.is_empty());
1044    }
1045
1046    #[tokio::test]
1047    async fn test_job_executor_trigger() {
1048        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1049        let executor = JobExecutor::new(runtime);
1050
1051        let spec = mock_job_spec();
1052        let exec_id = executor
1053            .trigger("backup", &spec, JobTrigger::Cli)
1054            .await
1055            .unwrap();
1056
1057        // Give the job a moment to start
1058        tokio::time::sleep(Duration::from_millis(50)).await;
1059
1060        let execution = executor.get_execution(&exec_id).await;
1061        assert!(execution.is_some());
1062
1063        let exec = execution.unwrap();
1064        assert_eq!(exec.job_name, "backup");
1065        assert!(matches!(exec.trigger, JobTrigger::Cli));
1066    }
1067
1068    #[tokio::test]
1069    async fn test_job_executor_list_executions() {
1070        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1071        let executor = JobExecutor::new(runtime);
1072
1073        let spec = mock_job_spec();
1074
1075        // Trigger multiple executions
1076        executor
1077            .trigger("backup", &spec, JobTrigger::Cli)
1078            .await
1079            .unwrap();
1080        executor
1081            .trigger("backup", &spec, JobTrigger::Scheduler)
1082            .await
1083            .unwrap();
1084
1085        tokio::time::sleep(Duration::from_millis(50)).await;
1086
1087        let executions = executor.list_executions("backup").await;
1088        assert_eq!(executions.len(), 2);
1089    }
1090
1091    #[tokio::test]
1092    async fn test_job_executor_register_spec() {
1093        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
1094        let executor = JobExecutor::new(runtime);
1095
1096        let spec = mock_job_spec();
1097        executor.register_job("backup", spec.clone()).await;
1098
1099        let retrieved = executor.get_job_spec("backup").await;
1100        assert!(retrieved.is_some());
1101        assert_eq!(retrieved.unwrap().image.name, spec.image.name);
1102    }
1103
1104    #[cfg(not(target_os = "windows"))]
1105    fn job_exec(id: &str, job_name: &str, status: JobStatus) -> JobExecution {
1106        JobExecution {
1107            id: JobExecutionId(id.to_string()),
1108            job_name: job_name.to_string(),
1109            status,
1110            started_at: None,
1111            completed_at: None,
1112            container_id: None,
1113            logs: None,
1114            trigger: JobTrigger::Cli,
1115        }
1116    }
1117
1118    #[cfg(not(target_os = "windows"))]
1119    #[tokio::test]
1120    async fn test_other_active_execution_exists() {
1121        use std::collections::HashMap;
1122
1123        let current = JobExecutionId("current".to_string());
1124
1125        // (a) current terminal + sibling same-job Running -> true
1126        {
1127            let mut map: HashMap<JobExecutionId, JobExecution> = HashMap::new();
1128            map.insert(
1129                current.clone(),
1130                job_exec(
1131                    "current",
1132                    "backup",
1133                    JobStatus::Completed {
1134                        exit_code: 0,
1135                        duration: Duration::from_secs(1),
1136                    },
1137                ),
1138            );
1139            map.insert(
1140                JobExecutionId("sibling".to_string()),
1141                job_exec("sibling", "backup", JobStatus::Running),
1142            );
1143            let executions = Arc::new(RwLock::new(map));
1144            assert!(
1145                JobExecutor::other_active_execution_exists(&executions, &current, "backup").await
1146            );
1147        }
1148
1149        // (b) current terminal + sibling same-job Completed -> false
1150        {
1151            let mut map: HashMap<JobExecutionId, JobExecution> = HashMap::new();
1152            map.insert(
1153                current.clone(),
1154                job_exec(
1155                    "current",
1156                    "backup",
1157                    JobStatus::Failed {
1158                        reason: "boom".into(),
1159                        exit_code: Some(1),
1160                    },
1161                ),
1162            );
1163            map.insert(
1164                JobExecutionId("sibling".to_string()),
1165                job_exec(
1166                    "sibling",
1167                    "backup",
1168                    JobStatus::Completed {
1169                        exit_code: 0,
1170                        duration: Duration::from_secs(1),
1171                    },
1172                ),
1173            );
1174            let executions = Arc::new(RwLock::new(map));
1175            assert!(
1176                !JobExecutor::other_active_execution_exists(&executions, &current, "backup").await
1177            );
1178        }
1179
1180        // (c) a Running execution of a DIFFERENT job_name -> false
1181        {
1182            let mut map: HashMap<JobExecutionId, JobExecution> = HashMap::new();
1183            map.insert(
1184                current.clone(),
1185                job_exec(
1186                    "current",
1187                    "backup",
1188                    JobStatus::Completed {
1189                        exit_code: 0,
1190                        duration: Duration::from_secs(1),
1191                    },
1192                ),
1193            );
1194            map.insert(
1195                JobExecutionId("other".to_string()),
1196                job_exec("other", "restore", JobStatus::Running),
1197            );
1198            let executions = Arc::new(RwLock::new(map));
1199            assert!(
1200                !JobExecutor::other_active_execution_exists(&executions, &current, "backup").await
1201            );
1202        }
1203    }
1204
1205    #[tokio::test]
1206    async fn test_job_status_display() {
1207        assert_eq!(format!("{}", JobStatus::Pending), "pending");
1208        assert_eq!(format!("{}", JobStatus::Running), "running");
1209        assert_eq!(
1210            format!(
1211                "{}",
1212                JobStatus::Completed {
1213                    exit_code: 0,
1214                    duration: Duration::from_secs(10)
1215                }
1216            ),
1217            "completed(0)"
1218        );
1219        assert_eq!(
1220            format!(
1221                "{}",
1222                JobStatus::Failed {
1223                    reason: "error".into(),
1224                    exit_code: Some(1)
1225                }
1226            ),
1227            "failed(1)"
1228        );
1229    }
1230}