1use std::time::Duration;
31
32use enum_ordinalize::Ordinalize;
33use enumset::EnumSet;
34
35use crate::{encryption::EncryptionMode, maybe::Maybe};
36
37#[derive(
38 Debug,
39 Default,
40 Clone,
41 Copy,
42 strum::Display,
43 strum::IntoStaticStr,
44 strum::EnumIter,
45 strum::FromRepr,
46 strum::EnumString,
47 PartialEq,
48 Eq,
49 Ordinalize,
50 Hash,
51)]
52#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
53#[repr(u8)]
54pub enum StorageClass {
55 #[strum(serialize = "standard")]
56 Standard = 1,
57 #[default]
58 #[strum(serialize = "express")]
59 Express = 2,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum RetentionPolicy {
64 Age(Duration),
65 Infinite(),
66}
67
68impl RetentionPolicy {
69 pub fn age(&self) -> Option<Duration> {
70 match self {
71 Self::Age(duration) => Some(*duration),
72 Self::Infinite() => None,
73 }
74 }
75}
76
77impl Default for RetentionPolicy {
78 fn default() -> Self {
79 const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
80
81 Self::Age(ONE_WEEK)
82 }
83}
84
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
86pub enum TimestampingMode {
87 #[default]
88 ClientPrefer,
89 ClientRequire,
90 Arrival,
91}
92
93#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
94pub struct TimestampingConfig {
95 pub mode: TimestampingMode,
96 pub uncapped: bool,
97}
98
99#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
100pub struct DeleteOnEmptyConfig {
101 pub min_age: Duration,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub struct EncryptionConfig {
106 pub allowed_modes: EnumSet<EncryptionMode>,
107}
108
109pub const DEFAULT_ALLOWED_ENCRYPTION_MODES: EnumSet<EncryptionMode> =
110 enumset::enum_set!(EncryptionMode::Plain);
111
112impl Default for EncryptionConfig {
113 fn default() -> Self {
114 Self {
115 allowed_modes: DEFAULT_ALLOWED_ENCRYPTION_MODES,
116 }
117 }
118}
119
120#[derive(Debug, Clone, Default, PartialEq, Eq)]
121pub struct StreamConfig {
122 pub storage_class: StorageClass,
123 pub retention_policy: RetentionPolicy,
124 pub timestamping: TimestampingConfig,
125 pub delete_on_empty: DeleteOnEmptyConfig,
126 pub encryption: EncryptionConfig,
127}
128
129#[derive(Debug, Clone, Default)]
130pub struct TimestampingReconfiguration {
131 pub mode: Maybe<Option<TimestampingMode>>,
132 pub uncapped: Maybe<Option<bool>>,
133}
134
135#[derive(Debug, Clone, Default)]
136pub struct DeleteOnEmptyReconfiguration {
137 pub min_age: Maybe<Option<Duration>>,
138}
139
140#[derive(Debug, Clone, Default)]
141pub struct EncryptionReconfiguration {
142 pub allowed_modes: Maybe<EnumSet<EncryptionMode>>,
143}
144
145#[derive(Debug, Clone, Default)]
146pub struct StreamReconfiguration {
147 pub storage_class: Maybe<Option<StorageClass>>,
148 pub retention_policy: Maybe<Option<RetentionPolicy>>,
149 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
150 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
151 pub encryption: Maybe<Option<EncryptionReconfiguration>>,
152}
153
154#[derive(Debug, Clone, Copy, Default)]
155pub struct OptionalTimestampingConfig {
156 pub mode: Option<TimestampingMode>,
157 pub uncapped: Option<bool>,
158}
159
160impl OptionalTimestampingConfig {
161 pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
162 if let Maybe::Specified(mode) = reconfiguration.mode {
163 self.mode = mode;
164 }
165 if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
166 self.uncapped = uncapped;
167 }
168 self
169 }
170
171 pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
172 let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
173 let uncapped = self
174 .uncapped
175 .or(basin_defaults.uncapped)
176 .unwrap_or_default();
177 TimestampingConfig { mode, uncapped }
178 }
179}
180
181impl From<OptionalTimestampingConfig> for TimestampingConfig {
182 fn from(value: OptionalTimestampingConfig) -> Self {
183 Self {
184 mode: value.mode.unwrap_or_default(),
185 uncapped: value.uncapped.unwrap_or_default(),
186 }
187 }
188}
189
190impl From<TimestampingConfig> for OptionalTimestampingConfig {
191 fn from(value: TimestampingConfig) -> Self {
192 Self {
193 mode: Some(value.mode),
194 uncapped: Some(value.uncapped),
195 }
196 }
197}
198
199impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
200 fn from(value: OptionalTimestampingConfig) -> Self {
201 Self {
202 mode: value.mode.into(),
203 uncapped: value.uncapped.into(),
204 }
205 }
206}
207
208#[derive(Debug, Clone, Default)]
209pub struct OptionalDeleteOnEmptyConfig {
210 pub min_age: Option<Duration>,
211}
212
213impl OptionalDeleteOnEmptyConfig {
214 pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
215 if let Maybe::Specified(min_age) = reconfiguration.min_age {
216 self.min_age = min_age;
217 }
218 self
219 }
220
221 pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
222 let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
223 DeleteOnEmptyConfig { min_age }
224 }
225}
226
227impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
228 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
229 Self {
230 min_age: value.min_age.unwrap_or_default(),
231 }
232 }
233}
234
235impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
236 fn from(value: DeleteOnEmptyConfig) -> Self {
237 Self {
238 min_age: Some(value.min_age),
239 }
240 }
241}
242
243impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
244 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
245 Self {
246 min_age: value.min_age.into(),
247 }
248 }
249}
250
251#[derive(Debug, Clone, Default)]
252pub struct OptionalEncryptionConfig {
253 pub allowed_modes: Option<EnumSet<EncryptionMode>>,
254}
255
256impl OptionalEncryptionConfig {
257 pub fn reconfigure(mut self, reconfiguration: EncryptionReconfiguration) -> Self {
258 if let Maybe::Specified(allowed_modes) = reconfiguration.allowed_modes {
259 self.allowed_modes = (!allowed_modes.is_empty()).then_some(allowed_modes);
260 }
261 self
262 }
263
264 pub fn merge(self, basin_defaults: Self) -> EncryptionConfig {
265 let allowed_modes = self
266 .allowed_modes
267 .or(basin_defaults.allowed_modes)
268 .unwrap_or(DEFAULT_ALLOWED_ENCRYPTION_MODES);
269 EncryptionConfig { allowed_modes }
270 }
271}
272
273impl From<OptionalEncryptionConfig> for EncryptionConfig {
274 fn from(value: OptionalEncryptionConfig) -> Self {
275 Self {
276 allowed_modes: value
277 .allowed_modes
278 .unwrap_or(DEFAULT_ALLOWED_ENCRYPTION_MODES),
279 }
280 }
281}
282
283impl From<EncryptionConfig> for OptionalEncryptionConfig {
284 fn from(value: EncryptionConfig) -> Self {
285 Self {
286 allowed_modes: Some(value.allowed_modes),
287 }
288 }
289}
290
291impl From<OptionalEncryptionConfig> for EncryptionReconfiguration {
292 fn from(value: OptionalEncryptionConfig) -> Self {
293 Self {
294 allowed_modes: Maybe::Specified(value.allowed_modes.unwrap_or_default()),
295 }
296 }
297}
298
299#[derive(Debug, Clone, Default)]
300pub struct OptionalStreamConfig {
301 pub storage_class: Option<StorageClass>,
302 pub retention_policy: Option<RetentionPolicy>,
303 pub timestamping: OptionalTimestampingConfig,
304 pub delete_on_empty: OptionalDeleteOnEmptyConfig,
305 pub encryption: OptionalEncryptionConfig,
306}
307
308impl OptionalStreamConfig {
309 pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
310 let StreamReconfiguration {
311 storage_class,
312 retention_policy,
313 timestamping,
314 delete_on_empty,
315 encryption,
316 } = reconfiguration;
317 if let Maybe::Specified(storage_class) = storage_class {
318 self.storage_class = storage_class;
319 }
320 if let Maybe::Specified(retention_policy) = retention_policy {
321 self.retention_policy = retention_policy;
322 }
323 if let Maybe::Specified(timestamping) = timestamping {
324 self.timestamping = timestamping
325 .map(|ts| self.timestamping.reconfigure(ts))
326 .unwrap_or_default();
327 }
328 if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
329 self.delete_on_empty = delete_on_empty_reconfig
330 .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
331 .unwrap_or_default();
332 }
333 if let Maybe::Specified(encryption) = encryption {
334 self.encryption = encryption
335 .map(|enc| self.encryption.reconfigure(enc))
336 .unwrap_or_default();
337 }
338 self
339 }
340
341 pub fn merge(self, basin_defaults: Self) -> StreamConfig {
342 let storage_class = self
343 .storage_class
344 .or(basin_defaults.storage_class)
345 .unwrap_or_default();
346
347 let retention_policy = self
348 .retention_policy
349 .or(basin_defaults.retention_policy)
350 .unwrap_or_default();
351
352 let timestamping = self.timestamping.merge(basin_defaults.timestamping);
353
354 let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
355
356 let encryption = self.encryption.merge(basin_defaults.encryption);
357
358 StreamConfig {
359 storage_class,
360 retention_policy,
361 timestamping,
362 delete_on_empty,
363 encryption,
364 }
365 }
366}
367
368impl From<OptionalStreamConfig> for StreamReconfiguration {
369 fn from(value: OptionalStreamConfig) -> Self {
370 let OptionalStreamConfig {
371 storage_class,
372 retention_policy,
373 timestamping,
374 delete_on_empty,
375 encryption,
376 } = value;
377
378 Self {
379 storage_class: storage_class.into(),
380 retention_policy: retention_policy.into(),
381 timestamping: Some(timestamping.into()).into(),
382 delete_on_empty: Some(delete_on_empty.into()).into(),
383 encryption: Some(encryption.into()).into(),
384 }
385 }
386}
387
388impl From<OptionalStreamConfig> for StreamConfig {
389 fn from(value: OptionalStreamConfig) -> Self {
390 let OptionalStreamConfig {
391 storage_class,
392 retention_policy,
393 timestamping,
394 delete_on_empty,
395 encryption,
396 } = value;
397
398 Self {
399 storage_class: storage_class.unwrap_or_default(),
400 retention_policy: retention_policy.unwrap_or_default(),
401 timestamping: timestamping.into(),
402 delete_on_empty: delete_on_empty.into(),
403 encryption: encryption.into(),
404 }
405 }
406}
407
408impl From<StreamConfig> for OptionalStreamConfig {
409 fn from(value: StreamConfig) -> Self {
410 let StreamConfig {
411 storage_class,
412 retention_policy,
413 timestamping,
414 delete_on_empty,
415 encryption,
416 } = value;
417
418 Self {
419 storage_class: Some(storage_class),
420 retention_policy: Some(retention_policy),
421 timestamping: timestamping.into(),
422 delete_on_empty: delete_on_empty.into(),
423 encryption: encryption.into(),
424 }
425 }
426}
427
428#[derive(Debug, Clone, Default)]
429pub struct BasinConfig {
430 pub default_stream_config: OptionalStreamConfig,
431 pub create_stream_on_append: bool,
432 pub create_stream_on_read: bool,
433}
434
435impl BasinConfig {
436 pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
437 let BasinReconfiguration {
438 default_stream_config,
439 create_stream_on_append,
440 create_stream_on_read,
441 } = reconfiguration;
442
443 if let Maybe::Specified(default_stream_config) = default_stream_config {
444 self.default_stream_config = default_stream_config
445 .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
446 .unwrap_or_default();
447 }
448
449 if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
450 self.create_stream_on_append = create_stream_on_append;
451 }
452
453 if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
454 self.create_stream_on_read = create_stream_on_read;
455 }
456
457 self
458 }
459}
460
461impl From<BasinConfig> for BasinReconfiguration {
462 fn from(value: BasinConfig) -> Self {
463 let BasinConfig {
464 default_stream_config,
465 create_stream_on_append,
466 create_stream_on_read,
467 } = value;
468
469 Self {
470 default_stream_config: Some(default_stream_config.into()).into(),
471 create_stream_on_append: create_stream_on_append.into(),
472 create_stream_on_read: create_stream_on_read.into(),
473 }
474 }
475}
476
477#[derive(Debug, Clone, Default)]
478pub struct BasinReconfiguration {
479 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
480 pub create_stream_on_append: Maybe<bool>,
481 pub create_stream_on_read: Maybe<bool>,
482}