Skip to main content

zlayer_agent/
job.rs

1//! Job execution engine - run-to-completion container lifecycle
2//!
3//! This module provides the `JobExecutor` which handles run-to-completion
4//! workloads (jobs and cron jobs). Unlike services which run indefinitely,
5//! jobs run to completion and track their exit status.
6
7use crate::error::{AgentError, Result};
8use crate::init::InitOrchestrator;
9use crate::runtime::{ContainerId, Runtime};
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17use zlayer_spec::ServiceSpec;
18
19/// Unique identifier for a job execution
20#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21pub struct JobExecutionId(pub String);
22
23impl JobExecutionId {
24    /// Create a new random execution ID
25    #[must_use]
26    pub fn new() -> Self {
27        Self(Uuid::new_v4().to_string())
28    }
29}
30
31impl Default for JobExecutionId {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl std::fmt::Display for JobExecutionId {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "{}", self.0)
40    }
41}
42
43/// Status of a job execution
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum JobStatus {
46    /// Job is queued, waiting to start
47    Pending,
48    /// Init steps are running
49    Initializing,
50    /// Main container is running
51    Running,
52    /// Job completed successfully
53    Completed { exit_code: i32, duration: Duration },
54    /// Job failed
55    Failed {
56        reason: String,
57        exit_code: Option<i32>,
58    },
59    /// Job was cancelled
60    Cancelled,
61}
62
63impl std::fmt::Display for JobStatus {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            JobStatus::Pending => write!(f, "pending"),
67            JobStatus::Initializing => write!(f, "initializing"),
68            JobStatus::Running => write!(f, "running"),
69            JobStatus::Completed { exit_code, .. } => write!(f, "completed({exit_code})"),
70            JobStatus::Failed { exit_code, .. } => {
71                if let Some(code) = exit_code {
72                    write!(f, "failed({code})")
73                } else {
74                    write!(f, "failed")
75                }
76            }
77            JobStatus::Cancelled => write!(f, "cancelled"),
78        }
79    }
80}
81
82/// How the job was triggered
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum JobTrigger {
85    /// Triggered via HTTP endpoint
86    Endpoint { remote_addr: Option<String> },
87    /// Triggered via CLI
88    Cli,
89    /// Triggered by cron scheduler
90    Scheduler,
91    /// Triggered by internal system (dependency, etc.)
92    Internal { reason: String },
93}
94
95impl std::fmt::Display for JobTrigger {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        match self {
98            JobTrigger::Endpoint { remote_addr } => {
99                if let Some(addr) = remote_addr {
100                    write!(f, "endpoint({addr})")
101                } else {
102                    write!(f, "endpoint")
103                }
104            }
105            JobTrigger::Cli => write!(f, "cli"),
106            JobTrigger::Scheduler => write!(f, "scheduler"),
107            JobTrigger::Internal { reason } => write!(f, "internal({reason})"),
108        }
109    }
110}
111
112/// A single job execution record
113#[derive(Debug, Clone)]
114pub struct JobExecution {
115    pub id: JobExecutionId,
116    pub job_name: String,
117    pub status: JobStatus,
118    pub started_at: Option<Instant>,
119    pub completed_at: Option<Instant>,
120    pub container_id: Option<ContainerId>,
121    /// Captured stdout/stderr (limited to last N bytes)
122    pub logs: Option<String>,
123    /// Trigger source (endpoint, cli, scheduler, etc.)
124    pub trigger: JobTrigger,
125}
126
127/// Configuration for the job executor
128#[derive(Debug, Clone)]
129pub struct JobExecutorConfig {
130    /// Maximum concurrent job executions per job name
131    pub max_concurrent: usize,
132    /// How long to retain completed job records
133    pub retention: Duration,
134    /// Maximum log size to capture (in bytes)
135    pub max_log_size: usize,
136}
137
138impl Default for JobExecutorConfig {
139    fn default() -> Self {
140        Self {
141            max_concurrent: 10,
142            retention: Duration::from_secs(3600), // 1 hour
143            max_log_size: 1024 * 1024,            // 1 MB
144        }
145    }
146}
147
148/// Job executor handles run-to-completion workloads
149pub struct JobExecutor {
150    runtime: Arc<dyn Runtime + Send + Sync>,
151    /// Active and recent job executions
152    executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
153    /// Job specs (for jobs that need to be stored)
154    job_specs: Arc<RwLock<HashMap<String, ServiceSpec>>>,
155    /// Configuration
156    config: JobExecutorConfig,
157    /// Shutdown flag
158    shutdown: AtomicBool,
159}
160
161impl JobExecutor {
162    /// Create a new job executor with default configuration
163    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
164        Self::with_config(runtime, JobExecutorConfig::default())
165    }
166
167    /// Create a new job executor with custom configuration
168    pub fn with_config(runtime: Arc<dyn Runtime + Send + Sync>, config: JobExecutorConfig) -> Self {
169        Self {
170            runtime,
171            executions: Arc::new(RwLock::new(HashMap::new())),
172            job_specs: Arc::new(RwLock::new(HashMap::new())),
173            config,
174            shutdown: AtomicBool::new(false),
175        }
176    }
177
178    /// Register a job spec (for later triggering)
179    pub async fn register_job(&self, name: &str, spec: ServiceSpec) {
180        let mut specs = self.job_specs.write().await;
181        specs.insert(name.to_string(), spec);
182        info!(job = %name, "Registered job spec");
183    }
184
185    /// Unregister a job spec
186    pub async fn unregister_job(&self, name: &str) {
187        let mut specs = self.job_specs.write().await;
188        specs.remove(name);
189        info!(job = %name, "Unregistered job spec");
190    }
191
192    /// Get a registered job spec
193    pub async fn get_job_spec(&self, name: &str) -> Option<ServiceSpec> {
194        let specs = self.job_specs.read().await;
195        specs.get(name).cloned()
196    }
197
198    /// Trigger a job execution
199    ///
200    /// # Errors
201    /// Returns an error if the job container cannot be created or started.
202    pub async fn trigger(
203        &self,
204        job_name: &str,
205        spec: &ServiceSpec,
206        trigger: JobTrigger,
207    ) -> Result<JobExecutionId> {
208        if self.shutdown.load(Ordering::Relaxed) {
209            return Err(AgentError::Internal("Job executor is shutting down".into()));
210        }
211
212        let exec_id = JobExecutionId::new();
213
214        info!(
215            job = %job_name,
216            execution_id = %exec_id,
217            trigger = %trigger,
218            "Triggering job execution"
219        );
220
221        // Create execution record
222        let execution = JobExecution {
223            id: exec_id.clone(),
224            job_name: job_name.to_string(),
225            status: JobStatus::Pending,
226            started_at: None,
227            completed_at: None,
228            container_id: None,
229            logs: None,
230            trigger,
231        };
232
233        // Store execution record
234        {
235            let mut executions = self.executions.write().await;
236            executions.insert(exec_id.clone(), execution);
237        }
238
239        // Spawn the job execution task
240        let runtime = self.runtime.clone();
241        let spec = spec.clone();
242        let exec_id_clone = exec_id.clone();
243        let executions = self.executions.clone();
244        let job_name = job_name.to_string();
245        let max_log_size = self.config.max_log_size;
246
247        tokio::spawn(async move {
248            Self::run_job(
249                runtime,
250                executions,
251                exec_id_clone,
252                &job_name,
253                spec,
254                max_log_size,
255            )
256            .await;
257        });
258
259        Ok(exec_id)
260    }
261
262    /// Internal: Run a job to completion
263    #[allow(clippy::too_many_lines)]
264    async fn run_job(
265        runtime: Arc<dyn Runtime + Send + Sync>,
266        executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
267        exec_id: JobExecutionId,
268        job_name: &str,
269        spec: ServiceSpec,
270        max_log_size: usize,
271    ) {
272        let started = Instant::now();
273
274        // Update status to Initializing
275        Self::update_status(&executions, &exec_id, |exec| {
276            exec.status = JobStatus::Initializing;
277            exec.started_at = Some(started);
278        })
279        .await;
280
281        // Create container ID for this execution
282        // Use a unique replica number based on execution ID hash
283        let replica = exec_id.0.chars().take(8).collect::<String>();
284        let replica_num = u32::from_str_radix(&replica, 16).unwrap_or(0) % 10000;
285        let container_id = ContainerId {
286            service: format!("job-{job_name}"),
287            replica: replica_num,
288        };
289
290        // Store container ID
291        Self::update_status(&executions, &exec_id, |exec| {
292            exec.container_id = Some(container_id.clone());
293        })
294        .await;
295
296        debug!(
297            job = %job_name,
298            execution_id = %exec_id,
299            container_id = %container_id,
300            "Creating job container"
301        );
302
303        // Pull image
304        let image_str = spec.image.name.to_string();
305        if let Err(e) = runtime
306            .pull_image_with_policy(&image_str, spec.image.pull_policy, None)
307            .await
308        {
309            error!(
310                job = %job_name,
311                execution_id = %exec_id,
312                error = %e,
313                "Image pull failed"
314            );
315            Self::update_status(&executions, &exec_id, |exec| {
316                exec.status = JobStatus::Failed {
317                    reason: format!("Image pull failed: {e}"),
318                    exit_code: None,
319                };
320                exec.completed_at = Some(Instant::now());
321            })
322            .await;
323            return;
324        }
325
326        // Create container
327        if let Err(e) = runtime.create_container(&container_id, &spec).await {
328            let error_msg = e.to_string();
329            error!(
330                job = %job_name,
331                execution_id = %exec_id,
332                error = %error_msg,
333                "Container create failed"
334            );
335            Self::update_status(&executions, &exec_id, |exec| {
336                exec.status = JobStatus::Failed {
337                    reason: format!("Container create failed: {error_msg}"),
338                    exit_code: None,
339                };
340                exec.completed_at = Some(Instant::now());
341            })
342            .await;
343            return;
344        }
345
346        // Run init steps
347        let init_orchestrator = InitOrchestrator::new(container_id.clone(), spec.init.clone());
348        if let Err(e) = init_orchestrator.run().await {
349            let error_msg = e.to_string();
350            error!(
351                job = %job_name,
352                execution_id = %exec_id,
353                error = %error_msg,
354                "Init failed"
355            );
356            Self::update_status(&executions, &exec_id, |exec| {
357                exec.status = JobStatus::Failed {
358                    reason: format!("Init failed: {error_msg}"),
359                    exit_code: None,
360                };
361                exec.completed_at = Some(Instant::now());
362            })
363            .await;
364            // Cleanup container
365            let _ = runtime.remove_container(&container_id).await;
366            return;
367        }
368
369        // Update status to Running
370        Self::update_status(&executions, &exec_id, |exec| {
371            exec.status = JobStatus::Running;
372        })
373        .await;
374
375        debug!(
376            job = %job_name,
377            execution_id = %exec_id,
378            "Starting job container"
379        );
380
381        // Start container
382        if let Err(e) = runtime.start_container(&container_id).await {
383            let error_msg = e.to_string();
384            error!(
385                job = %job_name,
386                execution_id = %exec_id,
387                error = %error_msg,
388                "Container start failed"
389            );
390            Self::update_status(&executions, &exec_id, |exec| {
391                exec.status = JobStatus::Failed {
392                    reason: format!("Container start failed: {error_msg}"),
393                    exit_code: None,
394                };
395                exec.completed_at = Some(Instant::now());
396            })
397            .await;
398            let _ = runtime.remove_container(&container_id).await;
399            return;
400        }
401
402        // Wait for container to exit using the runtime's wait_container method
403        let exit_code = runtime.wait_container(&container_id).await;
404        let duration = started.elapsed();
405
406        // Collect logs before cleanup using the runtime's get_logs method
407        let logs = match runtime.get_logs(&container_id).await {
408            Ok(entries) => Some(
409                entries
410                    .iter()
411                    .map(ToString::to_string)
412                    .collect::<Vec<_>>()
413                    .join("\n"),
414            ),
415            Err(e) => {
416                // Fallback to container_logs if get_logs fails
417                match runtime.container_logs(&container_id, max_log_size).await {
418                    Ok(entries) => Some(
419                        entries
420                            .iter()
421                            .map(ToString::to_string)
422                            .collect::<Vec<_>>()
423                            .join("\n"),
424                    ),
425                    Err(e2) => {
426                        warn!(
427                            job = %job_name,
428                            execution_id = %exec_id,
429                            error = %e,
430                            fallback_error = %e2,
431                            "Failed to collect logs"
432                        );
433                        None
434                    }
435                }
436            }
437        };
438
439        // Update final status
440        Self::update_status(&executions, &exec_id, |exec| {
441            exec.logs = logs;
442            exec.completed_at = Some(Instant::now());
443
444            match exit_code {
445                Ok(code) => {
446                    if code == 0 {
447                        info!(
448                            job = exec.job_name,
449                            execution_id = %exec.id,
450                            duration_ms = duration.as_millis(),
451                            "Job completed successfully"
452                        );
453                        exec.status = JobStatus::Completed {
454                            exit_code: code,
455                            duration,
456                        };
457                    } else {
458                        warn!(
459                            job = exec.job_name,
460                            execution_id = %exec.id,
461                            exit_code = code,
462                            duration_ms = duration.as_millis(),
463                            "Job failed with non-zero exit code"
464                        );
465                        exec.status = JobStatus::Failed {
466                            reason: format!("Non-zero exit code: {code}"),
467                            exit_code: Some(code),
468                        };
469                    }
470                }
471                Err(err) => {
472                    error!(
473                        job = exec.job_name,
474                        execution_id = %exec.id,
475                        error = %err,
476                        "Job execution error"
477                    );
478                    exec.status = JobStatus::Failed {
479                        reason: err.to_string(),
480                        exit_code: None,
481                    };
482                }
483            }
484        })
485        .await;
486
487        // Cleanup container
488        if let Err(e) = runtime.remove_container(&container_id).await {
489            warn!(
490                job = %job_name,
491                execution_id = %exec_id,
492                error = %e,
493                "Failed to remove job container"
494            );
495        }
496    }
497
498    async fn update_status<F>(
499        executions: &RwLock<HashMap<JobExecutionId, JobExecution>>,
500        exec_id: &JobExecutionId,
501        f: F,
502    ) where
503        F: FnOnce(&mut JobExecution),
504    {
505        let mut execs = executions.write().await;
506        if let Some(exec) = execs.get_mut(exec_id) {
507            f(exec);
508        }
509    }
510
511    /// Get the status of a job execution
512    pub async fn get_execution(&self, exec_id: &JobExecutionId) -> Option<JobExecution> {
513        let executions = self.executions.read().await;
514        executions.get(exec_id).cloned()
515    }
516
517    /// List all executions for a job
518    pub async fn list_executions(&self, job_name: &str) -> Vec<JobExecution> {
519        let executions = self.executions.read().await;
520        executions
521            .values()
522            .filter(|e| e.job_name == job_name)
523            .cloned()
524            .collect()
525    }
526
527    /// List all executions (across all jobs)
528    pub async fn list_all_executions(&self) -> Vec<JobExecution> {
529        let executions = self.executions.read().await;
530        executions.values().cloned().collect()
531    }
532
533    /// Cancel a running job execution
534    ///
535    /// # Errors
536    /// Returns an error if the execution is not found or not in a cancellable state.
537    pub async fn cancel(&self, exec_id: &JobExecutionId) -> Result<()> {
538        let mut executions = self.executions.write().await;
539        if let Some(execution) = executions.get_mut(exec_id) {
540            if matches!(
541                execution.status,
542                JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
543            ) {
544                if let Some(ref container_id) = execution.container_id {
545                    self.runtime
546                        .stop_container(container_id, Duration::from_secs(10))
547                        .await?;
548                    self.runtime.remove_container(container_id).await?;
549                }
550                execution.status = JobStatus::Cancelled;
551                execution.completed_at = Some(Instant::now());
552                info!(
553                    job = %execution.job_name,
554                    execution_id = %exec_id,
555                    "Job execution cancelled"
556                );
557            }
558        }
559        Ok(())
560    }
561
562    /// Clean up old execution records
563    pub async fn cleanup_old_executions(&self) {
564        let now = Instant::now();
565        let mut executions = self.executions.write().await;
566        let before_count = executions.len();
567        executions.retain(|_, exec| match exec.completed_at {
568            Some(completed) => now.duration_since(completed) < self.config.retention,
569            None => true, // Keep running executions
570        });
571        let removed = before_count - executions.len();
572        if removed > 0 {
573            debug!(removed = removed, "Cleaned up old job execution records");
574        }
575    }
576
577    /// Signal shutdown
578    pub fn shutdown(&self) {
579        self.shutdown.store(true, Ordering::Relaxed);
580    }
581
582    /// Check if executor is shutting down
583    pub fn is_shutting_down(&self) -> bool {
584        self.shutdown.load(Ordering::Relaxed)
585    }
586
587    /// Get the number of active (non-completed) executions
588    pub async fn active_execution_count(&self) -> usize {
589        let executions = self.executions.read().await;
590        executions
591            .values()
592            .filter(|e| {
593                matches!(
594                    e.status,
595                    JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
596                )
597            })
598            .count()
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use super::*;
605    use crate::runtime::MockRuntime;
606
607    fn mock_job_spec() -> ServiceSpec {
608        use zlayer_spec::*;
609        serde_yaml::from_str::<DeploymentSpec>(
610            r"
611version: v1
612deployment: test
613services:
614  backup:
615    rtype: job
616    image:
617      name: backup:latest
618",
619        )
620        .unwrap()
621        .services
622        .remove("backup")
623        .unwrap()
624    }
625
626    #[tokio::test]
627    async fn test_job_execution_id() {
628        let id1 = JobExecutionId::new();
629        let id2 = JobExecutionId::new();
630        assert_ne!(id1, id2);
631        assert!(!id1.0.is_empty());
632    }
633
634    #[tokio::test]
635    async fn test_job_executor_trigger() {
636        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
637        let executor = JobExecutor::new(runtime);
638
639        let spec = mock_job_spec();
640        let exec_id = executor
641            .trigger("backup", &spec, JobTrigger::Cli)
642            .await
643            .unwrap();
644
645        // Give the job a moment to start
646        tokio::time::sleep(Duration::from_millis(50)).await;
647
648        let execution = executor.get_execution(&exec_id).await;
649        assert!(execution.is_some());
650
651        let exec = execution.unwrap();
652        assert_eq!(exec.job_name, "backup");
653        assert!(matches!(exec.trigger, JobTrigger::Cli));
654    }
655
656    #[tokio::test]
657    async fn test_job_executor_list_executions() {
658        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
659        let executor = JobExecutor::new(runtime);
660
661        let spec = mock_job_spec();
662
663        // Trigger multiple executions
664        executor
665            .trigger("backup", &spec, JobTrigger::Cli)
666            .await
667            .unwrap();
668        executor
669            .trigger("backup", &spec, JobTrigger::Scheduler)
670            .await
671            .unwrap();
672
673        tokio::time::sleep(Duration::from_millis(50)).await;
674
675        let executions = executor.list_executions("backup").await;
676        assert_eq!(executions.len(), 2);
677    }
678
679    #[tokio::test]
680    async fn test_job_executor_register_spec() {
681        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
682        let executor = JobExecutor::new(runtime);
683
684        let spec = mock_job_spec();
685        executor.register_job("backup", spec.clone()).await;
686
687        let retrieved = executor.get_job_spec("backup").await;
688        assert!(retrieved.is_some());
689        assert_eq!(retrieved.unwrap().image.name, spec.image.name);
690    }
691
692    #[tokio::test]
693    async fn test_job_status_display() {
694        assert_eq!(format!("{}", JobStatus::Pending), "pending");
695        assert_eq!(format!("{}", JobStatus::Running), "running");
696        assert_eq!(
697            format!(
698                "{}",
699                JobStatus::Completed {
700                    exit_code: 0,
701                    duration: Duration::from_secs(10)
702                }
703            ),
704            "completed(0)"
705        );
706        assert_eq!(
707            format!(
708                "{}",
709                JobStatus::Failed {
710                    reason: "error".into(),
711                    exit_code: Some(1)
712                }
713            ),
714            "failed(1)"
715        );
716    }
717}