Skip to main content

s2_common/
config.rs

1//! Stream and basin configuration types.
2//!
3//! Stream configuration uses three representations:
4//!
5//! - Resolved (`StreamConfig`, `TimestampingConfig`, `DeleteOnEmptyConfig`): concrete values,
6//!   produced by merging optional configs with defaults using `merge()`.
7//!
8//! - Optional (`OptionalStreamConfig`, `OptionalTimestampingConfig`,
9//!   `OptionalDeleteOnEmptyConfig`): partial configuration layers, where `None` means "not set at
10//!   this layer; fall back to defaults."
11//!
12//! - Reconfiguration (`StreamReconfiguration`, `TimestampingReconfiguration`,
13//!   `DeleteOnEmptyReconfiguration`): PATCH-style updates applied with `reconfigure()`.
14//!
15//! Reconfiguration of nested fields (e.g. `timestamping`, `delete_on_empty`,
16//! `default_stream_config`) is applied recursively: `Specified(Some(inner_reconfig))`
17//! applies the inner reconfiguration to the existing value, while `Specified(None)`
18//! clears it to the default.
19//!
20//! `merge()` resolves optional configs into resolved configs with precedence:
21//! stream-level → basin-level → system default (via `Option::or` chaining).
22//!
23//! Basin config also carries basin-level knobs like `stream_cipher`,
24//! `create_stream_on_append`, and `create_stream_on_read`.
25
26use std::time::Duration;
27
28use crate::{ValidationError, encryption::EncryptionAlgorithm, maybe::Maybe};
29
30#[derive(
31    Debug,
32    Default,
33    Clone,
34    Copy,
35    strum::Display,
36    strum::IntoStaticStr,
37    strum::EnumIter,
38    strum::FromRepr,
39    strum::EnumString,
40    PartialEq,
41    Eq,
42    Hash,
43)]
44#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
45#[repr(u8)]
46pub enum StorageClass {
47    #[strum(serialize = "standard")]
48    Standard = 1,
49    #[default]
50    #[strum(serialize = "express")]
51    Express = 2,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum RetentionPolicy {
56    Age(Duration),
57    Infinite(),
58}
59
60impl RetentionPolicy {
61    pub fn age(&self) -> Option<Duration> {
62        match self {
63            Self::Age(duration) => Some(*duration),
64            Self::Infinite() => None,
65        }
66    }
67
68    pub fn validate(self) -> Result<Self, ValidationError> {
69        match self {
70            Self::Age(duration) if duration.is_zero() => Err(ValidationError(
71                "age must be greater than 0 seconds".to_string(),
72            )),
73            policy => Ok(policy),
74        }
75    }
76}
77
78impl Default for RetentionPolicy {
79    fn default() -> Self {
80        const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
81
82        Self::Age(ONE_WEEK)
83    }
84}
85
86#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
87pub enum TimestampingMode {
88    #[default]
89    ClientPrefer,
90    ClientRequire,
91    Arrival,
92}
93
94#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
95pub struct TimestampingConfig {
96    pub mode: TimestampingMode,
97    pub uncapped: bool,
98}
99
100#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
101pub struct DeleteOnEmptyConfig {
102    pub min_age: Duration,
103}
104
105impl DeleteOnEmptyConfig {
106    pub fn min_age(&self) -> Option<Duration> {
107        Some(self.min_age).filter(|age| !age.is_zero())
108    }
109}
110
111#[derive(Debug, Clone, Default, PartialEq, Eq)]
112pub struct StreamConfig {
113    pub storage_class: StorageClass,
114    pub retention_policy: RetentionPolicy,
115    pub timestamping: TimestampingConfig,
116    pub delete_on_empty: DeleteOnEmptyConfig,
117}
118
119#[derive(Debug, Clone, Default)]
120pub struct TimestampingReconfiguration {
121    pub mode: Maybe<Option<TimestampingMode>>,
122    pub uncapped: Maybe<Option<bool>>,
123}
124
125#[derive(Debug, Clone, Default)]
126pub struct DeleteOnEmptyReconfiguration {
127    pub min_age: Maybe<Option<Duration>>,
128}
129
130#[derive(Debug, Clone, Default)]
131pub struct StreamReconfiguration {
132    pub storage_class: Maybe<Option<StorageClass>>,
133    pub retention_policy: Maybe<Option<RetentionPolicy>>,
134    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
135    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
136}
137
138#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
139pub struct OptionalTimestampingConfig {
140    pub mode: Option<TimestampingMode>,
141    pub uncapped: Option<bool>,
142}
143
144impl OptionalTimestampingConfig {
145    pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
146        if let Maybe::Specified(mode) = reconfiguration.mode {
147            self.mode = mode;
148        }
149        if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
150            self.uncapped = uncapped;
151        }
152        self
153    }
154
155    pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
156        let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
157        let uncapped = self
158            .uncapped
159            .or(basin_defaults.uncapped)
160            .unwrap_or_default();
161        TimestampingConfig { mode, uncapped }
162    }
163}
164
165impl From<OptionalTimestampingConfig> for TimestampingConfig {
166    fn from(value: OptionalTimestampingConfig) -> Self {
167        Self {
168            mode: value.mode.unwrap_or_default(),
169            uncapped: value.uncapped.unwrap_or_default(),
170        }
171    }
172}
173
174impl From<TimestampingConfig> for OptionalTimestampingConfig {
175    fn from(value: TimestampingConfig) -> Self {
176        Self {
177            mode: Some(value.mode),
178            uncapped: Some(value.uncapped),
179        }
180    }
181}
182
183#[derive(Debug, Clone, Default, PartialEq, Eq)]
184pub struct OptionalDeleteOnEmptyConfig {
185    pub min_age: Option<Duration>,
186}
187
188impl OptionalDeleteOnEmptyConfig {
189    pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
190        if let Maybe::Specified(min_age) = reconfiguration.min_age {
191            self.min_age = min_age;
192        }
193        self
194    }
195
196    pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
197        let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
198        DeleteOnEmptyConfig { min_age }
199    }
200}
201
202impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
203    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
204        Self {
205            min_age: value.min_age.unwrap_or_default(),
206        }
207    }
208}
209
210impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
211    fn from(value: DeleteOnEmptyConfig) -> Self {
212        Self {
213            min_age: Some(value.min_age),
214        }
215    }
216}
217
218#[derive(Debug, Clone, Default, PartialEq, Eq)]
219pub struct OptionalStreamConfig {
220    pub storage_class: Option<StorageClass>,
221    pub retention_policy: Option<RetentionPolicy>,
222    pub timestamping: OptionalTimestampingConfig,
223    pub delete_on_empty: OptionalDeleteOnEmptyConfig,
224}
225
226impl OptionalStreamConfig {
227    pub fn validate(&self) -> Result<(), ValidationError> {
228        if let Some(retention_policy) = self.retention_policy {
229            retention_policy.validate()?;
230        }
231        Ok(())
232    }
233
234    pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
235        let StreamReconfiguration {
236            storage_class,
237            retention_policy,
238            timestamping,
239            delete_on_empty,
240        } = reconfiguration;
241        if let Maybe::Specified(storage_class) = storage_class {
242            self.storage_class = storage_class;
243        }
244        if let Maybe::Specified(retention_policy) = retention_policy {
245            self.retention_policy = retention_policy;
246        }
247        if let Maybe::Specified(timestamping) = timestamping {
248            self.timestamping = timestamping
249                .map(|ts| self.timestamping.reconfigure(ts))
250                .unwrap_or_default();
251        }
252        if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
253            self.delete_on_empty = delete_on_empty_reconfig
254                .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
255                .unwrap_or_default();
256        }
257        self
258    }
259
260    pub fn merge(self, basin_defaults: Self) -> StreamConfig {
261        let storage_class = self
262            .storage_class
263            .or(basin_defaults.storage_class)
264            .unwrap_or_default();
265
266        let retention_policy = self
267            .retention_policy
268            .or(basin_defaults.retention_policy)
269            .unwrap_or_default();
270
271        let timestamping = self.timestamping.merge(basin_defaults.timestamping);
272
273        let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
274
275        StreamConfig {
276            storage_class,
277            retention_policy,
278            timestamping,
279            delete_on_empty,
280        }
281    }
282}
283
284impl From<OptionalStreamConfig> for StreamConfig {
285    fn from(value: OptionalStreamConfig) -> Self {
286        let OptionalStreamConfig {
287            storage_class,
288            retention_policy,
289            timestamping,
290            delete_on_empty,
291        } = value;
292
293        Self {
294            storage_class: storage_class.unwrap_or_default(),
295            retention_policy: retention_policy.unwrap_or_default(),
296            timestamping: timestamping.into(),
297            delete_on_empty: delete_on_empty.into(),
298        }
299    }
300}
301
302impl From<StreamConfig> for OptionalStreamConfig {
303    fn from(value: StreamConfig) -> Self {
304        let StreamConfig {
305            storage_class,
306            retention_policy,
307            timestamping,
308            delete_on_empty,
309        } = value;
310
311        Self {
312            storage_class: Some(storage_class),
313            retention_policy: Some(retention_policy),
314            timestamping: timestamping.into(),
315            delete_on_empty: delete_on_empty.into(),
316        }
317    }
318}
319
320#[derive(Debug, Clone, Default, PartialEq, Eq)]
321pub struct BasinConfig {
322    pub default_stream_config: OptionalStreamConfig,
323    pub stream_cipher: Option<EncryptionAlgorithm>,
324    pub create_stream_on_append: bool,
325    pub create_stream_on_read: bool,
326}
327
328impl BasinConfig {
329    pub fn validate(&self) -> Result<(), ValidationError> {
330        self.default_stream_config.validate()
331    }
332
333    pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
334        let BasinReconfiguration {
335            default_stream_config,
336            stream_cipher,
337            create_stream_on_append,
338            create_stream_on_read,
339        } = reconfiguration;
340
341        if let Maybe::Specified(default_stream_config) = default_stream_config {
342            self.default_stream_config = default_stream_config
343                .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
344                .unwrap_or_default();
345        }
346
347        if let Maybe::Specified(stream_cipher) = stream_cipher {
348            self.stream_cipher = stream_cipher;
349        }
350
351        if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
352            self.create_stream_on_append = create_stream_on_append;
353        }
354
355        if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
356            self.create_stream_on_read = create_stream_on_read;
357        }
358
359        self
360    }
361}
362
363#[derive(Debug, Clone, Default)]
364pub struct BasinReconfiguration {
365    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
366    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
367    pub create_stream_on_append: Maybe<bool>,
368    pub create_stream_on_read: Maybe<bool>,
369}