Skip to main content

zlayer_agent/
job.rs

1//! Job execution engine - run-to-completion container lifecycle
2//!
3//! This module provides the `JobExecutor` which handles run-to-completion
4//! workloads (jobs and cron jobs). Unlike services which run indefinitely,
5//! jobs run to completion and track their exit status.
6
7use crate::error::{AgentError, Result};
8use crate::init::InitOrchestrator;
9use crate::runtime::{ContainerId, Runtime};
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17use zlayer_spec::ServiceSpec;
18
19/// Unique identifier for a job execution
20#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21pub struct JobExecutionId(pub String);
22
23impl JobExecutionId {
24    /// Create a new random execution ID
25    #[must_use]
26    pub fn new() -> Self {
27        Self(Uuid::new_v4().to_string())
28    }
29}
30
31impl Default for JobExecutionId {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl std::fmt::Display for JobExecutionId {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "{}", self.0)
40    }
41}
42
43/// Status of a job execution
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum JobStatus {
46    /// Job is queued, waiting to start
47    Pending,
48    /// Init steps are running
49    Initializing,
50    /// Main container is running
51    Running,
52    /// Job completed successfully
53    Completed { exit_code: i32, duration: Duration },
54    /// Job failed
55    Failed {
56        reason: String,
57        exit_code: Option<i32>,
58    },
59    /// Job was cancelled
60    Cancelled,
61}
62
63impl std::fmt::Display for JobStatus {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            JobStatus::Pending => write!(f, "pending"),
67            JobStatus::Initializing => write!(f, "initializing"),
68            JobStatus::Running => write!(f, "running"),
69            JobStatus::Completed { exit_code, .. } => write!(f, "completed({exit_code})"),
70            JobStatus::Failed { exit_code, .. } => {
71                if let Some(code) = exit_code {
72                    write!(f, "failed({code})")
73                } else {
74                    write!(f, "failed")
75                }
76            }
77            JobStatus::Cancelled => write!(f, "cancelled"),
78        }
79    }
80}
81
82/// How the job was triggered
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum JobTrigger {
85    /// Triggered via HTTP endpoint
86    Endpoint { remote_addr: Option<String> },
87    /// Triggered via CLI
88    Cli,
89    /// Triggered by cron scheduler
90    Scheduler,
91    /// Triggered by internal system (dependency, etc.)
92    Internal { reason: String },
93}
94
95impl std::fmt::Display for JobTrigger {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        match self {
98            JobTrigger::Endpoint { remote_addr } => {
99                if let Some(addr) = remote_addr {
100                    write!(f, "endpoint({addr})")
101                } else {
102                    write!(f, "endpoint")
103                }
104            }
105            JobTrigger::Cli => write!(f, "cli"),
106            JobTrigger::Scheduler => write!(f, "scheduler"),
107            JobTrigger::Internal { reason } => write!(f, "internal({reason})"),
108        }
109    }
110}
111
112/// A single job execution record
113#[derive(Debug, Clone)]
114pub struct JobExecution {
115    pub id: JobExecutionId,
116    pub job_name: String,
117    pub status: JobStatus,
118    pub started_at: Option<Instant>,
119    pub completed_at: Option<Instant>,
120    pub container_id: Option<ContainerId>,
121    /// Captured stdout/stderr (limited to last N bytes)
122    pub logs: Option<String>,
123    /// Trigger source (endpoint, cli, scheduler, etc.)
124    pub trigger: JobTrigger,
125}
126
127/// Configuration for the job executor
128#[derive(Debug, Clone)]
129pub struct JobExecutorConfig {
130    /// Maximum concurrent job executions per job name
131    pub max_concurrent: usize,
132    /// How long to retain completed job records
133    pub retention: Duration,
134    /// Maximum log size to capture (in bytes)
135    pub max_log_size: usize,
136}
137
138impl Default for JobExecutorConfig {
139    fn default() -> Self {
140        Self {
141            max_concurrent: 10,
142            retention: Duration::from_secs(3600), // 1 hour
143            max_log_size: 1024 * 1024,            // 1 MB
144        }
145    }
146}
147
148/// Job executor handles run-to-completion workloads
149pub struct JobExecutor {
150    runtime: Arc<dyn Runtime + Send + Sync>,
151    /// Active and recent job executions
152    executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
153    /// Job specs (for jobs that need to be stored)
154    job_specs: Arc<RwLock<HashMap<String, ServiceSpec>>>,
155    /// Configuration
156    config: JobExecutorConfig,
157    /// Shutdown flag
158    shutdown: AtomicBool,
159}
160
161impl JobExecutor {
162    /// Create a new job executor with default configuration
163    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
164        Self::with_config(runtime, JobExecutorConfig::default())
165    }
166
167    /// Create a new job executor with custom configuration
168    pub fn with_config(runtime: Arc<dyn Runtime + Send + Sync>, config: JobExecutorConfig) -> Self {
169        Self {
170            runtime,
171            executions: Arc::new(RwLock::new(HashMap::new())),
172            job_specs: Arc::new(RwLock::new(HashMap::new())),
173            config,
174            shutdown: AtomicBool::new(false),
175        }
176    }
177
178    /// Register a job spec (for later triggering)
179    pub async fn register_job(&self, name: &str, spec: ServiceSpec) {
180        let mut specs = self.job_specs.write().await;
181        specs.insert(name.to_string(), spec);
182        info!(job = %name, "Registered job spec");
183    }
184
185    /// Unregister a job spec
186    pub async fn unregister_job(&self, name: &str) {
187        let mut specs = self.job_specs.write().await;
188        specs.remove(name);
189        info!(job = %name, "Unregistered job spec");
190    }
191
192    /// Get a registered job spec
193    pub async fn get_job_spec(&self, name: &str) -> Option<ServiceSpec> {
194        let specs = self.job_specs.read().await;
195        specs.get(name).cloned()
196    }
197
198    /// Trigger a job execution
199    ///
200    /// # Errors
201    /// Returns an error if the job container cannot be created or started.
202    pub async fn trigger(
203        &self,
204        job_name: &str,
205        spec: &ServiceSpec,
206        trigger: JobTrigger,
207    ) -> Result<JobExecutionId> {
208        if self.shutdown.load(Ordering::Relaxed) {
209            return Err(AgentError::Internal("Job executor is shutting down".into()));
210        }
211
212        let exec_id = JobExecutionId::new();
213
214        info!(
215            job = %job_name,
216            execution_id = %exec_id,
217            trigger = %trigger,
218            "Triggering job execution"
219        );
220
221        // Create execution record
222        let execution = JobExecution {
223            id: exec_id.clone(),
224            job_name: job_name.to_string(),
225            status: JobStatus::Pending,
226            started_at: None,
227            completed_at: None,
228            container_id: None,
229            logs: None,
230            trigger,
231        };
232
233        // Store execution record
234        {
235            let mut executions = self.executions.write().await;
236            executions.insert(exec_id.clone(), execution);
237        }
238
239        // Spawn the job execution task
240        let runtime = self.runtime.clone();
241        let spec = spec.clone();
242        let exec_id_clone = exec_id.clone();
243        let executions = self.executions.clone();
244        let job_name = job_name.to_string();
245        let max_log_size = self.config.max_log_size;
246
247        tokio::spawn(async move {
248            Self::run_job(
249                runtime,
250                executions,
251                exec_id_clone,
252                &job_name,
253                spec,
254                max_log_size,
255            )
256            .await;
257        });
258
259        Ok(exec_id)
260    }
261
262    /// Internal: Run a job to completion
263    #[allow(clippy::too_many_lines)]
264    async fn run_job(
265        runtime: Arc<dyn Runtime + Send + Sync>,
266        executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
267        exec_id: JobExecutionId,
268        job_name: &str,
269        spec: ServiceSpec,
270        max_log_size: usize,
271    ) {
272        let started = Instant::now();
273
274        // Update status to Initializing
275        Self::update_status(&executions, &exec_id, |exec| {
276            exec.status = JobStatus::Initializing;
277            exec.started_at = Some(started);
278        })
279        .await;
280
281        // Create container ID for this execution
282        // Use a unique replica number based on execution ID hash
283        let replica = exec_id.0.chars().take(8).collect::<String>();
284        let replica_num = u32::from_str_radix(&replica, 16).unwrap_or(0) % 10000;
285        let container_id = ContainerId {
286            service: format!("job-{job_name}"),
287            replica: replica_num,
288        };
289
290        // Store container ID
291        Self::update_status(&executions, &exec_id, |exec| {
292            exec.container_id = Some(container_id.clone());
293        })
294        .await;
295
296        debug!(
297            job = %job_name,
298            execution_id = %exec_id,
299            container_id = %container_id,
300            "Creating job container"
301        );
302
303        // Pull image
304        if let Err(e) = runtime
305            .pull_image_with_policy(&spec.image.name, spec.image.pull_policy, None)
306            .await
307        {
308            error!(
309                job = %job_name,
310                execution_id = %exec_id,
311                error = %e,
312                "Image pull failed"
313            );
314            Self::update_status(&executions, &exec_id, |exec| {
315                exec.status = JobStatus::Failed {
316                    reason: format!("Image pull failed: {e}"),
317                    exit_code: None,
318                };
319                exec.completed_at = Some(Instant::now());
320            })
321            .await;
322            return;
323        }
324
325        // Create container
326        if let Err(e) = runtime.create_container(&container_id, &spec).await {
327            let error_msg = e.to_string();
328            error!(
329                job = %job_name,
330                execution_id = %exec_id,
331                error = %error_msg,
332                "Container create failed"
333            );
334            Self::update_status(&executions, &exec_id, |exec| {
335                exec.status = JobStatus::Failed {
336                    reason: format!("Container create failed: {error_msg}"),
337                    exit_code: None,
338                };
339                exec.completed_at = Some(Instant::now());
340            })
341            .await;
342            return;
343        }
344
345        // Run init steps
346        let init_orchestrator = InitOrchestrator::new(container_id.clone(), spec.init.clone());
347        if let Err(e) = init_orchestrator.run().await {
348            let error_msg = e.to_string();
349            error!(
350                job = %job_name,
351                execution_id = %exec_id,
352                error = %error_msg,
353                "Init failed"
354            );
355            Self::update_status(&executions, &exec_id, |exec| {
356                exec.status = JobStatus::Failed {
357                    reason: format!("Init failed: {error_msg}"),
358                    exit_code: None,
359                };
360                exec.completed_at = Some(Instant::now());
361            })
362            .await;
363            // Cleanup container
364            let _ = runtime.remove_container(&container_id).await;
365            return;
366        }
367
368        // Update status to Running
369        Self::update_status(&executions, &exec_id, |exec| {
370            exec.status = JobStatus::Running;
371        })
372        .await;
373
374        debug!(
375            job = %job_name,
376            execution_id = %exec_id,
377            "Starting job container"
378        );
379
380        // Start container
381        if let Err(e) = runtime.start_container(&container_id).await {
382            let error_msg = e.to_string();
383            error!(
384                job = %job_name,
385                execution_id = %exec_id,
386                error = %error_msg,
387                "Container start failed"
388            );
389            Self::update_status(&executions, &exec_id, |exec| {
390                exec.status = JobStatus::Failed {
391                    reason: format!("Container start failed: {error_msg}"),
392                    exit_code: None,
393                };
394                exec.completed_at = Some(Instant::now());
395            })
396            .await;
397            let _ = runtime.remove_container(&container_id).await;
398            return;
399        }
400
401        // Wait for container to exit using the runtime's wait_container method
402        let exit_code = runtime.wait_container(&container_id).await;
403        let duration = started.elapsed();
404
405        // Collect logs before cleanup using the runtime's get_logs method
406        let logs = match runtime.get_logs(&container_id).await {
407            Ok(entries) => Some(
408                entries
409                    .iter()
410                    .map(ToString::to_string)
411                    .collect::<Vec<_>>()
412                    .join("\n"),
413            ),
414            Err(e) => {
415                // Fallback to container_logs if get_logs fails
416                match runtime.container_logs(&container_id, max_log_size).await {
417                    Ok(entries) => Some(
418                        entries
419                            .iter()
420                            .map(ToString::to_string)
421                            .collect::<Vec<_>>()
422                            .join("\n"),
423                    ),
424                    Err(e2) => {
425                        warn!(
426                            job = %job_name,
427                            execution_id = %exec_id,
428                            error = %e,
429                            fallback_error = %e2,
430                            "Failed to collect logs"
431                        );
432                        None
433                    }
434                }
435            }
436        };
437
438        // Update final status
439        Self::update_status(&executions, &exec_id, |exec| {
440            exec.logs = logs;
441            exec.completed_at = Some(Instant::now());
442
443            match exit_code {
444                Ok(code) => {
445                    if code == 0 {
446                        info!(
447                            job = exec.job_name,
448                            execution_id = %exec.id,
449                            duration_ms = duration.as_millis(),
450                            "Job completed successfully"
451                        );
452                        exec.status = JobStatus::Completed {
453                            exit_code: code,
454                            duration,
455                        };
456                    } else {
457                        warn!(
458                            job = exec.job_name,
459                            execution_id = %exec.id,
460                            exit_code = code,
461                            duration_ms = duration.as_millis(),
462                            "Job failed with non-zero exit code"
463                        );
464                        exec.status = JobStatus::Failed {
465                            reason: format!("Non-zero exit code: {code}"),
466                            exit_code: Some(code),
467                        };
468                    }
469                }
470                Err(err) => {
471                    error!(
472                        job = exec.job_name,
473                        execution_id = %exec.id,
474                        error = %err,
475                        "Job execution error"
476                    );
477                    exec.status = JobStatus::Failed {
478                        reason: err.to_string(),
479                        exit_code: None,
480                    };
481                }
482            }
483        })
484        .await;
485
486        // Cleanup container
487        if let Err(e) = runtime.remove_container(&container_id).await {
488            warn!(
489                job = %job_name,
490                execution_id = %exec_id,
491                error = %e,
492                "Failed to remove job container"
493            );
494        }
495    }
496
497    async fn update_status<F>(
498        executions: &RwLock<HashMap<JobExecutionId, JobExecution>>,
499        exec_id: &JobExecutionId,
500        f: F,
501    ) where
502        F: FnOnce(&mut JobExecution),
503    {
504        let mut execs = executions.write().await;
505        if let Some(exec) = execs.get_mut(exec_id) {
506            f(exec);
507        }
508    }
509
510    /// Get the status of a job execution
511    pub async fn get_execution(&self, exec_id: &JobExecutionId) -> Option<JobExecution> {
512        let executions = self.executions.read().await;
513        executions.get(exec_id).cloned()
514    }
515
516    /// List all executions for a job
517    pub async fn list_executions(&self, job_name: &str) -> Vec<JobExecution> {
518        let executions = self.executions.read().await;
519        executions
520            .values()
521            .filter(|e| e.job_name == job_name)
522            .cloned()
523            .collect()
524    }
525
526    /// List all executions (across all jobs)
527    pub async fn list_all_executions(&self) -> Vec<JobExecution> {
528        let executions = self.executions.read().await;
529        executions.values().cloned().collect()
530    }
531
532    /// Cancel a running job execution
533    ///
534    /// # Errors
535    /// Returns an error if the execution is not found or not in a cancellable state.
536    pub async fn cancel(&self, exec_id: &JobExecutionId) -> Result<()> {
537        let mut executions = self.executions.write().await;
538        if let Some(execution) = executions.get_mut(exec_id) {
539            if matches!(
540                execution.status,
541                JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
542            ) {
543                if let Some(ref container_id) = execution.container_id {
544                    self.runtime
545                        .stop_container(container_id, Duration::from_secs(10))
546                        .await?;
547                    self.runtime.remove_container(container_id).await?;
548                }
549                execution.status = JobStatus::Cancelled;
550                execution.completed_at = Some(Instant::now());
551                info!(
552                    job = %execution.job_name,
553                    execution_id = %exec_id,
554                    "Job execution cancelled"
555                );
556            }
557        }
558        Ok(())
559    }
560
561    /// Clean up old execution records
562    pub async fn cleanup_old_executions(&self) {
563        let now = Instant::now();
564        let mut executions = self.executions.write().await;
565        let before_count = executions.len();
566        executions.retain(|_, exec| match exec.completed_at {
567            Some(completed) => now.duration_since(completed) < self.config.retention,
568            None => true, // Keep running executions
569        });
570        let removed = before_count - executions.len();
571        if removed > 0 {
572            debug!(removed = removed, "Cleaned up old job execution records");
573        }
574    }
575
576    /// Signal shutdown
577    pub fn shutdown(&self) {
578        self.shutdown.store(true, Ordering::Relaxed);
579    }
580
581    /// Check if executor is shutting down
582    pub fn is_shutting_down(&self) -> bool {
583        self.shutdown.load(Ordering::Relaxed)
584    }
585
586    /// Get the number of active (non-completed) executions
587    pub async fn active_execution_count(&self) -> usize {
588        let executions = self.executions.read().await;
589        executions
590            .values()
591            .filter(|e| {
592                matches!(
593                    e.status,
594                    JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
595                )
596            })
597            .count()
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604    use crate::runtime::MockRuntime;
605
606    fn mock_job_spec() -> ServiceSpec {
607        use zlayer_spec::*;
608        serde_yaml::from_str::<DeploymentSpec>(
609            r"
610version: v1
611deployment: test
612services:
613  backup:
614    rtype: job
615    image:
616      name: backup:latest
617",
618        )
619        .unwrap()
620        .services
621        .remove("backup")
622        .unwrap()
623    }
624
625    #[tokio::test]
626    async fn test_job_execution_id() {
627        let id1 = JobExecutionId::new();
628        let id2 = JobExecutionId::new();
629        assert_ne!(id1, id2);
630        assert!(!id1.0.is_empty());
631    }
632
633    #[tokio::test]
634    async fn test_job_executor_trigger() {
635        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
636        let executor = JobExecutor::new(runtime);
637
638        let spec = mock_job_spec();
639        let exec_id = executor
640            .trigger("backup", &spec, JobTrigger::Cli)
641            .await
642            .unwrap();
643
644        // Give the job a moment to start
645        tokio::time::sleep(Duration::from_millis(50)).await;
646
647        let execution = executor.get_execution(&exec_id).await;
648        assert!(execution.is_some());
649
650        let exec = execution.unwrap();
651        assert_eq!(exec.job_name, "backup");
652        assert!(matches!(exec.trigger, JobTrigger::Cli));
653    }
654
655    #[tokio::test]
656    async fn test_job_executor_list_executions() {
657        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
658        let executor = JobExecutor::new(runtime);
659
660        let spec = mock_job_spec();
661
662        // Trigger multiple executions
663        executor
664            .trigger("backup", &spec, JobTrigger::Cli)
665            .await
666            .unwrap();
667        executor
668            .trigger("backup", &spec, JobTrigger::Scheduler)
669            .await
670            .unwrap();
671
672        tokio::time::sleep(Duration::from_millis(50)).await;
673
674        let executions = executor.list_executions("backup").await;
675        assert_eq!(executions.len(), 2);
676    }
677
678    #[tokio::test]
679    async fn test_job_executor_register_spec() {
680        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
681        let executor = JobExecutor::new(runtime);
682
683        let spec = mock_job_spec();
684        executor.register_job("backup", spec.clone()).await;
685
686        let retrieved = executor.get_job_spec("backup").await;
687        assert!(retrieved.is_some());
688        assert_eq!(retrieved.unwrap().image.name, spec.image.name);
689    }
690
691    #[tokio::test]
692    async fn test_job_status_display() {
693        assert_eq!(format!("{}", JobStatus::Pending), "pending");
694        assert_eq!(format!("{}", JobStatus::Running), "running");
695        assert_eq!(
696            format!(
697                "{}",
698                JobStatus::Completed {
699                    exit_code: 0,
700                    duration: Duration::from_secs(10)
701                }
702            ),
703            "completed(0)"
704        );
705        assert_eq!(
706            format!(
707                "{}",
708                JobStatus::Failed {
709                    reason: "error".into(),
710                    exit_code: Some(1)
711                }
712            ),
713            "failed(1)"
714        );
715    }
716}