Skip to main content

s2_common/types/
config.rs

1//! Stream and basin configuration types.
2//!
3//! Stream configuration uses three representations:
4//!
5//! - Resolved (`StreamConfig`, `TimestampingConfig`, `DeleteOnEmptyConfig`): concrete values,
6//!   produced by merging optional configs with defaults using `merge()`.
7//!
8//! - Optional (`OptionalStreamConfig`, `OptionalTimestampingConfig`,
9//!   `OptionalDeleteOnEmptyConfig`): stored metadata, where `None` means "not set at this layer;
10//!   fall back to defaults."
11//!
12//! - Reconfiguration (`StreamReconfiguration`, `TimestampingReconfiguration`,
13//!   `DeleteOnEmptyReconfiguration`): PATCH-style updates applied with `reconfigure()`.
14//!
15//! Reconfiguration of nested fields (e.g. `timestamping`, `delete_on_empty`,
16//! `default_stream_config`) is applied recursively: `Specified(Some(inner_reconfig))`
17//! applies the inner reconfiguration to the existing value, while `Specified(None)`
18//! clears it to the default.
19//!
20//! `merge()` resolves optional configs into resolved configs with precedence:
21//! stream-level → basin-level → system default (via `Option::or` chaining).
22//!
23//! Basin config also carries basin-level knobs like `stream_cipher`,
24//! `create_stream_on_append`, and `create_stream_on_read`.
25
26use std::time::Duration;
27
28use crate::{encryption::EncryptionAlgorithm, maybe::Maybe};
29
30#[derive(
31    Debug,
32    Default,
33    Clone,
34    Copy,
35    strum::Display,
36    strum::IntoStaticStr,
37    strum::EnumIter,
38    strum::FromRepr,
39    strum::EnumString,
40    PartialEq,
41    Eq,
42    Hash,
43)]
44#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
45#[repr(u8)]
46pub enum StorageClass {
47    #[strum(serialize = "standard")]
48    Standard = 1,
49    #[default]
50    #[strum(serialize = "express")]
51    Express = 2,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum RetentionPolicy {
56    Age(Duration),
57    Infinite(),
58}
59
60impl RetentionPolicy {
61    pub fn age(&self) -> Option<Duration> {
62        match self {
63            Self::Age(duration) => Some(*duration),
64            Self::Infinite() => None,
65        }
66    }
67}
68
69impl Default for RetentionPolicy {
70    fn default() -> Self {
71        const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
72
73        Self::Age(ONE_WEEK)
74    }
75}
76
77#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
78pub enum TimestampingMode {
79    #[default]
80    ClientPrefer,
81    ClientRequire,
82    Arrival,
83}
84
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
86pub struct TimestampingConfig {
87    pub mode: TimestampingMode,
88    pub uncapped: bool,
89}
90
91#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
92pub struct DeleteOnEmptyConfig {
93    pub min_age: Duration,
94}
95
96#[derive(Debug, Clone, Default, PartialEq, Eq)]
97pub struct StreamConfig {
98    pub storage_class: StorageClass,
99    pub retention_policy: RetentionPolicy,
100    pub timestamping: TimestampingConfig,
101    pub delete_on_empty: DeleteOnEmptyConfig,
102}
103
104#[derive(Debug, Clone, Default)]
105pub struct TimestampingReconfiguration {
106    pub mode: Maybe<Option<TimestampingMode>>,
107    pub uncapped: Maybe<Option<bool>>,
108}
109
110#[derive(Debug, Clone, Default)]
111pub struct DeleteOnEmptyReconfiguration {
112    pub min_age: Maybe<Option<Duration>>,
113}
114
115#[derive(Debug, Clone, Default)]
116pub struct StreamReconfiguration {
117    pub storage_class: Maybe<Option<StorageClass>>,
118    pub retention_policy: Maybe<Option<RetentionPolicy>>,
119    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
120    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
121}
122
123#[derive(Debug, Clone, Copy, Default)]
124pub struct OptionalTimestampingConfig {
125    pub mode: Option<TimestampingMode>,
126    pub uncapped: Option<bool>,
127}
128
129impl OptionalTimestampingConfig {
130    pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
131        if let Maybe::Specified(mode) = reconfiguration.mode {
132            self.mode = mode;
133        }
134        if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
135            self.uncapped = uncapped;
136        }
137        self
138    }
139
140    pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
141        let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
142        let uncapped = self
143            .uncapped
144            .or(basin_defaults.uncapped)
145            .unwrap_or_default();
146        TimestampingConfig { mode, uncapped }
147    }
148}
149
150impl From<OptionalTimestampingConfig> for TimestampingConfig {
151    fn from(value: OptionalTimestampingConfig) -> Self {
152        Self {
153            mode: value.mode.unwrap_or_default(),
154            uncapped: value.uncapped.unwrap_or_default(),
155        }
156    }
157}
158
159impl From<TimestampingConfig> for OptionalTimestampingConfig {
160    fn from(value: TimestampingConfig) -> Self {
161        Self {
162            mode: Some(value.mode),
163            uncapped: Some(value.uncapped),
164        }
165    }
166}
167
168impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
169    fn from(value: OptionalTimestampingConfig) -> Self {
170        Self {
171            mode: value.mode.into(),
172            uncapped: value.uncapped.into(),
173        }
174    }
175}
176
177#[derive(Debug, Clone, Default)]
178pub struct OptionalDeleteOnEmptyConfig {
179    pub min_age: Option<Duration>,
180}
181
182impl OptionalDeleteOnEmptyConfig {
183    pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
184        if let Maybe::Specified(min_age) = reconfiguration.min_age {
185            self.min_age = min_age;
186        }
187        self
188    }
189
190    pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
191        let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
192        DeleteOnEmptyConfig { min_age }
193    }
194}
195
196impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
197    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
198        Self {
199            min_age: value.min_age.unwrap_or_default(),
200        }
201    }
202}
203
204impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
205    fn from(value: DeleteOnEmptyConfig) -> Self {
206        Self {
207            min_age: Some(value.min_age),
208        }
209    }
210}
211
212impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
213    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
214        Self {
215            min_age: value.min_age.into(),
216        }
217    }
218}
219
220#[derive(Debug, Clone, Default)]
221pub struct OptionalStreamConfig {
222    pub storage_class: Option<StorageClass>,
223    pub retention_policy: Option<RetentionPolicy>,
224    pub timestamping: OptionalTimestampingConfig,
225    pub delete_on_empty: OptionalDeleteOnEmptyConfig,
226}
227
228impl OptionalStreamConfig {
229    pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
230        let StreamReconfiguration {
231            storage_class,
232            retention_policy,
233            timestamping,
234            delete_on_empty,
235        } = reconfiguration;
236        if let Maybe::Specified(storage_class) = storage_class {
237            self.storage_class = storage_class;
238        }
239        if let Maybe::Specified(retention_policy) = retention_policy {
240            self.retention_policy = retention_policy;
241        }
242        if let Maybe::Specified(timestamping) = timestamping {
243            self.timestamping = timestamping
244                .map(|ts| self.timestamping.reconfigure(ts))
245                .unwrap_or_default();
246        }
247        if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
248            self.delete_on_empty = delete_on_empty_reconfig
249                .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
250                .unwrap_or_default();
251        }
252        self
253    }
254
255    pub fn merge(self, basin_defaults: Self) -> StreamConfig {
256        let storage_class = self
257            .storage_class
258            .or(basin_defaults.storage_class)
259            .unwrap_or_default();
260
261        let retention_policy = self
262            .retention_policy
263            .or(basin_defaults.retention_policy)
264            .unwrap_or_default();
265
266        let timestamping = self.timestamping.merge(basin_defaults.timestamping);
267
268        let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
269
270        StreamConfig {
271            storage_class,
272            retention_policy,
273            timestamping,
274            delete_on_empty,
275        }
276    }
277}
278
279impl From<OptionalStreamConfig> for StreamReconfiguration {
280    fn from(value: OptionalStreamConfig) -> Self {
281        let OptionalStreamConfig {
282            storage_class,
283            retention_policy,
284            timestamping,
285            delete_on_empty,
286        } = value;
287
288        Self {
289            storage_class: storage_class.into(),
290            retention_policy: retention_policy.into(),
291            timestamping: Some(timestamping.into()).into(),
292            delete_on_empty: Some(delete_on_empty.into()).into(),
293        }
294    }
295}
296
297impl From<OptionalStreamConfig> for StreamConfig {
298    fn from(value: OptionalStreamConfig) -> Self {
299        let OptionalStreamConfig {
300            storage_class,
301            retention_policy,
302            timestamping,
303            delete_on_empty,
304        } = value;
305
306        Self {
307            storage_class: storage_class.unwrap_or_default(),
308            retention_policy: retention_policy.unwrap_or_default(),
309            timestamping: timestamping.into(),
310            delete_on_empty: delete_on_empty.into(),
311        }
312    }
313}
314
315impl From<StreamConfig> for OptionalStreamConfig {
316    fn from(value: StreamConfig) -> Self {
317        let StreamConfig {
318            storage_class,
319            retention_policy,
320            timestamping,
321            delete_on_empty,
322        } = value;
323
324        Self {
325            storage_class: Some(storage_class),
326            retention_policy: Some(retention_policy),
327            timestamping: timestamping.into(),
328            delete_on_empty: delete_on_empty.into(),
329        }
330    }
331}
332
333#[derive(Debug, Clone, Default)]
334pub struct BasinConfig {
335    pub default_stream_config: OptionalStreamConfig,
336    pub stream_cipher: Option<EncryptionAlgorithm>,
337    pub create_stream_on_append: bool,
338    pub create_stream_on_read: bool,
339}
340
341impl BasinConfig {
342    pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
343        let BasinReconfiguration {
344            default_stream_config,
345            stream_cipher,
346            create_stream_on_append,
347            create_stream_on_read,
348        } = reconfiguration;
349
350        if let Maybe::Specified(default_stream_config) = default_stream_config {
351            self.default_stream_config = default_stream_config
352                .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
353                .unwrap_or_default();
354        }
355
356        if let Maybe::Specified(stream_cipher) = stream_cipher {
357            self.stream_cipher = stream_cipher;
358        }
359
360        if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
361            self.create_stream_on_append = create_stream_on_append;
362        }
363
364        if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
365            self.create_stream_on_read = create_stream_on_read;
366        }
367
368        self
369    }
370}
371
372impl From<BasinConfig> for BasinReconfiguration {
373    fn from(value: BasinConfig) -> Self {
374        let BasinConfig {
375            default_stream_config,
376            stream_cipher,
377            create_stream_on_append,
378            create_stream_on_read,
379        } = value;
380
381        Self {
382            default_stream_config: Some(default_stream_config.into()).into(),
383            stream_cipher: stream_cipher.into(),
384            create_stream_on_append: create_stream_on_append.into(),
385            create_stream_on_read: create_stream_on_read.into(),
386        }
387    }
388}
389
390#[derive(Debug, Clone, Default)]
391pub struct BasinReconfiguration {
392    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
393    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
394    pub create_stream_on_append: Maybe<bool>,
395    pub create_stream_on_read: Maybe<bool>,
396}