Skip to main content

s2_common/types/
config.rs

1//! Stream and basin configuration types.
2//!
3//! Stream configuration uses three representations:
4//!
5//! - Resolved (`StreamConfig`, `TimestampingConfig`, `DeleteOnEmptyConfig`): concrete values,
6//!   produced by merging optional configs with defaults using `merge()`.
7//!
8//! - Optional (`OptionalStreamConfig`, `OptionalTimestampingConfig`,
9//!   `OptionalDeleteOnEmptyConfig`): stored metadata, where `None` means "not set at this layer;
10//!   fall back to defaults."
11//!
12//! - Reconfiguration (`StreamReconfiguration`, `TimestampingReconfiguration`,
13//!   `DeleteOnEmptyReconfiguration`): PATCH-style updates applied with `reconfigure()`.
14//!
15//! Reconfiguration of nested fields (e.g. `timestamping`, `delete_on_empty`,
16//! `default_stream_config`) is applied recursively: `Specified(Some(inner_reconfig))`
17//! applies the inner reconfiguration to the existing value, while `Specified(None)`
18//! clears it to the default.
19//!
20//! `merge()` resolves optional configs into resolved configs with precedence:
21//! stream-level → basin-level → system default (via `Option::or` chaining).
22//!
23//! Basin config also carries basin-level knobs like `stream_cipher`,
24//! `create_stream_on_append`, and `create_stream_on_read`.
25
26use std::time::Duration;
27
28use enum_ordinalize::Ordinalize;
29
30use crate::{encryption::EncryptionAlgorithm, maybe::Maybe};
31
32#[derive(
33    Debug,
34    Default,
35    Clone,
36    Copy,
37    strum::Display,
38    strum::IntoStaticStr,
39    strum::EnumIter,
40    strum::FromRepr,
41    strum::EnumString,
42    PartialEq,
43    Eq,
44    Ordinalize,
45    Hash,
46)]
47#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
48#[repr(u8)]
49pub enum StorageClass {
50    #[strum(serialize = "standard")]
51    Standard = 1,
52    #[default]
53    #[strum(serialize = "express")]
54    Express = 2,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum RetentionPolicy {
59    Age(Duration),
60    Infinite(),
61}
62
63impl RetentionPolicy {
64    pub fn age(&self) -> Option<Duration> {
65        match self {
66            Self::Age(duration) => Some(*duration),
67            Self::Infinite() => None,
68        }
69    }
70}
71
72impl Default for RetentionPolicy {
73    fn default() -> Self {
74        const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
75
76        Self::Age(ONE_WEEK)
77    }
78}
79
80#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
81pub enum TimestampingMode {
82    #[default]
83    ClientPrefer,
84    ClientRequire,
85    Arrival,
86}
87
88#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
89pub struct TimestampingConfig {
90    pub mode: TimestampingMode,
91    pub uncapped: bool,
92}
93
94#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
95pub struct DeleteOnEmptyConfig {
96    pub min_age: Duration,
97}
98
99#[derive(Debug, Clone, Default, PartialEq, Eq)]
100pub struct StreamConfig {
101    pub storage_class: StorageClass,
102    pub retention_policy: RetentionPolicy,
103    pub timestamping: TimestampingConfig,
104    pub delete_on_empty: DeleteOnEmptyConfig,
105}
106
107#[derive(Debug, Clone, Default)]
108pub struct TimestampingReconfiguration {
109    pub mode: Maybe<Option<TimestampingMode>>,
110    pub uncapped: Maybe<Option<bool>>,
111}
112
113#[derive(Debug, Clone, Default)]
114pub struct DeleteOnEmptyReconfiguration {
115    pub min_age: Maybe<Option<Duration>>,
116}
117
118#[derive(Debug, Clone, Default)]
119pub struct StreamReconfiguration {
120    pub storage_class: Maybe<Option<StorageClass>>,
121    pub retention_policy: Maybe<Option<RetentionPolicy>>,
122    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
123    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
124}
125
126#[derive(Debug, Clone, Copy, Default)]
127pub struct OptionalTimestampingConfig {
128    pub mode: Option<TimestampingMode>,
129    pub uncapped: Option<bool>,
130}
131
132impl OptionalTimestampingConfig {
133    pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
134        if let Maybe::Specified(mode) = reconfiguration.mode {
135            self.mode = mode;
136        }
137        if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
138            self.uncapped = uncapped;
139        }
140        self
141    }
142
143    pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
144        let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
145        let uncapped = self
146            .uncapped
147            .or(basin_defaults.uncapped)
148            .unwrap_or_default();
149        TimestampingConfig { mode, uncapped }
150    }
151}
152
153impl From<OptionalTimestampingConfig> for TimestampingConfig {
154    fn from(value: OptionalTimestampingConfig) -> Self {
155        Self {
156            mode: value.mode.unwrap_or_default(),
157            uncapped: value.uncapped.unwrap_or_default(),
158        }
159    }
160}
161
162impl From<TimestampingConfig> for OptionalTimestampingConfig {
163    fn from(value: TimestampingConfig) -> Self {
164        Self {
165            mode: Some(value.mode),
166            uncapped: Some(value.uncapped),
167        }
168    }
169}
170
171impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
172    fn from(value: OptionalTimestampingConfig) -> Self {
173        Self {
174            mode: value.mode.into(),
175            uncapped: value.uncapped.into(),
176        }
177    }
178}
179
180#[derive(Debug, Clone, Default)]
181pub struct OptionalDeleteOnEmptyConfig {
182    pub min_age: Option<Duration>,
183}
184
185impl OptionalDeleteOnEmptyConfig {
186    pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
187        if let Maybe::Specified(min_age) = reconfiguration.min_age {
188            self.min_age = min_age;
189        }
190        self
191    }
192
193    pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
194        let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
195        DeleteOnEmptyConfig { min_age }
196    }
197}
198
199impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
200    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
201        Self {
202            min_age: value.min_age.unwrap_or_default(),
203        }
204    }
205}
206
207impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
208    fn from(value: DeleteOnEmptyConfig) -> Self {
209        Self {
210            min_age: Some(value.min_age),
211        }
212    }
213}
214
215impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
216    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
217        Self {
218            min_age: value.min_age.into(),
219        }
220    }
221}
222
223#[derive(Debug, Clone, Default)]
224pub struct OptionalStreamConfig {
225    pub storage_class: Option<StorageClass>,
226    pub retention_policy: Option<RetentionPolicy>,
227    pub timestamping: OptionalTimestampingConfig,
228    pub delete_on_empty: OptionalDeleteOnEmptyConfig,
229}
230
231impl OptionalStreamConfig {
232    pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
233        let StreamReconfiguration {
234            storage_class,
235            retention_policy,
236            timestamping,
237            delete_on_empty,
238        } = reconfiguration;
239        if let Maybe::Specified(storage_class) = storage_class {
240            self.storage_class = storage_class;
241        }
242        if let Maybe::Specified(retention_policy) = retention_policy {
243            self.retention_policy = retention_policy;
244        }
245        if let Maybe::Specified(timestamping) = timestamping {
246            self.timestamping = timestamping
247                .map(|ts| self.timestamping.reconfigure(ts))
248                .unwrap_or_default();
249        }
250        if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
251            self.delete_on_empty = delete_on_empty_reconfig
252                .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
253                .unwrap_or_default();
254        }
255        self
256    }
257
258    pub fn merge(self, basin_defaults: Self) -> StreamConfig {
259        let storage_class = self
260            .storage_class
261            .or(basin_defaults.storage_class)
262            .unwrap_or_default();
263
264        let retention_policy = self
265            .retention_policy
266            .or(basin_defaults.retention_policy)
267            .unwrap_or_default();
268
269        let timestamping = self.timestamping.merge(basin_defaults.timestamping);
270
271        let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
272
273        StreamConfig {
274            storage_class,
275            retention_policy,
276            timestamping,
277            delete_on_empty,
278        }
279    }
280}
281
282impl From<OptionalStreamConfig> for StreamReconfiguration {
283    fn from(value: OptionalStreamConfig) -> Self {
284        let OptionalStreamConfig {
285            storage_class,
286            retention_policy,
287            timestamping,
288            delete_on_empty,
289        } = value;
290
291        Self {
292            storage_class: storage_class.into(),
293            retention_policy: retention_policy.into(),
294            timestamping: Some(timestamping.into()).into(),
295            delete_on_empty: Some(delete_on_empty.into()).into(),
296        }
297    }
298}
299
300impl From<OptionalStreamConfig> for StreamConfig {
301    fn from(value: OptionalStreamConfig) -> Self {
302        let OptionalStreamConfig {
303            storage_class,
304            retention_policy,
305            timestamping,
306            delete_on_empty,
307        } = value;
308
309        Self {
310            storage_class: storage_class.unwrap_or_default(),
311            retention_policy: retention_policy.unwrap_or_default(),
312            timestamping: timestamping.into(),
313            delete_on_empty: delete_on_empty.into(),
314        }
315    }
316}
317
318impl From<StreamConfig> for OptionalStreamConfig {
319    fn from(value: StreamConfig) -> Self {
320        let StreamConfig {
321            storage_class,
322            retention_policy,
323            timestamping,
324            delete_on_empty,
325        } = value;
326
327        Self {
328            storage_class: Some(storage_class),
329            retention_policy: Some(retention_policy),
330            timestamping: timestamping.into(),
331            delete_on_empty: delete_on_empty.into(),
332        }
333    }
334}
335
336#[derive(Debug, Clone, Default)]
337pub struct BasinConfig {
338    pub default_stream_config: OptionalStreamConfig,
339    pub stream_cipher: Option<EncryptionAlgorithm>,
340    pub create_stream_on_append: bool,
341    pub create_stream_on_read: bool,
342}
343
344impl BasinConfig {
345    pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
346        let BasinReconfiguration {
347            default_stream_config,
348            stream_cipher,
349            create_stream_on_append,
350            create_stream_on_read,
351        } = reconfiguration;
352
353        if let Maybe::Specified(default_stream_config) = default_stream_config {
354            self.default_stream_config = default_stream_config
355                .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
356                .unwrap_or_default();
357        }
358
359        if let Maybe::Specified(stream_cipher) = stream_cipher {
360            self.stream_cipher = stream_cipher;
361        }
362
363        if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
364            self.create_stream_on_append = create_stream_on_append;
365        }
366
367        if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
368            self.create_stream_on_read = create_stream_on_read;
369        }
370
371        self
372    }
373}
374
375impl From<BasinConfig> for BasinReconfiguration {
376    fn from(value: BasinConfig) -> Self {
377        let BasinConfig {
378            default_stream_config,
379            stream_cipher,
380            create_stream_on_append,
381            create_stream_on_read,
382        } = value;
383
384        Self {
385            default_stream_config: Some(default_stream_config.into()).into(),
386            stream_cipher: stream_cipher.into(),
387            create_stream_on_append: create_stream_on_append.into(),
388            create_stream_on_read: create_stream_on_read.into(),
389        }
390    }
391}
392
393#[derive(Debug, Clone, Default)]
394pub struct BasinReconfiguration {
395    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
396    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
397    pub create_stream_on_append: Maybe<bool>,
398    pub create_stream_on_read: Maybe<bool>,
399}