#![deny(unsafe_code)]
#![warn(missing_docs)]
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
pub mod api;
pub mod audit_log;
pub mod batch_analytics;
pub mod batch_report;
pub mod batch_runner;
pub mod batch_schedule;
pub mod chaining;
pub mod checkpoint;
pub mod checkpointing;
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
pub mod cli;
pub mod cluster_discovery;
pub mod conditional_dag;
pub mod cost_estimator;
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
pub mod database;
pub mod dead_letter_queue;
pub mod dep_graph;
pub mod dependency;
pub mod error;
pub mod error_recovery;
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
pub mod examples;
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
pub mod execution;
pub mod fair_scheduler;
pub mod graceful_shutdown;
pub mod job;
pub mod job_archive;
pub mod job_deps;
pub mod job_migration;
pub mod job_splitting;
pub mod metrics;
pub mod monitoring;
pub mod notification_hub;
pub mod notifications;
pub mod operations;
pub mod output_collector;
pub mod pipeline_validator;
pub mod presets;
pub mod priority_queue;
#[cfg(not(target_arch = "wasm32"))]
pub mod processor;
pub mod progress_agg;
pub mod progress_tracker;
#[cfg(not(target_arch = "wasm32"))]
pub mod queue;
pub mod quota;
pub mod quota_enforcer;
pub mod rate_limiter;
pub mod resource_estimator;
pub mod resource_reservation;
pub mod retry_policy;
#[cfg(all(not(target_arch = "wasm32"), feature = "scripting"))]
pub mod script;
pub mod task_group;
pub mod template;
pub mod throttle;
pub mod timeout_enforcer;
pub mod types;
pub mod utils;
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
pub mod watch;
pub mod work_stealing;
pub use error::{BatchError, Result};
pub use job::{BatchJob, BatchOperation, InputSpec, OutputSpec};
pub use types::{JobId, JobState, Priority, RetryPolicy};
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
use database::Database;
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
use execution::ExecutionEngine;
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
use queue::JobQueue;
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering};
#[derive(Debug, Clone)]
pub struct ShutdownConfig {
pub drain_timeout_ms: u64,
pub force_after_ms: Option<u64>,
}
impl Default for ShutdownConfig {
fn default() -> Self {
Self {
drain_timeout_ms: 5_000,
force_after_ms: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ShutdownState {
Running = 0,
ShutdownRequested = 1,
Draining = 2,
Terminated = 3,
}
impl ShutdownState {
fn from_u8(v: u8) -> Self {
match v {
0 => Self::Running,
1 => Self::ShutdownRequested,
2 => Self::Draining,
3 => Self::Terminated,
_ => Self::Terminated,
}
}
}
#[derive(Debug, Clone)]
pub struct ShutdownReport {
pub jobs_completed: usize,
pub jobs_cancelled: usize,
pub elapsed_ms: u64,
pub state: ShutdownState,
}
pub struct ShutdownFlag(AtomicU8);
impl ShutdownFlag {
#[must_use]
pub fn new() -> Self {
Self(AtomicU8::new(ShutdownState::Running as u8))
}
#[must_use]
pub fn state(&self) -> ShutdownState {
ShutdownState::from_u8(self.0.load(Ordering::Acquire))
}
pub fn set(&self, state: ShutdownState) {
self.0.store(state as u8, Ordering::Release);
}
#[must_use]
pub fn is_shutdown_requested(&self) -> bool {
self.state() != ShutdownState::Running
}
}
impl Default for ShutdownFlag {
fn default() -> Self {
Self::new()
}
}
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
pub struct BatchEngine {
queue: Arc<JobQueue>,
engine: Arc<ExecutionEngine>,
database: Arc<Database>,
shutdown_flag: Arc<ShutdownFlag>,
}
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
impl BatchEngine {
pub fn new(db_path: &str, worker_count: usize) -> Result<Self> {
let database = Arc::new(Database::new(db_path)?);
let queue = Arc::new(JobQueue::new());
let engine = Arc::new(ExecutionEngine::new(
worker_count,
Arc::clone(&queue),
Arc::clone(&database),
)?);
Ok(Self {
queue,
engine,
database,
shutdown_flag: Arc::new(ShutdownFlag::new()),
})
}
pub async fn submit_job(&self, job: BatchJob) -> Result<JobId> {
let job_id = job.id.clone();
self.database.save_job(&job)?;
self.queue.enqueue(job).await?;
Ok(job_id)
}
pub async fn get_job_status(&self, job_id: &JobId) -> Result<JobState> {
self.queue.get_job_status(job_id).await
}
pub async fn cancel_job(&self, job_id: &JobId) -> Result<()> {
self.queue.cancel_job(job_id).await
}
pub fn list_jobs(&self) -> Result<Vec<BatchJob>> {
self.database.list_jobs()
}
pub async fn start(&self) -> Result<()> {
self.engine.start().await
}
pub async fn stop(&self) -> Result<()> {
self.engine.stop().await
}
#[must_use]
pub fn queue(&self) -> Arc<JobQueue> {
Arc::clone(&self.queue)
}
#[must_use]
pub fn engine(&self) -> Arc<ExecutionEngine> {
Arc::clone(&self.engine)
}
#[must_use]
pub fn database(&self) -> Arc<Database> {
Arc::clone(&self.database)
}
#[must_use]
pub fn shutdown_flag(&self) -> Arc<ShutdownFlag> {
Arc::clone(&self.shutdown_flag)
}
#[must_use]
pub fn shutdown_state(&self) -> ShutdownState {
self.shutdown_flag.state()
}
pub async fn request_shutdown(&self, config: ShutdownConfig) -> Result<ShutdownReport> {
let start = std::time::Instant::now();
self.shutdown_flag.set(ShutdownState::ShutdownRequested);
self.shutdown_flag.set(ShutdownState::Draining);
let drain_deadline = std::time::Duration::from_millis(config.drain_timeout_ms);
let force_deadline = config.force_after_ms.map(std::time::Duration::from_millis);
let mut jobs_completed = 0usize;
let mut jobs_cancelled = 0usize;
let poll_interval = std::time::Duration::from_millis(10);
loop {
let elapsed = start.elapsed();
if let Some(force) = force_deadline {
if elapsed >= force {
if let Ok(pending) = self.database.list_jobs() {
for job in &pending {
let is_active = job.context.as_ref().map_or(false, |ctx| {
matches!(ctx.state, JobState::Queued | JobState::Running)
});
if is_active {
let _ = self.queue.cancel_job(&job.id).await;
jobs_cancelled += 1;
}
}
}
break;
}
}
if elapsed >= drain_deadline {
break;
}
match self.database.list_jobs() {
Ok(jobs) => {
let active = jobs
.iter()
.filter(|j| {
j.context.as_ref().map_or(false, |ctx| {
matches!(ctx.state, JobState::Queued | JobState::Running)
})
})
.count();
jobs_completed = jobs
.iter()
.filter(|j| {
j.context
.as_ref()
.map_or(false, |ctx| matches!(ctx.state, JobState::Completed))
})
.count();
if active == 0 {
break;
}
}
Err(_) => break,
}
tokio::time::sleep(poll_interval).await;
}
self.engine.stop().await?;
self.shutdown_flag.set(ShutdownState::Terminated);
let elapsed_ms = start.elapsed().as_millis() as u64;
Ok(ShutdownReport {
jobs_completed,
jobs_cancelled,
elapsed_ms,
state: ShutdownState::Terminated,
})
}
}
#[cfg(all(test, feature = "sqlite"))]
mod tests {
use super::*;
use tempfile::NamedTempFile;
#[tokio::test]
async fn test_batch_engine_creation() {
let temp_file = NamedTempFile::new().expect("failed to create temp file");
let db_path = temp_file
.path()
.to_str()
.expect("path should be valid UTF-8");
let engine = BatchEngine::new(db_path, 4);
assert!(engine.is_ok());
}
#[tokio::test]
async fn test_job_submission() {
let temp_file = NamedTempFile::new().expect("failed to create temp file");
let db_path = temp_file
.path()
.to_str()
.expect("path should be valid UTF-8");
let engine = BatchEngine::new(db_path, 4).expect("failed to create");
let job = BatchJob::new(
"test-job".to_string(),
BatchOperation::FileOp {
operation: operations::FileOperation::Copy { overwrite: false },
},
);
let job_id = engine.submit_job(job).await;
assert!(job_id.is_ok());
}
#[tokio::test]
async fn test_job_status() {
let temp_file = NamedTempFile::new().expect("failed to create temp file");
let db_path = temp_file
.path()
.to_str()
.expect("path should be valid UTF-8");
let engine = BatchEngine::new(db_path, 4).expect("failed to create");
let job = BatchJob::new(
"test-job".to_string(),
BatchOperation::FileOp {
operation: operations::FileOperation::Copy { overwrite: false },
},
);
let job_id = engine.submit_job(job).await.expect("failed to submit job");
let status = engine.get_job_status(&job_id).await;
assert!(status.is_ok());
}
#[tokio::test]
async fn test_job_cancellation() {
let temp_file = NamedTempFile::new().expect("failed to create temp file");
let db_path = temp_file
.path()
.to_str()
.expect("path should be valid UTF-8");
let engine = BatchEngine::new(db_path, 4).expect("failed to create");
let job = BatchJob::new(
"test-job".to_string(),
BatchOperation::FileOp {
operation: operations::FileOperation::Copy { overwrite: false },
},
);
let job_id = engine.submit_job(job).await.expect("failed to submit job");
let result = engine.cancel_job(&job_id).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_list_jobs() {
let temp_file = NamedTempFile::new().expect("failed to create temp file");
let db_path = temp_file
.path()
.to_str()
.expect("path should be valid UTF-8");
let engine = BatchEngine::new(db_path, 4).expect("failed to create");
let jobs = engine.list_jobs();
assert!(jobs.is_ok());
}
}
#[cfg(test)]
mod shutdown_tests {
use super::*;
#[test]
fn test_shutdown_flag_initial_state() {
let flag = ShutdownFlag::new();
assert_eq!(flag.state(), ShutdownState::Running);
assert!(!flag.is_shutdown_requested());
}
#[test]
fn test_shutdown_flag_transitions() {
let flag = ShutdownFlag::new();
flag.set(ShutdownState::ShutdownRequested);
assert_eq!(flag.state(), ShutdownState::ShutdownRequested);
assert!(flag.is_shutdown_requested());
flag.set(ShutdownState::Draining);
assert_eq!(flag.state(), ShutdownState::Draining);
flag.set(ShutdownState::Terminated);
assert_eq!(flag.state(), ShutdownState::Terminated);
}
#[test]
fn test_shutdown_flag_default() {
let flag = ShutdownFlag::default();
assert_eq!(flag.state(), ShutdownState::Running);
}
#[test]
fn test_shutdown_config_default() {
let cfg = ShutdownConfig::default();
assert_eq!(cfg.drain_timeout_ms, 5_000);
assert!(cfg.force_after_ms.is_none());
}
#[test]
fn test_shutdown_config_with_force() {
let cfg = ShutdownConfig {
drain_timeout_ms: 2_000,
force_after_ms: Some(4_000),
};
assert_eq!(cfg.drain_timeout_ms, 2_000);
assert_eq!(cfg.force_after_ms, Some(4_000));
}
#[test]
fn test_shutdown_state_from_u8_all_variants() {
assert_eq!(ShutdownState::from_u8(0), ShutdownState::Running);
assert_eq!(ShutdownState::from_u8(1), ShutdownState::ShutdownRequested);
assert_eq!(ShutdownState::from_u8(2), ShutdownState::Draining);
assert_eq!(ShutdownState::from_u8(3), ShutdownState::Terminated);
assert_eq!(ShutdownState::from_u8(99), ShutdownState::Terminated);
}
#[test]
fn test_shutdown_report_fields() {
let report = ShutdownReport {
jobs_completed: 5,
jobs_cancelled: 2,
elapsed_ms: 300,
state: ShutdownState::Terminated,
};
assert_eq!(report.jobs_completed, 5);
assert_eq!(report.jobs_cancelled, 2);
assert_eq!(report.elapsed_ms, 300);
assert_eq!(report.state, ShutdownState::Terminated);
}
#[test]
fn test_shutdown_flag_arc_share() {
use std::sync::Arc;
let flag = Arc::new(ShutdownFlag::new());
let flag2 = Arc::clone(&flag);
flag2.set(ShutdownState::ShutdownRequested);
assert!(flag.is_shutdown_requested());
}
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
#[tokio::test]
async fn test_engine_shutdown_state_initially_running() {
use tempfile::NamedTempFile;
let tmp = NamedTempFile::new().expect("tmp file");
let db_path = tmp.path().to_str().expect("path utf8");
let engine = BatchEngine::new(db_path, 2).expect("create engine");
assert_eq!(engine.shutdown_state(), ShutdownState::Running);
}
#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
#[tokio::test]
async fn test_engine_request_shutdown_empty_queue() {
use tempfile::NamedTempFile;
let tmp = NamedTempFile::new().expect("tmp file");
let db_path = tmp.path().to_str().expect("path utf8");
let engine = BatchEngine::new(db_path, 2).expect("create engine");
let config = ShutdownConfig {
drain_timeout_ms: 100,
force_after_ms: None,
};
let report = engine
.request_shutdown(config)
.await
.expect("shutdown failed");
assert_eq!(report.state, ShutdownState::Terminated);
assert!(report.elapsed_ms < 5_000);
}
}