s2_common/types/
config.rs

1use std::time::Duration;
2
3use enum_ordinalize::Ordinalize;
4
5use crate::maybe::Maybe;
6
7#[derive(
8    Debug,
9    Default,
10    Clone,
11    Copy,
12    strum::Display,
13    strum::IntoStaticStr,
14    strum::EnumIter,
15    strum::FromRepr,
16    strum::EnumString,
17    PartialEq,
18    Eq,
19    Ordinalize,
20)]
21#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
22#[repr(u8)]
23pub enum StorageClass {
24    #[strum(serialize = "standard")]
25    Standard = 1,
26    #[default]
27    #[strum(serialize = "express")]
28    Express = 2,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum RetentionPolicy {
33    Age(Duration),
34    Infinite(),
35}
36
37impl RetentionPolicy {
38    pub fn age(&self) -> Option<Duration> {
39        match self {
40            Self::Age(duration) => Some(*duration),
41            Self::Infinite() => None,
42        }
43    }
44}
45
46impl Default for RetentionPolicy {
47    fn default() -> Self {
48        const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
49
50        Self::Age(ONE_WEEK)
51    }
52}
53
54#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
55pub enum TimestampingMode {
56    #[default]
57    ClientPrefer,
58    ClientRequire,
59    Arrival,
60}
61
62#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
63pub struct TimestampingConfig {
64    pub mode: TimestampingMode,
65    pub uncapped: bool,
66}
67
68#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
69pub struct DeleteOnEmptyConfig {
70    pub min_age: Duration,
71}
72
73#[derive(Debug, Clone, Default, PartialEq, Eq)]
74pub struct StreamConfig {
75    pub storage_class: StorageClass,
76    pub retention_policy: RetentionPolicy,
77    pub timestamping: TimestampingConfig,
78    pub delete_on_empty: DeleteOnEmptyConfig,
79}
80
81#[derive(Debug, Clone, Default)]
82pub struct TimestampingReconfiguration {
83    pub mode: Maybe<Option<TimestampingMode>>,
84    pub uncapped: Maybe<Option<bool>>,
85}
86
87#[derive(Debug, Clone, Default)]
88pub struct DeleteOnEmptyReconfiguration {
89    pub min_age: Maybe<Option<Duration>>,
90}
91
92#[derive(Debug, Clone, Default)]
93pub struct StreamReconfiguration {
94    pub storage_class: Maybe<Option<StorageClass>>,
95    pub retention_policy: Maybe<Option<RetentionPolicy>>,
96    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
97    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
98}
99
100#[derive(Debug, Clone, Copy, Default)]
101pub struct OptionalTimestampingConfig {
102    pub mode: Option<TimestampingMode>,
103    pub uncapped: Option<bool>,
104}
105
106impl OptionalTimestampingConfig {
107    pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
108        if let Maybe::Specified(mode) = reconfiguration.mode {
109            self.mode = mode;
110        }
111        if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
112            self.uncapped = uncapped;
113        }
114        self
115    }
116
117    pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
118        let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
119        let uncapped = self
120            .uncapped
121            .or(basin_defaults.uncapped)
122            .unwrap_or_default();
123        TimestampingConfig { mode, uncapped }
124    }
125}
126
127impl From<OptionalTimestampingConfig> for TimestampingConfig {
128    fn from(value: OptionalTimestampingConfig) -> Self {
129        Self {
130            mode: value.mode.unwrap_or_default(),
131            uncapped: value.uncapped.unwrap_or_default(),
132        }
133    }
134}
135
136impl From<TimestampingConfig> for OptionalTimestampingConfig {
137    fn from(value: TimestampingConfig) -> Self {
138        Self {
139            mode: Some(value.mode),
140            uncapped: Some(value.uncapped),
141        }
142    }
143}
144
145impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
146    fn from(value: OptionalTimestampingConfig) -> Self {
147        Self {
148            mode: value.mode.into(),
149            uncapped: value.uncapped.into(),
150        }
151    }
152}
153
154#[derive(Debug, Clone, Default)]
155pub struct OptionalDeleteOnEmptyConfig {
156    pub min_age: Option<Duration>,
157}
158
159impl OptionalDeleteOnEmptyConfig {
160    pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
161        if let Maybe::Specified(min_age) = reconfiguration.min_age {
162            self.min_age = min_age;
163        }
164        self
165    }
166
167    pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
168        let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
169        DeleteOnEmptyConfig { min_age }
170    }
171}
172
173impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
174    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
175        Self {
176            min_age: value.min_age.unwrap_or_default(),
177        }
178    }
179}
180
181impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
182    fn from(value: DeleteOnEmptyConfig) -> Self {
183        Self {
184            min_age: Some(value.min_age),
185        }
186    }
187}
188
189impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
190    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
191        Self {
192            min_age: value.min_age.into(),
193        }
194    }
195}
196
197#[derive(Debug, Clone, Default)]
198pub struct OptionalStreamConfig {
199    pub storage_class: Option<StorageClass>,
200    pub retention_policy: Option<RetentionPolicy>,
201    pub timestamping: OptionalTimestampingConfig,
202    pub delete_on_empty: OptionalDeleteOnEmptyConfig,
203}
204
205impl OptionalStreamConfig {
206    pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
207        let StreamReconfiguration {
208            storage_class,
209            retention_policy,
210            timestamping,
211            delete_on_empty,
212        } = reconfiguration;
213        if let Maybe::Specified(storage_class) = storage_class {
214            self.storage_class = storage_class;
215        }
216        if let Maybe::Specified(retention_policy) = retention_policy {
217            self.retention_policy = retention_policy;
218        }
219        if let Maybe::Specified(timestamping) = timestamping {
220            self.timestamping = timestamping
221                .map(|ts| self.timestamping.reconfigure(ts))
222                .unwrap_or_default();
223        }
224        if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
225            self.delete_on_empty = delete_on_empty_reconfig
226                .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
227                .unwrap_or_default();
228        }
229        self
230    }
231
232    pub fn merge(self, basin_defaults: Self) -> StreamConfig {
233        let storage_class = self
234            .storage_class
235            .or(basin_defaults.storage_class)
236            .unwrap_or_default();
237
238        let retention_policy = self
239            .retention_policy
240            .or(basin_defaults.retention_policy)
241            .unwrap_or_default();
242
243        let timestamping = self.timestamping.merge(basin_defaults.timestamping);
244
245        let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
246
247        StreamConfig {
248            storage_class,
249            retention_policy,
250            timestamping,
251            delete_on_empty,
252        }
253    }
254}
255
256impl From<OptionalStreamConfig> for StreamReconfiguration {
257    fn from(value: OptionalStreamConfig) -> Self {
258        let OptionalStreamConfig {
259            storage_class,
260            retention_policy,
261            timestamping,
262            delete_on_empty,
263        } = value;
264
265        Self {
266            storage_class: storage_class.into(),
267            retention_policy: retention_policy.into(),
268            timestamping: Some(timestamping.into()).into(),
269            delete_on_empty: Some(delete_on_empty.into()).into(),
270        }
271    }
272}
273
274impl From<OptionalStreamConfig> for StreamConfig {
275    fn from(value: OptionalStreamConfig) -> Self {
276        let OptionalStreamConfig {
277            storage_class,
278            retention_policy,
279            timestamping,
280            delete_on_empty,
281        } = value;
282
283        Self {
284            storage_class: storage_class.unwrap_or_default(),
285            retention_policy: retention_policy.unwrap_or_default(),
286            timestamping: timestamping.into(),
287            delete_on_empty: delete_on_empty.into(),
288        }
289    }
290}
291
292impl From<StreamConfig> for OptionalStreamConfig {
293    fn from(value: StreamConfig) -> Self {
294        let StreamConfig {
295            storage_class,
296            retention_policy,
297            timestamping,
298            delete_on_empty,
299        } = value;
300
301        Self {
302            storage_class: Some(storage_class),
303            retention_policy: Some(retention_policy),
304            timestamping: timestamping.into(),
305            delete_on_empty: delete_on_empty.into(),
306        }
307    }
308}
309
310#[derive(Debug, Clone, Default)]
311pub struct BasinConfig {
312    pub default_stream_config: OptionalStreamConfig,
313    pub create_stream_on_append: bool,
314    pub create_stream_on_read: bool,
315}
316
317impl BasinConfig {
318    pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
319        let BasinReconfiguration {
320            default_stream_config,
321            create_stream_on_append,
322            create_stream_on_read,
323        } = reconfiguration;
324
325        if let Maybe::Specified(default_stream_config) = default_stream_config {
326            self.default_stream_config = default_stream_config
327                .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
328                .unwrap_or_default();
329        }
330
331        if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
332            self.create_stream_on_append = create_stream_on_append;
333        }
334
335        if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
336            self.create_stream_on_read = create_stream_on_read;
337        }
338
339        self
340    }
341}
342
343impl From<BasinConfig> for BasinReconfiguration {
344    fn from(value: BasinConfig) -> Self {
345        let BasinConfig {
346            default_stream_config,
347            create_stream_on_append,
348            create_stream_on_read,
349        } = value;
350
351        Self {
352            default_stream_config: Some(default_stream_config.into()).into(),
353            create_stream_on_append: create_stream_on_append.into(),
354            create_stream_on_read: create_stream_on_read.into(),
355        }
356    }
357}
358
359#[derive(Debug, Clone, Default)]
360pub struct BasinReconfiguration {
361    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
362    pub create_stream_on_append: Maybe<bool>,
363    pub create_stream_on_read: Maybe<bool>,
364}