1use crate::error::{AgentError, Result};
8use crate::init::InitOrchestrator;
9use crate::runtime::{ContainerId, Runtime};
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17use zlayer_spec::ServiceSpec;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21pub struct JobExecutionId(pub String);
22
23impl JobExecutionId {
24 #[must_use]
26 pub fn new() -> Self {
27 Self(Uuid::new_v4().to_string())
28 }
29}
30
31impl Default for JobExecutionId {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl std::fmt::Display for JobExecutionId {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "{}", self.0)
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum JobStatus {
46 Pending,
48 Initializing,
50 Running,
52 Completed { exit_code: i32, duration: Duration },
54 Failed {
56 reason: String,
57 exit_code: Option<i32>,
58 },
59 Cancelled,
61}
62
63impl std::fmt::Display for JobStatus {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 JobStatus::Pending => write!(f, "pending"),
67 JobStatus::Initializing => write!(f, "initializing"),
68 JobStatus::Running => write!(f, "running"),
69 JobStatus::Completed { exit_code, .. } => write!(f, "completed({exit_code})"),
70 JobStatus::Failed { exit_code, .. } => {
71 if let Some(code) = exit_code {
72 write!(f, "failed({code})")
73 } else {
74 write!(f, "failed")
75 }
76 }
77 JobStatus::Cancelled => write!(f, "cancelled"),
78 }
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum JobTrigger {
85 Endpoint { remote_addr: Option<String> },
87 Cli,
89 Scheduler,
91 Internal { reason: String },
93}
94
95impl std::fmt::Display for JobTrigger {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 JobTrigger::Endpoint { remote_addr } => {
99 if let Some(addr) = remote_addr {
100 write!(f, "endpoint({addr})")
101 } else {
102 write!(f, "endpoint")
103 }
104 }
105 JobTrigger::Cli => write!(f, "cli"),
106 JobTrigger::Scheduler => write!(f, "scheduler"),
107 JobTrigger::Internal { reason } => write!(f, "internal({reason})"),
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct JobExecution {
115 pub id: JobExecutionId,
116 pub job_name: String,
117 pub status: JobStatus,
118 pub started_at: Option<Instant>,
119 pub completed_at: Option<Instant>,
120 pub container_id: Option<ContainerId>,
121 pub logs: Option<String>,
123 pub trigger: JobTrigger,
125}
126
127#[derive(Debug, Clone)]
129pub struct JobExecutorConfig {
130 pub max_concurrent: usize,
132 pub retention: Duration,
134 pub max_log_size: usize,
136}
137
138impl Default for JobExecutorConfig {
139 fn default() -> Self {
140 Self {
141 max_concurrent: 10,
142 retention: Duration::from_secs(3600), max_log_size: 1024 * 1024, }
145 }
146}
147
148pub struct JobExecutor {
150 runtime: Arc<dyn Runtime + Send + Sync>,
151 executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
153 job_specs: Arc<RwLock<HashMap<String, ServiceSpec>>>,
155 config: JobExecutorConfig,
157 shutdown: AtomicBool,
159}
160
161impl JobExecutor {
162 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
164 Self::with_config(runtime, JobExecutorConfig::default())
165 }
166
167 pub fn with_config(runtime: Arc<dyn Runtime + Send + Sync>, config: JobExecutorConfig) -> Self {
169 Self {
170 runtime,
171 executions: Arc::new(RwLock::new(HashMap::new())),
172 job_specs: Arc::new(RwLock::new(HashMap::new())),
173 config,
174 shutdown: AtomicBool::new(false),
175 }
176 }
177
178 pub async fn register_job(&self, name: &str, spec: ServiceSpec) {
180 let mut specs = self.job_specs.write().await;
181 specs.insert(name.to_string(), spec);
182 info!(job = %name, "Registered job spec");
183 }
184
185 pub async fn unregister_job(&self, name: &str) {
187 let mut specs = self.job_specs.write().await;
188 specs.remove(name);
189 info!(job = %name, "Unregistered job spec");
190 }
191
192 pub async fn get_job_spec(&self, name: &str) -> Option<ServiceSpec> {
194 let specs = self.job_specs.read().await;
195 specs.get(name).cloned()
196 }
197
198 pub async fn trigger(
203 &self,
204 job_name: &str,
205 spec: &ServiceSpec,
206 trigger: JobTrigger,
207 ) -> Result<JobExecutionId> {
208 if self.shutdown.load(Ordering::Relaxed) {
209 return Err(AgentError::Internal("Job executor is shutting down".into()));
210 }
211
212 let exec_id = JobExecutionId::new();
213
214 info!(
215 job = %job_name,
216 execution_id = %exec_id,
217 trigger = %trigger,
218 "Triggering job execution"
219 );
220
221 let execution = JobExecution {
223 id: exec_id.clone(),
224 job_name: job_name.to_string(),
225 status: JobStatus::Pending,
226 started_at: None,
227 completed_at: None,
228 container_id: None,
229 logs: None,
230 trigger,
231 };
232
233 {
235 let mut executions = self.executions.write().await;
236 executions.insert(exec_id.clone(), execution);
237 }
238
239 let runtime = self.runtime.clone();
241 let spec = spec.clone();
242 let exec_id_clone = exec_id.clone();
243 let executions = self.executions.clone();
244 let job_name = job_name.to_string();
245 let max_log_size = self.config.max_log_size;
246
247 tokio::spawn(async move {
248 Box::pin(Self::run_job(
249 runtime,
250 executions,
251 exec_id_clone,
252 &job_name,
253 spec,
254 max_log_size,
255 ))
256 .await;
257 });
258
259 Ok(exec_id)
260 }
261
262 #[allow(clippy::too_many_lines)]
264 async fn run_job(
265 runtime: Arc<dyn Runtime + Send + Sync>,
266 executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
267 exec_id: JobExecutionId,
268 job_name: &str,
269 spec: ServiceSpec,
270 max_log_size: usize,
271 ) {
272 let started = Instant::now();
273
274 Self::update_status(&executions, &exec_id, |exec| {
276 exec.status = JobStatus::Initializing;
277 exec.started_at = Some(started);
278 })
279 .await;
280
281 let replica = exec_id.0.chars().take(8).collect::<String>();
284 let replica_num = u32::from_str_radix(&replica, 16).unwrap_or(0) % 10000;
285 let container_id = ContainerId::new(format!("job-{job_name}"), replica_num);
286
287 Self::update_status(&executions, &exec_id, |exec| {
289 exec.container_id = Some(container_id.clone());
290 })
291 .await;
292
293 debug!(
294 job = %job_name,
295 execution_id = %exec_id,
296 container_id = %container_id,
297 "Creating job container"
298 );
299
300 let image_str = spec.image.name.to_string();
302 if let Err(e) = runtime
303 .pull_image_with_policy(
304 &image_str,
305 spec.image.pull_policy,
306 None,
307 spec.image.source_policy.unwrap_or_default(),
308 )
309 .await
310 {
311 error!(
312 job = %job_name,
313 execution_id = %exec_id,
314 error = %e,
315 "Image pull failed"
316 );
317 Self::update_status(&executions, &exec_id, |exec| {
318 exec.status = JobStatus::Failed {
319 reason: format!("Image pull failed: {e}"),
320 exit_code: None,
321 };
322 exec.completed_at = Some(Instant::now());
323 })
324 .await;
325 return;
326 }
327
328 if let Err(e) = runtime.create_container(&container_id, &spec).await {
330 let error_msg = e.to_string();
331 error!(
332 job = %job_name,
333 execution_id = %exec_id,
334 error = %error_msg,
335 "Container create failed"
336 );
337 Self::update_status(&executions, &exec_id, |exec| {
338 exec.status = JobStatus::Failed {
339 reason: format!("Container create failed: {error_msg}"),
340 exit_code: None,
341 };
342 exec.completed_at = Some(Instant::now());
343 })
344 .await;
345 return;
346 }
347
348 let init_orchestrator = InitOrchestrator::new(container_id.clone(), spec.init.clone());
350 if let Err(e) = init_orchestrator.run().await {
351 let error_msg = e.to_string();
352 error!(
353 job = %job_name,
354 execution_id = %exec_id,
355 error = %error_msg,
356 "Init failed"
357 );
358 Self::update_status(&executions, &exec_id, |exec| {
359 exec.status = JobStatus::Failed {
360 reason: format!("Init failed: {error_msg}"),
361 exit_code: None,
362 };
363 exec.completed_at = Some(Instant::now());
364 })
365 .await;
366 let _ = runtime.remove_container(&container_id).await;
368 return;
369 }
370
371 Self::update_status(&executions, &exec_id, |exec| {
373 exec.status = JobStatus::Running;
374 })
375 .await;
376
377 debug!(
378 job = %job_name,
379 execution_id = %exec_id,
380 "Starting job container"
381 );
382
383 if let Err(e) = runtime.start_container(&container_id).await {
385 let error_msg = e.to_string();
386 error!(
387 job = %job_name,
388 execution_id = %exec_id,
389 error = %error_msg,
390 "Container start failed"
391 );
392 Self::update_status(&executions, &exec_id, |exec| {
393 exec.status = JobStatus::Failed {
394 reason: format!("Container start failed: {error_msg}"),
395 exit_code: None,
396 };
397 exec.completed_at = Some(Instant::now());
398 })
399 .await;
400 let _ = runtime.remove_container(&container_id).await;
401 return;
402 }
403
404 let exit_code = runtime.wait_container(&container_id).await;
406 let duration = started.elapsed();
407
408 let logs = match runtime.get_logs(&container_id).await {
410 Ok(entries) => Some(
411 entries
412 .iter()
413 .map(ToString::to_string)
414 .collect::<Vec<_>>()
415 .join("\n"),
416 ),
417 Err(e) => {
418 match runtime.container_logs(&container_id, max_log_size).await {
420 Ok(entries) => Some(
421 entries
422 .iter()
423 .map(ToString::to_string)
424 .collect::<Vec<_>>()
425 .join("\n"),
426 ),
427 Err(e2) => {
428 warn!(
429 job = %job_name,
430 execution_id = %exec_id,
431 error = %e,
432 fallback_error = %e2,
433 "Failed to collect logs"
434 );
435 None
436 }
437 }
438 }
439 };
440
441 Self::update_status(&executions, &exec_id, |exec| {
443 exec.logs = logs;
444 exec.completed_at = Some(Instant::now());
445
446 match exit_code {
447 Ok(code) => {
448 if code == 0 {
449 info!(
450 job = exec.job_name,
451 execution_id = %exec.id,
452 duration_ms = duration.as_millis(),
453 "Job completed successfully"
454 );
455 exec.status = JobStatus::Completed {
456 exit_code: code,
457 duration,
458 };
459 } else {
460 warn!(
461 job = exec.job_name,
462 execution_id = %exec.id,
463 exit_code = code,
464 duration_ms = duration.as_millis(),
465 "Job failed with non-zero exit code"
466 );
467 exec.status = JobStatus::Failed {
468 reason: format!("Non-zero exit code: {code}"),
469 exit_code: Some(code),
470 };
471 }
472 }
473 Err(err) => {
474 error!(
475 job = exec.job_name,
476 execution_id = %exec.id,
477 error = %err,
478 "Job execution error"
479 );
480 exec.status = JobStatus::Failed {
481 reason: err.to_string(),
482 exit_code: None,
483 };
484 }
485 }
486 })
487 .await;
488
489 if let Err(e) = runtime.remove_container(&container_id).await {
491 warn!(
492 job = %job_name,
493 execution_id = %exec_id,
494 error = %e,
495 "Failed to remove job container"
496 );
497 }
498 }
499
500 async fn update_status<F>(
501 executions: &RwLock<HashMap<JobExecutionId, JobExecution>>,
502 exec_id: &JobExecutionId,
503 f: F,
504 ) where
505 F: FnOnce(&mut JobExecution),
506 {
507 let mut execs = executions.write().await;
508 if let Some(exec) = execs.get_mut(exec_id) {
509 f(exec);
510 }
511 }
512
513 pub async fn get_execution(&self, exec_id: &JobExecutionId) -> Option<JobExecution> {
515 let executions = self.executions.read().await;
516 executions.get(exec_id).cloned()
517 }
518
519 pub async fn list_executions(&self, job_name: &str) -> Vec<JobExecution> {
521 let executions = self.executions.read().await;
522 executions
523 .values()
524 .filter(|e| e.job_name == job_name)
525 .cloned()
526 .collect()
527 }
528
529 pub async fn list_all_executions(&self) -> Vec<JobExecution> {
531 let executions = self.executions.read().await;
532 executions.values().cloned().collect()
533 }
534
535 pub async fn cancel(&self, exec_id: &JobExecutionId) -> Result<()> {
540 let mut executions = self.executions.write().await;
541 if let Some(execution) = executions.get_mut(exec_id) {
542 if matches!(
543 execution.status,
544 JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
545 ) {
546 if let Some(ref container_id) = execution.container_id {
547 self.runtime
548 .stop_container(container_id, Duration::from_secs(10))
549 .await?;
550 self.runtime.remove_container(container_id).await?;
551 }
552 execution.status = JobStatus::Cancelled;
553 execution.completed_at = Some(Instant::now());
554 info!(
555 job = %execution.job_name,
556 execution_id = %exec_id,
557 "Job execution cancelled"
558 );
559 }
560 }
561 Ok(())
562 }
563
564 pub async fn cleanup_old_executions(&self) {
566 let now = Instant::now();
567 let mut executions = self.executions.write().await;
568 let before_count = executions.len();
569 executions.retain(|_, exec| match exec.completed_at {
570 Some(completed) => now.duration_since(completed) < self.config.retention,
571 None => true, });
573 let removed = before_count - executions.len();
574 if removed > 0 {
575 debug!(removed = removed, "Cleaned up old job execution records");
576 }
577 }
578
579 pub fn shutdown(&self) {
581 self.shutdown.store(true, Ordering::Relaxed);
582 }
583
584 pub fn is_shutting_down(&self) -> bool {
586 self.shutdown.load(Ordering::Relaxed)
587 }
588
589 pub async fn active_execution_count(&self) -> usize {
591 let executions = self.executions.read().await;
592 executions
593 .values()
594 .filter(|e| {
595 matches!(
596 e.status,
597 JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
598 )
599 })
600 .count()
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607 use crate::runtime::MockRuntime;
608
609 fn mock_job_spec() -> ServiceSpec {
610 use zlayer_spec::*;
611 serde_yaml::from_str::<DeploymentSpec>(
612 r"
613version: v1
614deployment: test
615services:
616 backup:
617 rtype: job
618 image:
619 name: backup:latest
620",
621 )
622 .unwrap()
623 .services
624 .remove("backup")
625 .unwrap()
626 }
627
628 #[tokio::test]
629 async fn test_job_execution_id() {
630 let id1 = JobExecutionId::new();
631 let id2 = JobExecutionId::new();
632 assert_ne!(id1, id2);
633 assert!(!id1.0.is_empty());
634 }
635
636 #[tokio::test]
637 async fn test_job_executor_trigger() {
638 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
639 let executor = JobExecutor::new(runtime);
640
641 let spec = mock_job_spec();
642 let exec_id = executor
643 .trigger("backup", &spec, JobTrigger::Cli)
644 .await
645 .unwrap();
646
647 tokio::time::sleep(Duration::from_millis(50)).await;
649
650 let execution = executor.get_execution(&exec_id).await;
651 assert!(execution.is_some());
652
653 let exec = execution.unwrap();
654 assert_eq!(exec.job_name, "backup");
655 assert!(matches!(exec.trigger, JobTrigger::Cli));
656 }
657
658 #[tokio::test]
659 async fn test_job_executor_list_executions() {
660 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
661 let executor = JobExecutor::new(runtime);
662
663 let spec = mock_job_spec();
664
665 executor
667 .trigger("backup", &spec, JobTrigger::Cli)
668 .await
669 .unwrap();
670 executor
671 .trigger("backup", &spec, JobTrigger::Scheduler)
672 .await
673 .unwrap();
674
675 tokio::time::sleep(Duration::from_millis(50)).await;
676
677 let executions = executor.list_executions("backup").await;
678 assert_eq!(executions.len(), 2);
679 }
680
681 #[tokio::test]
682 async fn test_job_executor_register_spec() {
683 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
684 let executor = JobExecutor::new(runtime);
685
686 let spec = mock_job_spec();
687 executor.register_job("backup", spec.clone()).await;
688
689 let retrieved = executor.get_job_spec("backup").await;
690 assert!(retrieved.is_some());
691 assert_eq!(retrieved.unwrap().image.name, spec.image.name);
692 }
693
694 #[tokio::test]
695 async fn test_job_status_display() {
696 assert_eq!(format!("{}", JobStatus::Pending), "pending");
697 assert_eq!(format!("{}", JobStatus::Running), "running");
698 assert_eq!(
699 format!(
700 "{}",
701 JobStatus::Completed {
702 exit_code: 0,
703 duration: Duration::from_secs(10)
704 }
705 ),
706 "completed(0)"
707 );
708 assert_eq!(
709 format!(
710 "{}",
711 JobStatus::Failed {
712 reason: "error".into(),
713 exit_code: Some(1)
714 }
715 ),
716 "failed(1)"
717 );
718 }
719}