1use std::time::Duration;
31
32use enum_ordinalize::Ordinalize;
33
34use crate::maybe::Maybe;
35
36#[derive(
37 Debug,
38 Default,
39 Clone,
40 Copy,
41 strum::Display,
42 strum::IntoStaticStr,
43 strum::EnumIter,
44 strum::FromRepr,
45 strum::EnumString,
46 PartialEq,
47 Eq,
48 Ordinalize,
49 Hash,
50)]
51#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
52#[repr(u8)]
53pub enum StorageClass {
54 #[strum(serialize = "standard")]
55 Standard = 1,
56 #[default]
57 #[strum(serialize = "express")]
58 Express = 2,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum RetentionPolicy {
63 Age(Duration),
64 Infinite(),
65}
66
67impl RetentionPolicy {
68 pub fn age(&self) -> Option<Duration> {
69 match self {
70 Self::Age(duration) => Some(*duration),
71 Self::Infinite() => None,
72 }
73 }
74}
75
76impl Default for RetentionPolicy {
77 fn default() -> Self {
78 const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
79
80 Self::Age(ONE_WEEK)
81 }
82}
83
84#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
85pub enum TimestampingMode {
86 #[default]
87 ClientPrefer,
88 ClientRequire,
89 Arrival,
90}
91
92#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
93pub struct TimestampingConfig {
94 pub mode: TimestampingMode,
95 pub uncapped: bool,
96}
97
98#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
99pub struct DeleteOnEmptyConfig {
100 pub min_age: Duration,
101}
102
103#[derive(Debug, Clone, Default, PartialEq, Eq)]
104pub struct StreamConfig {
105 pub storage_class: StorageClass,
106 pub retention_policy: RetentionPolicy,
107 pub timestamping: TimestampingConfig,
108 pub delete_on_empty: DeleteOnEmptyConfig,
109}
110
111#[derive(Debug, Clone, Default)]
112pub struct TimestampingReconfiguration {
113 pub mode: Maybe<Option<TimestampingMode>>,
114 pub uncapped: Maybe<Option<bool>>,
115}
116
117#[derive(Debug, Clone, Default)]
118pub struct DeleteOnEmptyReconfiguration {
119 pub min_age: Maybe<Option<Duration>>,
120}
121
122#[derive(Debug, Clone, Default)]
123pub struct StreamReconfiguration {
124 pub storage_class: Maybe<Option<StorageClass>>,
125 pub retention_policy: Maybe<Option<RetentionPolicy>>,
126 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
127 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
128}
129
130#[derive(Debug, Clone, Copy, Default)]
131pub struct OptionalTimestampingConfig {
132 pub mode: Option<TimestampingMode>,
133 pub uncapped: Option<bool>,
134}
135
136impl OptionalTimestampingConfig {
137 pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
138 if let Maybe::Specified(mode) = reconfiguration.mode {
139 self.mode = mode;
140 }
141 if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
142 self.uncapped = uncapped;
143 }
144 self
145 }
146
147 pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
148 let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
149 let uncapped = self
150 .uncapped
151 .or(basin_defaults.uncapped)
152 .unwrap_or_default();
153 TimestampingConfig { mode, uncapped }
154 }
155}
156
157impl From<OptionalTimestampingConfig> for TimestampingConfig {
158 fn from(value: OptionalTimestampingConfig) -> Self {
159 Self {
160 mode: value.mode.unwrap_or_default(),
161 uncapped: value.uncapped.unwrap_or_default(),
162 }
163 }
164}
165
166impl From<TimestampingConfig> for OptionalTimestampingConfig {
167 fn from(value: TimestampingConfig) -> Self {
168 Self {
169 mode: Some(value.mode),
170 uncapped: Some(value.uncapped),
171 }
172 }
173}
174
175impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
176 fn from(value: OptionalTimestampingConfig) -> Self {
177 Self {
178 mode: value.mode.into(),
179 uncapped: value.uncapped.into(),
180 }
181 }
182}
183
184#[derive(Debug, Clone, Default)]
185pub struct OptionalDeleteOnEmptyConfig {
186 pub min_age: Option<Duration>,
187}
188
189impl OptionalDeleteOnEmptyConfig {
190 pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
191 if let Maybe::Specified(min_age) = reconfiguration.min_age {
192 self.min_age = min_age;
193 }
194 self
195 }
196
197 pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
198 let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
199 DeleteOnEmptyConfig { min_age }
200 }
201}
202
203impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
204 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
205 Self {
206 min_age: value.min_age.unwrap_or_default(),
207 }
208 }
209}
210
211impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
212 fn from(value: DeleteOnEmptyConfig) -> Self {
213 Self {
214 min_age: Some(value.min_age),
215 }
216 }
217}
218
219impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
220 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
221 Self {
222 min_age: value.min_age.into(),
223 }
224 }
225}
226
227#[derive(Debug, Clone, Default)]
228pub struct OptionalStreamConfig {
229 pub storage_class: Option<StorageClass>,
230 pub retention_policy: Option<RetentionPolicy>,
231 pub timestamping: OptionalTimestampingConfig,
232 pub delete_on_empty: OptionalDeleteOnEmptyConfig,
233}
234
235impl OptionalStreamConfig {
236 pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
237 let StreamReconfiguration {
238 storage_class,
239 retention_policy,
240 timestamping,
241 delete_on_empty,
242 } = reconfiguration;
243 if let Maybe::Specified(storage_class) = storage_class {
244 self.storage_class = storage_class;
245 }
246 if let Maybe::Specified(retention_policy) = retention_policy {
247 self.retention_policy = retention_policy;
248 }
249 if let Maybe::Specified(timestamping) = timestamping {
250 self.timestamping = timestamping
251 .map(|ts| self.timestamping.reconfigure(ts))
252 .unwrap_or_default();
253 }
254 if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
255 self.delete_on_empty = delete_on_empty_reconfig
256 .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
257 .unwrap_or_default();
258 }
259 self
260 }
261
262 pub fn merge(self, basin_defaults: Self) -> StreamConfig {
263 let storage_class = self
264 .storage_class
265 .or(basin_defaults.storage_class)
266 .unwrap_or_default();
267
268 let retention_policy = self
269 .retention_policy
270 .or(basin_defaults.retention_policy)
271 .unwrap_or_default();
272
273 let timestamping = self.timestamping.merge(basin_defaults.timestamping);
274
275 let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
276
277 StreamConfig {
278 storage_class,
279 retention_policy,
280 timestamping,
281 delete_on_empty,
282 }
283 }
284}
285
286impl From<OptionalStreamConfig> for StreamReconfiguration {
287 fn from(value: OptionalStreamConfig) -> Self {
288 let OptionalStreamConfig {
289 storage_class,
290 retention_policy,
291 timestamping,
292 delete_on_empty,
293 } = value;
294
295 Self {
296 storage_class: storage_class.into(),
297 retention_policy: retention_policy.into(),
298 timestamping: Some(timestamping.into()).into(),
299 delete_on_empty: Some(delete_on_empty.into()).into(),
300 }
301 }
302}
303
304impl From<OptionalStreamConfig> for StreamConfig {
305 fn from(value: OptionalStreamConfig) -> Self {
306 let OptionalStreamConfig {
307 storage_class,
308 retention_policy,
309 timestamping,
310 delete_on_empty,
311 } = value;
312
313 Self {
314 storage_class: storage_class.unwrap_or_default(),
315 retention_policy: retention_policy.unwrap_or_default(),
316 timestamping: timestamping.into(),
317 delete_on_empty: delete_on_empty.into(),
318 }
319 }
320}
321
322impl From<StreamConfig> for OptionalStreamConfig {
323 fn from(value: StreamConfig) -> Self {
324 let StreamConfig {
325 storage_class,
326 retention_policy,
327 timestamping,
328 delete_on_empty,
329 } = value;
330
331 Self {
332 storage_class: Some(storage_class),
333 retention_policy: Some(retention_policy),
334 timestamping: timestamping.into(),
335 delete_on_empty: delete_on_empty.into(),
336 }
337 }
338}
339
340#[derive(Debug, Clone, Default)]
341pub struct BasinConfig {
342 pub default_stream_config: OptionalStreamConfig,
343 pub create_stream_on_append: bool,
344 pub create_stream_on_read: bool,
345}
346
347impl BasinConfig {
348 pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
349 let BasinReconfiguration {
350 default_stream_config,
351 create_stream_on_append,
352 create_stream_on_read,
353 } = reconfiguration;
354
355 if let Maybe::Specified(default_stream_config) = default_stream_config {
356 self.default_stream_config = default_stream_config
357 .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
358 .unwrap_or_default();
359 }
360
361 if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
362 self.create_stream_on_append = create_stream_on_append;
363 }
364
365 if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
366 self.create_stream_on_read = create_stream_on_read;
367 }
368
369 self
370 }
371}
372
373impl From<BasinConfig> for BasinReconfiguration {
374 fn from(value: BasinConfig) -> Self {
375 let BasinConfig {
376 default_stream_config,
377 create_stream_on_append,
378 create_stream_on_read,
379 } = value;
380
381 Self {
382 default_stream_config: Some(default_stream_config.into()).into(),
383 create_stream_on_append: create_stream_on_append.into(),
384 create_stream_on_read: create_stream_on_read.into(),
385 }
386 }
387}
388
389#[derive(Debug, Clone, Default)]
390pub struct BasinReconfiguration {
391 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
392 pub create_stream_on_append: Maybe<bool>,
393 pub create_stream_on_read: Maybe<bool>,
394}