Skip to main content

oximedia_batch/
lib.rs

1//! Comprehensive batch processing engine for `OxiMedia`
2//!
3//! This crate provides a production-ready batch processing system with:
4//! - Job queuing and scheduling
5//! - Worker pool management
6//! - Template-based configuration
7//! - Watch folder automation
8//! - Distributed processing support
9//! - REST API and CLI interfaces
10
11// `memmap2` requires one unavoidable `unsafe` block per `Mmap::map()` call;
12// the block is isolated in `operations/mmap_reader.rs` with a safety comment.
13// All other modules remain fully safe.
14#![deny(unsafe_code)]
15#![warn(missing_docs)]
16
17#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
18pub mod api;
19/// Immutable append-only audit trail for all job lifecycle events.
20pub mod audit_log;
21/// Per-job and fleet-wide batch analytics: throughput, latency, error rates.
22pub mod batch_analytics;
23pub mod batch_report;
24pub mod batch_runner;
25pub mod batch_schedule;
26/// Job chaining: define DAG-like sequential/parallel chains between jobs.
27pub mod chaining;
28/// Durable checkpoint persistence for mid-job progress and crash recovery.
29pub mod checkpoint;
30pub mod checkpointing;
31#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
32pub mod cli;
33/// Cluster-member discovery and heartbeat tracking for distributed workers.
34pub mod cluster_discovery;
35pub mod conditional_dag;
36/// CPU/GPU/IO cost estimation for job admission control and resource planning.
37pub mod cost_estimator;
38#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
39pub mod database;
40/// Dead-letter queue for failed jobs with replay and quarantine support.
41pub mod dead_letter_queue;
42pub mod dep_graph;
43pub mod dependency;
44pub mod error;
45/// Error recovery strategies: retry with backoff, circuit breaker, fallback.
46pub mod error_recovery;
47#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
48pub mod examples;
49#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
50pub mod execution;
51pub mod fair_scheduler;
52/// Graceful shutdown coordinator: drain window, force-cancel, status reporting.
53pub mod graceful_shutdown;
54pub mod job;
55pub mod job_archive;
56/// Job dependency tracker: declarative `before`/`after` constraint resolution.
57pub mod job_deps;
58pub mod job_migration;
59pub mod job_splitting;
60pub mod metrics;
61pub mod monitoring;
62/// Notification hub: fan-out job events to webhooks, email, and Slack.
63pub mod notification_hub;
64pub mod notifications;
65pub mod operations;
66pub mod output_collector;
67pub mod pipeline_validator;
68pub mod presets;
69pub mod priority_queue;
70#[cfg(not(target_arch = "wasm32"))]
71pub mod processor;
72/// Progress aggregator: roll up subtask progress into parent-job percentage.
73pub mod progress_agg;
74pub mod progress_tracker;
75#[cfg(not(target_arch = "wasm32"))]
76pub mod queue;
77/// Quota definition types: resource ceilings and usage counters.
78pub mod quota;
79/// Quota enforcer: per-user/team hard limits on concurrent and total jobs.
80pub mod quota_enforcer;
81pub mod rate_limiter;
82pub mod resource_estimator;
83pub mod resource_reservation;
84pub mod retry_policy;
85#[cfg(all(not(target_arch = "wasm32"), feature = "scripting"))]
86pub mod script;
87pub mod task_group;
88pub mod template;
89pub mod throttle;
90pub mod timeout_enforcer;
91pub mod types;
92pub mod utils;
93#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
94pub mod watch;
95pub mod work_stealing;
96
97pub use error::{BatchError, Result};
98pub use job::{BatchJob, BatchOperation, InputSpec, OutputSpec};
99pub use types::{JobId, JobState, Priority, RetryPolicy};
100
101#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
102use database::Database;
103#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
104use execution::ExecutionEngine;
105#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
106use queue::JobQueue;
107#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
108use std::sync::Arc;
109
110// ---------------------------------------------------------------------------
111// Graceful shutdown types (always compiled — not gated on sqlite/wasm)
112// ---------------------------------------------------------------------------
113
114use std::sync::atomic::{AtomicU8, Ordering};
115
116/// Configuration for graceful shutdown of [`BatchEngine`].
117#[derive(Debug, Clone)]
118pub struct ShutdownConfig {
119    /// How long to wait (in milliseconds) for in-progress jobs to finish
120    /// before considering the drain complete.
121    pub drain_timeout_ms: u64,
122    /// If `Some`, forcibly cancel remaining jobs after this many milliseconds
123    /// even if `drain_timeout_ms` has not elapsed.  Must be ≥ `drain_timeout_ms`
124    /// to take effect.
125    pub force_after_ms: Option<u64>,
126}
127
128impl Default for ShutdownConfig {
129    fn default() -> Self {
130        Self {
131            drain_timeout_ms: 5_000,
132            force_after_ms: None,
133        }
134    }
135}
136
137/// Shutdown progression state.
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139#[repr(u8)]
140pub enum ShutdownState {
141    /// Engine is accepting new jobs.
142    Running = 0,
143    /// Shutdown has been requested; no new jobs accepted.
144    ShutdownRequested = 1,
145    /// Waiting for in-progress jobs to drain.
146    Draining = 2,
147    /// All jobs finished (or force-timeout elapsed); engine is stopped.
148    Terminated = 3,
149}
150
151impl ShutdownState {
152    fn from_u8(v: u8) -> Self {
153        match v {
154            0 => Self::Running,
155            1 => Self::ShutdownRequested,
156            2 => Self::Draining,
157            3 => Self::Terminated,
158            _ => Self::Terminated,
159        }
160    }
161}
162
163/// Summary returned by [`BatchEngine::request_shutdown`].
164#[derive(Debug, Clone)]
165pub struct ShutdownReport {
166    /// Number of jobs that finished cleanly during the drain window.
167    pub jobs_completed: usize,
168    /// Number of jobs that were cancelled (force timeout or drain timeout).
169    pub jobs_cancelled: usize,
170    /// Wall-clock time the shutdown procedure took in milliseconds.
171    pub elapsed_ms: u64,
172    /// Final shutdown state (always [`ShutdownState::Terminated`] on success).
173    pub state: ShutdownState,
174}
175
176/// Shared shutdown flag — stored as an `AtomicU8` to allow lock-free reads.
177pub struct ShutdownFlag(AtomicU8);
178
179impl ShutdownFlag {
180    /// Create a new flag in the `Running` state.
181    #[must_use]
182    pub fn new() -> Self {
183        Self(AtomicU8::new(ShutdownState::Running as u8))
184    }
185
186    /// Read the current state.
187    #[must_use]
188    pub fn state(&self) -> ShutdownState {
189        ShutdownState::from_u8(self.0.load(Ordering::Acquire))
190    }
191
192    /// Transition to the given state.
193    pub fn set(&self, state: ShutdownState) {
194        self.0.store(state as u8, Ordering::Release);
195    }
196
197    /// Returns `true` if shutdown has been requested (any non-Running state).
198    #[must_use]
199    pub fn is_shutdown_requested(&self) -> bool {
200        self.state() != ShutdownState::Running
201    }
202}
203
204impl Default for ShutdownFlag {
205    fn default() -> Self {
206        Self::new()
207    }
208}
209
210/// Main batch processing engine
211#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
212pub struct BatchEngine {
213    queue: Arc<JobQueue>,
214    engine: Arc<ExecutionEngine>,
215    database: Arc<Database>,
216    shutdown_flag: Arc<ShutdownFlag>,
217}
218
219#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
220impl BatchEngine {
221    /// Create a new batch processing engine
222    ///
223    /// # Arguments
224    ///
225    /// * `db_path` - Path to `SQLite` database file
226    /// * `worker_count` - Number of worker threads
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if database initialization fails
231    pub fn new(db_path: &str, worker_count: usize) -> Result<Self> {
232        let database = Arc::new(Database::new(db_path)?);
233        let queue = Arc::new(JobQueue::new());
234        let engine = Arc::new(ExecutionEngine::new(
235            worker_count,
236            Arc::clone(&queue),
237            Arc::clone(&database),
238        )?);
239
240        Ok(Self {
241            queue,
242            engine,
243            database,
244            shutdown_flag: Arc::new(ShutdownFlag::new()),
245        })
246    }
247
248    /// Submit a job to the queue
249    ///
250    /// # Arguments
251    ///
252    /// * `job` - The job to submit
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if job submission fails
257    pub async fn submit_job(&self, job: BatchJob) -> Result<JobId> {
258        let job_id = job.id.clone();
259        self.database.save_job(&job)?;
260        self.queue.enqueue(job).await?;
261        Ok(job_id)
262    }
263
264    /// Get job status
265    ///
266    /// # Arguments
267    ///
268    /// * `job_id` - ID of the job to query
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if the job is not found
273    pub async fn get_job_status(&self, job_id: &JobId) -> Result<JobState> {
274        self.queue.get_job_status(job_id).await
275    }
276
277    /// Cancel a job
278    ///
279    /// # Arguments
280    ///
281    /// * `job_id` - ID of the job to cancel
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if cancellation fails
286    pub async fn cancel_job(&self, job_id: &JobId) -> Result<()> {
287        self.queue.cancel_job(job_id).await
288    }
289
290    /// List all jobs
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if database query fails
295    pub fn list_jobs(&self) -> Result<Vec<BatchJob>> {
296        self.database.list_jobs()
297    }
298
299    /// Start the execution engine
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if engine startup fails
304    pub async fn start(&self) -> Result<()> {
305        self.engine.start().await
306    }
307
308    /// Stop the execution engine
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if engine shutdown fails
313    pub async fn stop(&self) -> Result<()> {
314        self.engine.stop().await
315    }
316
317    /// Get queue reference
318    #[must_use]
319    pub fn queue(&self) -> Arc<JobQueue> {
320        Arc::clone(&self.queue)
321    }
322
323    /// Get engine reference
324    #[must_use]
325    pub fn engine(&self) -> Arc<ExecutionEngine> {
326        Arc::clone(&self.engine)
327    }
328
329    /// Get database reference
330    #[must_use]
331    pub fn database(&self) -> Arc<Database> {
332        Arc::clone(&self.database)
333    }
334
335    /// Returns a clone of the shared shutdown flag, allowing external
336    /// components to observe the engine's shutdown state.
337    #[must_use]
338    pub fn shutdown_flag(&self) -> Arc<ShutdownFlag> {
339        Arc::clone(&self.shutdown_flag)
340    }
341
342    /// Returns the current shutdown state.
343    #[must_use]
344    pub fn shutdown_state(&self) -> ShutdownState {
345        self.shutdown_flag.state()
346    }
347
348    /// Request a graceful shutdown of the batch engine.
349    ///
350    /// Steps:
351    /// 1. Transitions state to [`ShutdownState::ShutdownRequested`] — no new
352    ///    jobs are accepted.
353    /// 2. Transitions to [`ShutdownState::Draining`] and polls until either
354    ///    - All queued/running jobs finish, **or**
355    ///    - `config.drain_timeout_ms` elapses.
356    /// 3. If `config.force_after_ms` is set and that deadline elapses before
357    ///    drain completes, remaining jobs are cancelled.
358    /// 4. Calls `stop()` on the underlying execution engine and transitions to
359    ///    [`ShutdownState::Terminated`].
360    ///
361    /// Returns a [`ShutdownReport`] describing what happened.
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if the underlying engine `stop()` fails.
366    pub async fn request_shutdown(&self, config: ShutdownConfig) -> Result<ShutdownReport> {
367        let start = std::time::Instant::now();
368
369        // --- Phase 1: signal no new jobs ---
370        self.shutdown_flag.set(ShutdownState::ShutdownRequested);
371
372        // --- Phase 2: drain ---
373        self.shutdown_flag.set(ShutdownState::Draining);
374
375        let drain_deadline = std::time::Duration::from_millis(config.drain_timeout_ms);
376        let force_deadline = config.force_after_ms.map(std::time::Duration::from_millis);
377
378        let mut jobs_completed = 0usize;
379        let mut jobs_cancelled = 0usize;
380
381        // Poll the queue until it reports no active jobs or a deadline fires.
382        // We check every 10 ms.
383        let poll_interval = std::time::Duration::from_millis(10);
384        loop {
385            let elapsed = start.elapsed();
386
387            // Force-cancel deadline takes precedence if set.
388            if let Some(force) = force_deadline {
389                if elapsed >= force {
390                    // Cancel anything still pending.
391                    if let Ok(pending) = self.database.list_jobs() {
392                        for job in &pending {
393                            let is_active = job.context.as_ref().map_or(false, |ctx| {
394                                matches!(ctx.state, JobState::Queued | JobState::Running)
395                            });
396                            if is_active {
397                                let _ = self.queue.cancel_job(&job.id).await;
398                                jobs_cancelled += 1;
399                            }
400                        }
401                    }
402                    break;
403                }
404            }
405
406            // Drain timeout.
407            if elapsed >= drain_deadline {
408                break;
409            }
410
411            // Check if all jobs are finished.
412            match self.database.list_jobs() {
413                Ok(jobs) => {
414                    let active = jobs
415                        .iter()
416                        .filter(|j| {
417                            j.context.as_ref().map_or(false, |ctx| {
418                                matches!(ctx.state, JobState::Queued | JobState::Running)
419                            })
420                        })
421                        .count();
422                    jobs_completed = jobs
423                        .iter()
424                        .filter(|j| {
425                            j.context
426                                .as_ref()
427                                .map_or(false, |ctx| matches!(ctx.state, JobState::Completed))
428                        })
429                        .count();
430                    if active == 0 {
431                        break;
432                    }
433                }
434                Err(_) => break,
435            }
436
437            tokio::time::sleep(poll_interval).await;
438        }
439
440        // --- Phase 3: stop engine ---
441        self.engine.stop().await?;
442        self.shutdown_flag.set(ShutdownState::Terminated);
443
444        let elapsed_ms = start.elapsed().as_millis() as u64;
445
446        Ok(ShutdownReport {
447            jobs_completed,
448            jobs_cancelled,
449            elapsed_ms,
450            state: ShutdownState::Terminated,
451        })
452    }
453}
454
455#[cfg(all(test, feature = "sqlite"))]
456mod tests {
457    use super::*;
458    use tempfile::NamedTempFile;
459
460    #[tokio::test]
461    async fn test_batch_engine_creation() {
462        let temp_file = NamedTempFile::new().expect("failed to create temp file");
463        let db_path = temp_file
464            .path()
465            .to_str()
466            .expect("path should be valid UTF-8");
467        let engine = BatchEngine::new(db_path, 4);
468        assert!(engine.is_ok());
469    }
470
471    #[tokio::test]
472    async fn test_job_submission() {
473        let temp_file = NamedTempFile::new().expect("failed to create temp file");
474        let db_path = temp_file
475            .path()
476            .to_str()
477            .expect("path should be valid UTF-8");
478        let engine = BatchEngine::new(db_path, 4).expect("failed to create");
479
480        let job = BatchJob::new(
481            "test-job".to_string(),
482            BatchOperation::FileOp {
483                operation: operations::FileOperation::Copy { overwrite: false },
484            },
485        );
486
487        let job_id = engine.submit_job(job).await;
488        assert!(job_id.is_ok());
489    }
490
491    #[tokio::test]
492    async fn test_job_status() {
493        let temp_file = NamedTempFile::new().expect("failed to create temp file");
494        let db_path = temp_file
495            .path()
496            .to_str()
497            .expect("path should be valid UTF-8");
498        let engine = BatchEngine::new(db_path, 4).expect("failed to create");
499
500        let job = BatchJob::new(
501            "test-job".to_string(),
502            BatchOperation::FileOp {
503                operation: operations::FileOperation::Copy { overwrite: false },
504            },
505        );
506
507        let job_id = engine.submit_job(job).await.expect("failed to submit job");
508        let status = engine.get_job_status(&job_id).await;
509        assert!(status.is_ok());
510    }
511
512    #[tokio::test]
513    async fn test_job_cancellation() {
514        let temp_file = NamedTempFile::new().expect("failed to create temp file");
515        let db_path = temp_file
516            .path()
517            .to_str()
518            .expect("path should be valid UTF-8");
519        let engine = BatchEngine::new(db_path, 4).expect("failed to create");
520
521        let job = BatchJob::new(
522            "test-job".to_string(),
523            BatchOperation::FileOp {
524                operation: operations::FileOperation::Copy { overwrite: false },
525            },
526        );
527
528        let job_id = engine.submit_job(job).await.expect("failed to submit job");
529        let result = engine.cancel_job(&job_id).await;
530        assert!(result.is_ok());
531    }
532
533    #[tokio::test]
534    async fn test_list_jobs() {
535        let temp_file = NamedTempFile::new().expect("failed to create temp file");
536        let db_path = temp_file
537            .path()
538            .to_str()
539            .expect("path should be valid UTF-8");
540        let engine = BatchEngine::new(db_path, 4).expect("failed to create");
541
542        let jobs = engine.list_jobs();
543        assert!(jobs.is_ok());
544    }
545}
546
547// ---------------------------------------------------------------------------
548// Shutdown flag & config unit tests (no sqlite feature required)
549// ---------------------------------------------------------------------------
550
551#[cfg(test)]
552mod shutdown_tests {
553    use super::*;
554
555    #[test]
556    fn test_shutdown_flag_initial_state() {
557        let flag = ShutdownFlag::new();
558        assert_eq!(flag.state(), ShutdownState::Running);
559        assert!(!flag.is_shutdown_requested());
560    }
561
562    #[test]
563    fn test_shutdown_flag_transitions() {
564        let flag = ShutdownFlag::new();
565        flag.set(ShutdownState::ShutdownRequested);
566        assert_eq!(flag.state(), ShutdownState::ShutdownRequested);
567        assert!(flag.is_shutdown_requested());
568
569        flag.set(ShutdownState::Draining);
570        assert_eq!(flag.state(), ShutdownState::Draining);
571
572        flag.set(ShutdownState::Terminated);
573        assert_eq!(flag.state(), ShutdownState::Terminated);
574    }
575
576    #[test]
577    fn test_shutdown_flag_default() {
578        let flag = ShutdownFlag::default();
579        assert_eq!(flag.state(), ShutdownState::Running);
580    }
581
582    #[test]
583    fn test_shutdown_config_default() {
584        let cfg = ShutdownConfig::default();
585        assert_eq!(cfg.drain_timeout_ms, 5_000);
586        assert!(cfg.force_after_ms.is_none());
587    }
588
589    #[test]
590    fn test_shutdown_config_with_force() {
591        let cfg = ShutdownConfig {
592            drain_timeout_ms: 2_000,
593            force_after_ms: Some(4_000),
594        };
595        assert_eq!(cfg.drain_timeout_ms, 2_000);
596        assert_eq!(cfg.force_after_ms, Some(4_000));
597    }
598
599    #[test]
600    fn test_shutdown_state_from_u8_all_variants() {
601        assert_eq!(ShutdownState::from_u8(0), ShutdownState::Running);
602        assert_eq!(ShutdownState::from_u8(1), ShutdownState::ShutdownRequested);
603        assert_eq!(ShutdownState::from_u8(2), ShutdownState::Draining);
604        assert_eq!(ShutdownState::from_u8(3), ShutdownState::Terminated);
605        // Out of range → Terminated
606        assert_eq!(ShutdownState::from_u8(99), ShutdownState::Terminated);
607    }
608
609    #[test]
610    fn test_shutdown_report_fields() {
611        let report = ShutdownReport {
612            jobs_completed: 5,
613            jobs_cancelled: 2,
614            elapsed_ms: 300,
615            state: ShutdownState::Terminated,
616        };
617        assert_eq!(report.jobs_completed, 5);
618        assert_eq!(report.jobs_cancelled, 2);
619        assert_eq!(report.elapsed_ms, 300);
620        assert_eq!(report.state, ShutdownState::Terminated);
621    }
622
623    #[test]
624    fn test_shutdown_flag_arc_share() {
625        use std::sync::Arc;
626        let flag = Arc::new(ShutdownFlag::new());
627        let flag2 = Arc::clone(&flag);
628        flag2.set(ShutdownState::ShutdownRequested);
629        assert!(flag.is_shutdown_requested());
630    }
631
632    #[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
633    #[tokio::test]
634    async fn test_engine_shutdown_state_initially_running() {
635        use tempfile::NamedTempFile;
636        let tmp = NamedTempFile::new().expect("tmp file");
637        let db_path = tmp.path().to_str().expect("path utf8");
638        let engine = BatchEngine::new(db_path, 2).expect("create engine");
639        assert_eq!(engine.shutdown_state(), ShutdownState::Running);
640    }
641
642    #[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))]
643    #[tokio::test]
644    async fn test_engine_request_shutdown_empty_queue() {
645        use tempfile::NamedTempFile;
646        let tmp = NamedTempFile::new().expect("tmp file");
647        let db_path = tmp.path().to_str().expect("path utf8");
648        let engine = BatchEngine::new(db_path, 2).expect("create engine");
649
650        let config = ShutdownConfig {
651            drain_timeout_ms: 100,
652            force_after_ms: None,
653        };
654        let report = engine
655            .request_shutdown(config)
656            .await
657            .expect("shutdown failed");
658        assert_eq!(report.state, ShutdownState::Terminated);
659        assert!(report.elapsed_ms < 5_000);
660    }
661}