1use std::time::Duration;
2
3use enum_ordinalize::Ordinalize;
4
5use crate::maybe::Maybe;
6
7#[derive(
8 Debug,
9 Default,
10 Clone,
11 Copy,
12 strum::Display,
13 strum::IntoStaticStr,
14 strum::EnumIter,
15 strum::FromRepr,
16 strum::EnumString,
17 PartialEq,
18 Eq,
19 Ordinalize,
20 Hash,
21)]
22#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
23#[repr(u8)]
24pub enum StorageClass {
25 #[strum(serialize = "standard")]
26 Standard = 1,
27 #[default]
28 #[strum(serialize = "express")]
29 Express = 2,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum RetentionPolicy {
34 Age(Duration),
35 Infinite(),
36}
37
38impl RetentionPolicy {
39 pub fn age(&self) -> Option<Duration> {
40 match self {
41 Self::Age(duration) => Some(*duration),
42 Self::Infinite() => None,
43 }
44 }
45}
46
47impl Default for RetentionPolicy {
48 fn default() -> Self {
49 const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
50
51 Self::Age(ONE_WEEK)
52 }
53}
54
55#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
56pub enum TimestampingMode {
57 #[default]
58 ClientPrefer,
59 ClientRequire,
60 Arrival,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
64pub struct TimestampingConfig {
65 pub mode: TimestampingMode,
66 pub uncapped: bool,
67}
68
69#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
70pub struct DeleteOnEmptyConfig {
71 pub min_age: Duration,
72}
73
74#[derive(Debug, Clone, Default, PartialEq, Eq)]
75pub struct StreamConfig {
76 pub storage_class: StorageClass,
77 pub retention_policy: RetentionPolicy,
78 pub timestamping: TimestampingConfig,
79 pub delete_on_empty: DeleteOnEmptyConfig,
80}
81
82#[derive(Debug, Clone, Default)]
83pub struct TimestampingReconfiguration {
84 pub mode: Maybe<Option<TimestampingMode>>,
85 pub uncapped: Maybe<Option<bool>>,
86}
87
88#[derive(Debug, Clone, Default)]
89pub struct DeleteOnEmptyReconfiguration {
90 pub min_age: Maybe<Option<Duration>>,
91}
92
93#[derive(Debug, Clone, Default)]
94pub struct StreamReconfiguration {
95 pub storage_class: Maybe<Option<StorageClass>>,
96 pub retention_policy: Maybe<Option<RetentionPolicy>>,
97 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
98 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
99}
100
101#[derive(Debug, Clone, Copy, Default)]
102pub struct OptionalTimestampingConfig {
103 pub mode: Option<TimestampingMode>,
104 pub uncapped: Option<bool>,
105}
106
107impl OptionalTimestampingConfig {
108 pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
109 if let Maybe::Specified(mode) = reconfiguration.mode {
110 self.mode = mode;
111 }
112 if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
113 self.uncapped = uncapped;
114 }
115 self
116 }
117
118 pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
119 let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
120 let uncapped = self
121 .uncapped
122 .or(basin_defaults.uncapped)
123 .unwrap_or_default();
124 TimestampingConfig { mode, uncapped }
125 }
126}
127
128impl From<OptionalTimestampingConfig> for TimestampingConfig {
129 fn from(value: OptionalTimestampingConfig) -> Self {
130 Self {
131 mode: value.mode.unwrap_or_default(),
132 uncapped: value.uncapped.unwrap_or_default(),
133 }
134 }
135}
136
137impl From<TimestampingConfig> for OptionalTimestampingConfig {
138 fn from(value: TimestampingConfig) -> Self {
139 Self {
140 mode: Some(value.mode),
141 uncapped: Some(value.uncapped),
142 }
143 }
144}
145
146impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
147 fn from(value: OptionalTimestampingConfig) -> Self {
148 Self {
149 mode: value.mode.into(),
150 uncapped: value.uncapped.into(),
151 }
152 }
153}
154
155#[derive(Debug, Clone, Default)]
156pub struct OptionalDeleteOnEmptyConfig {
157 pub min_age: Option<Duration>,
158}
159
160impl OptionalDeleteOnEmptyConfig {
161 pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
162 if let Maybe::Specified(min_age) = reconfiguration.min_age {
163 self.min_age = min_age;
164 }
165 self
166 }
167
168 pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
169 let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
170 DeleteOnEmptyConfig { min_age }
171 }
172}
173
174impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
175 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
176 Self {
177 min_age: value.min_age.unwrap_or_default(),
178 }
179 }
180}
181
182impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
183 fn from(value: DeleteOnEmptyConfig) -> Self {
184 Self {
185 min_age: Some(value.min_age),
186 }
187 }
188}
189
190impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
191 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
192 Self {
193 min_age: value.min_age.into(),
194 }
195 }
196}
197
198#[derive(Debug, Clone, Default)]
199pub struct OptionalStreamConfig {
200 pub storage_class: Option<StorageClass>,
201 pub retention_policy: Option<RetentionPolicy>,
202 pub timestamping: OptionalTimestampingConfig,
203 pub delete_on_empty: OptionalDeleteOnEmptyConfig,
204}
205
206impl OptionalStreamConfig {
207 pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
208 let StreamReconfiguration {
209 storage_class,
210 retention_policy,
211 timestamping,
212 delete_on_empty,
213 } = reconfiguration;
214 if let Maybe::Specified(storage_class) = storage_class {
215 self.storage_class = storage_class;
216 }
217 if let Maybe::Specified(retention_policy) = retention_policy {
218 self.retention_policy = retention_policy;
219 }
220 if let Maybe::Specified(timestamping) = timestamping {
221 self.timestamping = timestamping
222 .map(|ts| self.timestamping.reconfigure(ts))
223 .unwrap_or_default();
224 }
225 if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
226 self.delete_on_empty = delete_on_empty_reconfig
227 .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
228 .unwrap_or_default();
229 }
230 self
231 }
232
233 pub fn merge(self, basin_defaults: Self) -> StreamConfig {
234 let storage_class = self
235 .storage_class
236 .or(basin_defaults.storage_class)
237 .unwrap_or_default();
238
239 let retention_policy = self
240 .retention_policy
241 .or(basin_defaults.retention_policy)
242 .unwrap_or_default();
243
244 let timestamping = self.timestamping.merge(basin_defaults.timestamping);
245
246 let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
247
248 StreamConfig {
249 storage_class,
250 retention_policy,
251 timestamping,
252 delete_on_empty,
253 }
254 }
255}
256
257impl From<OptionalStreamConfig> for StreamReconfiguration {
258 fn from(value: OptionalStreamConfig) -> Self {
259 let OptionalStreamConfig {
260 storage_class,
261 retention_policy,
262 timestamping,
263 delete_on_empty,
264 } = value;
265
266 Self {
267 storage_class: storage_class.into(),
268 retention_policy: retention_policy.into(),
269 timestamping: Some(timestamping.into()).into(),
270 delete_on_empty: Some(delete_on_empty.into()).into(),
271 }
272 }
273}
274
275impl From<OptionalStreamConfig> for StreamConfig {
276 fn from(value: OptionalStreamConfig) -> Self {
277 let OptionalStreamConfig {
278 storage_class,
279 retention_policy,
280 timestamping,
281 delete_on_empty,
282 } = value;
283
284 Self {
285 storage_class: storage_class.unwrap_or_default(),
286 retention_policy: retention_policy.unwrap_or_default(),
287 timestamping: timestamping.into(),
288 delete_on_empty: delete_on_empty.into(),
289 }
290 }
291}
292
293impl From<StreamConfig> for OptionalStreamConfig {
294 fn from(value: StreamConfig) -> Self {
295 let StreamConfig {
296 storage_class,
297 retention_policy,
298 timestamping,
299 delete_on_empty,
300 } = value;
301
302 Self {
303 storage_class: Some(storage_class),
304 retention_policy: Some(retention_policy),
305 timestamping: timestamping.into(),
306 delete_on_empty: delete_on_empty.into(),
307 }
308 }
309}
310
311#[derive(Debug, Clone, Default)]
312pub struct BasinConfig {
313 pub default_stream_config: OptionalStreamConfig,
314 pub create_stream_on_append: bool,
315 pub create_stream_on_read: bool,
316}
317
318impl BasinConfig {
319 pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
320 let BasinReconfiguration {
321 default_stream_config,
322 create_stream_on_append,
323 create_stream_on_read,
324 } = reconfiguration;
325
326 if let Maybe::Specified(default_stream_config) = default_stream_config {
327 self.default_stream_config = default_stream_config
328 .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
329 .unwrap_or_default();
330 }
331
332 if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
333 self.create_stream_on_append = create_stream_on_append;
334 }
335
336 if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
337 self.create_stream_on_read = create_stream_on_read;
338 }
339
340 self
341 }
342}
343
344impl From<BasinConfig> for BasinReconfiguration {
345 fn from(value: BasinConfig) -> Self {
346 let BasinConfig {
347 default_stream_config,
348 create_stream_on_append,
349 create_stream_on_read,
350 } = value;
351
352 Self {
353 default_stream_config: Some(default_stream_config.into()).into(),
354 create_stream_on_append: create_stream_on_append.into(),
355 create_stream_on_read: create_stream_on_read.into(),
356 }
357 }
358}
359
360#[derive(Debug, Clone, Default)]
361pub struct BasinReconfiguration {
362 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
363 pub create_stream_on_append: Maybe<bool>,
364 pub create_stream_on_read: Maybe<bool>,
365}