1use std::time::Duration;
27
28use crate::{ValidationError, encryption::EncryptionAlgorithm, maybe::Maybe};
29
30#[derive(
31 Debug,
32 Default,
33 Clone,
34 Copy,
35 strum::Display,
36 strum::IntoStaticStr,
37 strum::EnumIter,
38 strum::FromRepr,
39 strum::EnumString,
40 PartialEq,
41 Eq,
42 Hash,
43)]
44#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
45#[repr(u8)]
46pub enum StorageClass {
47 #[strum(serialize = "standard")]
48 Standard = 1,
49 #[default]
50 #[strum(serialize = "express")]
51 Express = 2,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum RetentionPolicy {
56 Age(Duration),
57 Infinite(),
58}
59
60impl RetentionPolicy {
61 pub fn age(&self) -> Option<Duration> {
62 match self {
63 Self::Age(duration) => Some(*duration),
64 Self::Infinite() => None,
65 }
66 }
67
68 pub fn validate(self) -> Result<Self, ValidationError> {
69 match self {
70 Self::Age(duration) if duration.is_zero() => Err(ValidationError(
71 "age must be greater than 0 seconds".to_string(),
72 )),
73 policy => Ok(policy),
74 }
75 }
76}
77
78impl Default for RetentionPolicy {
79 fn default() -> Self {
80 const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
81
82 Self::Age(ONE_WEEK)
83 }
84}
85
86#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
87pub enum TimestampingMode {
88 #[default]
89 ClientPrefer,
90 ClientRequire,
91 Arrival,
92}
93
94#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
95pub struct TimestampingConfig {
96 pub mode: TimestampingMode,
97 pub uncapped: bool,
98}
99
100#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
101pub struct DeleteOnEmptyConfig {
102 pub min_age: Duration,
103}
104
105impl DeleteOnEmptyConfig {
106 pub fn min_age(&self) -> Option<Duration> {
107 Some(self.min_age).filter(|age| !age.is_zero())
108 }
109}
110
111#[derive(Debug, Clone, Default, PartialEq, Eq)]
112pub struct StreamConfig {
113 pub storage_class: StorageClass,
114 pub retention_policy: RetentionPolicy,
115 pub timestamping: TimestampingConfig,
116 pub delete_on_empty: DeleteOnEmptyConfig,
117}
118
119#[derive(Debug, Clone, Default)]
120pub struct TimestampingReconfiguration {
121 pub mode: Maybe<Option<TimestampingMode>>,
122 pub uncapped: Maybe<Option<bool>>,
123}
124
125#[derive(Debug, Clone, Default)]
126pub struct DeleteOnEmptyReconfiguration {
127 pub min_age: Maybe<Option<Duration>>,
128}
129
130#[derive(Debug, Clone, Default)]
131pub struct StreamReconfiguration {
132 pub storage_class: Maybe<Option<StorageClass>>,
133 pub retention_policy: Maybe<Option<RetentionPolicy>>,
134 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
135 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
136}
137
138#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
139pub struct OptionalTimestampingConfig {
140 pub mode: Option<TimestampingMode>,
141 pub uncapped: Option<bool>,
142}
143
144impl OptionalTimestampingConfig {
145 pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
146 if let Maybe::Specified(mode) = reconfiguration.mode {
147 self.mode = mode;
148 }
149 if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
150 self.uncapped = uncapped;
151 }
152 self
153 }
154
155 pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
156 let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
157 let uncapped = self
158 .uncapped
159 .or(basin_defaults.uncapped)
160 .unwrap_or_default();
161 TimestampingConfig { mode, uncapped }
162 }
163}
164
165impl From<OptionalTimestampingConfig> for TimestampingConfig {
166 fn from(value: OptionalTimestampingConfig) -> Self {
167 Self {
168 mode: value.mode.unwrap_or_default(),
169 uncapped: value.uncapped.unwrap_or_default(),
170 }
171 }
172}
173
174impl From<TimestampingConfig> for OptionalTimestampingConfig {
175 fn from(value: TimestampingConfig) -> Self {
176 Self {
177 mode: Some(value.mode),
178 uncapped: Some(value.uncapped),
179 }
180 }
181}
182
183#[derive(Debug, Clone, Default, PartialEq, Eq)]
184pub struct OptionalDeleteOnEmptyConfig {
185 pub min_age: Option<Duration>,
186}
187
188impl OptionalDeleteOnEmptyConfig {
189 pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
190 if let Maybe::Specified(min_age) = reconfiguration.min_age {
191 self.min_age = min_age;
192 }
193 self
194 }
195
196 pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
197 let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
198 DeleteOnEmptyConfig { min_age }
199 }
200}
201
202impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
203 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
204 Self {
205 min_age: value.min_age.unwrap_or_default(),
206 }
207 }
208}
209
210impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
211 fn from(value: DeleteOnEmptyConfig) -> Self {
212 Self {
213 min_age: Some(value.min_age),
214 }
215 }
216}
217
218#[derive(Debug, Clone, Default, PartialEq, Eq)]
219pub struct OptionalStreamConfig {
220 pub storage_class: Option<StorageClass>,
221 pub retention_policy: Option<RetentionPolicy>,
222 pub timestamping: OptionalTimestampingConfig,
223 pub delete_on_empty: OptionalDeleteOnEmptyConfig,
224}
225
226impl OptionalStreamConfig {
227 pub fn validate(&self) -> Result<(), ValidationError> {
228 if let Some(retention_policy) = self.retention_policy {
229 retention_policy.validate()?;
230 }
231 Ok(())
232 }
233
234 pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
235 let StreamReconfiguration {
236 storage_class,
237 retention_policy,
238 timestamping,
239 delete_on_empty,
240 } = reconfiguration;
241 if let Maybe::Specified(storage_class) = storage_class {
242 self.storage_class = storage_class;
243 }
244 if let Maybe::Specified(retention_policy) = retention_policy {
245 self.retention_policy = retention_policy;
246 }
247 if let Maybe::Specified(timestamping) = timestamping {
248 self.timestamping = timestamping
249 .map(|ts| self.timestamping.reconfigure(ts))
250 .unwrap_or_default();
251 }
252 if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
253 self.delete_on_empty = delete_on_empty_reconfig
254 .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
255 .unwrap_or_default();
256 }
257 self
258 }
259
260 pub fn merge(self, basin_defaults: Self) -> StreamConfig {
261 let storage_class = self
262 .storage_class
263 .or(basin_defaults.storage_class)
264 .unwrap_or_default();
265
266 let retention_policy = self
267 .retention_policy
268 .or(basin_defaults.retention_policy)
269 .unwrap_or_default();
270
271 let timestamping = self.timestamping.merge(basin_defaults.timestamping);
272
273 let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
274
275 StreamConfig {
276 storage_class,
277 retention_policy,
278 timestamping,
279 delete_on_empty,
280 }
281 }
282}
283
284impl From<OptionalStreamConfig> for StreamConfig {
285 fn from(value: OptionalStreamConfig) -> Self {
286 let OptionalStreamConfig {
287 storage_class,
288 retention_policy,
289 timestamping,
290 delete_on_empty,
291 } = value;
292
293 Self {
294 storage_class: storage_class.unwrap_or_default(),
295 retention_policy: retention_policy.unwrap_or_default(),
296 timestamping: timestamping.into(),
297 delete_on_empty: delete_on_empty.into(),
298 }
299 }
300}
301
302impl From<StreamConfig> for OptionalStreamConfig {
303 fn from(value: StreamConfig) -> Self {
304 let StreamConfig {
305 storage_class,
306 retention_policy,
307 timestamping,
308 delete_on_empty,
309 } = value;
310
311 Self {
312 storage_class: Some(storage_class),
313 retention_policy: Some(retention_policy),
314 timestamping: timestamping.into(),
315 delete_on_empty: delete_on_empty.into(),
316 }
317 }
318}
319
320#[derive(Debug, Clone, Default, PartialEq, Eq)]
321pub struct BasinConfig {
322 pub default_stream_config: OptionalStreamConfig,
323 pub stream_cipher: Option<EncryptionAlgorithm>,
324 pub create_stream_on_append: bool,
325 pub create_stream_on_read: bool,
326}
327
328impl BasinConfig {
329 pub fn validate(&self) -> Result<(), ValidationError> {
330 self.default_stream_config.validate()
331 }
332
333 pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
334 let BasinReconfiguration {
335 default_stream_config,
336 stream_cipher,
337 create_stream_on_append,
338 create_stream_on_read,
339 } = reconfiguration;
340
341 if let Maybe::Specified(default_stream_config) = default_stream_config {
342 self.default_stream_config = default_stream_config
343 .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
344 .unwrap_or_default();
345 }
346
347 if let Maybe::Specified(stream_cipher) = stream_cipher {
348 self.stream_cipher = stream_cipher;
349 }
350
351 if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
352 self.create_stream_on_append = create_stream_on_append;
353 }
354
355 if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
356 self.create_stream_on_read = create_stream_on_read;
357 }
358
359 self
360 }
361}
362
363#[derive(Debug, Clone, Default)]
364pub struct BasinReconfiguration {
365 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
366 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
367 pub create_stream_on_append: Maybe<bool>,
368 pub create_stream_on_read: Maybe<bool>,
369}