use std::time::Duration;
use thiserror::Error;
pub const DEFAULT_SEQUENCER_BATCH_SIZE: u32 = 1000;
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_mins(10);
pub const DEFAULT_PARTITION_BATCH_LIMIT: u32 = 128;
pub const DEFAULT_MAX_INNER_ITERATIONS: u32 = 8;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Partitions(u16);
impl Partitions {
#[must_use]
pub const fn of(n: u16) -> Self {
assert!(
n >= 1 && n <= 64 && n.is_power_of_two(),
"partition count must be a power of 2 between 1 and 64"
);
Self(n)
}
#[must_use]
pub const fn count(self) -> u16 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct OutboxMessageId(pub i64);
#[derive(Debug)]
pub struct EnqueueMessage<'a> {
pub partition: u32,
pub payload: Vec<u8>,
pub payload_type: &'a str,
}
#[derive(Debug, Error)]
pub enum OutboxError {
#[error("queue '{0}' is not registered")]
QueueNotRegistered(String),
#[error("partition {partition} is out of range for queue '{queue}' (max {max})")]
PartitionOutOfRange {
queue: String,
partition: u32,
max: u32,
},
#[error("payload size {size} exceeds maximum {max}")]
PayloadTooLarge { size: usize, max: usize },
#[error("partition count mismatch for queue '{queue}': expected {expected}, found {found}")]
PartitionCountMismatch {
queue: String,
expected: u16,
found: usize,
},
#[error("invalid queue name: '{0}'")]
InvalidQueueName(String),
#[error("invalid payload type: '{0}'")]
InvalidPayloadType(String),
#[error(transparent)]
Database(#[from] sea_orm::DbErr),
}
#[derive(Debug, Clone, Default)]
pub struct OutboxConfig {
pub sequencer: SequencerConfig,
}
#[derive(Debug, Clone)]
pub struct SequencerConfig {
pub batch_size: u32,
pub poll_interval: Duration,
pub partition_batch_limit: u32,
pub max_inner_iterations: u32,
}
impl Default for SequencerConfig {
fn default() -> Self {
Self {
batch_size: DEFAULT_SEQUENCER_BATCH_SIZE,
poll_interval: DEFAULT_POLL_INTERVAL,
partition_batch_limit: DEFAULT_PARTITION_BATCH_LIMIT,
max_inner_iterations: DEFAULT_MAX_INNER_ITERATIONS,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct LeaseConfig {
pub duration: Duration,
pub headroom: Duration,
}
impl Default for LeaseConfig {
fn default() -> Self {
Self {
duration: Duration::from_secs(30),
headroom: Duration::from_secs(2),
}
}
}
impl LeaseConfig {
pub(crate) fn validate(&self) {
assert!(
self.headroom < self.duration,
"LeaseConfig: headroom ({:?}) must be less than duration ({:?})",
self.headroom,
self.duration,
);
}
pub(crate) fn handler_budget(&self) -> Duration {
self.duration.saturating_sub(self.headroom)
}
}
#[derive(Debug, Clone)]
pub struct WorkerTuning {
pub batch_size: u32,
pub min_interval: Duration,
pub active_interval: Duration,
pub idle_interval: Duration,
pub ramp_step: Duration,
pub retry_base: Duration,
pub retry_max: Duration,
pub degradation_threshold: u32,
pub lease_duration: Duration,
}
impl WorkerTuning {
#[must_use]
pub fn batch_size(mut self, n: u32) -> Self {
self.batch_size = n;
self
}
#[must_use]
pub fn min_interval(mut self, d: Duration) -> Self {
self.min_interval = d;
self
}
#[must_use]
pub fn active_interval(mut self, d: Duration) -> Self {
self.active_interval = d;
self
}
#[must_use]
pub fn idle_interval(mut self, d: Duration) -> Self {
self.idle_interval = d;
self
}
#[must_use]
pub fn ramp_step(mut self, d: Duration) -> Self {
self.ramp_step = d;
self
}
#[must_use]
pub fn retry_base(mut self, d: Duration) -> Self {
self.retry_base = d;
self
}
#[must_use]
pub fn retry_max(mut self, d: Duration) -> Self {
self.retry_max = d;
self
}
#[must_use]
pub fn degradation_threshold(mut self, n: u32) -> Self {
self.degradation_threshold = n;
self
}
#[must_use]
pub fn lease_duration(mut self, d: Duration) -> Self {
self.lease_duration = d;
self
}
#[must_use]
pub fn processor() -> Self {
Self::processor_default()
}
#[must_use]
pub fn sequencer() -> Self {
Self::sequencer_default()
}
#[must_use]
pub fn vacuum() -> Self {
Self {
batch_size: 10_000,
min_interval: Duration::from_secs(1),
active_interval: Duration::from_secs(1),
idle_interval: Duration::from_hours(1),
ramp_step: Duration::ZERO,
retry_base: Duration::from_secs(1),
retry_max: Duration::from_mins(1),
degradation_threshold: 1,
lease_duration: Duration::from_secs(30),
}
}
#[must_use]
pub fn reconciler() -> Self {
Self {
batch_size: 1,
min_interval: Duration::from_secs(1),
active_interval: Duration::from_secs(1),
idle_interval: Duration::from_mins(1),
ramp_step: Duration::ZERO,
retry_base: Duration::from_secs(1),
retry_max: Duration::from_mins(1),
degradation_threshold: 1,
lease_duration: Duration::from_secs(30),
}
}
#[must_use]
pub fn processor_default() -> Self {
Self {
batch_size: 10,
min_interval: Duration::from_millis(100),
active_interval: Duration::from_millis(500),
idle_interval: Duration::from_mins(10),
ramp_step: Duration::from_millis(50),
retry_base: Duration::from_secs(1),
retry_max: Duration::from_mins(1),
degradation_threshold: 2,
lease_duration: Duration::from_secs(30),
}
}
#[must_use]
pub fn processor_low_latency() -> Self {
Self {
batch_size: 10,
min_interval: Duration::from_millis(1),
active_interval: Duration::from_millis(2),
idle_interval: Duration::from_mins(1),
ramp_step: Duration::from_millis(1),
retry_base: Duration::from_millis(100),
retry_max: Duration::from_secs(10),
degradation_threshold: 3,
lease_duration: Duration::from_secs(30),
}
}
#[must_use]
pub fn processor_high_throughput() -> Self {
Self {
batch_size: 100,
min_interval: Duration::from_millis(1),
active_interval: Duration::from_millis(20),
idle_interval: Duration::from_mins(10),
ramp_step: Duration::from_millis(2),
retry_base: Duration::from_secs(1),
retry_max: Duration::from_mins(1),
degradation_threshold: 2,
lease_duration: Duration::from_secs(30),
}
}
#[must_use]
pub fn processor_relaxed() -> Self {
Self {
batch_size: 10,
min_interval: Duration::from_millis(100),
active_interval: Duration::from_millis(500),
idle_interval: Duration::from_mins(10),
ramp_step: Duration::from_millis(50),
retry_base: Duration::from_secs(5),
retry_max: Duration::from_mins(5),
degradation_threshold: 1,
lease_duration: Duration::from_secs(30),
}
}
#[must_use]
pub fn sequencer_default() -> Self {
Self {
batch_size: 1000,
min_interval: Duration::from_millis(100),
active_interval: Duration::from_millis(500),
idle_interval: Duration::from_mins(10),
ramp_step: Duration::from_millis(50),
retry_base: Duration::from_millis(100),
retry_max: Duration::from_secs(30),
degradation_threshold: 1,
lease_duration: Duration::from_secs(30),
}
}
#[must_use]
pub fn sequencer_low_latency() -> Self {
Self {
batch_size: 500,
min_interval: Duration::ZERO,
active_interval: Duration::from_millis(1),
idle_interval: Duration::from_mins(1),
ramp_step: Duration::ZERO,
retry_base: Duration::from_millis(100),
retry_max: Duration::from_secs(30),
degradation_threshold: 1,
lease_duration: Duration::from_secs(30),
}
}
#[must_use]
pub fn sequencer_high_throughput() -> Self {
Self {
batch_size: 2000,
min_interval: Duration::from_millis(10),
active_interval: Duration::from_millis(100),
idle_interval: Duration::from_mins(10),
ramp_step: Duration::from_millis(10),
retry_base: Duration::from_millis(100),
retry_max: Duration::from_secs(30),
degradation_threshold: 1,
lease_duration: Duration::from_secs(30),
}
}
#[must_use]
pub fn sequencer_relaxed() -> Self {
Self {
batch_size: 1000,
min_interval: Duration::from_millis(100),
active_interval: Duration::from_millis(500),
idle_interval: Duration::from_mins(10),
ramp_step: Duration::from_millis(100),
retry_base: Duration::from_millis(100),
retry_max: Duration::from_secs(30),
degradation_threshold: 1,
lease_duration: Duration::from_secs(30),
}
}
}
impl From<&WorkerTuning> for super::taskward::PacingConfig {
fn from(t: &WorkerTuning) -> Self {
Self {
min_interval: t.min_interval,
active_interval: t.active_interval,
ramp_step: t.ramp_step,
}
}
}
impl From<WorkerTuning> for super::taskward::PacingConfig {
fn from(t: WorkerTuning) -> Self {
Self::from(&t)
}
}
impl WorkerTuning {
pub fn validate(&self) {
assert!(
self.batch_size >= 1,
"WorkerTuning: batch_size must be >= 1"
);
assert!(
self.min_interval <= self.active_interval,
"WorkerTuning: min_interval ({:?}) must be <= active_interval ({:?})",
self.min_interval,
self.active_interval
);
assert!(
!self.retry_base.is_zero(),
"WorkerTuning: retry_base must be > 0 (got ZERO)"
);
assert!(
self.retry_base <= self.retry_max,
"WorkerTuning: retry_base ({:?}) must be <= retry_max ({:?})",
self.retry_base,
self.retry_max
);
assert!(
self.degradation_threshold >= 1,
"WorkerTuning: degradation_threshold must be >= 1 (got {})",
self.degradation_threshold
);
}
}
#[derive(Debug, Clone)]
pub struct OutboxProfile {
pub sequencer: WorkerTuning,
pub processor: WorkerTuning,
pub vacuum: WorkerTuning,
pub reconciler: WorkerTuning,
}
impl OutboxProfile {
#[must_use]
pub fn default_profile() -> Self {
Self {
sequencer: WorkerTuning::sequencer_default(),
processor: WorkerTuning::processor_default(),
vacuum: WorkerTuning::vacuum(),
reconciler: WorkerTuning::reconciler(),
}
}
#[must_use]
pub fn low_latency() -> Self {
Self {
sequencer: WorkerTuning::sequencer_low_latency(),
processor: WorkerTuning::processor_low_latency(),
vacuum: WorkerTuning::vacuum(),
reconciler: WorkerTuning::reconciler().idle_interval(Duration::from_secs(30)),
}
}
#[must_use]
pub fn high_throughput() -> Self {
Self {
sequencer: WorkerTuning::sequencer_high_throughput(),
processor: WorkerTuning::processor_high_throughput(),
vacuum: WorkerTuning::vacuum(),
reconciler: WorkerTuning::reconciler(),
}
}
#[must_use]
pub fn relaxed() -> Self {
Self {
sequencer: WorkerTuning::sequencer_relaxed(),
processor: WorkerTuning::processor_relaxed(),
vacuum: WorkerTuning::vacuum(),
reconciler: WorkerTuning::reconciler().idle_interval(Duration::from_mins(2)),
}
}
}
impl Default for OutboxProfile {
fn default() -> Self {
Self::default_profile()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
#[test]
#[should_panic(expected = "retry_base must be > 0")]
fn validate_rejects_zero_retry_base() {
WorkerTuning::processor_default()
.retry_base(Duration::ZERO)
.validate();
}
#[test]
#[should_panic(expected = "degradation_threshold must be >= 1")]
fn validate_rejects_zero_degradation_threshold() {
WorkerTuning::processor_default()
.degradation_threshold(0)
.validate();
}
#[test]
fn lease_config_default() {
let cfg = LeaseConfig::default();
assert_eq!(cfg.duration, Duration::from_secs(30));
assert_eq!(cfg.headroom, Duration::from_secs(2));
assert_eq!(cfg.handler_budget(), Duration::from_secs(28));
}
#[test]
#[should_panic(expected = "headroom")]
fn lease_config_rejects_headroom_equal_to_duration() {
LeaseConfig {
duration: Duration::from_secs(5),
headroom: Duration::from_secs(5),
}
.validate();
}
#[test]
#[should_panic(expected = "headroom")]
fn lease_config_rejects_headroom_greater_than_duration() {
LeaseConfig {
duration: Duration::from_secs(5),
headroom: Duration::from_secs(10),
}
.validate();
}
}