1#![deny(unsafe_code)]
15#![warn(missing_docs)]
16
17#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
18pub mod api;
19pub mod audit_log;
21pub mod batch_analytics;
23pub mod batch_report;
24pub mod batch_runner;
25pub mod batch_schedule;
26pub mod chaining;
28pub mod checkpoint;
30pub mod checkpointing;
31#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
32pub mod cli;
33pub mod cluster_discovery;
35pub mod conditional_dag;
36pub mod cost_estimator;
38#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
39pub mod database;
40pub mod dead_letter_queue;
42pub mod dep_graph;
43pub mod dependency;
44pub mod error;
45pub mod error_recovery;
47#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
48pub mod examples;
49#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
50pub mod execution;
51pub mod fair_scheduler;
52pub mod graceful_shutdown;
54pub mod job;
55pub mod job_archive;
56pub mod job_deps;
58pub mod job_migration;
59pub mod job_splitting;
60pub mod metrics;
61pub mod monitoring;
62pub mod notification_hub;
64pub mod notifications;
65pub mod operations;
66pub mod output_collector;
67pub mod pipeline_validator;
68pub mod presets;
69pub mod priority_queue;
70#[cfg(not(target_arch = "wasm32"))]
71pub mod processor;
72pub mod progress_agg;
74pub mod progress_tracker;
75#[cfg(not(target_arch = "wasm32"))]
76pub mod queue;
77pub mod quota;
79pub mod quota_enforcer;
81pub mod rate_limiter;
82pub mod resource_estimator;
83pub mod resource_reservation;
84pub mod retry_policy;
85#[cfg(all(not(target_arch = "wasm32"), feature = "scripting"))]
86pub mod script;
87pub mod task_group;
88pub mod template;
89pub mod throttle;
90pub mod timeout_enforcer;
91pub mod types;
92pub mod utils;
93#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
94pub mod watch;
95pub mod work_stealing;
96
97pub use error::{BatchError, Result};
98pub use job::{BatchJob, BatchOperation, InputSpec, OutputSpec};
99pub use types::{JobId, JobState, Priority, RetryPolicy};
100
101#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
102use database::Database;
103#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
104use execution::ExecutionEngine;
105#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
106use queue::JobQueue;
107#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
108use std::sync::Arc;
109
110use std::sync::atomic::{AtomicU8, Ordering};
115
116#[derive(Debug, Clone)]
118pub struct ShutdownConfig {
119 pub drain_timeout_ms: u64,
122 pub force_after_ms: Option<u64>,
126}
127
128impl Default for ShutdownConfig {
129 fn default() -> Self {
130 Self {
131 drain_timeout_ms: 5_000,
132 force_after_ms: None,
133 }
134 }
135}
136
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139#[repr(u8)]
140pub enum ShutdownState {
141 Running = 0,
143 ShutdownRequested = 1,
145 Draining = 2,
147 Terminated = 3,
149}
150
151impl ShutdownState {
152 fn from_u8(v: u8) -> Self {
153 match v {
154 0 => Self::Running,
155 1 => Self::ShutdownRequested,
156 2 => Self::Draining,
157 3 => Self::Terminated,
158 _ => Self::Terminated,
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
165pub struct ShutdownReport {
166 pub jobs_completed: usize,
168 pub jobs_cancelled: usize,
170 pub elapsed_ms: u64,
172 pub state: ShutdownState,
174}
175
176pub struct ShutdownFlag(AtomicU8);
178
179impl ShutdownFlag {
180 #[must_use]
182 pub fn new() -> Self {
183 Self(AtomicU8::new(ShutdownState::Running as u8))
184 }
185
186 #[must_use]
188 pub fn state(&self) -> ShutdownState {
189 ShutdownState::from_u8(self.0.load(Ordering::Acquire))
190 }
191
192 pub fn set(&self, state: ShutdownState) {
194 self.0.store(state as u8, Ordering::Release);
195 }
196
197 #[must_use]
199 pub fn is_shutdown_requested(&self) -> bool {
200 self.state() != ShutdownState::Running
201 }
202}
203
204impl Default for ShutdownFlag {
205 fn default() -> Self {
206 Self::new()
207 }
208}
209
210#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
212pub struct BatchEngine {
213 queue: Arc<JobQueue>,
214 engine: Arc<ExecutionEngine>,
215 database: Arc<Database>,
216 shutdown_flag: Arc<ShutdownFlag>,
217}
218
219#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
220impl BatchEngine {
221 pub fn new(db_path: &str, worker_count: usize) -> Result<Self> {
232 let database = Arc::new(Database::new(db_path)?);
233 let queue = Arc::new(JobQueue::new());
234 let engine = Arc::new(ExecutionEngine::new(
235 worker_count,
236 Arc::clone(&queue),
237 Arc::clone(&database),
238 )?);
239
240 Ok(Self {
241 queue,
242 engine,
243 database,
244 shutdown_flag: Arc::new(ShutdownFlag::new()),
245 })
246 }
247
248 pub async fn submit_job(&self, job: BatchJob) -> Result<JobId> {
258 let job_id = job.id.clone();
259 self.database.save_job(&job)?;
260 self.queue.enqueue(job).await?;
261 Ok(job_id)
262 }
263
264 pub async fn get_job_status(&self, job_id: &JobId) -> Result<JobState> {
274 self.queue.get_job_status(job_id).await
275 }
276
277 pub async fn cancel_job(&self, job_id: &JobId) -> Result<()> {
287 self.queue.cancel_job(job_id).await
288 }
289
290 pub fn list_jobs(&self) -> Result<Vec<BatchJob>> {
296 self.database.list_jobs()
297 }
298
299 pub async fn start(&self) -> Result<()> {
305 self.engine.start().await
306 }
307
308 pub async fn stop(&self) -> Result<()> {
314 self.engine.stop().await
315 }
316
317 #[must_use]
319 pub fn queue(&self) -> Arc<JobQueue> {
320 Arc::clone(&self.queue)
321 }
322
323 #[must_use]
325 pub fn engine(&self) -> Arc<ExecutionEngine> {
326 Arc::clone(&self.engine)
327 }
328
329 #[must_use]
331 pub fn database(&self) -> Arc<Database> {
332 Arc::clone(&self.database)
333 }
334
335 #[must_use]
338 pub fn shutdown_flag(&self) -> Arc<ShutdownFlag> {
339 Arc::clone(&self.shutdown_flag)
340 }
341
342 #[must_use]
344 pub fn shutdown_state(&self) -> ShutdownState {
345 self.shutdown_flag.state()
346 }
347
348 pub async fn request_shutdown(&self, config: ShutdownConfig) -> Result<ShutdownReport> {
367 let start = std::time::Instant::now();
368
369 self.shutdown_flag.set(ShutdownState::ShutdownRequested);
371
372 self.shutdown_flag.set(ShutdownState::Draining);
374
375 let drain_deadline = std::time::Duration::from_millis(config.drain_timeout_ms);
376 let force_deadline = config.force_after_ms.map(std::time::Duration::from_millis);
377
378 let mut jobs_completed = 0usize;
379 let mut jobs_cancelled = 0usize;
380
381 let poll_interval = std::time::Duration::from_millis(10);
384 loop {
385 let elapsed = start.elapsed();
386
387 if let Some(force) = force_deadline {
389 if elapsed >= force {
390 if let Ok(pending) = self.database.list_jobs() {
392 for job in &pending {
393 let is_active = job.context.as_ref().map_or(false, |ctx| {
394 matches!(ctx.state, JobState::Queued | JobState::Running)
395 });
396 if is_active {
397 let _ = self.queue.cancel_job(&job.id).await;
398 jobs_cancelled += 1;
399 }
400 }
401 }
402 break;
403 }
404 }
405
406 if elapsed >= drain_deadline {
408 break;
409 }
410
411 match self.database.list_jobs() {
413 Ok(jobs) => {
414 let active = jobs
415 .iter()
416 .filter(|j| {
417 j.context.as_ref().map_or(false, |ctx| {
418 matches!(ctx.state, JobState::Queued | JobState::Running)
419 })
420 })
421 .count();
422 jobs_completed = jobs
423 .iter()
424 .filter(|j| {
425 j.context
426 .as_ref()
427 .map_or(false, |ctx| matches!(ctx.state, JobState::Completed))
428 })
429 .count();
430 if active == 0 {
431 break;
432 }
433 }
434 Err(_) => break,
435 }
436
437 tokio::time::sleep(poll_interval).await;
438 }
439
440 self.engine.stop().await?;
442 self.shutdown_flag.set(ShutdownState::Terminated);
443
444 let elapsed_ms = start.elapsed().as_millis() as u64;
445
446 Ok(ShutdownReport {
447 jobs_completed,
448 jobs_cancelled,
449 elapsed_ms,
450 state: ShutdownState::Terminated,
451 })
452 }
453}
454
455#[cfg(all(test, feature = "sqlite"))]
456mod tests {
457 use super::*;
458 use tempfile::NamedTempFile;
459
460 #[tokio::test]
461 async fn test_batch_engine_creation() {
462 let temp_file = NamedTempFile::new().expect("failed to create temp file");
463 let db_path = temp_file
464 .path()
465 .to_str()
466 .expect("path should be valid UTF-8");
467 let engine = BatchEngine::new(db_path, 4);
468 assert!(engine.is_ok());
469 }
470
471 #[tokio::test]
472 async fn test_job_submission() {
473 let temp_file = NamedTempFile::new().expect("failed to create temp file");
474 let db_path = temp_file
475 .path()
476 .to_str()
477 .expect("path should be valid UTF-8");
478 let engine = BatchEngine::new(db_path, 4).expect("failed to create");
479
480 let job = BatchJob::new(
481 "test-job".to_string(),
482 BatchOperation::FileOp {
483 operation: operations::FileOperation::Copy { overwrite: false },
484 },
485 );
486
487 let job_id = engine.submit_job(job).await;
488 assert!(job_id.is_ok());
489 }
490
491 #[tokio::test]
492 async fn test_job_status() {
493 let temp_file = NamedTempFile::new().expect("failed to create temp file");
494 let db_path = temp_file
495 .path()
496 .to_str()
497 .expect("path should be valid UTF-8");
498 let engine = BatchEngine::new(db_path, 4).expect("failed to create");
499
500 let job = BatchJob::new(
501 "test-job".to_string(),
502 BatchOperation::FileOp {
503 operation: operations::FileOperation::Copy { overwrite: false },
504 },
505 );
506
507 let job_id = engine.submit_job(job).await.expect("failed to submit job");
508 let status = engine.get_job_status(&job_id).await;
509 assert!(status.is_ok());
510 }
511
512 #[tokio::test]
513 async fn test_job_cancellation() {
514 let temp_file = NamedTempFile::new().expect("failed to create temp file");
515 let db_path = temp_file
516 .path()
517 .to_str()
518 .expect("path should be valid UTF-8");
519 let engine = BatchEngine::new(db_path, 4).expect("failed to create");
520
521 let job = BatchJob::new(
522 "test-job".to_string(),
523 BatchOperation::FileOp {
524 operation: operations::FileOperation::Copy { overwrite: false },
525 },
526 );
527
528 let job_id = engine.submit_job(job).await.expect("failed to submit job");
529 let result = engine.cancel_job(&job_id).await;
530 assert!(result.is_ok());
531 }
532
533 #[tokio::test]
534 async fn test_list_jobs() {
535 let temp_file = NamedTempFile::new().expect("failed to create temp file");
536 let db_path = temp_file
537 .path()
538 .to_str()
539 .expect("path should be valid UTF-8");
540 let engine = BatchEngine::new(db_path, 4).expect("failed to create");
541
542 let jobs = engine.list_jobs();
543 assert!(jobs.is_ok());
544 }
545}
546
547#[cfg(test)]
552mod shutdown_tests {
553 use super::*;
554
555 #[test]
556 fn test_shutdown_flag_initial_state() {
557 let flag = ShutdownFlag::new();
558 assert_eq!(flag.state(), ShutdownState::Running);
559 assert!(!flag.is_shutdown_requested());
560 }
561
562 #[test]
563 fn test_shutdown_flag_transitions() {
564 let flag = ShutdownFlag::new();
565 flag.set(ShutdownState::ShutdownRequested);
566 assert_eq!(flag.state(), ShutdownState::ShutdownRequested);
567 assert!(flag.is_shutdown_requested());
568
569 flag.set(ShutdownState::Draining);
570 assert_eq!(flag.state(), ShutdownState::Draining);
571
572 flag.set(ShutdownState::Terminated);
573 assert_eq!(flag.state(), ShutdownState::Terminated);
574 }
575
576 #[test]
577 fn test_shutdown_flag_default() {
578 let flag = ShutdownFlag::default();
579 assert_eq!(flag.state(), ShutdownState::Running);
580 }
581
582 #[test]
583 fn test_shutdown_config_default() {
584 let cfg = ShutdownConfig::default();
585 assert_eq!(cfg.drain_timeout_ms, 5_000);
586 assert!(cfg.force_after_ms.is_none());
587 }
588
589 #[test]
590 fn test_shutdown_config_with_force() {
591 let cfg = ShutdownConfig {
592 drain_timeout_ms: 2_000,
593 force_after_ms: Some(4_000),
594 };
595 assert_eq!(cfg.drain_timeout_ms, 2_000);
596 assert_eq!(cfg.force_after_ms, Some(4_000));
597 }
598
599 #[test]
600 fn test_shutdown_state_from_u8_all_variants() {
601 assert_eq!(ShutdownState::from_u8(0), ShutdownState::Running);
602 assert_eq!(ShutdownState::from_u8(1), ShutdownState::ShutdownRequested);
603 assert_eq!(ShutdownState::from_u8(2), ShutdownState::Draining);
604 assert_eq!(ShutdownState::from_u8(3), ShutdownState::Terminated);
605 assert_eq!(ShutdownState::from_u8(99), ShutdownState::Terminated);
607 }
608
609 #[test]
610 fn test_shutdown_report_fields() {
611 let report = ShutdownReport {
612 jobs_completed: 5,
613 jobs_cancelled: 2,
614 elapsed_ms: 300,
615 state: ShutdownState::Terminated,
616 };
617 assert_eq!(report.jobs_completed, 5);
618 assert_eq!(report.jobs_cancelled, 2);
619 assert_eq!(report.elapsed_ms, 300);
620 assert_eq!(report.state, ShutdownState::Terminated);
621 }
622
623 #[test]
624 fn test_shutdown_flag_arc_share() {
625 use std::sync::Arc;
626 let flag = Arc::new(ShutdownFlag::new());
627 let flag2 = Arc::clone(&flag);
628 flag2.set(ShutdownState::ShutdownRequested);
629 assert!(flag.is_shutdown_requested());
630 }
631
632 #[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
633 #[tokio::test]
634 async fn test_engine_shutdown_state_initially_running() {
635 use tempfile::NamedTempFile;
636 let tmp = NamedTempFile::new().expect("tmp file");
637 let db_path = tmp.path().to_str().expect("path utf8");
638 let engine = BatchEngine::new(db_path, 2).expect("create engine");
639 assert_eq!(engine.shutdown_state(), ShutdownState::Running);
640 }
641
642 #[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
643 #[tokio::test]
644 async fn test_engine_request_shutdown_empty_queue() {
645 use tempfile::NamedTempFile;
646 let tmp = NamedTempFile::new().expect("tmp file");
647 let db_path = tmp.path().to_str().expect("path utf8");
648 let engine = BatchEngine::new(db_path, 2).expect("create engine");
649
650 let config = ShutdownConfig {
651 drain_timeout_ms: 100,
652 force_after_ms: None,
653 };
654 let report = engine
655 .request_shutdown(config)
656 .await
657 .expect("shutdown failed");
658 assert_eq!(report.state, ShutdownState::Terminated);
659 assert!(report.elapsed_ms < 5_000);
660 }
661}