Skip to main content

s2_common/types/
config.rs

1//! Stream and basin configuration types.
2//!
3//! Each config area (stream, timestamping, delete-on-empty) has three type tiers:
4//!
5//! - Resolved (`StreamConfig`, `TimestampingConfig`, `DeleteOnEmptyConfig`): All fields are
6//!   concrete values. Produced by merging optional configs with defaults using `merge()`.
7//!
8//! - Optional (`OptionalStreamConfig`, `OptionalTimestampingConfig`,
9//!   `OptionalDeleteOnEmptyConfig`): The internal representation, stored in metadata. Fields are
10//!   `Option<T>` where `None` means "not set at this layer, fall back to defaults."
11//!
12//! - Reconfiguration (`StreamReconfiguration`, `TimestampingReconfiguration`,
13//!   `DeleteOnEmptyReconfiguration`): Partial updates with PATCH semantics. Most fields are
14//!   `Maybe<Option<T>>` with three states: `Unspecified` (don't change), `Specified(None)` (clear
15//!   to default), `Specified(Some(v))` (set to value). Collection-valued fields may instead use an
16//!   empty collection to mean "clear to default". Applied using `reconfigure()`.
17//!
18//! Reconfiguration of nested fields (e.g. `timestamping`, `delete_on_empty`,
19//! `default_stream_config`) is applied recursively: `Specified(Some(inner_reconfig))`
20//! applies the inner reconfiguration to the existing value, while `Specified(None)`
21//! clears it to the default.
22//!
23//! `merge()` resolves optional configs into resolved configs with precedence:
24//! stream-level → basin-level → system default (via `Option::or` chaining).
25//!
26//! The `From<Optional*> for *Reconfiguration` conversions treat every field as
27//! `Specified`. These conversions represent "set the config to exactly this state",
28//! not "update only the fields that are set."
29
30use std::time::Duration;
31
32use enum_ordinalize::Ordinalize;
33use enumset::EnumSet;
34
35use crate::{encryption::EncryptionMode, maybe::Maybe};
36
37#[derive(
38    Debug,
39    Default,
40    Clone,
41    Copy,
42    strum::Display,
43    strum::IntoStaticStr,
44    strum::EnumIter,
45    strum::FromRepr,
46    strum::EnumString,
47    PartialEq,
48    Eq,
49    Ordinalize,
50    Hash,
51)]
52#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
53#[repr(u8)]
54pub enum StorageClass {
55    #[strum(serialize = "standard")]
56    Standard = 1,
57    #[default]
58    #[strum(serialize = "express")]
59    Express = 2,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum RetentionPolicy {
64    Age(Duration),
65    Infinite(),
66}
67
68impl RetentionPolicy {
69    pub fn age(&self) -> Option<Duration> {
70        match self {
71            Self::Age(duration) => Some(*duration),
72            Self::Infinite() => None,
73        }
74    }
75}
76
77impl Default for RetentionPolicy {
78    fn default() -> Self {
79        const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
80
81        Self::Age(ONE_WEEK)
82    }
83}
84
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
86pub enum TimestampingMode {
87    #[default]
88    ClientPrefer,
89    ClientRequire,
90    Arrival,
91}
92
93#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
94pub struct TimestampingConfig {
95    pub mode: TimestampingMode,
96    pub uncapped: bool,
97}
98
99#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
100pub struct DeleteOnEmptyConfig {
101    pub min_age: Duration,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub struct EncryptionConfig {
106    pub allowed_modes: EnumSet<EncryptionMode>,
107}
108
109pub const DEFAULT_ALLOWED_ENCRYPTION_MODES: EnumSet<EncryptionMode> =
110    enumset::enum_set!(EncryptionMode::Plain);
111
112impl Default for EncryptionConfig {
113    fn default() -> Self {
114        Self {
115            allowed_modes: DEFAULT_ALLOWED_ENCRYPTION_MODES,
116        }
117    }
118}
119
120#[derive(Debug, Clone, Default, PartialEq, Eq)]
121pub struct StreamConfig {
122    pub storage_class: StorageClass,
123    pub retention_policy: RetentionPolicy,
124    pub timestamping: TimestampingConfig,
125    pub delete_on_empty: DeleteOnEmptyConfig,
126    pub encryption: EncryptionConfig,
127}
128
129#[derive(Debug, Clone, Default)]
130pub struct TimestampingReconfiguration {
131    pub mode: Maybe<Option<TimestampingMode>>,
132    pub uncapped: Maybe<Option<bool>>,
133}
134
135#[derive(Debug, Clone, Default)]
136pub struct DeleteOnEmptyReconfiguration {
137    pub min_age: Maybe<Option<Duration>>,
138}
139
140#[derive(Debug, Clone, Default)]
141pub struct EncryptionReconfiguration {
142    pub allowed_modes: Maybe<EnumSet<EncryptionMode>>,
143}
144
145#[derive(Debug, Clone, Default)]
146pub struct StreamReconfiguration {
147    pub storage_class: Maybe<Option<StorageClass>>,
148    pub retention_policy: Maybe<Option<RetentionPolicy>>,
149    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
150    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
151    pub encryption: Maybe<Option<EncryptionReconfiguration>>,
152}
153
154#[derive(Debug, Clone, Copy, Default)]
155pub struct OptionalTimestampingConfig {
156    pub mode: Option<TimestampingMode>,
157    pub uncapped: Option<bool>,
158}
159
160impl OptionalTimestampingConfig {
161    pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
162        if let Maybe::Specified(mode) = reconfiguration.mode {
163            self.mode = mode;
164        }
165        if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
166            self.uncapped = uncapped;
167        }
168        self
169    }
170
171    pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
172        let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
173        let uncapped = self
174            .uncapped
175            .or(basin_defaults.uncapped)
176            .unwrap_or_default();
177        TimestampingConfig { mode, uncapped }
178    }
179}
180
181impl From<OptionalTimestampingConfig> for TimestampingConfig {
182    fn from(value: OptionalTimestampingConfig) -> Self {
183        Self {
184            mode: value.mode.unwrap_or_default(),
185            uncapped: value.uncapped.unwrap_or_default(),
186        }
187    }
188}
189
190impl From<TimestampingConfig> for OptionalTimestampingConfig {
191    fn from(value: TimestampingConfig) -> Self {
192        Self {
193            mode: Some(value.mode),
194            uncapped: Some(value.uncapped),
195        }
196    }
197}
198
199impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
200    fn from(value: OptionalTimestampingConfig) -> Self {
201        Self {
202            mode: value.mode.into(),
203            uncapped: value.uncapped.into(),
204        }
205    }
206}
207
208#[derive(Debug, Clone, Default)]
209pub struct OptionalDeleteOnEmptyConfig {
210    pub min_age: Option<Duration>,
211}
212
213impl OptionalDeleteOnEmptyConfig {
214    pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
215        if let Maybe::Specified(min_age) = reconfiguration.min_age {
216            self.min_age = min_age;
217        }
218        self
219    }
220
221    pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
222        let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
223        DeleteOnEmptyConfig { min_age }
224    }
225}
226
227impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
228    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
229        Self {
230            min_age: value.min_age.unwrap_or_default(),
231        }
232    }
233}
234
235impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
236    fn from(value: DeleteOnEmptyConfig) -> Self {
237        Self {
238            min_age: Some(value.min_age),
239        }
240    }
241}
242
243impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
244    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
245        Self {
246            min_age: value.min_age.into(),
247        }
248    }
249}
250
251#[derive(Debug, Clone, Default)]
252pub struct OptionalEncryptionConfig {
253    pub allowed_modes: EnumSet<EncryptionMode>,
254}
255
256impl OptionalEncryptionConfig {
257    pub fn reconfigure(mut self, reconfiguration: EncryptionReconfiguration) -> Self {
258        if let Maybe::Specified(allowed_modes) = reconfiguration.allowed_modes {
259            self.allowed_modes = allowed_modes;
260        }
261        self
262    }
263
264    pub fn merge(self, basin_defaults: Self) -> EncryptionConfig {
265        let allowed_modes = if !self.allowed_modes.is_empty() {
266            self.allowed_modes
267        } else if !basin_defaults.allowed_modes.is_empty() {
268            basin_defaults.allowed_modes
269        } else {
270            DEFAULT_ALLOWED_ENCRYPTION_MODES
271        };
272        EncryptionConfig { allowed_modes }
273    }
274}
275
276impl From<OptionalEncryptionConfig> for EncryptionConfig {
277    fn from(value: OptionalEncryptionConfig) -> Self {
278        Self {
279            allowed_modes: value.allowed_modes,
280        }
281    }
282}
283
284impl From<EncryptionConfig> for OptionalEncryptionConfig {
285    fn from(value: EncryptionConfig) -> Self {
286        Self {
287            allowed_modes: value.allowed_modes,
288        }
289    }
290}
291
292impl From<OptionalEncryptionConfig> for EncryptionReconfiguration {
293    fn from(value: OptionalEncryptionConfig) -> Self {
294        Self {
295            allowed_modes: Maybe::Specified(value.allowed_modes),
296        }
297    }
298}
299
300#[derive(Debug, Clone, Default)]
301pub struct OptionalStreamConfig {
302    pub storage_class: Option<StorageClass>,
303    pub retention_policy: Option<RetentionPolicy>,
304    pub timestamping: OptionalTimestampingConfig,
305    pub delete_on_empty: OptionalDeleteOnEmptyConfig,
306    pub encryption: OptionalEncryptionConfig,
307}
308
309impl OptionalStreamConfig {
310    pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
311        let StreamReconfiguration {
312            storage_class,
313            retention_policy,
314            timestamping,
315            delete_on_empty,
316            encryption,
317        } = reconfiguration;
318        if let Maybe::Specified(storage_class) = storage_class {
319            self.storage_class = storage_class;
320        }
321        if let Maybe::Specified(retention_policy) = retention_policy {
322            self.retention_policy = retention_policy;
323        }
324        if let Maybe::Specified(timestamping) = timestamping {
325            self.timestamping = timestamping
326                .map(|ts| self.timestamping.reconfigure(ts))
327                .unwrap_or_default();
328        }
329        if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
330            self.delete_on_empty = delete_on_empty_reconfig
331                .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
332                .unwrap_or_default();
333        }
334        if let Maybe::Specified(encryption) = encryption {
335            self.encryption = encryption
336                .map(|enc| self.encryption.reconfigure(enc))
337                .unwrap_or_default();
338        }
339        self
340    }
341
342    pub fn merge(self, basin_defaults: Self) -> StreamConfig {
343        let storage_class = self
344            .storage_class
345            .or(basin_defaults.storage_class)
346            .unwrap_or_default();
347
348        let retention_policy = self
349            .retention_policy
350            .or(basin_defaults.retention_policy)
351            .unwrap_or_default();
352
353        let timestamping = self.timestamping.merge(basin_defaults.timestamping);
354
355        let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
356
357        let encryption = self.encryption.merge(basin_defaults.encryption);
358
359        StreamConfig {
360            storage_class,
361            retention_policy,
362            timestamping,
363            delete_on_empty,
364            encryption,
365        }
366    }
367}
368
369impl From<OptionalStreamConfig> for StreamReconfiguration {
370    fn from(value: OptionalStreamConfig) -> Self {
371        let OptionalStreamConfig {
372            storage_class,
373            retention_policy,
374            timestamping,
375            delete_on_empty,
376            encryption,
377        } = value;
378
379        Self {
380            storage_class: storage_class.into(),
381            retention_policy: retention_policy.into(),
382            timestamping: Some(timestamping.into()).into(),
383            delete_on_empty: Some(delete_on_empty.into()).into(),
384            encryption: Some(encryption.into()).into(),
385        }
386    }
387}
388
389impl From<OptionalStreamConfig> for StreamConfig {
390    fn from(value: OptionalStreamConfig) -> Self {
391        let OptionalStreamConfig {
392            storage_class,
393            retention_policy,
394            timestamping,
395            delete_on_empty,
396            encryption,
397        } = value;
398
399        Self {
400            storage_class: storage_class.unwrap_or_default(),
401            retention_policy: retention_policy.unwrap_or_default(),
402            timestamping: timestamping.into(),
403            delete_on_empty: delete_on_empty.into(),
404            encryption: encryption.into(),
405        }
406    }
407}
408
409impl From<StreamConfig> for OptionalStreamConfig {
410    fn from(value: StreamConfig) -> Self {
411        let StreamConfig {
412            storage_class,
413            retention_policy,
414            timestamping,
415            delete_on_empty,
416            encryption,
417        } = value;
418
419        Self {
420            storage_class: Some(storage_class),
421            retention_policy: Some(retention_policy),
422            timestamping: timestamping.into(),
423            delete_on_empty: delete_on_empty.into(),
424            encryption: encryption.into(),
425        }
426    }
427}
428
429#[derive(Debug, Clone, Default)]
430pub struct BasinConfig {
431    pub default_stream_config: OptionalStreamConfig,
432    pub create_stream_on_append: bool,
433    pub create_stream_on_read: bool,
434}
435
436impl BasinConfig {
437    pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
438        let BasinReconfiguration {
439            default_stream_config,
440            create_stream_on_append,
441            create_stream_on_read,
442        } = reconfiguration;
443
444        if let Maybe::Specified(default_stream_config) = default_stream_config {
445            self.default_stream_config = default_stream_config
446                .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
447                .unwrap_or_default();
448        }
449
450        if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
451            self.create_stream_on_append = create_stream_on_append;
452        }
453
454        if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
455            self.create_stream_on_read = create_stream_on_read;
456        }
457
458        self
459    }
460}
461
462impl From<BasinConfig> for BasinReconfiguration {
463    fn from(value: BasinConfig) -> Self {
464        let BasinConfig {
465            default_stream_config,
466            create_stream_on_append,
467            create_stream_on_read,
468        } = value;
469
470        Self {
471            default_stream_config: Some(default_stream_config.into()).into(),
472            create_stream_on_append: create_stream_on_append.into(),
473            create_stream_on_read: create_stream_on_read.into(),
474        }
475    }
476}
477
478#[derive(Debug, Clone, Default)]
479pub struct BasinReconfiguration {
480    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
481    pub create_stream_on_append: Maybe<bool>,
482    pub create_stream_on_read: Maybe<bool>,
483}