use crate::tiered_sink::CompressionCodec;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TieredSinkConfig {
pub spool_path: PathBuf,
#[serde(default = "default_send_timeout_ms")]
pub send_timeout_ms: u64,
#[serde(default)]
pub compression: CompressionCodec,
#[serde(default)]
pub drain_strategy: DrainStrategy,
#[serde(default)]
pub ordering: OrderingMode,
#[serde(default)]
pub max_spool_bytes: Option<u64>,
#[serde(default)]
pub max_spool_items: Option<usize>,
#[serde(default = "default_circuit_failure_threshold")]
pub circuit_failure_threshold: u32,
#[serde(default = "default_circuit_reset_timeout_ms")]
pub circuit_reset_timeout_ms: u64,
#[serde(default = "default_drain_interval_ms")]
pub drain_interval_ms: u64,
#[serde(default)]
pub disk_aware: Option<DiskAwareConfig>,
}
fn default_send_timeout_ms() -> u64 {
1000 }
fn default_circuit_failure_threshold() -> u32 {
5
}
fn default_circuit_reset_timeout_ms() -> u64 {
30_000 }
fn default_drain_interval_ms() -> u64 {
100 }
impl TieredSinkConfig {
#[must_use]
pub fn new(spool_path: impl Into<PathBuf>) -> Self {
Self {
spool_path: spool_path.into(),
send_timeout_ms: default_send_timeout_ms(),
compression: CompressionCodec::default(),
drain_strategy: DrainStrategy::default(),
ordering: OrderingMode::default(),
max_spool_bytes: None,
max_spool_items: None,
circuit_failure_threshold: default_circuit_failure_threshold(),
circuit_reset_timeout_ms: default_circuit_reset_timeout_ms(),
drain_interval_ms: default_drain_interval_ms(),
disk_aware: None,
}
}
#[must_use]
pub fn send_timeout(mut self, timeout: Duration) -> Self {
self.send_timeout_ms = u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX);
self
}
#[must_use]
pub fn compression(mut self, codec: CompressionCodec) -> Self {
self.compression = codec;
self
}
#[must_use]
pub fn drain_strategy(mut self, strategy: DrainStrategy) -> Self {
self.drain_strategy = strategy;
self
}
#[must_use]
pub fn ordering(mut self, mode: OrderingMode) -> Self {
self.ordering = mode;
self
}
#[must_use]
pub fn max_spool_bytes(mut self, max: u64) -> Self {
self.max_spool_bytes = Some(max);
self
}
#[must_use]
pub fn disk_aware(mut self, config: DiskAwareConfig) -> Self {
self.disk_aware = Some(config);
self
}
#[must_use]
pub fn send_timeout_duration(&self) -> Duration {
Duration::from_millis(self.send_timeout_ms)
}
#[must_use]
pub fn circuit_reset_timeout(&self) -> Duration {
Duration::from_millis(self.circuit_reset_timeout_ms)
}
#[must_use]
pub fn drain_interval(&self) -> Duration {
Duration::from_millis(self.drain_interval_ms)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiskAwareConfig {
#[serde(default = "default_max_usage_percent")]
pub max_usage_percent: f64,
#[serde(default = "default_poll_interval_secs")]
pub poll_interval_secs: u64,
}
fn default_max_usage_percent() -> f64 {
0.8
}
fn default_poll_interval_secs() -> u64 {
5
}
impl Default for DiskAwareConfig {
fn default() -> Self {
Self {
max_usage_percent: default_max_usage_percent(),
poll_interval_secs: default_poll_interval_secs(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum DrainStrategy {
Adaptive {
#[serde(default = "default_initial_rate")]
initial_rate: usize,
#[serde(default = "default_max_rate")]
max_rate: usize,
},
RateLimited {
msgs_per_sec: usize,
},
Greedy,
}
fn default_initial_rate() -> usize {
100
}
fn default_max_rate() -> usize {
10_000
}
impl Default for DrainStrategy {
fn default() -> Self {
Self::Adaptive {
initial_rate: default_initial_rate(),
max_rate: default_max_rate(),
}
}
}
impl DrainStrategy {
#[must_use]
pub fn adaptive(initial_rate: usize, max_rate: usize) -> Self {
Self::Adaptive {
initial_rate,
max_rate,
}
}
#[must_use]
pub fn rate_limited(msgs_per_sec: usize) -> Self {
Self::RateLimited { msgs_per_sec }
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OrderingMode {
#[default]
Interleaved,
StrictFifo,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = TieredSinkConfig::new("/tmp/test.queue");
assert_eq!(config.send_timeout_ms, 1000);
assert_eq!(config.compression, CompressionCodec::default());
assert!(matches!(
config.compression,
CompressionCodec::Zstd { level: 1 }
));
assert!(matches!(
config.drain_strategy,
DrainStrategy::Adaptive { .. }
));
assert_eq!(config.ordering, OrderingMode::Interleaved);
assert_eq!(config.circuit_failure_threshold, 5);
assert!(config.disk_aware.is_none());
}
#[test]
fn test_builder_pattern() {
let config = TieredSinkConfig::new("/tmp/test.queue")
.send_timeout(Duration::from_secs(5))
.compression(CompressionCodec::Snappy)
.drain_strategy(DrainStrategy::Greedy)
.ordering(OrderingMode::StrictFifo)
.max_spool_bytes(1024 * 1024 * 100);
assert_eq!(config.send_timeout_ms, 5000);
assert_eq!(config.compression, CompressionCodec::Snappy);
assert!(matches!(config.drain_strategy, DrainStrategy::Greedy));
assert_eq!(config.ordering, OrderingMode::StrictFifo);
assert_eq!(config.max_spool_bytes, Some(100 * 1024 * 1024));
}
#[test]
fn test_drain_strategy_constructors() {
let adaptive = DrainStrategy::adaptive(50, 5000);
assert!(matches!(
adaptive,
DrainStrategy::Adaptive {
initial_rate: 50,
max_rate: 5000
}
));
let rate_limited = DrainStrategy::rate_limited(1000);
assert!(matches!(
rate_limited,
DrainStrategy::RateLimited { msgs_per_sec: 1000 }
));
}
#[test]
fn test_duration_conversions() {
let config = TieredSinkConfig::new("/tmp/test.queue");
assert_eq!(config.send_timeout_duration(), Duration::from_secs(1));
assert_eq!(config.circuit_reset_timeout(), Duration::from_secs(30));
assert_eq!(config.drain_interval(), Duration::from_millis(100));
}
}