Skip to main content

s2_common/types/
config.rs

1//! Stream and basin configuration types.
2//!
3//! Stream configuration uses three representations:
4//!
5//! - Resolved (`StreamConfig`, `TimestampingConfig`, `DeleteOnEmptyConfig`): concrete values,
6//!   produced by merging optional configs with defaults using `merge()`.
7//!
8//! - Optional (`OptionalStreamConfig`, `OptionalTimestampingConfig`,
9//!   `OptionalDeleteOnEmptyConfig`): partial configuration layers, where `None` means "not set at
10//!   this layer; fall back to defaults."
11//!
12//! - Reconfiguration (`StreamReconfiguration`, `TimestampingReconfiguration`,
13//!   `DeleteOnEmptyReconfiguration`): PATCH-style updates applied with `reconfigure()`.
14//!
15//! Reconfiguration of nested fields (e.g. `timestamping`, `delete_on_empty`,
16//! `default_stream_config`) is applied recursively: `Specified(Some(inner_reconfig))`
17//! applies the inner reconfiguration to the existing value, while `Specified(None)`
18//! clears it to the default.
19//!
20//! `merge()` resolves optional configs into resolved configs with precedence:
21//! stream-level → basin-level → system default (via `Option::or` chaining).
22//!
23//! Basin config also carries basin-level knobs like `stream_cipher`,
24//! `create_stream_on_append`, and `create_stream_on_read`.
25
26use std::time::Duration;
27
28use crate::{encryption::EncryptionAlgorithm, maybe::Maybe};
29
30#[derive(
31    Debug,
32    Default,
33    Clone,
34    Copy,
35    strum::Display,
36    strum::IntoStaticStr,
37    strum::EnumIter,
38    strum::FromRepr,
39    strum::EnumString,
40    PartialEq,
41    Eq,
42    Hash,
43)]
44#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
45#[repr(u8)]
46pub enum StorageClass {
47    #[strum(serialize = "standard")]
48    Standard = 1,
49    #[default]
50    #[strum(serialize = "express")]
51    Express = 2,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum RetentionPolicy {
56    Age(Duration),
57    Infinite(),
58}
59
60impl RetentionPolicy {
61    pub fn age(&self) -> Option<Duration> {
62        match self {
63            Self::Age(duration) => Some(*duration),
64            Self::Infinite() => None,
65        }
66    }
67}
68
69impl Default for RetentionPolicy {
70    fn default() -> Self {
71        const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
72
73        Self::Age(ONE_WEEK)
74    }
75}
76
77#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
78pub enum TimestampingMode {
79    #[default]
80    ClientPrefer,
81    ClientRequire,
82    Arrival,
83}
84
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
86pub struct TimestampingConfig {
87    pub mode: TimestampingMode,
88    pub uncapped: bool,
89}
90
91#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
92pub struct DeleteOnEmptyConfig {
93    pub min_age: Duration,
94}
95
96impl DeleteOnEmptyConfig {
97    pub fn min_age(&self) -> Option<Duration> {
98        Some(self.min_age).filter(|age| !age.is_zero())
99    }
100}
101
102#[derive(Debug, Clone, Default, PartialEq, Eq)]
103pub struct StreamConfig {
104    pub storage_class: StorageClass,
105    pub retention_policy: RetentionPolicy,
106    pub timestamping: TimestampingConfig,
107    pub delete_on_empty: DeleteOnEmptyConfig,
108}
109
110#[derive(Debug, Clone, Default)]
111pub struct TimestampingReconfiguration {
112    pub mode: Maybe<Option<TimestampingMode>>,
113    pub uncapped: Maybe<Option<bool>>,
114}
115
116#[derive(Debug, Clone, Default)]
117pub struct DeleteOnEmptyReconfiguration {
118    pub min_age: Maybe<Option<Duration>>,
119}
120
121#[derive(Debug, Clone, Default)]
122pub struct StreamReconfiguration {
123    pub storage_class: Maybe<Option<StorageClass>>,
124    pub retention_policy: Maybe<Option<RetentionPolicy>>,
125    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
126    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
127}
128
129#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
130pub struct OptionalTimestampingConfig {
131    pub mode: Option<TimestampingMode>,
132    pub uncapped: Option<bool>,
133}
134
135impl OptionalTimestampingConfig {
136    pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
137        if let Maybe::Specified(mode) = reconfiguration.mode {
138            self.mode = mode;
139        }
140        if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
141            self.uncapped = uncapped;
142        }
143        self
144    }
145
146    pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
147        let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
148        let uncapped = self
149            .uncapped
150            .or(basin_defaults.uncapped)
151            .unwrap_or_default();
152        TimestampingConfig { mode, uncapped }
153    }
154}
155
156impl From<OptionalTimestampingConfig> for TimestampingConfig {
157    fn from(value: OptionalTimestampingConfig) -> Self {
158        Self {
159            mode: value.mode.unwrap_or_default(),
160            uncapped: value.uncapped.unwrap_or_default(),
161        }
162    }
163}
164
165impl From<TimestampingConfig> for OptionalTimestampingConfig {
166    fn from(value: TimestampingConfig) -> Self {
167        Self {
168            mode: Some(value.mode),
169            uncapped: Some(value.uncapped),
170        }
171    }
172}
173
174impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
175    fn from(value: OptionalTimestampingConfig) -> Self {
176        Self {
177            mode: value.mode.into(),
178            uncapped: value.uncapped.into(),
179        }
180    }
181}
182
183#[derive(Debug, Clone, Default, PartialEq, Eq)]
184pub struct OptionalDeleteOnEmptyConfig {
185    pub min_age: Option<Duration>,
186}
187
188impl OptionalDeleteOnEmptyConfig {
189    pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
190        if let Maybe::Specified(min_age) = reconfiguration.min_age {
191            self.min_age = min_age;
192        }
193        self
194    }
195
196    pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
197        let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
198        DeleteOnEmptyConfig { min_age }
199    }
200}
201
202impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
203    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
204        Self {
205            min_age: value.min_age.unwrap_or_default(),
206        }
207    }
208}
209
210impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
211    fn from(value: DeleteOnEmptyConfig) -> Self {
212        Self {
213            min_age: Some(value.min_age),
214        }
215    }
216}
217
218impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
219    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
220        Self {
221            min_age: value.min_age.into(),
222        }
223    }
224}
225
226#[derive(Debug, Clone, Default, PartialEq, Eq)]
227pub struct OptionalStreamConfig {
228    pub storage_class: Option<StorageClass>,
229    pub retention_policy: Option<RetentionPolicy>,
230    pub timestamping: OptionalTimestampingConfig,
231    pub delete_on_empty: OptionalDeleteOnEmptyConfig,
232}
233
234impl OptionalStreamConfig {
235    pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
236        let StreamReconfiguration {
237            storage_class,
238            retention_policy,
239            timestamping,
240            delete_on_empty,
241        } = reconfiguration;
242        if let Maybe::Specified(storage_class) = storage_class {
243            self.storage_class = storage_class;
244        }
245        if let Maybe::Specified(retention_policy) = retention_policy {
246            self.retention_policy = retention_policy;
247        }
248        if let Maybe::Specified(timestamping) = timestamping {
249            self.timestamping = timestamping
250                .map(|ts| self.timestamping.reconfigure(ts))
251                .unwrap_or_default();
252        }
253        if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
254            self.delete_on_empty = delete_on_empty_reconfig
255                .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
256                .unwrap_or_default();
257        }
258        self
259    }
260
261    pub fn merge(self, basin_defaults: Self) -> StreamConfig {
262        let storage_class = self
263            .storage_class
264            .or(basin_defaults.storage_class)
265            .unwrap_or_default();
266
267        let retention_policy = self
268            .retention_policy
269            .or(basin_defaults.retention_policy)
270            .unwrap_or_default();
271
272        let timestamping = self.timestamping.merge(basin_defaults.timestamping);
273
274        let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
275
276        StreamConfig {
277            storage_class,
278            retention_policy,
279            timestamping,
280            delete_on_empty,
281        }
282    }
283}
284
285impl From<OptionalStreamConfig> for StreamReconfiguration {
286    fn from(value: OptionalStreamConfig) -> Self {
287        let OptionalStreamConfig {
288            storage_class,
289            retention_policy,
290            timestamping,
291            delete_on_empty,
292        } = value;
293
294        Self {
295            storage_class: storage_class.into(),
296            retention_policy: retention_policy.into(),
297            timestamping: Some(timestamping.into()).into(),
298            delete_on_empty: Some(delete_on_empty.into()).into(),
299        }
300    }
301}
302
303impl From<OptionalStreamConfig> for StreamConfig {
304    fn from(value: OptionalStreamConfig) -> Self {
305        let OptionalStreamConfig {
306            storage_class,
307            retention_policy,
308            timestamping,
309            delete_on_empty,
310        } = value;
311
312        Self {
313            storage_class: storage_class.unwrap_or_default(),
314            retention_policy: retention_policy.unwrap_or_default(),
315            timestamping: timestamping.into(),
316            delete_on_empty: delete_on_empty.into(),
317        }
318    }
319}
320
321impl From<StreamConfig> for OptionalStreamConfig {
322    fn from(value: StreamConfig) -> Self {
323        let StreamConfig {
324            storage_class,
325            retention_policy,
326            timestamping,
327            delete_on_empty,
328        } = value;
329
330        Self {
331            storage_class: Some(storage_class),
332            retention_policy: Some(retention_policy),
333            timestamping: timestamping.into(),
334            delete_on_empty: delete_on_empty.into(),
335        }
336    }
337}
338
339#[derive(Debug, Clone, Default, PartialEq, Eq)]
340pub struct BasinConfig {
341    pub default_stream_config: OptionalStreamConfig,
342    pub stream_cipher: Option<EncryptionAlgorithm>,
343    pub create_stream_on_append: bool,
344    pub create_stream_on_read: bool,
345}
346
347impl BasinConfig {
348    pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
349        let BasinReconfiguration {
350            default_stream_config,
351            stream_cipher,
352            create_stream_on_append,
353            create_stream_on_read,
354        } = reconfiguration;
355
356        if let Maybe::Specified(default_stream_config) = default_stream_config {
357            self.default_stream_config = default_stream_config
358                .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
359                .unwrap_or_default();
360        }
361
362        if let Maybe::Specified(stream_cipher) = stream_cipher {
363            self.stream_cipher = stream_cipher;
364        }
365
366        if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
367            self.create_stream_on_append = create_stream_on_append;
368        }
369
370        if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
371            self.create_stream_on_read = create_stream_on_read;
372        }
373
374        self
375    }
376}
377
378impl From<BasinConfig> for BasinReconfiguration {
379    fn from(value: BasinConfig) -> Self {
380        let BasinConfig {
381            default_stream_config,
382            stream_cipher,
383            create_stream_on_append,
384            create_stream_on_read,
385        } = value;
386
387        Self {
388            default_stream_config: Some(default_stream_config.into()).into(),
389            stream_cipher: stream_cipher.into(),
390            create_stream_on_append: create_stream_on_append.into(),
391            create_stream_on_read: create_stream_on_read.into(),
392        }
393    }
394}
395
396#[derive(Debug, Clone, Default)]
397pub struct BasinReconfiguration {
398    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
399    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
400    pub create_stream_on_append: Maybe<bool>,
401    pub create_stream_on_read: Maybe<bool>,
402}