1use std::time::Duration;
27
28use crate::{encryption::EncryptionAlgorithm, maybe::Maybe};
29
30#[derive(
31 Debug,
32 Default,
33 Clone,
34 Copy,
35 strum::Display,
36 strum::IntoStaticStr,
37 strum::EnumIter,
38 strum::FromRepr,
39 strum::EnumString,
40 PartialEq,
41 Eq,
42 Hash,
43)]
44#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
45#[repr(u8)]
46pub enum StorageClass {
47 #[strum(serialize = "standard")]
48 Standard = 1,
49 #[default]
50 #[strum(serialize = "express")]
51 Express = 2,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum RetentionPolicy {
56 Age(Duration),
57 Infinite(),
58}
59
60impl RetentionPolicy {
61 pub fn age(&self) -> Option<Duration> {
62 match self {
63 Self::Age(duration) => Some(*duration),
64 Self::Infinite() => None,
65 }
66 }
67}
68
69impl Default for RetentionPolicy {
70 fn default() -> Self {
71 const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
72
73 Self::Age(ONE_WEEK)
74 }
75}
76
77#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
78pub enum TimestampingMode {
79 #[default]
80 ClientPrefer,
81 ClientRequire,
82 Arrival,
83}
84
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
86pub struct TimestampingConfig {
87 pub mode: TimestampingMode,
88 pub uncapped: bool,
89}
90
91#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
92pub struct DeleteOnEmptyConfig {
93 pub min_age: Duration,
94}
95
96impl DeleteOnEmptyConfig {
97 pub fn min_age(&self) -> Option<Duration> {
98 Some(self.min_age).filter(|age| !age.is_zero())
99 }
100}
101
102#[derive(Debug, Clone, Default, PartialEq, Eq)]
103pub struct StreamConfig {
104 pub storage_class: StorageClass,
105 pub retention_policy: RetentionPolicy,
106 pub timestamping: TimestampingConfig,
107 pub delete_on_empty: DeleteOnEmptyConfig,
108}
109
110#[derive(Debug, Clone, Default)]
111pub struct TimestampingReconfiguration {
112 pub mode: Maybe<Option<TimestampingMode>>,
113 pub uncapped: Maybe<Option<bool>>,
114}
115
116#[derive(Debug, Clone, Default)]
117pub struct DeleteOnEmptyReconfiguration {
118 pub min_age: Maybe<Option<Duration>>,
119}
120
121#[derive(Debug, Clone, Default)]
122pub struct StreamReconfiguration {
123 pub storage_class: Maybe<Option<StorageClass>>,
124 pub retention_policy: Maybe<Option<RetentionPolicy>>,
125 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
126 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
127}
128
129#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
130pub struct OptionalTimestampingConfig {
131 pub mode: Option<TimestampingMode>,
132 pub uncapped: Option<bool>,
133}
134
135impl OptionalTimestampingConfig {
136 pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
137 if let Maybe::Specified(mode) = reconfiguration.mode {
138 self.mode = mode;
139 }
140 if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
141 self.uncapped = uncapped;
142 }
143 self
144 }
145
146 pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
147 let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
148 let uncapped = self
149 .uncapped
150 .or(basin_defaults.uncapped)
151 .unwrap_or_default();
152 TimestampingConfig { mode, uncapped }
153 }
154}
155
156impl From<OptionalTimestampingConfig> for TimestampingConfig {
157 fn from(value: OptionalTimestampingConfig) -> Self {
158 Self {
159 mode: value.mode.unwrap_or_default(),
160 uncapped: value.uncapped.unwrap_or_default(),
161 }
162 }
163}
164
165impl From<TimestampingConfig> for OptionalTimestampingConfig {
166 fn from(value: TimestampingConfig) -> Self {
167 Self {
168 mode: Some(value.mode),
169 uncapped: Some(value.uncapped),
170 }
171 }
172}
173
174impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
175 fn from(value: OptionalTimestampingConfig) -> Self {
176 Self {
177 mode: value.mode.into(),
178 uncapped: value.uncapped.into(),
179 }
180 }
181}
182
183#[derive(Debug, Clone, Default, PartialEq, Eq)]
184pub struct OptionalDeleteOnEmptyConfig {
185 pub min_age: Option<Duration>,
186}
187
188impl OptionalDeleteOnEmptyConfig {
189 pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
190 if let Maybe::Specified(min_age) = reconfiguration.min_age {
191 self.min_age = min_age;
192 }
193 self
194 }
195
196 pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
197 let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
198 DeleteOnEmptyConfig { min_age }
199 }
200}
201
202impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
203 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
204 Self {
205 min_age: value.min_age.unwrap_or_default(),
206 }
207 }
208}
209
210impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
211 fn from(value: DeleteOnEmptyConfig) -> Self {
212 Self {
213 min_age: Some(value.min_age),
214 }
215 }
216}
217
218impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
219 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
220 Self {
221 min_age: value.min_age.into(),
222 }
223 }
224}
225
226#[derive(Debug, Clone, Default, PartialEq, Eq)]
227pub struct OptionalStreamConfig {
228 pub storage_class: Option<StorageClass>,
229 pub retention_policy: Option<RetentionPolicy>,
230 pub timestamping: OptionalTimestampingConfig,
231 pub delete_on_empty: OptionalDeleteOnEmptyConfig,
232}
233
234impl OptionalStreamConfig {
235 pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
236 let StreamReconfiguration {
237 storage_class,
238 retention_policy,
239 timestamping,
240 delete_on_empty,
241 } = reconfiguration;
242 if let Maybe::Specified(storage_class) = storage_class {
243 self.storage_class = storage_class;
244 }
245 if let Maybe::Specified(retention_policy) = retention_policy {
246 self.retention_policy = retention_policy;
247 }
248 if let Maybe::Specified(timestamping) = timestamping {
249 self.timestamping = timestamping
250 .map(|ts| self.timestamping.reconfigure(ts))
251 .unwrap_or_default();
252 }
253 if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
254 self.delete_on_empty = delete_on_empty_reconfig
255 .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
256 .unwrap_or_default();
257 }
258 self
259 }
260
261 pub fn merge(self, basin_defaults: Self) -> StreamConfig {
262 let storage_class = self
263 .storage_class
264 .or(basin_defaults.storage_class)
265 .unwrap_or_default();
266
267 let retention_policy = self
268 .retention_policy
269 .or(basin_defaults.retention_policy)
270 .unwrap_or_default();
271
272 let timestamping = self.timestamping.merge(basin_defaults.timestamping);
273
274 let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
275
276 StreamConfig {
277 storage_class,
278 retention_policy,
279 timestamping,
280 delete_on_empty,
281 }
282 }
283}
284
285impl From<OptionalStreamConfig> for StreamReconfiguration {
286 fn from(value: OptionalStreamConfig) -> Self {
287 let OptionalStreamConfig {
288 storage_class,
289 retention_policy,
290 timestamping,
291 delete_on_empty,
292 } = value;
293
294 Self {
295 storage_class: storage_class.into(),
296 retention_policy: retention_policy.into(),
297 timestamping: Some(timestamping.into()).into(),
298 delete_on_empty: Some(delete_on_empty.into()).into(),
299 }
300 }
301}
302
303impl From<OptionalStreamConfig> for StreamConfig {
304 fn from(value: OptionalStreamConfig) -> Self {
305 let OptionalStreamConfig {
306 storage_class,
307 retention_policy,
308 timestamping,
309 delete_on_empty,
310 } = value;
311
312 Self {
313 storage_class: storage_class.unwrap_or_default(),
314 retention_policy: retention_policy.unwrap_or_default(),
315 timestamping: timestamping.into(),
316 delete_on_empty: delete_on_empty.into(),
317 }
318 }
319}
320
321impl From<StreamConfig> for OptionalStreamConfig {
322 fn from(value: StreamConfig) -> Self {
323 let StreamConfig {
324 storage_class,
325 retention_policy,
326 timestamping,
327 delete_on_empty,
328 } = value;
329
330 Self {
331 storage_class: Some(storage_class),
332 retention_policy: Some(retention_policy),
333 timestamping: timestamping.into(),
334 delete_on_empty: delete_on_empty.into(),
335 }
336 }
337}
338
339#[derive(Debug, Clone, Default, PartialEq, Eq)]
340pub struct BasinConfig {
341 pub default_stream_config: OptionalStreamConfig,
342 pub stream_cipher: Option<EncryptionAlgorithm>,
343 pub create_stream_on_append: bool,
344 pub create_stream_on_read: bool,
345}
346
347impl BasinConfig {
348 pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
349 let BasinReconfiguration {
350 default_stream_config,
351 stream_cipher,
352 create_stream_on_append,
353 create_stream_on_read,
354 } = reconfiguration;
355
356 if let Maybe::Specified(default_stream_config) = default_stream_config {
357 self.default_stream_config = default_stream_config
358 .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
359 .unwrap_or_default();
360 }
361
362 if let Maybe::Specified(stream_cipher) = stream_cipher {
363 self.stream_cipher = stream_cipher;
364 }
365
366 if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
367 self.create_stream_on_append = create_stream_on_append;
368 }
369
370 if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
371 self.create_stream_on_read = create_stream_on_read;
372 }
373
374 self
375 }
376}
377
378impl From<BasinConfig> for BasinReconfiguration {
379 fn from(value: BasinConfig) -> Self {
380 let BasinConfig {
381 default_stream_config,
382 stream_cipher,
383 create_stream_on_append,
384 create_stream_on_read,
385 } = value;
386
387 Self {
388 default_stream_config: Some(default_stream_config.into()).into(),
389 stream_cipher: stream_cipher.into(),
390 create_stream_on_append: create_stream_on_append.into(),
391 create_stream_on_read: create_stream_on_read.into(),
392 }
393 }
394}
395
396#[derive(Debug, Clone, Default)]
397pub struct BasinReconfiguration {
398 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
399 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
400 pub create_stream_on_append: Maybe<bool>,
401 pub create_stream_on_read: Maybe<bool>,
402}