1use std::time::Duration;
2
3use enum_ordinalize::Ordinalize;
4
5use crate::maybe::Maybe;
6
7#[derive(
8 Debug,
9 Default,
10 Clone,
11 Copy,
12 strum::Display,
13 strum::IntoStaticStr,
14 strum::EnumIter,
15 strum::FromRepr,
16 strum::EnumString,
17 PartialEq,
18 Eq,
19 Ordinalize,
20)]
21#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
22#[repr(u8)]
23pub enum StorageClass {
24 #[strum(serialize = "standard")]
25 Standard = 1,
26 #[default]
27 #[strum(serialize = "express")]
28 Express = 2,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum RetentionPolicy {
33 Age(Duration),
34 Infinite(),
35}
36
37impl RetentionPolicy {
38 pub fn age(&self) -> Option<Duration> {
39 match self {
40 Self::Age(duration) => Some(*duration),
41 Self::Infinite() => None,
42 }
43 }
44}
45
46impl Default for RetentionPolicy {
47 fn default() -> Self {
48 const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60);
49
50 Self::Age(ONE_WEEK)
51 }
52}
53
54#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
55pub enum TimestampingMode {
56 #[default]
57 ClientPrefer,
58 ClientRequire,
59 Arrival,
60}
61
62#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
63pub struct TimestampingConfig {
64 pub mode: TimestampingMode,
65 pub uncapped: bool,
66}
67
68#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
69pub struct DeleteOnEmptyConfig {
70 pub min_age: Duration,
71}
72
73#[derive(Debug, Clone, Default, PartialEq, Eq)]
74pub struct StreamConfig {
75 pub storage_class: StorageClass,
76 pub retention_policy: RetentionPolicy,
77 pub timestamping: TimestampingConfig,
78 pub delete_on_empty: DeleteOnEmptyConfig,
79}
80
81#[derive(Debug, Clone, Default)]
82pub struct TimestampingReconfiguration {
83 pub mode: Maybe<Option<TimestampingMode>>,
84 pub uncapped: Maybe<Option<bool>>,
85}
86
87#[derive(Debug, Clone, Default)]
88pub struct DeleteOnEmptyReconfiguration {
89 pub min_age: Maybe<Option<Duration>>,
90}
91
92#[derive(Debug, Clone, Default)]
93pub struct StreamReconfiguration {
94 pub storage_class: Maybe<Option<StorageClass>>,
95 pub retention_policy: Maybe<Option<RetentionPolicy>>,
96 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
97 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
98}
99
100#[derive(Debug, Clone, Copy, Default)]
101pub struct OptionalTimestampingConfig {
102 pub mode: Option<TimestampingMode>,
103 pub uncapped: Option<bool>,
104}
105
106impl OptionalTimestampingConfig {
107 pub fn reconfigure(mut self, reconfiguration: TimestampingReconfiguration) -> Self {
108 if let Maybe::Specified(mode) = reconfiguration.mode {
109 self.mode = mode;
110 }
111 if let Maybe::Specified(uncapped) = reconfiguration.uncapped {
112 self.uncapped = uncapped;
113 }
114 self
115 }
116
117 pub fn merge(self, basin_defaults: Self) -> TimestampingConfig {
118 let mode = self.mode.or(basin_defaults.mode).unwrap_or_default();
119 let uncapped = self
120 .uncapped
121 .or(basin_defaults.uncapped)
122 .unwrap_or_default();
123 TimestampingConfig { mode, uncapped }
124 }
125}
126
127impl From<OptionalTimestampingConfig> for TimestampingConfig {
128 fn from(value: OptionalTimestampingConfig) -> Self {
129 Self {
130 mode: value.mode.unwrap_or_default(),
131 uncapped: value.uncapped.unwrap_or_default(),
132 }
133 }
134}
135
136impl From<TimestampingConfig> for OptionalTimestampingConfig {
137 fn from(value: TimestampingConfig) -> Self {
138 Self {
139 mode: Some(value.mode),
140 uncapped: Some(value.uncapped),
141 }
142 }
143}
144
145impl From<OptionalTimestampingConfig> for TimestampingReconfiguration {
146 fn from(value: OptionalTimestampingConfig) -> Self {
147 Self {
148 mode: value.mode.into(),
149 uncapped: value.uncapped.into(),
150 }
151 }
152}
153
154#[derive(Debug, Clone, Default)]
155pub struct OptionalDeleteOnEmptyConfig {
156 pub min_age: Option<Duration>,
157}
158
159impl OptionalDeleteOnEmptyConfig {
160 pub fn reconfigure(mut self, reconfiguration: DeleteOnEmptyReconfiguration) -> Self {
161 if let Maybe::Specified(min_age) = reconfiguration.min_age {
162 self.min_age = min_age;
163 }
164 self
165 }
166
167 pub fn merge(self, basin_defaults: Self) -> DeleteOnEmptyConfig {
168 let min_age = self.min_age.or(basin_defaults.min_age).unwrap_or_default();
169 DeleteOnEmptyConfig { min_age }
170 }
171}
172
173impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
174 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
175 Self {
176 min_age: value.min_age.unwrap_or_default(),
177 }
178 }
179}
180
181impl From<DeleteOnEmptyConfig> for OptionalDeleteOnEmptyConfig {
182 fn from(value: DeleteOnEmptyConfig) -> Self {
183 Self {
184 min_age: Some(value.min_age),
185 }
186 }
187}
188
189impl From<OptionalDeleteOnEmptyConfig> for DeleteOnEmptyReconfiguration {
190 fn from(value: OptionalDeleteOnEmptyConfig) -> Self {
191 Self {
192 min_age: value.min_age.into(),
193 }
194 }
195}
196
197#[derive(Debug, Clone, Default)]
198pub struct OptionalStreamConfig {
199 pub storage_class: Option<StorageClass>,
200 pub retention_policy: Option<RetentionPolicy>,
201 pub timestamping: OptionalTimestampingConfig,
202 pub delete_on_empty: OptionalDeleteOnEmptyConfig,
203}
204
205impl OptionalStreamConfig {
206 pub fn reconfigure(mut self, reconfiguration: StreamReconfiguration) -> Self {
207 let StreamReconfiguration {
208 storage_class,
209 retention_policy,
210 timestamping,
211 delete_on_empty,
212 } = reconfiguration;
213 if let Maybe::Specified(storage_class) = storage_class {
214 self.storage_class = storage_class;
215 }
216 if let Maybe::Specified(retention_policy) = retention_policy {
217 self.retention_policy = retention_policy;
218 }
219 if let Maybe::Specified(timestamping) = timestamping {
220 self.timestamping = timestamping
221 .map(|ts| self.timestamping.reconfigure(ts))
222 .unwrap_or_default();
223 }
224 if let Maybe::Specified(delete_on_empty_reconfig) = delete_on_empty {
225 self.delete_on_empty = delete_on_empty_reconfig
226 .map(|reconfig| self.delete_on_empty.reconfigure(reconfig))
227 .unwrap_or_default();
228 }
229 self
230 }
231
232 pub fn merge(self, basin_defaults: Self) -> StreamConfig {
233 let storage_class = self
234 .storage_class
235 .or(basin_defaults.storage_class)
236 .unwrap_or_default();
237
238 let retention_policy = self
239 .retention_policy
240 .or(basin_defaults.retention_policy)
241 .unwrap_or_default();
242
243 let timestamping = self.timestamping.merge(basin_defaults.timestamping);
244
245 let delete_on_empty = self.delete_on_empty.merge(basin_defaults.delete_on_empty);
246
247 StreamConfig {
248 storage_class,
249 retention_policy,
250 timestamping,
251 delete_on_empty,
252 }
253 }
254}
255
256impl From<OptionalStreamConfig> for StreamReconfiguration {
257 fn from(value: OptionalStreamConfig) -> Self {
258 let OptionalStreamConfig {
259 storage_class,
260 retention_policy,
261 timestamping,
262 delete_on_empty,
263 } = value;
264
265 Self {
266 storage_class: storage_class.into(),
267 retention_policy: retention_policy.into(),
268 timestamping: Some(timestamping.into()).into(),
269 delete_on_empty: Some(delete_on_empty.into()).into(),
270 }
271 }
272}
273
274impl From<OptionalStreamConfig> for StreamConfig {
275 fn from(value: OptionalStreamConfig) -> Self {
276 let OptionalStreamConfig {
277 storage_class,
278 retention_policy,
279 timestamping,
280 delete_on_empty,
281 } = value;
282
283 Self {
284 storage_class: storage_class.unwrap_or_default(),
285 retention_policy: retention_policy.unwrap_or_default(),
286 timestamping: timestamping.into(),
287 delete_on_empty: delete_on_empty.into(),
288 }
289 }
290}
291
292impl From<StreamConfig> for OptionalStreamConfig {
293 fn from(value: StreamConfig) -> Self {
294 let StreamConfig {
295 storage_class,
296 retention_policy,
297 timestamping,
298 delete_on_empty,
299 } = value;
300
301 Self {
302 storage_class: Some(storage_class),
303 retention_policy: Some(retention_policy),
304 timestamping: timestamping.into(),
305 delete_on_empty: delete_on_empty.into(),
306 }
307 }
308}
309
310#[derive(Debug, Clone, Default)]
311pub struct BasinConfig {
312 pub default_stream_config: OptionalStreamConfig,
313 pub create_stream_on_append: bool,
314 pub create_stream_on_read: bool,
315}
316
317impl BasinConfig {
318 pub fn reconfigure(mut self, reconfiguration: BasinReconfiguration) -> Self {
319 let BasinReconfiguration {
320 default_stream_config,
321 create_stream_on_append,
322 create_stream_on_read,
323 } = reconfiguration;
324
325 if let Maybe::Specified(default_stream_config) = default_stream_config {
326 self.default_stream_config = default_stream_config
327 .map(|reconfig| self.default_stream_config.reconfigure(reconfig))
328 .unwrap_or_default();
329 }
330
331 if let Maybe::Specified(create_stream_on_append) = create_stream_on_append {
332 self.create_stream_on_append = create_stream_on_append;
333 }
334
335 if let Maybe::Specified(create_stream_on_read) = create_stream_on_read {
336 self.create_stream_on_read = create_stream_on_read;
337 }
338
339 self
340 }
341}
342
343impl From<BasinConfig> for BasinReconfiguration {
344 fn from(value: BasinConfig) -> Self {
345 let BasinConfig {
346 default_stream_config,
347 create_stream_on_append,
348 create_stream_on_read,
349 } = value;
350
351 Self {
352 default_stream_config: Some(default_stream_config.into()).into(),
353 create_stream_on_append: create_stream_on_append.into(),
354 create_stream_on_read: create_stream_on_read.into(),
355 }
356 }
357}
358
359#[derive(Debug, Clone, Default)]
360pub struct BasinReconfiguration {
361 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
362 pub create_stream_on_append: Maybe<bool>,
363 pub create_stream_on_read: Maybe<bool>,
364}