hyperi_rustlib/worker/mod.rs
1// Project: hyperi-rustlib
2// File: src/worker/mod.rs
3// Purpose: Adaptive worker pool with hybrid rayon + tokio execution
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Adaptive worker pool and batch processing framework.
10//!
11//! Two layers:
12//!
13//! - **Generic:** [`AdaptiveWorkerPool`] provides CPU-saturating parallelism via
14//! rayon (CPU-bound) + tokio (async I/O), with reactive pressure-based scaling.
15//! Useful for any workload -- not DFE-specific.
16//!
17//! - **Opinionated:** [`BatchProcessor`] trait + [`BatchPipeline`] provide a
18//! structured parallel-then-sequential pipeline for DFE apps. Apps implement
19//! `BatchProcessor` for their domain; the pipeline handles stats, scaling,
20//! and batch orchestration. [`PipelineStats`] provides common atomic counters.
21//!
22//! ## Quick Start
23//!
24//! ```rust,ignore
25//! use hyperi_rustlib::worker::{AdaptiveWorkerPool, WorkerPoolConfig};
26//!
27//! let pool = AdaptiveWorkerPool::from_cascade("worker_pool")?;
28//! pool.register_metrics(&metrics_manager);
29//! pool.start_scaling_loop(shutdown_token.clone());
30//!
31//! // CPU-bound parallel transform (rayon)
32//! let results = pool.process_batch(&messages, |msg| {
33//! parse_and_transform(msg)
34//! });
35//!
36//! // Async parallel enrichment (tokio)
37//! let enriched = pool.fan_out_async(&items, |item| async move {
38//! enrich(item).await
39//! }).await;
40//! ```
41
42mod accumulator;
43mod batch;
44mod config;
45#[cfg(feature = "worker-batch")]
46pub mod engine;
47pub(crate) mod metrics;
48#[cfg(feature = "worker-batch")]
49pub mod ndjson;
50mod pool;
51pub(crate) mod scaler;
52mod stats;
53
54#[cfg(feature = "transport")]
55pub use accumulator::records_into_work_batch;
56pub use accumulator::{AccumulatorConfig, AccumulatorFull, BatchAccumulator, BatchDrainer};
57pub use batch::{BatchPipeline, BatchProcessor};
58pub use config::WorkerPoolConfig;
59#[cfg(feature = "worker-batch")]
60pub use engine::{
61 BatchEngine, BatchProcessingConfig, FieldInterner, MessageMetadata, ParsedMessage,
62 PreRouteFilterConfig,
63};
64#[cfg(all(feature = "worker-batch", feature = "transport"))]
65pub use engine::{CommitMode, EngineError, FilterDlqPolicy, ParsedBatch};
66pub use pool::{AdaptiveWorkerPool, FanOutPolicy, FanOutResult};
67pub use scaler::{ScalingDecision, ScalingInput};
68pub use stats::{PipelineStats, PipelineStatsSnapshot};