hyperi_rustlib/tiered_sink/
config.rs1use crate::tiered_sink::CompressionCodec;
12use serde::{Deserialize, Serialize};
13use std::path::PathBuf;
14use std::time::Duration;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct TieredSinkConfig {
19 pub spool_path: PathBuf,
21
22 #[serde(default = "default_send_timeout_ms")]
25 pub send_timeout_ms: u64,
26
27 #[serde(default)]
29 pub compression: CompressionCodec,
30
31 #[serde(default)]
33 pub drain_strategy: DrainStrategy,
34
35 #[serde(default)]
37 pub ordering: OrderingMode,
38
39 #[serde(default)]
42 pub max_spool_bytes: Option<u64>,
43
44 #[serde(default)]
46 pub max_spool_items: Option<usize>,
47
48 #[serde(default = "default_circuit_failure_threshold")]
50 pub circuit_failure_threshold: u32,
51
52 #[serde(default = "default_circuit_reset_timeout_ms")]
54 pub circuit_reset_timeout_ms: u64,
55
56 #[serde(default = "default_drain_interval_ms")]
58 pub drain_interval_ms: u64,
59
60 #[serde(default)]
64 pub disk_aware: Option<DiskAwareConfig>,
65}
66
67fn default_send_timeout_ms() -> u64 {
68 1000 }
70
71fn default_circuit_failure_threshold() -> u32 {
72 5
73}
74
75fn default_circuit_reset_timeout_ms() -> u64 {
76 30_000 }
78
79fn default_drain_interval_ms() -> u64 {
80 100 }
82
83impl TieredSinkConfig {
84 #[must_use]
86 pub fn new(spool_path: impl Into<PathBuf>) -> Self {
87 Self {
88 spool_path: spool_path.into(),
89 send_timeout_ms: default_send_timeout_ms(),
90 compression: CompressionCodec::default(),
91 drain_strategy: DrainStrategy::default(),
92 ordering: OrderingMode::default(),
93 max_spool_bytes: None,
94 max_spool_items: None,
95 circuit_failure_threshold: default_circuit_failure_threshold(),
96 circuit_reset_timeout_ms: default_circuit_reset_timeout_ms(),
97 drain_interval_ms: default_drain_interval_ms(),
98 disk_aware: None,
99 }
100 }
101
102 #[must_use]
104 pub fn send_timeout(mut self, timeout: Duration) -> Self {
105 self.send_timeout_ms = u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX);
106 self
107 }
108
109 #[must_use]
111 pub fn compression(mut self, codec: CompressionCodec) -> Self {
112 self.compression = codec;
113 self
114 }
115
116 #[must_use]
118 pub fn drain_strategy(mut self, strategy: DrainStrategy) -> Self {
119 self.drain_strategy = strategy;
120 self
121 }
122
123 #[must_use]
125 pub fn ordering(mut self, mode: OrderingMode) -> Self {
126 self.ordering = mode;
127 self
128 }
129
130 #[must_use]
132 pub fn max_spool_bytes(mut self, max: u64) -> Self {
133 self.max_spool_bytes = Some(max);
134 self
135 }
136
137 #[must_use]
139 pub fn disk_aware(mut self, config: DiskAwareConfig) -> Self {
140 self.disk_aware = Some(config);
141 self
142 }
143
144 #[must_use]
146 pub fn send_timeout_duration(&self) -> Duration {
147 Duration::from_millis(self.send_timeout_ms)
148 }
149
150 #[must_use]
152 pub fn circuit_reset_timeout(&self) -> Duration {
153 Duration::from_millis(self.circuit_reset_timeout_ms)
154 }
155
156 #[must_use]
158 pub fn drain_interval(&self) -> Duration {
159 Duration::from_millis(self.drain_interval_ms)
160 }
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct DiskAwareConfig {
170 #[serde(default = "default_max_usage_percent")]
173 pub max_usage_percent: f64,
174
175 #[serde(default = "default_poll_interval_secs")]
178 pub poll_interval_secs: u64,
179}
180
181fn default_max_usage_percent() -> f64 {
182 0.8
183}
184
185fn default_poll_interval_secs() -> u64 {
186 5
187}
188
189impl Default for DiskAwareConfig {
190 fn default() -> Self {
191 Self {
192 max_usage_percent: default_max_usage_percent(),
193 poll_interval_secs: default_poll_interval_secs(),
194 }
195 }
196}
197
198#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
200#[serde(tag = "type", rename_all = "snake_case")]
201pub enum DrainStrategy {
202 Adaptive {
205 #[serde(default = "default_initial_rate")]
207 initial_rate: usize,
208 #[serde(default = "default_max_rate")]
210 max_rate: usize,
211 },
212
213 RateLimited {
215 msgs_per_sec: usize,
217 },
218
219 Greedy,
222}
223
224fn default_initial_rate() -> usize {
225 100
226}
227
228fn default_max_rate() -> usize {
229 10_000
230}
231
232impl Default for DrainStrategy {
233 fn default() -> Self {
234 Self::Adaptive {
235 initial_rate: default_initial_rate(),
236 max_rate: default_max_rate(),
237 }
238 }
239}
240
241impl DrainStrategy {
242 #[must_use]
244 pub fn adaptive(initial_rate: usize, max_rate: usize) -> Self {
245 Self::Adaptive {
246 initial_rate,
247 max_rate,
248 }
249 }
250
251 #[must_use]
253 pub fn rate_limited(msgs_per_sec: usize) -> Self {
254 Self::RateLimited { msgs_per_sec }
255 }
256}
257
258#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
260#[serde(rename_all = "snake_case")]
261pub enum OrderingMode {
262 #[default]
266 Interleaved,
267
268 StrictFifo,
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[test]
278 fn test_default_config() {
279 let config = TieredSinkConfig::new("/tmp/test.queue");
280 assert_eq!(config.send_timeout_ms, 1000);
281 assert_eq!(config.compression, CompressionCodec::default());
282 assert!(matches!(
283 config.compression,
284 CompressionCodec::Zstd { level: 1 }
285 ));
286 assert!(matches!(
287 config.drain_strategy,
288 DrainStrategy::Adaptive { .. }
289 ));
290 assert_eq!(config.ordering, OrderingMode::Interleaved);
291 assert_eq!(config.circuit_failure_threshold, 5);
292 assert!(config.disk_aware.is_none());
293 }
294
295 #[test]
296 fn test_builder_pattern() {
297 let config = TieredSinkConfig::new("/tmp/test.queue")
298 .send_timeout(Duration::from_secs(5))
299 .compression(CompressionCodec::Snappy)
300 .drain_strategy(DrainStrategy::Greedy)
301 .ordering(OrderingMode::StrictFifo)
302 .max_spool_bytes(1024 * 1024 * 100);
303
304 assert_eq!(config.send_timeout_ms, 5000);
305 assert_eq!(config.compression, CompressionCodec::Snappy);
306 assert!(matches!(config.drain_strategy, DrainStrategy::Greedy));
307 assert_eq!(config.ordering, OrderingMode::StrictFifo);
308 assert_eq!(config.max_spool_bytes, Some(100 * 1024 * 1024));
309 }
310
311 #[test]
312 fn test_drain_strategy_constructors() {
313 let adaptive = DrainStrategy::adaptive(50, 5000);
314 assert!(matches!(
315 adaptive,
316 DrainStrategy::Adaptive {
317 initial_rate: 50,
318 max_rate: 5000
319 }
320 ));
321
322 let rate_limited = DrainStrategy::rate_limited(1000);
323 assert!(matches!(
324 rate_limited,
325 DrainStrategy::RateLimited { msgs_per_sec: 1000 }
326 ));
327 }
328
329 #[test]
330 fn test_duration_conversions() {
331 let config = TieredSinkConfig::new("/tmp/test.queue");
332 assert_eq!(config.send_timeout_duration(), Duration::from_secs(1));
333 assert_eq!(config.circuit_reset_timeout(), Duration::from_secs(30));
334 assert_eq!(config.drain_interval(), Duration::from_millis(100));
335 }
336}