Skip to main content

hyperi_rustlib/tiered_sink/
config.rs

1// Project:   hyperi-rustlib
2// File:      src/tiered_sink/config.rs
3// Purpose:   TieredSink configuration
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! TieredSink configuration.
10
11use crate::tiered_sink::CompressionCodec;
12use serde::{Deserialize, Serialize};
13use std::path::PathBuf;
14use std::time::Duration;
15
16/// Configuration for TieredSink.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct TieredSinkConfig {
19    /// Path to the spool file for disk fallback.
20    pub spool_path: PathBuf,
21
22    /// Timeout for primary sink operations.
23    /// If exceeded, message is spooled to disk.
24    #[serde(default = "default_send_timeout_ms")]
25    pub send_timeout_ms: u64,
26
27    /// Compression codec for spooled messages.
28    #[serde(default)]
29    pub compression: CompressionCodec,
30
31    /// Strategy for draining spooled messages back to primary.
32    #[serde(default)]
33    pub drain_strategy: DrainStrategy,
34
35    /// Ordering mode for message delivery.
36    #[serde(default)]
37    pub ordering: OrderingMode,
38
39    /// Maximum spool file size in bytes.
40    /// Spool operations fail when exceeded.
41    #[serde(default)]
42    pub max_spool_bytes: Option<u64>,
43
44    /// Maximum messages in spool.
45    #[serde(default)]
46    pub max_spool_items: Option<usize>,
47
48    /// Circuit breaker: failures before opening circuit.
49    #[serde(default = "default_circuit_failure_threshold")]
50    pub circuit_failure_threshold: u32,
51
52    /// Circuit breaker: how long to wait before probing.
53    #[serde(default = "default_circuit_reset_timeout_ms")]
54    pub circuit_reset_timeout_ms: u64,
55
56    /// Interval for drain task to check spool.
57    #[serde(default = "default_drain_interval_ms")]
58    pub drain_interval_ms: u64,
59
60    /// Disk-aware capacity management. When configured, a background poller
61    /// checks available disk space and stops spooling if the filesystem
62    /// exceeds the configured usage threshold.
63    #[serde(default)]
64    pub disk_aware: Option<DiskAwareConfig>,
65}
66
67fn default_send_timeout_ms() -> u64 {
68    1000 // 1 second
69}
70
71fn default_circuit_failure_threshold() -> u32 {
72    5
73}
74
75fn default_circuit_reset_timeout_ms() -> u64 {
76    30_000 // 30 seconds
77}
78
79fn default_drain_interval_ms() -> u64 {
80    100 // 100ms
81}
82
83impl TieredSinkConfig {
84    /// Create a new config with the given spool path.
85    #[must_use]
86    pub fn new(spool_path: impl Into<PathBuf>) -> Self {
87        Self {
88            spool_path: spool_path.into(),
89            send_timeout_ms: default_send_timeout_ms(),
90            compression: CompressionCodec::default(),
91            drain_strategy: DrainStrategy::default(),
92            ordering: OrderingMode::default(),
93            max_spool_bytes: None,
94            max_spool_items: None,
95            circuit_failure_threshold: default_circuit_failure_threshold(),
96            circuit_reset_timeout_ms: default_circuit_reset_timeout_ms(),
97            drain_interval_ms: default_drain_interval_ms(),
98            disk_aware: None,
99        }
100    }
101
102    /// Set send timeout.
103    #[must_use]
104    pub fn send_timeout(mut self, timeout: Duration) -> Self {
105        self.send_timeout_ms = u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX);
106        self
107    }
108
109    /// Set compression codec.
110    #[must_use]
111    pub fn compression(mut self, codec: CompressionCodec) -> Self {
112        self.compression = codec;
113        self
114    }
115
116    /// Set drain strategy.
117    #[must_use]
118    pub fn drain_strategy(mut self, strategy: DrainStrategy) -> Self {
119        self.drain_strategy = strategy;
120        self
121    }
122
123    /// Set ordering mode.
124    #[must_use]
125    pub fn ordering(mut self, mode: OrderingMode) -> Self {
126        self.ordering = mode;
127        self
128    }
129
130    /// Set maximum spool size.
131    #[must_use]
132    pub fn max_spool_bytes(mut self, max: u64) -> Self {
133        self.max_spool_bytes = Some(max);
134        self
135    }
136
137    /// Enable disk-aware capacity management with default settings.
138    #[must_use]
139    pub fn disk_aware(mut self, config: DiskAwareConfig) -> Self {
140        self.disk_aware = Some(config);
141        self
142    }
143
144    /// Get send timeout as Duration.
145    #[must_use]
146    pub fn send_timeout_duration(&self) -> Duration {
147        Duration::from_millis(self.send_timeout_ms)
148    }
149
150    /// Get circuit reset timeout as Duration.
151    #[must_use]
152    pub fn circuit_reset_timeout(&self) -> Duration {
153        Duration::from_millis(self.circuit_reset_timeout_ms)
154    }
155
156    /// Get drain interval as Duration.
157    #[must_use]
158    pub fn drain_interval(&self) -> Duration {
159        Duration::from_millis(self.drain_interval_ms)
160    }
161}
162
163/// Configuration for disk-aware capacity management.
164///
165/// When enabled, a background poller checks filesystem usage and pauses
166/// spool writes when the disk exceeds the configured threshold. Writes
167/// resume automatically when space is recovered.
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct DiskAwareConfig {
170    /// Maximum filesystem usage percentage (0.0 - 1.0) before pausing spool writes.
171    /// Default: 0.8 (80%).
172    #[serde(default = "default_max_usage_percent")]
173    pub max_usage_percent: f64,
174
175    /// How often to check disk usage, in seconds.
176    /// Default: 5 seconds.
177    #[serde(default = "default_poll_interval_secs")]
178    pub poll_interval_secs: u64,
179}
180
181fn default_max_usage_percent() -> f64 {
182    0.8
183}
184
185fn default_poll_interval_secs() -> u64 {
186    5
187}
188
189impl Default for DiskAwareConfig {
190    fn default() -> Self {
191        Self {
192            max_usage_percent: default_max_usage_percent(),
193            poll_interval_secs: default_poll_interval_secs(),
194        }
195    }
196}
197
198/// Strategy for draining spooled messages back to the primary sink.
199#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
200#[serde(tag = "type", rename_all = "snake_case")]
201pub enum DrainStrategy {
202    /// Adaptive rate: starts slow, speeds up based on success rate.
203    /// This is the default and recommended strategy.
204    Adaptive {
205        /// Initial drain rate (messages per second).
206        #[serde(default = "default_initial_rate")]
207        initial_rate: usize,
208        /// Maximum drain rate (messages per second).
209        #[serde(default = "default_max_rate")]
210        max_rate: usize,
211    },
212
213    /// Fixed rate limit (messages per second).
214    RateLimited {
215        /// Messages per second.
216        msgs_per_sec: usize,
217    },
218
219    /// Drain as fast as possible.
220    /// Use with caution - may overwhelm a recovering sink.
221    Greedy,
222}
223
224fn default_initial_rate() -> usize {
225    100
226}
227
228fn default_max_rate() -> usize {
229    10_000
230}
231
232impl Default for DrainStrategy {
233    fn default() -> Self {
234        Self::Adaptive {
235            initial_rate: default_initial_rate(),
236            max_rate: default_max_rate(),
237        }
238    }
239}
240
241impl DrainStrategy {
242    /// Create adaptive strategy with custom rates.
243    #[must_use]
244    pub fn adaptive(initial_rate: usize, max_rate: usize) -> Self {
245        Self::Adaptive {
246            initial_rate,
247            max_rate,
248        }
249    }
250
251    /// Create rate-limited strategy.
252    #[must_use]
253    pub fn rate_limited(msgs_per_sec: usize) -> Self {
254        Self::RateLimited { msgs_per_sec }
255    }
256}
257
258/// Ordering mode for message delivery during drain.
259#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
260#[serde(rename_all = "snake_case")]
261pub enum OrderingMode {
262    /// New messages go hot path, spool drains in background (default).
263    /// Maximizes throughput with slight ordering relaxation.
264    /// New messages may arrive before older spooled messages.
265    #[default]
266    Interleaved,
267
268    /// Drain spool completely before new messages use hot path.
269    /// Guarantees strict FIFO ordering but blocks new traffic during drain.
270    StrictFifo,
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn test_default_config() {
279        let config = TieredSinkConfig::new("/tmp/test.queue");
280        assert_eq!(config.send_timeout_ms, 1000);
281        assert_eq!(config.compression, CompressionCodec::default());
282        assert!(matches!(
283            config.compression,
284            CompressionCodec::Zstd { level: 1 }
285        ));
286        assert!(matches!(
287            config.drain_strategy,
288            DrainStrategy::Adaptive { .. }
289        ));
290        assert_eq!(config.ordering, OrderingMode::Interleaved);
291        assert_eq!(config.circuit_failure_threshold, 5);
292        assert!(config.disk_aware.is_none());
293    }
294
295    #[test]
296    fn test_builder_pattern() {
297        let config = TieredSinkConfig::new("/tmp/test.queue")
298            .send_timeout(Duration::from_secs(5))
299            .compression(CompressionCodec::Snappy)
300            .drain_strategy(DrainStrategy::Greedy)
301            .ordering(OrderingMode::StrictFifo)
302            .max_spool_bytes(1024 * 1024 * 100);
303
304        assert_eq!(config.send_timeout_ms, 5000);
305        assert_eq!(config.compression, CompressionCodec::Snappy);
306        assert!(matches!(config.drain_strategy, DrainStrategy::Greedy));
307        assert_eq!(config.ordering, OrderingMode::StrictFifo);
308        assert_eq!(config.max_spool_bytes, Some(100 * 1024 * 1024));
309    }
310
311    #[test]
312    fn test_drain_strategy_constructors() {
313        let adaptive = DrainStrategy::adaptive(50, 5000);
314        assert!(matches!(
315            adaptive,
316            DrainStrategy::Adaptive {
317                initial_rate: 50,
318                max_rate: 5000
319            }
320        ));
321
322        let rate_limited = DrainStrategy::rate_limited(1000);
323        assert!(matches!(
324            rate_limited,
325            DrainStrategy::RateLimited { msgs_per_sec: 1000 }
326        ));
327    }
328
329    #[test]
330    fn test_duration_conversions() {
331        let config = TieredSinkConfig::new("/tmp/test.queue");
332        assert_eq!(config.send_timeout_duration(), Duration::from_secs(1));
333        assert_eq!(config.circuit_reset_timeout(), Duration::from_secs(30));
334        assert_eq!(config.drain_interval(), Duration::from_millis(100));
335    }
336}