1use std::time::Duration;
31
32use enum_ordinalize::Ordinalize;
33use enumset::EnumSet;
34
35use crate::{encryption::EncryptionMode, maybe::Maybe};
36
37#[derive(
38 Debug,
39 Default,
40 Clone,
41 Copy,
42 strum::Display,
43 strum::IntoStaticStr,
44 strum::EnumIter,
45 strum::FromRepr,
46 strum::EnumString,
47 PartialEq,
48 Eq,
49 Ordinalize,
50 Hash,
51)]
52#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
53#[repr(u8)]
54pub enum StorageClass {
55 #[strum(serialize = "standard")]
56 Standard = 1,
57 #[default]
58 #[strum(serialize = "express")]
59 Express = 2,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum RetentionPolicy {
64 Age(Duration),
65 Infinite(),
66}
67
68impl RetentionPolicy {
69 pub fn age(&self) -> Option<Duration> {
70 match self {
71 Self::Age(duration) => Some(*duration),
72 Self::Infinite() => None,
73 }
74 }
75}
76
77impl Default for RetentionPolicy {
78 fn default() -> Self {
79 const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
80
81 Self::Age(ONE_WEEK)
82 }
83}
84
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
86pub enum TimestampingMode {
87 #[default]
88 ClientPrefer,
89 ClientRequire,
90 Arrival,
91}
92
93#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
94pub struct TimestampingConfig {
95 pub mode: TimestampingMode,
96 pub uncapped: bool,
97}
98
99#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
100pub struct DeleteOnEmptyConfig {
101 pub min_age: Duration,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub struct EncryptionConfig {
106 pub allowed_modes: EnumSet<EncryptionMode>,
107}
108
109pub const DEFAULT_ALLOWED_ENCRYPTION_MODES: EnumSet<EncryptionMode> =
110 enumset::enum_set!(EncryptionMode::Plain);
111
112impl Default for EncryptionConfig {
113 fn default() -> Self {
114 Self {
115 allowed_modes: DEFAULT_ALLOWED_ENCRYPTION_MODES,
116 }
117 }
118}
119
120#[derive(Debug, Clone, Default, PartialEq, Eq)]
121pub struct StreamConfig {
122 pub storage_class: StorageClass,
123 pub retention_policy: RetentionPolicy,
124 pub timestamping: TimestampingConfig,
125 pub delete_on_empty: DeleteOnEmptyConfig,
126 pub encryption: EncryptionConfig,
127}
128
129#[derive(Debug, Clone, Default)]
130pub struct TimestampingReconfiguration {
131 pub mode: Maybe<Option<TimestampingMode>>,
132 pub uncapped: Maybe<Option<bool>>,
133}
134
135#[derive(Debug, Clone, Default)]
136pub struct DeleteOnEmptyReconfiguration {
137 pub min_age: Maybe<Option<Duration>>,
138}
139
140#[derive(Debug, Clone, Default)]
141pub struct EncryptionReconfiguration {
142 pub allowed_modes: Maybe<EnumSet<EncryptionMode>>,
143}
144
145#[derive(Debug, Clone, Default)]
146pub struct StreamReconfiguration {
147 pub storage_class: Maybe<Option<StorageClass>>,
148 pub retention_policy: Maybe<Option<RetentionPolicy>>,
149 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
150 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
151 pub encryption: Maybe<Option<EncryptionReconfiguration>>,
152}
153
154#[derive(Debug, Clone, Copy, Default)]
155pub struct OptionalTimestampingConfig {
156 pub mode: Option<TimestampingMode>,
157 pub uncapped: Option<bool>,
158}
159
160impl OptionalTimestampingConfig {
161 pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
162 if let Maybe::Specified(mode) = reconfiguration.mode {
163 self.mode = mode;
164 }
165 if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
166 self.uncapped = uncapped;
167 }
168 self
169 }
170
171 pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
172 let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
173 let uncapped = self
174 .uncapped
175 .or(basin_defaults.uncapped)
176 .unwrap_or_default();
177 TimestampingConfig { mode, uncapped }
178 }
179}
180
181impl From<OptionalTimestampingConfig> for TimestampingConfig {
182 fn from(value: OptionalTimestampingConfig) -> Self {
183 Self {
184 mode: value.mode.unwrap_or_default(),
185 uncapped: value.uncapped.unwrap_or_default(),
186 }
187 }
188}
189
190impl From<TimestampingConfig> for OptionalTimestampingConfig {
191 fn from(value: TimestampingConfig) -> Self {
192 Self {
193 mode: Some(value.mode),
194 uncapped: Some(value.uncapped),
195 }
196 }
197}
198
199impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
200 fn from(value: OptionalTimestampingConfig) -> Self {
201 Self {
202 mode: value.mode.into(),
203 uncapped: value.uncapped.into(),
204 }
205 }
206}
207
208#[derive(Debug, Clone, Default)]
209pub struct OptionalDeleteOnEmptyConfig {
210 pub min_age: Option<Duration>,
211}
212
213impl OptionalDeleteOnEmptyConfig {
214 pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
215 if let Maybe::Specified(min_age) = reconfiguration.min_age {
216 self.min_age = min_age;
217 }
218 self
219 }
220
221 pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
222 let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
223 DeleteOnEmptyConfig { min_age }
224 }
225}
226
227impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
228 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
229 Self {
230 min_age: value.min_age.unwrap_or_default(),
231 }
232 }
233}
234
235impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
236 fn from(value: DeleteOnEmptyConfig) -> Self {
237 Self {
238 min_age: Some(value.min_age),
239 }
240 }
241}
242
243impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
244 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
245 Self {
246 min_age: value.min_age.into(),
247 }
248 }
249}
250
251#[derive(Debug, Clone, Default)]
252pub struct OptionalEncryptionConfig {
253 pub allowed_modes: EnumSet<EncryptionMode>,
254}
255
256impl OptionalEncryptionConfig {
257 pub fn reconfigure(mut self, reconfiguration: EncryptionReconfiguration) -> Self {
258 if let Maybe::Specified(allowed_modes) = reconfiguration.allowed_modes {
259 self.allowed_modes = allowed_modes;
260 }
261 self
262 }
263
264 pub fn merge(self, basin_defaults: Self) -> EncryptionConfig {
265 let allowed_modes = if !self.allowed_modes.is_empty() {
266 self.allowed_modes
267 } else if !basin_defaults.allowed_modes.is_empty() {
268 basin_defaults.allowed_modes
269 } else {
270 DEFAULT_ALLOWED_ENCRYPTION_MODES
271 };
272 EncryptionConfig { allowed_modes }
273 }
274}
275
276impl From<OptionalEncryptionConfig> for EncryptionConfig {
277 fn from(value: OptionalEncryptionConfig) -> Self {
278 Self {
279 allowed_modes: value.allowed_modes,
280 }
281 }
282}
283
284impl From<EncryptionConfig> for OptionalEncryptionConfig {
285 fn from(value: EncryptionConfig) -> Self {
286 Self {
287 allowed_modes: value.allowed_modes,
288 }
289 }
290}
291
292impl From<OptionalEncryptionConfig> for EncryptionReconfiguration {
293 fn from(value: OptionalEncryptionConfig) -> Self {
294 Self {
295 allowed_modes: Maybe::Specified(value.allowed_modes),
296 }
297 }
298}
299
300#[derive(Debug, Clone, Default)]
301pub struct OptionalStreamConfig {
302 pub storage_class: Option<StorageClass>,
303 pub retention_policy: Option<RetentionPolicy>,
304 pub timestamping: OptionalTimestampingConfig,
305 pub delete_on_empty: OptionalDeleteOnEmptyConfig,
306 pub encryption: OptionalEncryptionConfig,
307}
308
309impl OptionalStreamConfig {
310 pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
311 let StreamReconfiguration {
312 storage_class,
313 retention_policy,
314 timestamping,
315 delete_on_empty,
316 encryption,
317 } = reconfiguration;
318 if let Maybe::Specified(storage_class) = storage_class {
319 self.storage_class = storage_class;
320 }
321 if let Maybe::Specified(retention_policy) = retention_policy {
322 self.retention_policy = retention_policy;
323 }
324 if let Maybe::Specified(timestamping) = timestamping {
325 self.timestamping = timestamping
326 .map(|ts| self.timestamping.reconfigure(ts))
327 .unwrap_or_default();
328 }
329 if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
330 self.delete_on_empty = delete_on_empty_reconfig
331 .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
332 .unwrap_or_default();
333 }
334 if let Maybe::Specified(encryption) = encryption {
335 self.encryption = encryption
336 .map(|enc| self.encryption.reconfigure(enc))
337 .unwrap_or_default();
338 }
339 self
340 }
341
342 pub fn merge(self, basin_defaults: Self) -> StreamConfig {
343 let storage_class = self
344 .storage_class
345 .or(basin_defaults.storage_class)
346 .unwrap_or_default();
347
348 let retention_policy = self
349 .retention_policy
350 .or(basin_defaults.retention_policy)
351 .unwrap_or_default();
352
353 let timestamping = self.timestamping.merge(basin_defaults.timestamping);
354
355 let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
356
357 let encryption = self.encryption.merge(basin_defaults.encryption);
358
359 StreamConfig {
360 storage_class,
361 retention_policy,
362 timestamping,
363 delete_on_empty,
364 encryption,
365 }
366 }
367}
368
369impl From<OptionalStreamConfig> for StreamReconfiguration {
370 fn from(value: OptionalStreamConfig) -> Self {
371 let OptionalStreamConfig {
372 storage_class,
373 retention_policy,
374 timestamping,
375 delete_on_empty,
376 encryption,
377 } = value;
378
379 Self {
380 storage_class: storage_class.into(),
381 retention_policy: retention_policy.into(),
382 timestamping: Some(timestamping.into()).into(),
383 delete_on_empty: Some(delete_on_empty.into()).into(),
384 encryption: Some(encryption.into()).into(),
385 }
386 }
387}
388
389impl From<OptionalStreamConfig> for StreamConfig {
390 fn from(value: OptionalStreamConfig) -> Self {
391 let OptionalStreamConfig {
392 storage_class,
393 retention_policy,
394 timestamping,
395 delete_on_empty,
396 encryption,
397 } = value;
398
399 Self {
400 storage_class: storage_class.unwrap_or_default(),
401 retention_policy: retention_policy.unwrap_or_default(),
402 timestamping: timestamping.into(),
403 delete_on_empty: delete_on_empty.into(),
404 encryption: encryption.into(),
405 }
406 }
407}
408
409impl From<StreamConfig> for OptionalStreamConfig {
410 fn from(value: StreamConfig) -> Self {
411 let StreamConfig {
412 storage_class,
413 retention_policy,
414 timestamping,
415 delete_on_empty,
416 encryption,
417 } = value;
418
419 Self {
420 storage_class: Some(storage_class),
421 retention_policy: Some(retention_policy),
422 timestamping: timestamping.into(),
423 delete_on_empty: delete_on_empty.into(),
424 encryption: encryption.into(),
425 }
426 }
427}
428
429#[derive(Debug, Clone, Default)]
430pub struct BasinConfig {
431 pub default_stream_config: OptionalStreamConfig,
432 pub create_stream_on_append: bool,
433 pub create_stream_on_read: bool,
434}
435
436impl BasinConfig {
437 pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
438 let BasinReconfiguration {
439 default_stream_config,
440 create_stream_on_append,
441 create_stream_on_read,
442 } = reconfiguration;
443
444 if let Maybe::Specified(default_stream_config) = default_stream_config {
445 self.default_stream_config = default_stream_config
446 .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
447 .unwrap_or_default();
448 }
449
450 if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
451 self.create_stream_on_append = create_stream_on_append;
452 }
453
454 if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
455 self.create_stream_on_read = create_stream_on_read;
456 }
457
458 self
459 }
460}
461
462impl From<BasinConfig> for BasinReconfiguration {
463 fn from(value: BasinConfig) -> Self {
464 let BasinConfig {
465 default_stream_config,
466 create_stream_on_append,
467 create_stream_on_read,
468 } = value;
469
470 Self {
471 default_stream_config: Some(default_stream_config.into()).into(),
472 create_stream_on_append: create_stream_on_append.into(),
473 create_stream_on_read: create_stream_on_read.into(),
474 }
475 }
476}
477
478#[derive(Debug, Clone, Default)]
479pub struct BasinReconfiguration {
480 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
481 pub create_stream_on_append: Maybe<bool>,
482 pub create_stream_on_read: Maybe<bool>,
483}