1use std::time::Duration;
27
28use enum_ordinalize::Ordinalize;
29
30use crate::{encryption::EncryptionAlgorithm, maybe::Maybe};
31
32#[derive(
33 Debug,
34 Default,
35 Clone,
36 Copy,
37 strum::Display,
38 strum::IntoStaticStr,
39 strum::EnumIter,
40 strum::FromRepr,
41 strum::EnumString,
42 PartialEq,
43 Eq,
44 Ordinalize,
45 Hash,
46)]
47#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
48#[repr(u8)]
49pub enum StorageClass {
50 #[strum(serialize = "standard")]
51 Standard = 1,
52 #[default]
53 #[strum(serialize = "express")]
54 Express = 2,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum RetentionPolicy {
59 Age(Duration),
60 Infinite(),
61}
62
63impl RetentionPolicy {
64 pub fn age(&self) -> Option<Duration> {
65 match self {
66 Self::Age(duration) => Some(*duration),
67 Self::Infinite() => None,
68 }
69 }
70}
71
72impl Default for RetentionPolicy {
73 fn default() -> Self {
74 const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
75
76 Self::Age(ONE_WEEK)
77 }
78}
79
80#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
81pub enum TimestampingMode {
82 #[default]
83 ClientPrefer,
84 ClientRequire,
85 Arrival,
86}
87
88#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
89pub struct TimestampingConfig {
90 pub mode: TimestampingMode,
91 pub uncapped: bool,
92}
93
94#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
95pub struct DeleteOnEmptyConfig {
96 pub min_age: Duration,
97}
98
99#[derive(Debug, Clone, Default, PartialEq, Eq)]
100pub struct StreamConfig {
101 pub storage_class: StorageClass,
102 pub retention_policy: RetentionPolicy,
103 pub timestamping: TimestampingConfig,
104 pub delete_on_empty: DeleteOnEmptyConfig,
105}
106
107#[derive(Debug, Clone, Default)]
108pub struct TimestampingReconfiguration {
109 pub mode: Maybe<Option<TimestampingMode>>,
110 pub uncapped: Maybe<Option<bool>>,
111}
112
113#[derive(Debug, Clone, Default)]
114pub struct DeleteOnEmptyReconfiguration {
115 pub min_age: Maybe<Option<Duration>>,
116}
117
118#[derive(Debug, Clone, Default)]
119pub struct StreamReconfiguration {
120 pub storage_class: Maybe<Option<StorageClass>>,
121 pub retention_policy: Maybe<Option<RetentionPolicy>>,
122 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
123 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
124}
125
126#[derive(Debug, Clone, Copy, Default)]
127pub struct OptionalTimestampingConfig {
128 pub mode: Option<TimestampingMode>,
129 pub uncapped: Option<bool>,
130}
131
132impl OptionalTimestampingConfig {
133 pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
134 if let Maybe::Specified(mode) = reconfiguration.mode {
135 self.mode = mode;
136 }
137 if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
138 self.uncapped = uncapped;
139 }
140 self
141 }
142
143 pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
144 let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
145 let uncapped = self
146 .uncapped
147 .or(basin_defaults.uncapped)
148 .unwrap_or_default();
149 TimestampingConfig { mode, uncapped }
150 }
151}
152
153impl From<OptionalTimestampingConfig> for TimestampingConfig {
154 fn from(value: OptionalTimestampingConfig) -> Self {
155 Self {
156 mode: value.mode.unwrap_or_default(),
157 uncapped: value.uncapped.unwrap_or_default(),
158 }
159 }
160}
161
162impl From<TimestampingConfig> for OptionalTimestampingConfig {
163 fn from(value: TimestampingConfig) -> Self {
164 Self {
165 mode: Some(value.mode),
166 uncapped: Some(value.uncapped),
167 }
168 }
169}
170
171impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
172 fn from(value: OptionalTimestampingConfig) -> Self {
173 Self {
174 mode: value.mode.into(),
175 uncapped: value.uncapped.into(),
176 }
177 }
178}
179
180#[derive(Debug, Clone, Default)]
181pub struct OptionalDeleteOnEmptyConfig {
182 pub min_age: Option<Duration>,
183}
184
185impl OptionalDeleteOnEmptyConfig {
186 pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
187 if let Maybe::Specified(min_age) = reconfiguration.min_age {
188 self.min_age = min_age;
189 }
190 self
191 }
192
193 pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
194 let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
195 DeleteOnEmptyConfig { min_age }
196 }
197}
198
199impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
200 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
201 Self {
202 min_age: value.min_age.unwrap_or_default(),
203 }
204 }
205}
206
207impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
208 fn from(value: DeleteOnEmptyConfig) -> Self {
209 Self {
210 min_age: Some(value.min_age),
211 }
212 }
213}
214
215impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
216 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
217 Self {
218 min_age: value.min_age.into(),
219 }
220 }
221}
222
223#[derive(Debug, Clone, Default)]
224pub struct OptionalStreamConfig {
225 pub storage_class: Option<StorageClass>,
226 pub retention_policy: Option<RetentionPolicy>,
227 pub timestamping: OptionalTimestampingConfig,
228 pub delete_on_empty: OptionalDeleteOnEmptyConfig,
229}
230
231impl OptionalStreamConfig {
232 pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
233 let StreamReconfiguration {
234 storage_class,
235 retention_policy,
236 timestamping,
237 delete_on_empty,
238 } = reconfiguration;
239 if let Maybe::Specified(storage_class) = storage_class {
240 self.storage_class = storage_class;
241 }
242 if let Maybe::Specified(retention_policy) = retention_policy {
243 self.retention_policy = retention_policy;
244 }
245 if let Maybe::Specified(timestamping) = timestamping {
246 self.timestamping = timestamping
247 .map(|ts| self.timestamping.reconfigure(ts))
248 .unwrap_or_default();
249 }
250 if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
251 self.delete_on_empty = delete_on_empty_reconfig
252 .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
253 .unwrap_or_default();
254 }
255 self
256 }
257
258 pub fn merge(self, basin_defaults: Self) -> StreamConfig {
259 let storage_class = self
260 .storage_class
261 .or(basin_defaults.storage_class)
262 .unwrap_or_default();
263
264 let retention_policy = self
265 .retention_policy
266 .or(basin_defaults.retention_policy)
267 .unwrap_or_default();
268
269 let timestamping = self.timestamping.merge(basin_defaults.timestamping);
270
271 let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
272
273 StreamConfig {
274 storage_class,
275 retention_policy,
276 timestamping,
277 delete_on_empty,
278 }
279 }
280}
281
282impl From<OptionalStreamConfig> for StreamReconfiguration {
283 fn from(value: OptionalStreamConfig) -> Self {
284 let OptionalStreamConfig {
285 storage_class,
286 retention_policy,
287 timestamping,
288 delete_on_empty,
289 } = value;
290
291 Self {
292 storage_class: storage_class.into(),
293 retention_policy: retention_policy.into(),
294 timestamping: Some(timestamping.into()).into(),
295 delete_on_empty: Some(delete_on_empty.into()).into(),
296 }
297 }
298}
299
300impl From<OptionalStreamConfig> for StreamConfig {
301 fn from(value: OptionalStreamConfig) -> Self {
302 let OptionalStreamConfig {
303 storage_class,
304 retention_policy,
305 timestamping,
306 delete_on_empty,
307 } = value;
308
309 Self {
310 storage_class: storage_class.unwrap_or_default(),
311 retention_policy: retention_policy.unwrap_or_default(),
312 timestamping: timestamping.into(),
313 delete_on_empty: delete_on_empty.into(),
314 }
315 }
316}
317
318impl From<StreamConfig> for OptionalStreamConfig {
319 fn from(value: StreamConfig) -> Self {
320 let StreamConfig {
321 storage_class,
322 retention_policy,
323 timestamping,
324 delete_on_empty,
325 } = value;
326
327 Self {
328 storage_class: Some(storage_class),
329 retention_policy: Some(retention_policy),
330 timestamping: timestamping.into(),
331 delete_on_empty: delete_on_empty.into(),
332 }
333 }
334}
335
336#[derive(Debug, Clone, Default)]
337pub struct BasinConfig {
338 pub default_stream_config: OptionalStreamConfig,
339 pub stream_cipher: Option<EncryptionAlgorithm>,
340 pub create_stream_on_append: bool,
341 pub create_stream_on_read: bool,
342}
343
344impl BasinConfig {
345 pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
346 let BasinReconfiguration {
347 default_stream_config,
348 stream_cipher,
349 create_stream_on_append,
350 create_stream_on_read,
351 } = reconfiguration;
352
353 if let Maybe::Specified(default_stream_config) = default_stream_config {
354 self.default_stream_config = default_stream_config
355 .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
356 .unwrap_or_default();
357 }
358
359 if let Maybe::Specified(stream_cipher) = stream_cipher {
360 self.stream_cipher = stream_cipher;
361 }
362
363 if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
364 self.create_stream_on_append = create_stream_on_append;
365 }
366
367 if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
368 self.create_stream_on_read = create_stream_on_read;
369 }
370
371 self
372 }
373}
374
375impl From<BasinConfig> for BasinReconfiguration {
376 fn from(value: BasinConfig) -> Self {
377 let BasinConfig {
378 default_stream_config,
379 stream_cipher,
380 create_stream_on_append,
381 create_stream_on_read,
382 } = value;
383
384 Self {
385 default_stream_config: Some(default_stream_config.into()).into(),
386 stream_cipher: stream_cipher.into(),
387 create_stream_on_append: create_stream_on_append.into(),
388 create_stream_on_read: create_stream_on_read.into(),
389 }
390 }
391}
392
393#[derive(Debug, Clone, Default)]
394pub struct BasinReconfiguration {
395 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
396 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
397 pub create_stream_on_append: Maybe<bool>,
398 pub create_stream_on_read: Maybe<bool>,
399}