1use crate::error::{AgentError, Result};
8use crate::init::InitOrchestrator;
9use crate::runtime::{ContainerId, Runtime};
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, error, info, warn};
16use uuid::Uuid;
17use zlayer_spec::ServiceSpec;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21pub struct JobExecutionId(pub String);
22
23impl JobExecutionId {
24 #[must_use]
26 pub fn new() -> Self {
27 Self(Uuid::new_v4().to_string())
28 }
29}
30
31impl Default for JobExecutionId {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl std::fmt::Display for JobExecutionId {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "{}", self.0)
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum JobStatus {
46 Pending,
48 Initializing,
50 Running,
52 Completed { exit_code: i32, duration: Duration },
54 Failed {
56 reason: String,
57 exit_code: Option<i32>,
58 },
59 Cancelled,
61}
62
63impl std::fmt::Display for JobStatus {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 JobStatus::Pending => write!(f, "pending"),
67 JobStatus::Initializing => write!(f, "initializing"),
68 JobStatus::Running => write!(f, "running"),
69 JobStatus::Completed { exit_code, .. } => write!(f, "completed({exit_code})"),
70 JobStatus::Failed { exit_code, .. } => {
71 if let Some(code) = exit_code {
72 write!(f, "failed({code})")
73 } else {
74 write!(f, "failed")
75 }
76 }
77 JobStatus::Cancelled => write!(f, "cancelled"),
78 }
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum JobTrigger {
85 Endpoint { remote_addr: Option<String> },
87 Cli,
89 Scheduler,
91 Internal { reason: String },
93}
94
95impl std::fmt::Display for JobTrigger {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 JobTrigger::Endpoint { remote_addr } => {
99 if let Some(addr) = remote_addr {
100 write!(f, "endpoint({addr})")
101 } else {
102 write!(f, "endpoint")
103 }
104 }
105 JobTrigger::Cli => write!(f, "cli"),
106 JobTrigger::Scheduler => write!(f, "scheduler"),
107 JobTrigger::Internal { reason } => write!(f, "internal({reason})"),
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct JobExecution {
115 pub id: JobExecutionId,
116 pub job_name: String,
117 pub status: JobStatus,
118 pub started_at: Option<Instant>,
119 pub completed_at: Option<Instant>,
120 pub container_id: Option<ContainerId>,
121 pub logs: Option<String>,
123 pub trigger: JobTrigger,
125}
126
127#[derive(Debug, Clone)]
129pub struct JobExecutorConfig {
130 pub max_concurrent: usize,
132 pub retention: Duration,
134 pub max_log_size: usize,
136}
137
138impl Default for JobExecutorConfig {
139 fn default() -> Self {
140 Self {
141 max_concurrent: 10,
142 retention: Duration::from_secs(3600), max_log_size: 1024 * 1024, }
145 }
146}
147
148pub struct JobExecutor {
150 runtime: Arc<dyn Runtime + Send + Sync>,
151 executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
153 job_specs: Arc<RwLock<HashMap<String, ServiceSpec>>>,
155 config: JobExecutorConfig,
157 shutdown: AtomicBool,
159}
160
161impl JobExecutor {
162 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
164 Self::with_config(runtime, JobExecutorConfig::default())
165 }
166
167 pub fn with_config(runtime: Arc<dyn Runtime + Send + Sync>, config: JobExecutorConfig) -> Self {
169 Self {
170 runtime,
171 executions: Arc::new(RwLock::new(HashMap::new())),
172 job_specs: Arc::new(RwLock::new(HashMap::new())),
173 config,
174 shutdown: AtomicBool::new(false),
175 }
176 }
177
178 pub async fn register_job(&self, name: &str, spec: ServiceSpec) {
180 let mut specs = self.job_specs.write().await;
181 specs.insert(name.to_string(), spec);
182 info!(job = %name, "Registered job spec");
183 }
184
185 pub async fn unregister_job(&self, name: &str) {
187 let mut specs = self.job_specs.write().await;
188 specs.remove(name);
189 info!(job = %name, "Unregistered job spec");
190 }
191
192 pub async fn get_job_spec(&self, name: &str) -> Option<ServiceSpec> {
194 let specs = self.job_specs.read().await;
195 specs.get(name).cloned()
196 }
197
198 pub async fn trigger(
203 &self,
204 job_name: &str,
205 spec: &ServiceSpec,
206 trigger: JobTrigger,
207 ) -> Result<JobExecutionId> {
208 if self.shutdown.load(Ordering::Relaxed) {
209 return Err(AgentError::Internal("Job executor is shutting down".into()));
210 }
211
212 let exec_id = JobExecutionId::new();
213
214 info!(
215 job = %job_name,
216 execution_id = %exec_id,
217 trigger = %trigger,
218 "Triggering job execution"
219 );
220
221 let execution = JobExecution {
223 id: exec_id.clone(),
224 job_name: job_name.to_string(),
225 status: JobStatus::Pending,
226 started_at: None,
227 completed_at: None,
228 container_id: None,
229 logs: None,
230 trigger,
231 };
232
233 {
235 let mut executions = self.executions.write().await;
236 executions.insert(exec_id.clone(), execution);
237 }
238
239 let runtime = self.runtime.clone();
241 let spec = spec.clone();
242 let exec_id_clone = exec_id.clone();
243 let executions = self.executions.clone();
244 let job_name = job_name.to_string();
245 let max_log_size = self.config.max_log_size;
246
247 tokio::spawn(async move {
248 Self::run_job(
249 runtime,
250 executions,
251 exec_id_clone,
252 &job_name,
253 spec,
254 max_log_size,
255 )
256 .await;
257 });
258
259 Ok(exec_id)
260 }
261
262 #[allow(clippy::too_many_lines)]
264 async fn run_job(
265 runtime: Arc<dyn Runtime + Send + Sync>,
266 executions: Arc<RwLock<HashMap<JobExecutionId, JobExecution>>>,
267 exec_id: JobExecutionId,
268 job_name: &str,
269 spec: ServiceSpec,
270 max_log_size: usize,
271 ) {
272 let started = Instant::now();
273
274 Self::update_status(&executions, &exec_id, |exec| {
276 exec.status = JobStatus::Initializing;
277 exec.started_at = Some(started);
278 })
279 .await;
280
281 let replica = exec_id.0.chars().take(8).collect::<String>();
284 let replica_num = u32::from_str_radix(&replica, 16).unwrap_or(0) % 10000;
285 let container_id = ContainerId {
286 service: format!("job-{job_name}"),
287 replica: replica_num,
288 };
289
290 Self::update_status(&executions, &exec_id, |exec| {
292 exec.container_id = Some(container_id.clone());
293 })
294 .await;
295
296 debug!(
297 job = %job_name,
298 execution_id = %exec_id,
299 container_id = %container_id,
300 "Creating job container"
301 );
302
303 let image_str = spec.image.name.to_string();
305 if let Err(e) = runtime
306 .pull_image_with_policy(&image_str, spec.image.pull_policy, None)
307 .await
308 {
309 error!(
310 job = %job_name,
311 execution_id = %exec_id,
312 error = %e,
313 "Image pull failed"
314 );
315 Self::update_status(&executions, &exec_id, |exec| {
316 exec.status = JobStatus::Failed {
317 reason: format!("Image pull failed: {e}"),
318 exit_code: None,
319 };
320 exec.completed_at = Some(Instant::now());
321 })
322 .await;
323 return;
324 }
325
326 if let Err(e) = runtime.create_container(&container_id, &spec).await {
328 let error_msg = e.to_string();
329 error!(
330 job = %job_name,
331 execution_id = %exec_id,
332 error = %error_msg,
333 "Container create failed"
334 );
335 Self::update_status(&executions, &exec_id, |exec| {
336 exec.status = JobStatus::Failed {
337 reason: format!("Container create failed: {error_msg}"),
338 exit_code: None,
339 };
340 exec.completed_at = Some(Instant::now());
341 })
342 .await;
343 return;
344 }
345
346 let init_orchestrator = InitOrchestrator::new(container_id.clone(), spec.init.clone());
348 if let Err(e) = init_orchestrator.run().await {
349 let error_msg = e.to_string();
350 error!(
351 job = %job_name,
352 execution_id = %exec_id,
353 error = %error_msg,
354 "Init failed"
355 );
356 Self::update_status(&executions, &exec_id, |exec| {
357 exec.status = JobStatus::Failed {
358 reason: format!("Init failed: {error_msg}"),
359 exit_code: None,
360 };
361 exec.completed_at = Some(Instant::now());
362 })
363 .await;
364 let _ = runtime.remove_container(&container_id).await;
366 return;
367 }
368
369 Self::update_status(&executions, &exec_id, |exec| {
371 exec.status = JobStatus::Running;
372 })
373 .await;
374
375 debug!(
376 job = %job_name,
377 execution_id = %exec_id,
378 "Starting job container"
379 );
380
381 if let Err(e) = runtime.start_container(&container_id).await {
383 let error_msg = e.to_string();
384 error!(
385 job = %job_name,
386 execution_id = %exec_id,
387 error = %error_msg,
388 "Container start failed"
389 );
390 Self::update_status(&executions, &exec_id, |exec| {
391 exec.status = JobStatus::Failed {
392 reason: format!("Container start failed: {error_msg}"),
393 exit_code: None,
394 };
395 exec.completed_at = Some(Instant::now());
396 })
397 .await;
398 let _ = runtime.remove_container(&container_id).await;
399 return;
400 }
401
402 let exit_code = runtime.wait_container(&container_id).await;
404 let duration = started.elapsed();
405
406 let logs = match runtime.get_logs(&container_id).await {
408 Ok(entries) => Some(
409 entries
410 .iter()
411 .map(ToString::to_string)
412 .collect::<Vec<_>>()
413 .join("\n"),
414 ),
415 Err(e) => {
416 match runtime.container_logs(&container_id, max_log_size).await {
418 Ok(entries) => Some(
419 entries
420 .iter()
421 .map(ToString::to_string)
422 .collect::<Vec<_>>()
423 .join("\n"),
424 ),
425 Err(e2) => {
426 warn!(
427 job = %job_name,
428 execution_id = %exec_id,
429 error = %e,
430 fallback_error = %e2,
431 "Failed to collect logs"
432 );
433 None
434 }
435 }
436 }
437 };
438
439 Self::update_status(&executions, &exec_id, |exec| {
441 exec.logs = logs;
442 exec.completed_at = Some(Instant::now());
443
444 match exit_code {
445 Ok(code) => {
446 if code == 0 {
447 info!(
448 job = exec.job_name,
449 execution_id = %exec.id,
450 duration_ms = duration.as_millis(),
451 "Job completed successfully"
452 );
453 exec.status = JobStatus::Completed {
454 exit_code: code,
455 duration,
456 };
457 } else {
458 warn!(
459 job = exec.job_name,
460 execution_id = %exec.id,
461 exit_code = code,
462 duration_ms = duration.as_millis(),
463 "Job failed with non-zero exit code"
464 );
465 exec.status = JobStatus::Failed {
466 reason: format!("Non-zero exit code: {code}"),
467 exit_code: Some(code),
468 };
469 }
470 }
471 Err(err) => {
472 error!(
473 job = exec.job_name,
474 execution_id = %exec.id,
475 error = %err,
476 "Job execution error"
477 );
478 exec.status = JobStatus::Failed {
479 reason: err.to_string(),
480 exit_code: None,
481 };
482 }
483 }
484 })
485 .await;
486
487 if let Err(e) = runtime.remove_container(&container_id).await {
489 warn!(
490 job = %job_name,
491 execution_id = %exec_id,
492 error = %e,
493 "Failed to remove job container"
494 );
495 }
496 }
497
498 async fn update_status<F>(
499 executions: &RwLock<HashMap<JobExecutionId, JobExecution>>,
500 exec_id: &JobExecutionId,
501 f: F,
502 ) where
503 F: FnOnce(&mut JobExecution),
504 {
505 let mut execs = executions.write().await;
506 if let Some(exec) = execs.get_mut(exec_id) {
507 f(exec);
508 }
509 }
510
511 pub async fn get_execution(&self, exec_id: &JobExecutionId) -> Option<JobExecution> {
513 let executions = self.executions.read().await;
514 executions.get(exec_id).cloned()
515 }
516
517 pub async fn list_executions(&self, job_name: &str) -> Vec<JobExecution> {
519 let executions = self.executions.read().await;
520 executions
521 .values()
522 .filter(|e| e.job_name == job_name)
523 .cloned()
524 .collect()
525 }
526
527 pub async fn list_all_executions(&self) -> Vec<JobExecution> {
529 let executions = self.executions.read().await;
530 executions.values().cloned().collect()
531 }
532
533 pub async fn cancel(&self, exec_id: &JobExecutionId) -> Result<()> {
538 let mut executions = self.executions.write().await;
539 if let Some(execution) = executions.get_mut(exec_id) {
540 if matches!(
541 execution.status,
542 JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
543 ) {
544 if let Some(ref container_id) = execution.container_id {
545 self.runtime
546 .stop_container(container_id, Duration::from_secs(10))
547 .await?;
548 self.runtime.remove_container(container_id).await?;
549 }
550 execution.status = JobStatus::Cancelled;
551 execution.completed_at = Some(Instant::now());
552 info!(
553 job = %execution.job_name,
554 execution_id = %exec_id,
555 "Job execution cancelled"
556 );
557 }
558 }
559 Ok(())
560 }
561
562 pub async fn cleanup_old_executions(&self) {
564 let now = Instant::now();
565 let mut executions = self.executions.write().await;
566 let before_count = executions.len();
567 executions.retain(|_, exec| match exec.completed_at {
568 Some(completed) => now.duration_since(completed) < self.config.retention,
569 None => true, });
571 let removed = before_count - executions.len();
572 if removed > 0 {
573 debug!(removed = removed, "Cleaned up old job execution records");
574 }
575 }
576
577 pub fn shutdown(&self) {
579 self.shutdown.store(true, Ordering::Relaxed);
580 }
581
582 pub fn is_shutting_down(&self) -> bool {
584 self.shutdown.load(Ordering::Relaxed)
585 }
586
587 pub async fn active_execution_count(&self) -> usize {
589 let executions = self.executions.read().await;
590 executions
591 .values()
592 .filter(|e| {
593 matches!(
594 e.status,
595 JobStatus::Pending | JobStatus::Initializing | JobStatus::Running
596 )
597 })
598 .count()
599 }
600}
601
602#[cfg(test)]
603mod tests {
604 use super::*;
605 use crate::runtime::MockRuntime;
606
607 fn mock_job_spec() -> ServiceSpec {
608 use zlayer_spec::*;
609 serde_yaml::from_str::<DeploymentSpec>(
610 r"
611version: v1
612deployment: test
613services:
614 backup:
615 rtype: job
616 image:
617 name: backup:latest
618",
619 )
620 .unwrap()
621 .services
622 .remove("backup")
623 .unwrap()
624 }
625
626 #[tokio::test]
627 async fn test_job_execution_id() {
628 let id1 = JobExecutionId::new();
629 let id2 = JobExecutionId::new();
630 assert_ne!(id1, id2);
631 assert!(!id1.0.is_empty());
632 }
633
634 #[tokio::test]
635 async fn test_job_executor_trigger() {
636 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
637 let executor = JobExecutor::new(runtime);
638
639 let spec = mock_job_spec();
640 let exec_id = executor
641 .trigger("backup", &spec, JobTrigger::Cli)
642 .await
643 .unwrap();
644
645 tokio::time::sleep(Duration::from_millis(50)).await;
647
648 let execution = executor.get_execution(&exec_id).await;
649 assert!(execution.is_some());
650
651 let exec = execution.unwrap();
652 assert_eq!(exec.job_name, "backup");
653 assert!(matches!(exec.trigger, JobTrigger::Cli));
654 }
655
656 #[tokio::test]
657 async fn test_job_executor_list_executions() {
658 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
659 let executor = JobExecutor::new(runtime);
660
661 let spec = mock_job_spec();
662
663 executor
665 .trigger("backup", &spec, JobTrigger::Cli)
666 .await
667 .unwrap();
668 executor
669 .trigger("backup", &spec, JobTrigger::Scheduler)
670 .await
671 .unwrap();
672
673 tokio::time::sleep(Duration::from_millis(50)).await;
674
675 let executions = executor.list_executions("backup").await;
676 assert_eq!(executions.len(), 2);
677 }
678
679 #[tokio::test]
680 async fn test_job_executor_register_spec() {
681 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
682 let executor = JobExecutor::new(runtime);
683
684 let spec = mock_job_spec();
685 executor.register_job("backup", spec.clone()).await;
686
687 let retrieved = executor.get_job_spec("backup").await;
688 assert!(retrieved.is_some());
689 assert_eq!(retrieved.unwrap().image.name, spec.image.name);
690 }
691
692 #[tokio::test]
693 async fn test_job_status_display() {
694 assert_eq!(format!("{}", JobStatus::Pending), "pending");
695 assert_eq!(format!("{}", JobStatus::Running), "running");
696 assert_eq!(
697 format!(
698 "{}",
699 JobStatus::Completed {
700 exit_code: 0,
701 duration: Duration::from_secs(10)
702 }
703 ),
704 "completed(0)"
705 );
706 assert_eq!(
707 format!(
708 "{}",
709 JobStatus::Failed {
710 reason: "error".into(),
711 exit_code: Some(1)
712 }
713 ),
714 "failed(1)"
715 );
716 }
717}