1use std::time::Duration;
27
28use crate::{encryption::EncryptionAlgorithm, maybe::Maybe};
29
30#[derive(
31 Debug,
32 Default,
33 Clone,
34 Copy,
35 strum::Display,
36 strum::IntoStaticStr,
37 strum::EnumIter,
38 strum::FromRepr,
39 strum::EnumString,
40 PartialEq,
41 Eq,
42 Hash,
43)]
44#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
45#[repr(u8)]
46pub enum StorageClass {
47 #[strum(serialize = "standard")]
48 Standard = 1,
49 #[default]
50 #[strum(serialize = "express")]
51 Express = 2,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum RetentionPolicy {
56 Age(Duration),
57 Infinite(),
58}
59
60impl RetentionPolicy {
61 pub fn age(&self) -> Option<Duration> {
62 match self {
63 Self::Age(duration) => Some(*duration),
64 Self::Infinite() => None,
65 }
66 }
67}
68
69impl Default for RetentionPolicy {
70 fn default() -> Self {
71 const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
72
73 Self::Age(ONE_WEEK)
74 }
75}
76
77#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
78pub enum TimestampingMode {
79 #[default]
80 ClientPrefer,
81 ClientRequire,
82 Arrival,
83}
84
85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
86pub struct TimestampingConfig {
87 pub mode: TimestampingMode,
88 pub uncapped: bool,
89}
90
91#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
92pub struct DeleteOnEmptyConfig {
93 pub min_age: Duration,
94}
95
96#[derive(Debug, Clone, Default, PartialEq, Eq)]
97pub struct StreamConfig {
98 pub storage_class: StorageClass,
99 pub retention_policy: RetentionPolicy,
100 pub timestamping: TimestampingConfig,
101 pub delete_on_empty: DeleteOnEmptyConfig,
102}
103
104#[derive(Debug, Clone, Default)]
105pub struct TimestampingReconfiguration {
106 pub mode: Maybe<Option<TimestampingMode>>,
107 pub uncapped: Maybe<Option<bool>>,
108}
109
110#[derive(Debug, Clone, Default)]
111pub struct DeleteOnEmptyReconfiguration {
112 pub min_age: Maybe<Option<Duration>>,
113}
114
115#[derive(Debug, Clone, Default)]
116pub struct StreamReconfiguration {
117 pub storage_class: Maybe<Option<StorageClass>>,
118 pub retention_policy: Maybe<Option<RetentionPolicy>>,
119 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
120 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
121}
122
123#[derive(Debug, Clone, Copy, Default)]
124pub struct OptionalTimestampingConfig {
125 pub mode: Option<TimestampingMode>,
126 pub uncapped: Option<bool>,
127}
128
129impl OptionalTimestampingConfig {
130 pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
131 if let Maybe::Specified(mode) = reconfiguration.mode {
132 self.mode = mode;
133 }
134 if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
135 self.uncapped = uncapped;
136 }
137 self
138 }
139
140 pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
141 let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
142 let uncapped = self
143 .uncapped
144 .or(basin_defaults.uncapped)
145 .unwrap_or_default();
146 TimestampingConfig { mode, uncapped }
147 }
148}
149
150impl From<OptionalTimestampingConfig> for TimestampingConfig {
151 fn from(value: OptionalTimestampingConfig) -> Self {
152 Self {
153 mode: value.mode.unwrap_or_default(),
154 uncapped: value.uncapped.unwrap_or_default(),
155 }
156 }
157}
158
159impl From<TimestampingConfig> for OptionalTimestampingConfig {
160 fn from(value: TimestampingConfig) -> Self {
161 Self {
162 mode: Some(value.mode),
163 uncapped: Some(value.uncapped),
164 }
165 }
166}
167
168impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
169 fn from(value: OptionalTimestampingConfig) -> Self {
170 Self {
171 mode: value.mode.into(),
172 uncapped: value.uncapped.into(),
173 }
174 }
175}
176
177#[derive(Debug, Clone, Default)]
178pub struct OptionalDeleteOnEmptyConfig {
179 pub min_age: Option<Duration>,
180}
181
182impl OptionalDeleteOnEmptyConfig {
183 pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
184 if let Maybe::Specified(min_age) = reconfiguration.min_age {
185 self.min_age = min_age;
186 }
187 self
188 }
189
190 pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
191 let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
192 DeleteOnEmptyConfig { min_age }
193 }
194}
195
196impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
197 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
198 Self {
199 min_age: value.min_age.unwrap_or_default(),
200 }
201 }
202}
203
204impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
205 fn from(value: DeleteOnEmptyConfig) -> Self {
206 Self {
207 min_age: Some(value.min_age),
208 }
209 }
210}
211
212impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
213 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
214 Self {
215 min_age: value.min_age.into(),
216 }
217 }
218}
219
220#[derive(Debug, Clone, Default)]
221pub struct OptionalStreamConfig {
222 pub storage_class: Option<StorageClass>,
223 pub retention_policy: Option<RetentionPolicy>,
224 pub timestamping: OptionalTimestampingConfig,
225 pub delete_on_empty: OptionalDeleteOnEmptyConfig,
226}
227
228impl OptionalStreamConfig {
229 pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
230 let StreamReconfiguration {
231 storage_class,
232 retention_policy,
233 timestamping,
234 delete_on_empty,
235 } = reconfiguration;
236 if let Maybe::Specified(storage_class) = storage_class {
237 self.storage_class = storage_class;
238 }
239 if let Maybe::Specified(retention_policy) = retention_policy {
240 self.retention_policy = retention_policy;
241 }
242 if let Maybe::Specified(timestamping) = timestamping {
243 self.timestamping = timestamping
244 .map(|ts| self.timestamping.reconfigure(ts))
245 .unwrap_or_default();
246 }
247 if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
248 self.delete_on_empty = delete_on_empty_reconfig
249 .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
250 .unwrap_or_default();
251 }
252 self
253 }
254
255 pub fn merge(self, basin_defaults: Self) -> StreamConfig {
256 let storage_class = self
257 .storage_class
258 .or(basin_defaults.storage_class)
259 .unwrap_or_default();
260
261 let retention_policy = self
262 .retention_policy
263 .or(basin_defaults.retention_policy)
264 .unwrap_or_default();
265
266 let timestamping = self.timestamping.merge(basin_defaults.timestamping);
267
268 let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
269
270 StreamConfig {
271 storage_class,
272 retention_policy,
273 timestamping,
274 delete_on_empty,
275 }
276 }
277}
278
279impl From<OptionalStreamConfig> for StreamReconfiguration {
280 fn from(value: OptionalStreamConfig) -> Self {
281 let OptionalStreamConfig {
282 storage_class,
283 retention_policy,
284 timestamping,
285 delete_on_empty,
286 } = value;
287
288 Self {
289 storage_class: storage_class.into(),
290 retention_policy: retention_policy.into(),
291 timestamping: Some(timestamping.into()).into(),
292 delete_on_empty: Some(delete_on_empty.into()).into(),
293 }
294 }
295}
296
297impl From<OptionalStreamConfig> for StreamConfig {
298 fn from(value: OptionalStreamConfig) -> Self {
299 let OptionalStreamConfig {
300 storage_class,
301 retention_policy,
302 timestamping,
303 delete_on_empty,
304 } = value;
305
306 Self {
307 storage_class: storage_class.unwrap_or_default(),
308 retention_policy: retention_policy.unwrap_or_default(),
309 timestamping: timestamping.into(),
310 delete_on_empty: delete_on_empty.into(),
311 }
312 }
313}
314
315impl From<StreamConfig> for OptionalStreamConfig {
316 fn from(value: StreamConfig) -> Self {
317 let StreamConfig {
318 storage_class,
319 retention_policy,
320 timestamping,
321 delete_on_empty,
322 } = value;
323
324 Self {
325 storage_class: Some(storage_class),
326 retention_policy: Some(retention_policy),
327 timestamping: timestamping.into(),
328 delete_on_empty: delete_on_empty.into(),
329 }
330 }
331}
332
333#[derive(Debug, Clone, Default)]
334pub struct BasinConfig {
335 pub default_stream_config: OptionalStreamConfig,
336 pub stream_cipher: Option<EncryptionAlgorithm>,
337 pub create_stream_on_append: bool,
338 pub create_stream_on_read: bool,
339}
340
341impl BasinConfig {
342 pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
343 let BasinReconfiguration {
344 default_stream_config,
345 stream_cipher,
346 create_stream_on_append,
347 create_stream_on_read,
348 } = reconfiguration;
349
350 if let Maybe::Specified(default_stream_config) = default_stream_config {
351 self.default_stream_config = default_stream_config
352 .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
353 .unwrap_or_default();
354 }
355
356 if let Maybe::Specified(stream_cipher) = stream_cipher {
357 self.stream_cipher = stream_cipher;
358 }
359
360 if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
361 self.create_stream_on_append = create_stream_on_append;
362 }
363
364 if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
365 self.create_stream_on_read = create_stream_on_read;
366 }
367
368 self
369 }
370}
371
372impl From<BasinConfig> for BasinReconfiguration {
373 fn from(value: BasinConfig) -> Self {
374 let BasinConfig {
375 default_stream_config,
376 stream_cipher,
377 create_stream_on_append,
378 create_stream_on_read,
379 } = value;
380
381 Self {
382 default_stream_config: Some(default_stream_config.into()).into(),
383 stream_cipher: stream_cipher.into(),
384 create_stream_on_append: create_stream_on_append.into(),
385 create_stream_on_read: create_stream_on_read.into(),
386 }
387 }
388}
389
390#[derive(Debug, Clone, Default)]
391pub struct BasinReconfiguration {
392 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
393 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
394 pub create_stream_on_append: Maybe<bool>,
395 pub create_stream_on_read: Maybe<bool>,
396}