Skip to main content

s2_common/types/
config.rs

1//! Stream and basin configuration types.
2//!
3//! Each config area (stream, timestamping, delete-on-empty) has three type tiers:
4//!
5//! - Resolved (`StreamConfig`, `TimestampingConfig`, `DeleteOnEmptyConfig`): All fields are
6//!   concrete values. Produced by merging optional configs with defaults using `merge()`.
7//!
8//! - Optional (`OptionalStreamConfig`, `OptionalTimestampingConfig`,
9//!   `OptionalDeleteOnEmptyConfig`): The internal representation, stored in metadata. Fields are
10//!   `Option<T>` where `None` means "not set at this layer, fall back to defaults."
11//!
12//! - Reconfiguration (`StreamReconfiguration`, `TimestampingReconfiguration`,
13//!   `DeleteOnEmptyReconfiguration`): Partial updates with PATCH semantics. Fields are
14//!   `Maybe<Option<T>>` with three states: `Unspecified` (don't change), `Specified(None)` (clear
15//!   to default), `Specified(Some(v))` (set to value). Applied using `reconfigure()`.
16//!
17//! Reconfiguration of nested fields (e.g. `timestamping`, `delete_on_empty`,
18//! `default_stream_config`) is applied recursively: `Specified(Some(inner_reconfig))`
19//! applies the inner reconfiguration to the existing value, while `Specified(None)`
20//! clears it to the default.
21//!
22//! `merge()` resolves optional configs into resolved configs with precedence:
23//! stream-level → basin-level → system default (via `Option::or` chaining).
24//!
25//! The `From<Optional*> for *Reconfiguration` conversions treat every field as
26//! `Specified` (including `None` → `Specified(None)`). These
27//! conversions represent "set the config to exactly this state", not "update only
28//! the fields that are set."
29
30use std::time::Duration;
31
32use enum_ordinalize::Ordinalize;
33
34use crate::maybe::Maybe;
35
36#[derive(
37    Debug,
38    Default,
39    Clone,
40    Copy,
41    strum::Display,
42    strum::IntoStaticStr,
43    strum::EnumIter,
44    strum::FromRepr,
45    strum::EnumString,
46    PartialEq,
47    Eq,
48    Ordinalize,
49    Hash,
50)]
51#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
52#[repr(u8)]
53pub enum StorageClass {
54    #[strum(serialize = "standard")]
55    Standard = 1,
56    #[default]
57    #[strum(serialize = "express")]
58    Express = 2,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum RetentionPolicy {
63    Age(Duration),
64    Infinite(),
65}
66
67impl RetentionPolicy {
68    pub fn age(&self) -> Option<Duration> {
69        match self {
70            Self::Age(duration) => Some(*duration),
71            Self::Infinite() => None,
72        }
73    }
74}
75
76impl Default for RetentionPolicy {
77    fn default() -> Self {
78        const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
79
80        Self::Age(ONE_WEEK)
81    }
82}
83
84#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
85pub enum TimestampingMode {
86    #[default]
87    ClientPrefer,
88    ClientRequire,
89    Arrival,
90}
91
92#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
93pub struct TimestampingConfig {
94    pub mode: TimestampingMode,
95    pub uncapped: bool,
96}
97
98#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
99pub struct DeleteOnEmptyConfig {
100    pub min_age: Duration,
101}
102
103#[derive(Debug, Clone, Default, PartialEq, Eq)]
104pub struct StreamConfig {
105    pub storage_class: StorageClass,
106    pub retention_policy: RetentionPolicy,
107    pub timestamping: TimestampingConfig,
108    pub delete_on_empty: DeleteOnEmptyConfig,
109}
110
111#[derive(Debug, Clone, Default)]
112pub struct TimestampingReconfiguration {
113    pub mode: Maybe<Option<TimestampingMode>>,
114    pub uncapped: Maybe<Option<bool>>,
115}
116
117#[derive(Debug, Clone, Default)]
118pub struct DeleteOnEmptyReconfiguration {
119    pub min_age: Maybe<Option<Duration>>,
120}
121
122#[derive(Debug, Clone, Default)]
123pub struct StreamReconfiguration {
124    pub storage_class: Maybe<Option<StorageClass>>,
125    pub retention_policy: Maybe<Option<RetentionPolicy>>,
126    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
127    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
128}
129
130#[derive(Debug, Clone, Copy, Default)]
131pub struct OptionalTimestampingConfig {
132    pub mode: Option<TimestampingMode>,
133    pub uncapped: Option<bool>,
134}
135
136impl OptionalTimestampingConfig {
137    pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
138        if let Maybe::Specified(mode) = reconfiguration.mode {
139            self.mode = mode;
140        }
141        if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
142            self.uncapped = uncapped;
143        }
144        self
145    }
146
147    pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
148        let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
149        let uncapped = self
150            .uncapped
151            .or(basin_defaults.uncapped)
152            .unwrap_or_default();
153        TimestampingConfig { mode, uncapped }
154    }
155}
156
157impl From<OptionalTimestampingConfig> for TimestampingConfig {
158    fn from(value: OptionalTimestampingConfig) -> Self {
159        Self {
160            mode: value.mode.unwrap_or_default(),
161            uncapped: value.uncapped.unwrap_or_default(),
162        }
163    }
164}
165
166impl From<TimestampingConfig> for OptionalTimestampingConfig {
167    fn from(value: TimestampingConfig) -> Self {
168        Self {
169            mode: Some(value.mode),
170            uncapped: Some(value.uncapped),
171        }
172    }
173}
174
175impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
176    fn from(value: OptionalTimestampingConfig) -> Self {
177        Self {
178            mode: value.mode.into(),
179            uncapped: value.uncapped.into(),
180        }
181    }
182}
183
184#[derive(Debug, Clone, Default)]
185pub struct OptionalDeleteOnEmptyConfig {
186    pub min_age: Option<Duration>,
187}
188
189impl OptionalDeleteOnEmptyConfig {
190    pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
191        if let Maybe::Specified(min_age) = reconfiguration.min_age {
192            self.min_age = min_age;
193        }
194        self
195    }
196
197    pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
198        let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
199        DeleteOnEmptyConfig { min_age }
200    }
201}
202
203impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
204    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
205        Self {
206            min_age: value.min_age.unwrap_or_default(),
207        }
208    }
209}
210
211impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
212    fn from(value: DeleteOnEmptyConfig) -> Self {
213        Self {
214            min_age: Some(value.min_age),
215        }
216    }
217}
218
219impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
220    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
221        Self {
222            min_age: value.min_age.into(),
223        }
224    }
225}
226
227#[derive(Debug, Clone, Default)]
228pub struct OptionalStreamConfig {
229    pub storage_class: Option<StorageClass>,
230    pub retention_policy: Option<RetentionPolicy>,
231    pub timestamping: OptionalTimestampingConfig,
232    pub delete_on_empty: OptionalDeleteOnEmptyConfig,
233}
234
235impl OptionalStreamConfig {
236    pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
237        let StreamReconfiguration {
238            storage_class,
239            retention_policy,
240            timestamping,
241            delete_on_empty,
242        } = reconfiguration;
243        if let Maybe::Specified(storage_class) = storage_class {
244            self.storage_class = storage_class;
245        }
246        if let Maybe::Specified(retention_policy) = retention_policy {
247            self.retention_policy = retention_policy;
248        }
249        if let Maybe::Specified(timestamping) = timestamping {
250            self.timestamping = timestamping
251                .map(|ts| self.timestamping.reconfigure(ts))
252                .unwrap_or_default();
253        }
254        if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
255            self.delete_on_empty = delete_on_empty_reconfig
256                .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
257                .unwrap_or_default();
258        }
259        self
260    }
261
262    pub fn merge(self, basin_defaults: Self) -> StreamConfig {
263        let storage_class = self
264            .storage_class
265            .or(basin_defaults.storage_class)
266            .unwrap_or_default();
267
268        let retention_policy = self
269            .retention_policy
270            .or(basin_defaults.retention_policy)
271            .unwrap_or_default();
272
273        let timestamping = self.timestamping.merge(basin_defaults.timestamping);
274
275        let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
276
277        StreamConfig {
278            storage_class,
279            retention_policy,
280            timestamping,
281            delete_on_empty,
282        }
283    }
284}
285
286impl From<OptionalStreamConfig> for StreamReconfiguration {
287    fn from(value: OptionalStreamConfig) -> Self {
288        let OptionalStreamConfig {
289            storage_class,
290            retention_policy,
291            timestamping,
292            delete_on_empty,
293        } = value;
294
295        Self {
296            storage_class: storage_class.into(),
297            retention_policy: retention_policy.into(),
298            timestamping: Some(timestamping.into()).into(),
299            delete_on_empty: Some(delete_on_empty.into()).into(),
300        }
301    }
302}
303
304impl From<OptionalStreamConfig> for StreamConfig {
305    fn from(value: OptionalStreamConfig) -> Self {
306        let OptionalStreamConfig {
307            storage_class,
308            retention_policy,
309            timestamping,
310            delete_on_empty,
311        } = value;
312
313        Self {
314            storage_class: storage_class.unwrap_or_default(),
315            retention_policy: retention_policy.unwrap_or_default(),
316            timestamping: timestamping.into(),
317            delete_on_empty: delete_on_empty.into(),
318        }
319    }
320}
321
322impl From<StreamConfig> for OptionalStreamConfig {
323    fn from(value: StreamConfig) -> Self {
324        let StreamConfig {
325            storage_class,
326            retention_policy,
327            timestamping,
328            delete_on_empty,
329        } = value;
330
331        Self {
332            storage_class: Some(storage_class),
333            retention_policy: Some(retention_policy),
334            timestamping: timestamping.into(),
335            delete_on_empty: delete_on_empty.into(),
336        }
337    }
338}
339
340#[derive(Debug, Clone, Default)]
341pub struct BasinConfig {
342    pub default_stream_config: OptionalStreamConfig,
343    pub create_stream_on_append: bool,
344    pub create_stream_on_read: bool,
345}
346
347impl BasinConfig {
348    pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
349        let BasinReconfiguration {
350            default_stream_config,
351            create_stream_on_append,
352            create_stream_on_read,
353        } = reconfiguration;
354
355        if let Maybe::Specified(default_stream_config) = default_stream_config {
356            self.default_stream_config = default_stream_config
357                .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
358                .unwrap_or_default();
359        }
360
361        if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
362            self.create_stream_on_append = create_stream_on_append;
363        }
364
365        if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
366            self.create_stream_on_read = create_stream_on_read;
367        }
368
369        self
370    }
371}
372
373impl From<BasinConfig> for BasinReconfiguration {
374    fn from(value: BasinConfig) -> Self {
375        let BasinConfig {
376            default_stream_config,
377            create_stream_on_append,
378            create_stream_on_read,
379        } = value;
380
381        Self {
382            default_stream_config: Some(default_stream_config.into()).into(),
383            create_stream_on_append: create_stream_on_append.into(),
384            create_stream_on_read: create_stream_on_read.into(),
385        }
386    }
387}
388
389#[derive(Debug, Clone, Default)]
390pub struct BasinReconfiguration {
391    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
392    pub create_stream_on_append: Maybe<bool>,
393    pub create_stream_on_read: Maybe<bool>,
394}