Skip to main content

s2_common/types/
config.rs

1use std::time::Duration;
2
3use enum_ordinalize::Ordinalize;
4
5use crate::maybe::Maybe;
6
7#[derive(
8    Debug,
9    Default,
10    Clone,
11    Copy,
12    strum::Display,
13    strum::IntoStaticStr,
14    strum::EnumIter,
15    strum::FromRepr,
16    strum::EnumString,
17    PartialEq,
18    Eq,
19    Ordinalize,
20    Hash,
21)]
22#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
23#[repr(u8)]
24pub enum StorageClass {
25    #[strum(serialize = "standard")]
26    Standard = 1,
27    #[default]
28    #[strum(serialize = "express")]
29    Express = 2,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum RetentionPolicy {
34    Age(Duration),
35    Infinite(),
36}
37
38impl RetentionPolicy {
39    pub fn age(&self) -> Option<Duration> {
40        match self {
41            Self::Age(duration) => Some(*duration),
42            Self::Infinite() => None,
43        }
44    }
45}
46
47impl Default for RetentionPolicy {
48    fn default() -> Self {
49        const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
50
51        Self::Age(ONE_WEEK)
52    }
53}
54
55#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
56pub enum TimestampingMode {
57    #[default]
58    ClientPrefer,
59    ClientRequire,
60    Arrival,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
64pub struct TimestampingConfig {
65    pub mode: TimestampingMode,
66    pub uncapped: bool,
67}
68
69#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
70pub struct DeleteOnEmptyConfig {
71    pub min_age: Duration,
72}
73
74#[derive(Debug, Clone, Default, PartialEq, Eq)]
75pub struct StreamConfig {
76    pub storage_class: StorageClass,
77    pub retention_policy: RetentionPolicy,
78    pub timestamping: TimestampingConfig,
79    pub delete_on_empty: DeleteOnEmptyConfig,
80}
81
82#[derive(Debug, Clone, Default)]
83pub struct TimestampingReconfiguration {
84    pub mode: Maybe<Option<TimestampingMode>>,
85    pub uncapped: Maybe<Option<bool>>,
86}
87
88#[derive(Debug, Clone, Default)]
89pub struct DeleteOnEmptyReconfiguration {
90    pub min_age: Maybe<Option<Duration>>,
91}
92
93#[derive(Debug, Clone, Default)]
94pub struct StreamReconfiguration {
95    pub storage_class: Maybe<Option<StorageClass>>,
96    pub retention_policy: Maybe<Option<RetentionPolicy>>,
97    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
98    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
99}
100
101#[derive(Debug, Clone, Copy, Default)]
102pub struct OptionalTimestampingConfig {
103    pub mode: Option<TimestampingMode>,
104    pub uncapped: Option<bool>,
105}
106
107impl OptionalTimestampingConfig {
108    pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
109        if let Maybe::Specified(mode) = reconfiguration.mode {
110            self.mode = mode;
111        }
112        if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
113            self.uncapped = uncapped;
114        }
115        self
116    }
117
118    pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
119        let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
120        let uncapped = self
121            .uncapped
122            .or(basin_defaults.uncapped)
123            .unwrap_or_default();
124        TimestampingConfig { mode, uncapped }
125    }
126}
127
128impl From<OptionalTimestampingConfig> for TimestampingConfig {
129    fn from(value: OptionalTimestampingConfig) -> Self {
130        Self {
131            mode: value.mode.unwrap_or_default(),
132            uncapped: value.uncapped.unwrap_or_default(),
133        }
134    }
135}
136
137impl From<TimestampingConfig> for OptionalTimestampingConfig {
138    fn from(value: TimestampingConfig) -> Self {
139        Self {
140            mode: Some(value.mode),
141            uncapped: Some(value.uncapped),
142        }
143    }
144}
145
146impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
147    fn from(value: OptionalTimestampingConfig) -> Self {
148        Self {
149            mode: value.mode.into(),
150            uncapped: value.uncapped.into(),
151        }
152    }
153}
154
155#[derive(Debug, Clone, Default)]
156pub struct OptionalDeleteOnEmptyConfig {
157    pub min_age: Option<Duration>,
158}
159
160impl OptionalDeleteOnEmptyConfig {
161    pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
162        if let Maybe::Specified(min_age) = reconfiguration.min_age {
163            self.min_age = min_age;
164        }
165        self
166    }
167
168    pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
169        let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
170        DeleteOnEmptyConfig { min_age }
171    }
172}
173
174impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
175    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
176        Self {
177            min_age: value.min_age.unwrap_or_default(),
178        }
179    }
180}
181
182impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
183    fn from(value: DeleteOnEmptyConfig) -> Self {
184        Self {
185            min_age: Some(value.min_age),
186        }
187    }
188}
189
190impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
191    fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
192        Self {
193            min_age: value.min_age.into(),
194        }
195    }
196}
197
198#[derive(Debug, Clone, Default)]
199pub struct OptionalStreamConfig {
200    pub storage_class: Option<StorageClass>,
201    pub retention_policy: Option<RetentionPolicy>,
202    pub timestamping: OptionalTimestampingConfig,
203    pub delete_on_empty: OptionalDeleteOnEmptyConfig,
204}
205
206impl OptionalStreamConfig {
207    pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
208        let StreamReconfiguration {
209            storage_class,
210            retention_policy,
211            timestamping,
212            delete_on_empty,
213        } = reconfiguration;
214        if let Maybe::Specified(storage_class) = storage_class {
215            self.storage_class = storage_class;
216        }
217        if let Maybe::Specified(retention_policy) = retention_policy {
218            self.retention_policy = retention_policy;
219        }
220        if let Maybe::Specified(timestamping) = timestamping {
221            self.timestamping = timestamping
222                .map(|ts| self.timestamping.reconfigure(ts))
223                .unwrap_or_default();
224        }
225        if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
226            self.delete_on_empty = delete_on_empty_reconfig
227                .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
228                .unwrap_or_default();
229        }
230        self
231    }
232
233    pub fn merge(self, basin_defaults: Self) -> StreamConfig {
234        let storage_class = self
235            .storage_class
236            .or(basin_defaults.storage_class)
237            .unwrap_or_default();
238
239        let retention_policy = self
240            .retention_policy
241            .or(basin_defaults.retention_policy)
242            .unwrap_or_default();
243
244        let timestamping = self.timestamping.merge(basin_defaults.timestamping);
245
246        let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
247
248        StreamConfig {
249            storage_class,
250            retention_policy,
251            timestamping,
252            delete_on_empty,
253        }
254    }
255}
256
257impl From<OptionalStreamConfig> for StreamReconfiguration {
258    fn from(value: OptionalStreamConfig) -> Self {
259        let OptionalStreamConfig {
260            storage_class,
261            retention_policy,
262            timestamping,
263            delete_on_empty,
264        } = value;
265
266        Self {
267            storage_class: storage_class.into(),
268            retention_policy: retention_policy.into(),
269            timestamping: Some(timestamping.into()).into(),
270            delete_on_empty: Some(delete_on_empty.into()).into(),
271        }
272    }
273}
274
275impl From<OptionalStreamConfig> for StreamConfig {
276    fn from(value: OptionalStreamConfig) -> Self {
277        let OptionalStreamConfig {
278            storage_class,
279            retention_policy,
280            timestamping,
281            delete_on_empty,
282        } = value;
283
284        Self {
285            storage_class: storage_class.unwrap_or_default(),
286            retention_policy: retention_policy.unwrap_or_default(),
287            timestamping: timestamping.into(),
288            delete_on_empty: delete_on_empty.into(),
289        }
290    }
291}
292
293impl From<StreamConfig> for OptionalStreamConfig {
294    fn from(value: StreamConfig) -> Self {
295        let StreamConfig {
296            storage_class,
297            retention_policy,
298            timestamping,
299            delete_on_empty,
300        } = value;
301
302        Self {
303            storage_class: Some(storage_class),
304            retention_policy: Some(retention_policy),
305            timestamping: timestamping.into(),
306            delete_on_empty: delete_on_empty.into(),
307        }
308    }
309}
310
311#[derive(Debug, Clone, Default)]
312pub struct BasinConfig {
313    pub default_stream_config: OptionalStreamConfig,
314    pub create_stream_on_append: bool,
315    pub create_stream_on_read: bool,
316}
317
318impl BasinConfig {
319    pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
320        let BasinReconfiguration {
321            default_stream_config,
322            create_stream_on_append,
323            create_stream_on_read,
324        } = reconfiguration;
325
326        if let Maybe::Specified(default_stream_config) = default_stream_config {
327            self.default_stream_config = default_stream_config
328                .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
329                .unwrap_or_default();
330        }
331
332        if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
333            self.create_stream_on_append = create_stream_on_append;
334        }
335
336        if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
337            self.create_stream_on_read = create_stream_on_read;
338        }
339
340        self
341    }
342}
343
344impl From<BasinConfig> for BasinReconfiguration {
345    fn from(value: BasinConfig) -> Self {
346        let BasinConfig {
347            default_stream_config,
348            create_stream_on_append,
349            create_stream_on_read,
350        } = value;
351
352        Self {
353            default_stream_config: Some(default_stream_config.into()).into(),
354            create_stream_on_append: create_stream_on_append.into(),
355            create_stream_on_read: create_stream_on_read.into(),
356        }
357    }
358}
359
360#[derive(Debug, Clone, Default)]
361pub struct BasinReconfiguration {
362    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
363    pub create_stream_on_append: Maybe<bool>,
364    pub create_stream_on_read: Maybe<bool>,
365}