1use crate::error::{AgentError, Result};
8use crate::init::InitOrchestrator;
9use crate::runtime::{ContainerId, Runtime};
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17use zlayer_spec::ServiceSpec;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21pub struct JobExecutionId(pub String);
22
23impl JobExecutionId {
24 #[must_use]
26 pub fn new() -> Self {
27 Self(Uuid::new_v4().to_string())
28 }
29}
30
31impl Default for JobExecutionId {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl std::fmt::Display for JobExecutionId {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "{}", self.0)
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum JobStatus {
46 Pending,
48 Initializing,
50 Running,
52 Completed { exit_code: i32, duration: Duration },
54 Failed {
56 reason: String,
57 exit_code: Option<i32>,
58 },
59 Cancelled,
61}
62
63impl std::fmt::Display for JobStatus {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 JobStatus::Pending => write!(f, "pending"),
67 JobStatus::Initializing => write!(f, "initializing"),
68 JobStatus::Running => write!(f, "running"),
69 JobStatus::Completed { exit_code, .. } => write!(f, "completed({exit_code})"),
70 JobStatus::Failed { exit_code, .. } => {
71 if let Some(code) = exit_code {
72 write!(f, "failed({code})")
73 } else {
74 write!(f, "failed")
75 }
76 }
77 JobStatus::Cancelled => write!(f, "cancelled"),
78 }
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum JobTrigger {
85 Endpoint { remote_addr: Option<String> },
87 Cli,
89 Scheduler,
91 Internal { reason: String },
93}
94
95impl std::fmt::Display for JobTrigger {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 JobTrigger::Endpoint { remote_addr } => {
99 if let Some(addr) = remote_addr {
100 write!(f, "endpoint({addr})")
101 } else {
102 write!(f, "endpoint")
103 }
104 }
105 JobTrigger::Cli => write!(f, "cli"),
106 JobTrigger::Scheduler => write!(f, "scheduler"),
107 JobTrigger::Internal { reason } => write!(f, "internal({reason})"),
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct JobExecution {
115 pub id: JobExecutionId,
116 pub job_name: String,
117 pub status: JobStatus,
118 pub started_at: Option<Instant>,
119 pub completed_at: Option<Instant>,
120 pub container_id: Option<ContainerId>,
121 pub logs: Option<String>,
123 pub trigger: JobTrigger,
125}
126
127#[derive(Debug, Clone)]
129pub struct JobExecutorConfig {
130 pub max_concurrent: usize,
132 pub retention: Duration,
134 pub max_log_size: usize,
136}
137
138impl Default for JobExecutorConfig {
139 fn default() -> Self {
140 Self {
141 max_concurrent: 10,
142 retention: Duration::from_secs(3600), max_log_size: 1024 * 1024, }
145 }
146}
147
148pub struct JobExecutor {
150 runtime: Arc<dyn Runtime + Send + Sync>,
151 executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
153 job_specs: Arc<RwLock<HashMap<String, ServiceSpec>>>,
155 config: JobExecutorConfig,
157 shutdown: AtomicBool,
159}
160
161impl JobExecutor {
162 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
164 Self::with_config(runtime, JobExecutorConfig::default())
165 }
166
167 pub fn with_config(runtime: Arc<dyn Runtime + Send + Sync>, config: JobExecutorConfig) -> Self {
169 Self {
170 runtime,
171 executions: Arc::new(RwLock::new(HashMap::new())),
172 job_specs: Arc::new(RwLock::new(HashMap::new())),
173 config,
174 shutdown: AtomicBool::new(false),
175 }
176 }
177
178 pub async fn register_job(&self, name: &str, spec: ServiceSpec) {
180 let mut specs = self.job_specs.write().await;
181 specs.insert(name.to_string(), spec);
182 info!(job = %name, "Registered job spec");
183 }
184
185 pub async fn unregister_job(&self, name: &str) {
187 let mut specs = self.job_specs.write().await;
188 specs.remove(name);
189 info!(job = %name, "Unregistered job spec");
190 }
191
192 pub async fn get_job_spec(&self, name: &str) -> Option<ServiceSpec> {
194 let specs = self.job_specs.read().await;
195 specs.get(name).cloned()
196 }
197
198 pub async fn trigger(
203 &self,
204 job_name: &str,
205 spec: &ServiceSpec,
206 trigger: JobTrigger,
207 ) -> Result<JobExecutionId> {
208 if self.shutdown.load(Ordering::Relaxed) {
209 return Err(AgentError::Internal("Job executor is shutting down".into()));
210 }
211
212 let exec_id = JobExecutionId::new();
213
214 info!(
215 job = %job_name,
216 execution_id = %exec_id,
217 trigger = %trigger,
218 "Triggering job execution"
219 );
220
221 let execution = JobExecution {
223 id: exec_id.clone(),
224 job_name: job_name.to_string(),
225 status: JobStatus::Pending,
226 started_at: None,
227 completed_at: None,
228 container_id: None,
229 logs: None,
230 trigger,
231 };
232
233 {
235 let mut executions = self.executions.write().await;
236 executions.insert(exec_id.clone(), execution);
237 }
238
239 let runtime = self.runtime.clone();
241 let spec = spec.clone();
242 let exec_id_clone = exec_id.clone();
243 let executions = self.executions.clone();
244 let job_name = job_name.to_string();
245 let max_log_size = self.config.max_log_size;
246
247 tokio::spawn(async move {
248 Self::run_job(
249 runtime,
250 executions,
251 exec_id_clone,
252 &job_name,
253 spec,
254 max_log_size,
255 )
256 .await;
257 });
258
259 Ok(exec_id)
260 }
261
262 #[allow(clippy::too_many_lines)]
264 async fn run_job(
265 runtime: Arc<dyn Runtime + Send + Sync>,
266 executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
267 exec_id: JobExecutionId,
268 job_name: &str,
269 spec: ServiceSpec,
270 max_log_size: usize,
271 ) {
272 let started = Instant::now();
273
274 Self::update_status(&executions, &exec_id, |exec| {
276 exec.status = JobStatus::Initializing;
277 exec.started_at = Some(started);
278 })
279 .await;
280
281 let replica = exec_id.0.chars().take(8).collect::<String>();
284 let replica_num = u32::from_str_radix(&replica, 16).unwrap_or(0) % 10000;
285 let container_id = ContainerId {
286 service: format!("job-{job_name}"),
287 replica: replica_num,
288 };
289
290 Self::update_status(&executions, &exec_id, |exec| {
292 exec.container_id = Some(container_id.clone());
293 })
294 .await;
295
296 debug!(
297 job = %job_name,
298 execution_id = %exec_id,
299 container_id = %container_id,
300 "Creating job container"
301 );
302
303 if let Err(e) = runtime
305 .pull_image_with_policy(&spec.image.name, spec.image.pull_policy, None)
306 .await
307 {
308 error!(
309 job = %job_name,
310 execution_id = %exec_id,
311 error = %e,
312 "Image pull failed"
313 );
314 Self::update_status(&executions, &exec_id, |exec| {
315 exec.status = JobStatus::Failed {
316 reason: format!("Image pull failed: {e}"),
317 exit_code: None,
318 };
319 exec.completed_at = Some(Instant::now());
320 })
321 .await;
322 return;
323 }
324
325 if let Err(e) = runtime.create_container(&container_id, &spec).await {
327 let error_msg = e.to_string();
328 error!(
329 job = %job_name,
330 execution_id = %exec_id,
331 error = %error_msg,
332 "Container create failed"
333 );
334 Self::update_status(&executions, &exec_id, |exec| {
335 exec.status = JobStatus::Failed {
336 reason: format!("Container create failed: {error_msg}"),
337 exit_code: None,
338 };
339 exec.completed_at = Some(Instant::now());
340 })
341 .await;
342 return;
343 }
344
345 let init_orchestrator = InitOrchestrator::new(container_id.clone(), spec.init.clone());
347 if let Err(e) = init_orchestrator.run().await {
348 let error_msg = e.to_string();
349 error!(
350 job = %job_name,
351 execution_id = %exec_id,
352 error = %error_msg,
353 "Init failed"
354 );
355 Self::update_status(&executions, &exec_id, |exec| {
356 exec.status = JobStatus::Failed {
357 reason: format!("Init failed: {error_msg}"),
358 exit_code: None,
359 };
360 exec.completed_at = Some(Instant::now());
361 })
362 .await;
363 let _ = runtime.remove_container(&container_id).await;
365 return;
366 }
367
368 Self::update_status(&executions, &exec_id, |exec| {
370 exec.status = JobStatus::Running;
371 })
372 .await;
373
374 debug!(
375 job = %job_name,
376 execution_id = %exec_id,
377 "Starting job container"
378 );
379
380 if let Err(e) = runtime.start_container(&container_id).await {
382 let error_msg = e.to_string();
383 error!(
384 job = %job_name,
385 execution_id = %exec_id,
386 error = %error_msg,
387 "Container start failed"
388 );
389 Self::update_status(&executions, &exec_id, |exec| {
390 exec.status = JobStatus::Failed {
391 reason: format!("Container start failed: {error_msg}"),
392 exit_code: None,
393 };
394 exec.completed_at = Some(Instant::now());
395 })
396 .await;
397 let _ = runtime.remove_container(&container_id).await;
398 return;
399 }
400
401 let exit_code = runtime.wait_container(&container_id).await;
403 let duration = started.elapsed();
404
405 let logs = match runtime.get_logs(&container_id).await {
407 Ok(entries) => Some(
408 entries
409 .iter()
410 .map(ToString::to_string)
411 .collect::<Vec<_>>()
412 .join("\n"),
413 ),
414 Err(e) => {
415 match runtime.container_logs(&container_id, max_log_size).await {
417 Ok(entries) => Some(
418 entries
419 .iter()
420 .map(ToString::to_string)
421 .collect::<Vec<_>>()
422 .join("\n"),
423 ),
424 Err(e2) => {
425 warn!(
426 job = %job_name,
427 execution_id = %exec_id,
428 error = %e,
429 fallback_error = %e2,
430 "Failed to collect logs"
431 );
432 None
433 }
434 }
435 }
436 };
437
438 Self::update_status(&executions, &exec_id, |exec| {
440 exec.logs = logs;
441 exec.completed_at = Some(Instant::now());
442
443 match exit_code {
444 Ok(code) => {
445 if code == 0 {
446 info!(
447 job = exec.job_name,
448 execution_id = %exec.id,
449 duration_ms = duration.as_millis(),
450 "Job completed successfully"
451 );
452 exec.status = JobStatus::Completed {
453 exit_code: code,
454 duration,
455 };
456 } else {
457 warn!(
458 job = exec.job_name,
459 execution_id = %exec.id,
460 exit_code = code,
461 duration_ms = duration.as_millis(),
462 "Job failed with non-zero exit code"
463 );
464 exec.status = JobStatus::Failed {
465 reason: format!("Non-zero exit code: {code}"),
466 exit_code: Some(code),
467 };
468 }
469 }
470 Err(err) => {
471 error!(
472 job = exec.job_name,
473 execution_id = %exec.id,
474 error = %err,
475 "Job execution error"
476 );
477 exec.status = JobStatus::Failed {
478 reason: err.to_string(),
479 exit_code: None,
480 };
481 }
482 }
483 })
484 .await;
485
486 if let Err(e) = runtime.remove_container(&container_id).await {
488 warn!(
489 job = %job_name,
490 execution_id = %exec_id,
491 error = %e,
492 "Failed to remove job container"
493 );
494 }
495 }
496
497 async fn update_status<F>(
498 executions: &RwLock<HashMap<JobExecutionId, JobExecution>>,
499 exec_id: &JobExecutionId,
500 f: F,
501 ) where
502 F: FnOnce(&mut JobExecution),
503 {
504 let mut execs = executions.write().await;
505 if let Some(exec) = execs.get_mut(exec_id) {
506 f(exec);
507 }
508 }
509
510 pub async fn get_execution(&self, exec_id: &JobExecutionId) -> Option<JobExecution> {
512 let executions = self.executions.read().await;
513 executions.get(exec_id).cloned()
514 }
515
516 pub async fn list_executions(&self, job_name: &str) -> Vec<JobExecution> {
518 let executions = self.executions.read().await;
519 executions
520 .values()
521 .filter(|e| e.job_name == job_name)
522 .cloned()
523 .collect()
524 }
525
526 pub async fn list_all_executions(&self) -> Vec<JobExecution> {
528 let executions = self.executions.read().await;
529 executions.values().cloned().collect()
530 }
531
532 pub async fn cancel(&self, exec_id: &JobExecutionId) -> Result<()> {
537 let mut executions = self.executions.write().await;
538 if let Some(execution) = executions.get_mut(exec_id) {
539 if matches!(
540 execution.status,
541 JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
542 ) {
543 if let Some(ref container_id) = execution.container_id {
544 self.runtime
545 .stop_container(container_id, Duration::from_secs(10))
546 .await?;
547 self.runtime.remove_container(container_id).await?;
548 }
549 execution.status = JobStatus::Cancelled;
550 execution.completed_at = Some(Instant::now());
551 info!(
552 job = %execution.job_name,
553 execution_id = %exec_id,
554 "Job execution cancelled"
555 );
556 }
557 }
558 Ok(())
559 }
560
561 pub async fn cleanup_old_executions(&self) {
563 let now = Instant::now();
564 let mut executions = self.executions.write().await;
565 let before_count = executions.len();
566 executions.retain(|_, exec| match exec.completed_at {
567 Some(completed) => now.duration_since(completed) < self.config.retention,
568 None => true, });
570 let removed = before_count - executions.len();
571 if removed > 0 {
572 debug!(removed = removed, "Cleaned up old job execution records");
573 }
574 }
575
576 pub fn shutdown(&self) {
578 self.shutdown.store(true, Ordering::Relaxed);
579 }
580
581 pub fn is_shutting_down(&self) -> bool {
583 self.shutdown.load(Ordering::Relaxed)
584 }
585
586 pub async fn active_execution_count(&self) -> usize {
588 let executions = self.executions.read().await;
589 executions
590 .values()
591 .filter(|e| {
592 matches!(
593 e.status,
594 JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
595 )
596 })
597 .count()
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604 use crate::runtime::MockRuntime;
605
606 fn mock_job_spec() -> ServiceSpec {
607 use zlayer_spec::*;
608 serde_yaml::from_str::<DeploymentSpec>(
609 r"
610version: v1
611deployment: test
612services:
613 backup:
614 rtype: job
615 image:
616 name: backup:latest
617",
618 )
619 .unwrap()
620 .services
621 .remove("backup")
622 .unwrap()
623 }
624
625 #[tokio::test]
626 async fn test_job_execution_id() {
627 let id1 = JobExecutionId::new();
628 let id2 = JobExecutionId::new();
629 assert_ne!(id1, id2);
630 assert!(!id1.0.is_empty());
631 }
632
633 #[tokio::test]
634 async fn test_job_executor_trigger() {
635 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
636 let executor = JobExecutor::new(runtime);
637
638 let spec = mock_job_spec();
639 let exec_id = executor
640 .trigger("backup", &spec, JobTrigger::Cli)
641 .await
642 .unwrap();
643
644 tokio::time::sleep(Duration::from_millis(50)).await;
646
647 let execution = executor.get_execution(&exec_id).await;
648 assert!(execution.is_some());
649
650 let exec = execution.unwrap();
651 assert_eq!(exec.job_name, "backup");
652 assert!(matches!(exec.trigger, JobTrigger::Cli));
653 }
654
655 #[tokio::test]
656 async fn test_job_executor_list_executions() {
657 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
658 let executor = JobExecutor::new(runtime);
659
660 let spec = mock_job_spec();
661
662 executor
664 .trigger("backup", &spec, JobTrigger::Cli)
665 .await
666 .unwrap();
667 executor
668 .trigger("backup", &spec, JobTrigger::Scheduler)
669 .await
670 .unwrap();
671
672 tokio::time::sleep(Duration::from_millis(50)).await;
673
674 let executions = executor.list_executions("backup").await;
675 assert_eq!(executions.len(), 2);
676 }
677
678 #[tokio::test]
679 async fn test_job_executor_register_spec() {
680 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
681 let executor = JobExecutor::new(runtime);
682
683 let spec = mock_job_spec();
684 executor.register_job("backup", spec.clone()).await;
685
686 let retrieved = executor.get_job_spec("backup").await;
687 assert!(retrieved.is_some());
688 assert_eq!(retrieved.unwrap().image.name, spec.image.name);
689 }
690
691 #[tokio::test]
692 async fn test_job_status_display() {
693 assert_eq!(format!("{}", JobStatus::Pending), "pending");
694 assert_eq!(format!("{}", JobStatus::Running), "running");
695 assert_eq!(
696 format!(
697 "{}",
698 JobStatus::Completed {
699 exit_code: 0,
700 duration: Duration::from_secs(10)
701 }
702 ),
703 "completed(0)"
704 );
705 assert_eq!(
706 format!(
707 "{}",
708 JobStatus::Failed {
709 reason: "error".into(),
710 exit_code: Some(1)
711 }
712 ),
713 "failed(1)"
714 );
715 }
716}