Skip to main content

s2_common/types/
config.rs

1//! Stream and basin configuration types.
2//!
3//! Each config area (stream, timestamping, delete-on-empty) has three type tiers:
4//!
5//! - Resolved (`StreamConfig`, `TimestampingConfig`, `DeleteOnEmptyConfig`): All fields are
6//!   concrete values. Produced by merging optional configs with defaults using `merge()`.
7//!
8//! - Optional (`OptionalStreamConfig`, `OptionalTimestampingConfig`,
9//!   `OptionalDeleteOnEmptyConfig`): The internal representation, stored in metadata. Fields are
10//!   `Option<T>` where `None` means "not set at this layer, fall back to defaults."
11//!
12//! - Reconfiguration (`StreamReconfiguration`, `TimestampingReconfiguration`,
13//!   `DeleteOnEmptyReconfiguration`): Partial updates with PATCH semantics. Most fields are
14//!   `Maybe<Option<T>>` with three states: `Unspecified` (don't change), `Specified(None)` (clear
15//!   to default), `Specified(Some(v))` (set to value). Collection-valued fields may instead use an
16//!   empty collection to mean "clear to default". Applied using `reconfigure()`.
17//!
18//! Reconfiguration of nested fields (e.g. `timestamping`, `delete_on_empty`,
19//! `default_stream_config`) is applied recursively: `Specified(Some(inner_reconfig))`
20//! applies the inner reconfiguration to the existing value, while `Specified(None)`
21//! clears it to the default.
22//!
23//! `merge()` resolves optional configs into resolved configs with precedence:
24//! stream-level → basin-level → system default (via `Option::or` chaining).
25//!
26//! The `From<Optional*> for *Reconfiguration` conversions treat every field as
27//! `Specified`. These conversions represent "set the config to exactly this state",
28//! not "update only the fields that are set."
29
30use std::time::Duration;
31
32use enum_ordinalize::Ordinalize;
33use enumset::EnumSet;
34
35use crate::{encryption::EncryptionMode, maybe::Maybe};
36
37#[derive(
38    Debug,
39    Default,
40    Clone,
41    Copy,
42    strum::Display,
43    strum::IntoStaticStr,
44    strum::EnumIter,
45    strum::FromRepr,
46    strum::EnumString,
47    PartialEq,
48    Eq,
49    Ordinalize,
50    Hash,
51)]
52#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
53#[repr(u8)]
54pub enum StorageClass {
55    #[strum(serialize = "standard")]
56    Standard = 1,
57    #[default]
58    #[strum(serialize = "express")]
59    Express = 2,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum RetentionPolicy {
64    Age(Duration),
65    Infinite(),
66}
67
68impl RetentionPolicy {
69    pub fn age(&self) -> Option<Duration> {
70        match self {
71            Self::Age(duration) => Some(*duration),
72            Self::Infinite() => None,
73        }
74    }
75}
76
77impl Default for RetentionPolicy {
78    fn default() -> Self {
79        const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
80
81        Self::Age(ONE_WEEK)
82    }
83}
84
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
86pub enum TimestampingMode {
87    #[default]
88    ClientPrefer,
89    ClientRequire,
90    Arrival,
91}
92
93#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
94pub struct TimestampingConfig {
95    pub mode: TimestampingMode,
96    pub uncapped: bool,
97}
98
99#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
100pub struct DeleteOnEmptyConfig {
101    pub min_age: Duration,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub struct EncryptionConfig {
106    pub allowed_modes: EnumSet<EncryptionMode>,
107}
108
109pub const DEFAULT_ALLOWED_ENCRYPTION_MODES: EnumSet<EncryptionMode> =
110    enumset::enum_set!(EncryptionMode::Plain);
111
112impl Default for EncryptionConfig {
113    fn default() -> Self {
114        Self {
115            allowed_modes: DEFAULT_ALLOWED_ENCRYPTION_MODES,
116        }
117    }
118}
119
120#[derive(Debug, Clone, Default, PartialEq, Eq)]
121pub struct StreamConfig {
122    pub storage_class: StorageClass,
123    pub retention_policy: RetentionPolicy,
124    pub timestamping: TimestampingConfig,
125    pub delete_on_empty: DeleteOnEmptyConfig,
126    pub encryption: EncryptionConfig,
127}
128
129#[derive(Debug, Clone, Default)]
130pub struct TimestampingReconfiguration {
131    pub mode: Maybe<Option<TimestampingMode>>,
132    pub uncapped: Maybe<Option<bool>>,
133}
134
135#[derive(Debug, Clone, Default)]
136pub struct DeleteOnEmptyReconfiguration {
137    pub min_age: Maybe<Option<Duration>>,
138}
139
140#[derive(Debug, Clone, Default)]
141pub struct EncryptionReconfiguration {
142    pub allowed_modes: Maybe<EnumSet<EncryptionMode>>,
143}
144
145#[derive(Debug, Clone, Default)]
146pub struct StreamReconfiguration {
147    pub storage_class: Maybe<Option<StorageClass>>,
148    pub retention_policy: Maybe<Option<RetentionPolicy>>,
149    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
150    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
151    pub encryption: Maybe<Option<EncryptionReconfiguration>>,
152}
153
154#[derive(Debug, Clone, Copy, Default)]
155pub struct OptionalTimestampingConfig {
156    pub mode: Option<TimestampingMode>,
157    pub uncapped: Option<bool>,
158}
159
160impl OptionalTimestampingConfig {
161    pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
162        if let Maybe::Specified(mode) = reconfiguration.mode {
163            self.mode = mode;
164        }
165        if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
166            self.uncapped = uncapped;
167        }
168        self
169    }
170
171    pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
172        let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
173        let uncapped = self
174            .uncapped
175            .or(basin_defaults.uncapped)
176            .unwrap_or_default();
177        TimestampingConfig { mode, uncapped }
178    }
179}
180
181impl From<OptionalTimestampingConfig> for TimestampingConfig {
182    fn from(value: OptionalTimestampingConfig) -> Self {
183        Self {
184            mode: value.mode.unwrap_or_default(),
185            uncapped: value.uncapped.unwrap_or_default(),
186        }
187    }
188}
189
190impl From<TimestampingConfig> for OptionalTimestampingConfig {
191    fn from(value: TimestampingConfig) -> Self {
192        Self {
193            mode: Some(value.mode),
194            uncapped: Some(value.uncapped),
195        }
196    }
197}
198
199impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
200    fn from(value: OptionalTimestampingConfig) -> Self {
201        Self {
202            mode: value.mode.into(),
203            uncapped: value.uncapped.into(),
204        }
205    }
206}
207
208#[derive(Debug, Clone, Default)]
209pub struct OptionalDeleteOnEmptyConfig {
210    pub min_age: Option<Duration>,
211}
212
213impl OptionalDeleteOnEmptyConfig {
214    pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
215        if let Maybe::Specified(min_age) = reconfiguration.min_age {
216            self.min_age = min_age;
217        }
218        self
219    }
220
221    pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
222        let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
223        DeleteOnEmptyConfig { min_age }
224    }
225}
226
227impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
228    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
229        Self {
230            min_age: value.min_age.unwrap_or_default(),
231        }
232    }
233}
234
235impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
236    fn from(value: DeleteOnEmptyConfig) -> Self {
237        Self {
238            min_age: Some(value.min_age),
239        }
240    }
241}
242
243impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
244    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
245        Self {
246            min_age: value.min_age.into(),
247        }
248    }
249}
250
251#[derive(Debug, Clone, Default)]
252pub struct OptionalEncryptionConfig {
253    pub allowed_modes: Option<EnumSet<EncryptionMode>>,
254}
255
256impl OptionalEncryptionConfig {
257    pub fn reconfigure(mut self, reconfiguration: EncryptionReconfiguration) -> Self {
258        if let Maybe::Specified(allowed_modes) = reconfiguration.allowed_modes {
259            self.allowed_modes = (!allowed_modes.is_empty()).then_some(allowed_modes);
260        }
261        self
262    }
263
264    pub fn merge(self, basin_defaults: Self) -> EncryptionConfig {
265        let allowed_modes = self
266            .allowed_modes
267            .or(basin_defaults.allowed_modes)
268            .unwrap_or(DEFAULT_ALLOWED_ENCRYPTION_MODES);
269        EncryptionConfig { allowed_modes }
270    }
271}
272
273impl From<OptionalEncryptionConfig> for EncryptionConfig {
274    fn from(value: OptionalEncryptionConfig) -> Self {
275        Self {
276            allowed_modes: value
277                .allowed_modes
278                .unwrap_or(DEFAULT_ALLOWED_ENCRYPTION_MODES),
279        }
280    }
281}
282
283impl From<EncryptionConfig> for OptionalEncryptionConfig {
284    fn from(value: EncryptionConfig) -> Self {
285        Self {
286            allowed_modes: Some(value.allowed_modes),
287        }
288    }
289}
290
291impl From<OptionalEncryptionConfig> for EncryptionReconfiguration {
292    fn from(value: OptionalEncryptionConfig) -> Self {
293        Self {
294            allowed_modes: Maybe::Specified(value.allowed_modes.unwrap_or_default()),
295        }
296    }
297}
298
299#[derive(Debug, Clone, Default)]
300pub struct OptionalStreamConfig {
301    pub storage_class: Option<StorageClass>,
302    pub retention_policy: Option<RetentionPolicy>,
303    pub timestamping: OptionalTimestampingConfig,
304    pub delete_on_empty: OptionalDeleteOnEmptyConfig,
305    pub encryption: OptionalEncryptionConfig,
306}
307
308impl OptionalStreamConfig {
309    pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
310        let StreamReconfiguration {
311            storage_class,
312            retention_policy,
313            timestamping,
314            delete_on_empty,
315            encryption,
316        } = reconfiguration;
317        if let Maybe::Specified(storage_class) = storage_class {
318            self.storage_class = storage_class;
319        }
320        if let Maybe::Specified(retention_policy) = retention_policy {
321            self.retention_policy = retention_policy;
322        }
323        if let Maybe::Specified(timestamping) = timestamping {
324            self.timestamping = timestamping
325                .map(|ts| self.timestamping.reconfigure(ts))
326                .unwrap_or_default();
327        }
328        if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
329            self.delete_on_empty = delete_on_empty_reconfig
330                .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
331                .unwrap_or_default();
332        }
333        if let Maybe::Specified(encryption) = encryption {
334            self.encryption = encryption
335                .map(|enc| self.encryption.reconfigure(enc))
336                .unwrap_or_default();
337        }
338        self
339    }
340
341    pub fn merge(self, basin_defaults: Self) -> StreamConfig {
342        let storage_class = self
343            .storage_class
344            .or(basin_defaults.storage_class)
345            .unwrap_or_default();
346
347        let retention_policy = self
348            .retention_policy
349            .or(basin_defaults.retention_policy)
350            .unwrap_or_default();
351
352        let timestamping = self.timestamping.merge(basin_defaults.timestamping);
353
354        let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
355
356        let encryption = self.encryption.merge(basin_defaults.encryption);
357
358        StreamConfig {
359            storage_class,
360            retention_policy,
361            timestamping,
362            delete_on_empty,
363            encryption,
364        }
365    }
366}
367
368impl From<OptionalStreamConfig> for StreamReconfiguration {
369    fn from(value: OptionalStreamConfig) -> Self {
370        let OptionalStreamConfig {
371            storage_class,
372            retention_policy,
373            timestamping,
374            delete_on_empty,
375            encryption,
376        } = value;
377
378        Self {
379            storage_class: storage_class.into(),
380            retention_policy: retention_policy.into(),
381            timestamping: Some(timestamping.into()).into(),
382            delete_on_empty: Some(delete_on_empty.into()).into(),
383            encryption: Some(encryption.into()).into(),
384        }
385    }
386}
387
388impl From<OptionalStreamConfig> for StreamConfig {
389    fn from(value: OptionalStreamConfig) -> Self {
390        let OptionalStreamConfig {
391            storage_class,
392            retention_policy,
393            timestamping,
394            delete_on_empty,
395            encryption,
396        } = value;
397
398        Self {
399            storage_class: storage_class.unwrap_or_default(),
400            retention_policy: retention_policy.unwrap_or_default(),
401            timestamping: timestamping.into(),
402            delete_on_empty: delete_on_empty.into(),
403            encryption: encryption.into(),
404        }
405    }
406}
407
408impl From<StreamConfig> for OptionalStreamConfig {
409    fn from(value: StreamConfig) -> Self {
410        let StreamConfig {
411            storage_class,
412            retention_policy,
413            timestamping,
414            delete_on_empty,
415            encryption,
416        } = value;
417
418        Self {
419            storage_class: Some(storage_class),
420            retention_policy: Some(retention_policy),
421            timestamping: timestamping.into(),
422            delete_on_empty: delete_on_empty.into(),
423            encryption: encryption.into(),
424        }
425    }
426}
427
428#[derive(Debug, Clone, Default)]
429pub struct BasinConfig {
430    pub default_stream_config: OptionalStreamConfig,
431    pub create_stream_on_append: bool,
432    pub create_stream_on_read: bool,
433}
434
435impl BasinConfig {
436    pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
437        let BasinReconfiguration {
438            default_stream_config,
439            create_stream_on_append,
440            create_stream_on_read,
441        } = reconfiguration;
442
443        if let Maybe::Specified(default_stream_config) = default_stream_config {
444            self.default_stream_config = default_stream_config
445                .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
446                .unwrap_or_default();
447        }
448
449        if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
450            self.create_stream_on_append = create_stream_on_append;
451        }
452
453        if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
454            self.create_stream_on_read = create_stream_on_read;
455        }
456
457        self
458    }
459}
460
461impl From<BasinConfig> for BasinReconfiguration {
462    fn from(value: BasinConfig) -> Self {
463        let BasinConfig {
464            default_stream_config,
465            create_stream_on_append,
466            create_stream_on_read,
467        } = value;
468
469        Self {
470            default_stream_config: Some(default_stream_config.into()).into(),
471            create_stream_on_append: create_stream_on_append.into(),
472            create_stream_on_read: create_stream_on_read.into(),
473        }
474    }
475}
476
477#[derive(Debug, Clone, Default)]
478pub struct BasinReconfiguration {
479    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
480    pub create_stream_on_append: Maybe<bool>,
481    pub create_stream_on_read: Maybe<bool>,
482}