1use std::time::Duration;
2
3use s2_common::{maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11 Standard,
13 Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18 fn from(value: StorageClass) -> Self {
19 match value {
20 StorageClass::Express => Self::Express,
21 StorageClass::Standard => Self::Standard,
22 }
23 }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27 fn from(value: types::config::StorageClass) -> Self {
28 match value {
29 types::config::StorageClass::Express => Self::Express,
30 types::config::StorageClass::Standard => Self::Standard,
31 }
32 }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40 Age(u64),
43 Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53 type Error = types::ValidationError;
54
55 fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56 match value {
57 RetentionPolicy::Age(0) => Err(types::ValidationError(
58 "age must be greater than 0 seconds".to_string(),
59 )),
60 RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61 RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62 }
63 }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67 fn from(value: types::config::RetentionPolicy) -> Self {
68 match value {
69 types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70 types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71 }
72 }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80 #[default]
82 ClientPrefer,
83 ClientRequire,
85 Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90 fn from(value: TimestampingMode) -> Self {
91 match value {
92 TimestampingMode::ClientPrefer => Self::ClientPrefer,
93 TimestampingMode::ClientRequire => Self::ClientRequire,
94 TimestampingMode::Arrival => Self::Arrival,
95 }
96 }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100 fn from(value: types::config::TimestampingMode) -> Self {
101 match value {
102 types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103 types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104 types::config::TimestampingMode::Arrival => Self::Arrival,
105 }
106 }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113 pub mode: Option<TimestampingMode>,
115 pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121 pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122 let config = TimestampingConfig {
123 mode: config.mode.map(Into::into),
124 uncapped: config.uncapped,
125 };
126 if config == Self::default() {
127 None
128 } else {
129 Some(config)
130 }
131 }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135 fn from(value: types::config::TimestampingConfig) -> Self {
136 Self {
137 mode: Some(value.mode.into()),
138 uncapped: Some(value.uncapped),
139 }
140 }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144 fn from(value: TimestampingConfig) -> Self {
145 Self {
146 mode: value.mode.map(Into::into),
147 uncapped: value.uncapped,
148 }
149 }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159 pub mode: Maybe<Option<TimestampingMode>>,
160 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163 pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167 fn from(value: TimestampingReconfiguration) -> Self {
168 Self {
169 mode: value.mode.map_opt(Into::into),
170 uncapped: value.uncapped,
171 }
172 }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176 fn from(value: types::config::TimestampingReconfiguration) -> Self {
177 Self {
178 mode: value.mode.map_opt(Into::into),
179 uncapped: value.uncapped,
180 }
181 }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188 #[serde(default)]
191 pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195 pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196 let min_age = config.min_age.unwrap_or_default();
197 if min_age > Duration::ZERO {
198 Some(DeleteOnEmptyConfig {
199 min_age_secs: min_age.as_secs(),
200 })
201 } else {
202 None
203 }
204 }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208 fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209 Self {
210 min_age_secs: value.min_age.as_secs(),
211 }
212 }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216 fn from(value: DeleteOnEmptyConfig) -> Self {
217 Self {
218 min_age: Duration::from_secs(value.min_age_secs),
219 }
220 }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224 fn from(value: DeleteOnEmptyConfig) -> Self {
225 Self {
226 min_age: Some(Duration::from_secs(value.min_age_secs)),
227 }
228 }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238 #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239 pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244 Self {
245 min_age: value.min_age_secs.map_opt(Duration::from_secs),
246 }
247 }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251 fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252 Self {
253 min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254 }
255 }
256}
257
258#[rustfmt::skip]
259#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
260#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
261pub struct StreamConfig {
262 pub storage_class: Option<StorageClass>,
264 pub retention_policy: Option<RetentionPolicy>,
267 pub timestamping: Option<TimestampingConfig>,
269 #[serde(default)]
271 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
272}
273
274impl StreamConfig {
275 pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
276 let types::config::OptionalStreamConfig {
277 storage_class,
278 retention_policy,
279 timestamping,
280 delete_on_empty,
281 } = config;
282
283 let config = StreamConfig {
284 storage_class: storage_class.map(Into::into),
285 retention_policy: retention_policy.map(Into::into),
286 timestamping: TimestampingConfig::to_opt(timestamping),
287 delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
288 };
289 if config == Self::default() {
290 None
291 } else {
292 Some(config)
293 }
294 }
295}
296
297impl From<types::config::StreamConfig> for StreamConfig {
298 fn from(value: types::config::StreamConfig) -> Self {
299 let types::config::StreamConfig {
300 storage_class,
301 retention_policy,
302 timestamping,
303 delete_on_empty,
304 } = value;
305
306 Self {
307 storage_class: Some(storage_class.into()),
308 retention_policy: Some(retention_policy.into()),
309 timestamping: Some(timestamping.into()),
310 delete_on_empty: Some(delete_on_empty.into()),
311 }
312 }
313}
314
315impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
316 type Error = types::ValidationError;
317
318 fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
319 let StreamConfig {
320 storage_class,
321 retention_policy,
322 timestamping,
323 delete_on_empty,
324 } = value;
325
326 let retention_policy = match retention_policy {
327 None => None,
328 Some(policy) => Some(policy.try_into()?),
329 };
330
331 Ok(Self {
332 storage_class: storage_class.map(Into::into),
333 retention_policy,
334 timestamping: timestamping.map(Into::into).unwrap_or_default(),
335 delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
336 })
337 }
338}
339
340#[rustfmt::skip]
341#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
342#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
343pub struct StreamReconfiguration {
344 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
346 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
347 pub storage_class: Maybe<Option<StorageClass>>,
348 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
351 #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
352 pub retention_policy: Maybe<Option<RetentionPolicy>>,
353 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
355 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
356 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
357 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
359 #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
360 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
361}
362
363impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
364 type Error = types::ValidationError;
365
366 fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
367 let StreamReconfiguration {
368 storage_class,
369 retention_policy,
370 timestamping,
371 delete_on_empty,
372 } = value;
373
374 Ok(Self {
375 storage_class: storage_class.map_opt(Into::into),
376 retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
377 timestamping: timestamping.map_opt(Into::into),
378 delete_on_empty: delete_on_empty.map_opt(Into::into),
379 })
380 }
381}
382
383impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
384 fn from(value: types::config::StreamReconfiguration) -> Self {
385 let types::config::StreamReconfiguration {
386 storage_class,
387 retention_policy,
388 timestamping,
389 delete_on_empty,
390 } = value;
391
392 Self {
393 storage_class: storage_class.map_opt(Into::into),
394 retention_policy: retention_policy.map_opt(Into::into),
395 timestamping: timestamping.map_opt(Into::into),
396 delete_on_empty: delete_on_empty.map_opt(Into::into),
397 }
398 }
399}
400
401#[rustfmt::skip]
402#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
403#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
404pub struct BasinConfig {
405 pub default_stream_config: Option<StreamConfig>,
407 #[serde(default)]
409 pub create_stream_on_append: bool,
410 #[serde(default)]
412 pub create_stream_on_read: bool,
413}
414
415impl TryFrom<BasinConfig> for types::config::BasinConfig {
416 type Error = types::ValidationError;
417
418 fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
419 let BasinConfig {
420 default_stream_config,
421 create_stream_on_append,
422 create_stream_on_read,
423 } = value;
424
425 Ok(Self {
426 default_stream_config: match default_stream_config {
427 Some(config) => config.try_into()?,
428 None => Default::default(),
429 },
430 create_stream_on_append,
431 create_stream_on_read,
432 })
433 }
434}
435
436impl From<types::config::BasinConfig> for BasinConfig {
437 fn from(value: types::config::BasinConfig) -> Self {
438 let types::config::BasinConfig {
439 default_stream_config,
440 create_stream_on_append,
441 create_stream_on_read,
442 } = value;
443
444 Self {
445 default_stream_config: StreamConfig::to_opt(default_stream_config),
446 create_stream_on_append,
447 create_stream_on_read,
448 }
449 }
450}
451
452#[rustfmt::skip]
453#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
454#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
455pub struct BasinReconfiguration {
456 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
458 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
459 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
460 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
462 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
463 pub create_stream_on_append: Maybe<bool>,
464 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
466 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
467 pub create_stream_on_read: Maybe<bool>,
468}
469
470impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
471 type Error = types::ValidationError;
472
473 fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
474 let BasinReconfiguration {
475 default_stream_config,
476 create_stream_on_append,
477 create_stream_on_read,
478 } = value;
479
480 Ok(Self {
481 default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
482 create_stream_on_append: create_stream_on_append.map(Into::into),
483 create_stream_on_read: create_stream_on_read.map(Into::into),
484 })
485 }
486}
487
488impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
489 fn from(value: types::config::BasinReconfiguration) -> Self {
490 let types::config::BasinReconfiguration {
491 default_stream_config,
492 create_stream_on_append,
493 create_stream_on_read,
494 } = value;
495
496 Self {
497 default_stream_config: default_stream_config.map_opt(Into::into),
498 create_stream_on_append: create_stream_on_append.map(Into::into),
499 create_stream_on_read: create_stream_on_read.map(Into::into),
500 }
501 }
502}
503
504#[cfg(test)]
505mod tests {
506 use proptest::prelude::*;
507
508 use super::*;
509
510 fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
511 prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
512 }
513
514 fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
515 prop_oneof![
516 Just(TimestampingMode::ClientPrefer),
517 Just(TimestampingMode::ClientRequire),
518 Just(TimestampingMode::Arrival),
519 ]
520 }
521
522 fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
523 prop_oneof![
524 any::<u64>().prop_map(RetentionPolicy::Age),
525 Just(RetentionPolicy::Infinite(InfiniteRetention {})),
526 ]
527 }
528
529 fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
530 (
531 proptest::option::of(gen_timestamping_mode()),
532 proptest::option::of(any::<bool>()),
533 )
534 .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
535 }
536
537 fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
538 any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
539 }
540
541 fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
542 (
543 proptest::option::of(gen_storage_class()),
544 proptest::option::of(gen_retention_policy()),
545 proptest::option::of(gen_timestamping_config()),
546 proptest::option::of(gen_delete_on_empty_config()),
547 )
548 .prop_map(
549 |(storage_class, retention_policy, timestamping, delete_on_empty)| StreamConfig {
550 storage_class,
551 retention_policy,
552 timestamping,
553 delete_on_empty,
554 },
555 )
556 }
557
558 fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
559 (
560 proptest::option::of(gen_stream_config()),
561 any::<bool>(),
562 any::<bool>(),
563 )
564 .prop_map(
565 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
566 BasinConfig {
567 default_stream_config,
568 create_stream_on_append,
569 create_stream_on_read,
570 }
571 },
572 )
573 }
574
575 fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
576 inner: impl Strategy<Value = T>,
577 ) -> impl Strategy<Value = Maybe<Option<T>>> {
578 prop_oneof![
579 Just(Maybe::Unspecified),
580 Just(Maybe::Specified(None)),
581 inner.prop_map(|v| Maybe::Specified(Some(v))),
582 ]
583 }
584
585 fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
586 (
587 gen_maybe(gen_storage_class()),
588 gen_maybe(gen_retention_policy()),
589 gen_maybe(gen_timestamping_reconfiguration()),
590 gen_maybe(gen_delete_on_empty_reconfiguration()),
591 )
592 .prop_map(
593 |(storage_class, retention_policy, timestamping, delete_on_empty)| {
594 StreamReconfiguration {
595 storage_class,
596 retention_policy,
597 timestamping,
598 delete_on_empty,
599 }
600 },
601 )
602 }
603
604 fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
605 (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
606 .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
607 }
608
609 fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
610 {
611 gen_maybe(any::<u64>())
612 .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
613 }
614
615 fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
616 (
617 gen_maybe(gen_stream_reconfiguration()),
618 prop_oneof![
619 Just(Maybe::Unspecified),
620 any::<bool>().prop_map(Maybe::Specified),
621 ],
622 prop_oneof![
623 Just(Maybe::Unspecified),
624 any::<bool>().prop_map(Maybe::Specified),
625 ],
626 )
627 .prop_map(
628 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
629 BasinReconfiguration {
630 default_stream_config,
631 create_stream_on_append,
632 create_stream_on_read,
633 }
634 },
635 )
636 }
637
638 fn gen_internal_optional_stream_config()
639 -> impl Strategy<Value = types::config::OptionalStreamConfig> {
640 (
641 proptest::option::of(gen_storage_class()),
642 proptest::option::of(gen_retention_policy()),
643 proptest::option::of(gen_timestamping_mode()),
644 proptest::option::of(any::<bool>()),
645 proptest::option::of(any::<u64>()),
646 )
647 .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe)| {
648 types::config::OptionalStreamConfig {
649 storage_class: sc.map(Into::into),
650 retention_policy: rp.map(|rp| match rp {
651 RetentionPolicy::Age(secs) => {
652 types::config::RetentionPolicy::Age(Duration::from_secs(secs))
653 }
654 RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
655 }),
656 timestamping: types::config::OptionalTimestampingConfig {
657 mode: ts_mode.map(Into::into),
658 uncapped: ts_uncapped,
659 },
660 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
661 min_age: doe.map(Duration::from_secs),
662 },
663 }
664 })
665 }
666
667 proptest! {
668 #[test]
669 fn stream_config_conversion_validates(config in gen_stream_config()) {
670 let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
671 let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
672
673 if has_zero_age {
674 prop_assert!(result.is_err());
675 } else {
676 prop_assert!(result.is_ok());
677 }
678 }
679
680 #[test]
681 fn basin_config_conversion_validates(config in gen_basin_config()) {
682 let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
683 matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
684 });
685
686 let result: Result<types::config::BasinConfig, _> = config.try_into();
687
688 if has_invalid_config {
689 prop_assert!(result.is_err());
690 } else {
691 prop_assert!(result.is_ok());
692 }
693 }
694
695 #[test]
696 fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
697 let has_zero_age = matches!(
698 reconfig.retention_policy,
699 Maybe::Specified(Some(RetentionPolicy::Age(0)))
700 );
701 let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
702
703 if has_zero_age {
704 prop_assert!(result.is_err());
705 } else {
706 prop_assert!(result.is_ok());
707 }
708 }
709
710 #[test]
711 fn merge_stream_or_basin_or_default(
712 stream in gen_internal_optional_stream_config(),
713 basin in gen_internal_optional_stream_config(),
714 ) {
715 let merged = stream.clone().merge(basin.clone());
716
717 prop_assert_eq!(
718 merged.storage_class,
719 stream.storage_class.or(basin.storage_class).unwrap_or_default()
720 );
721 prop_assert_eq!(
722 merged.retention_policy,
723 stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
724 );
725 prop_assert_eq!(
726 merged.timestamping.mode,
727 stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
728 );
729 prop_assert_eq!(
730 merged.timestamping.uncapped,
731 stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
732 );
733 prop_assert_eq!(
734 merged.delete_on_empty.min_age,
735 stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
736 );
737 }
738
739 #[test]
740 fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
741 let reconfig = types::config::StreamReconfiguration::default();
742 let result = base.clone().reconfigure(reconfig);
743
744 prop_assert_eq!(result.storage_class, base.storage_class);
745 prop_assert_eq!(result.retention_policy, base.retention_policy);
746 prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
747 prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
748 prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
749 }
750
751 #[test]
752 fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
753 let reconfig = types::config::StreamReconfiguration {
754 storage_class: Maybe::Specified(None),
755 retention_policy: Maybe::Specified(None),
756 timestamping: Maybe::Specified(None),
757 delete_on_empty: Maybe::Specified(None),
758 };
759 let result = base.reconfigure(reconfig);
760
761 prop_assert!(result.storage_class.is_none());
762 prop_assert!(result.retention_policy.is_none());
763 prop_assert!(result.timestamping.mode.is_none());
764 prop_assert!(result.timestamping.uncapped.is_none());
765 prop_assert!(result.delete_on_empty.min_age.is_none());
766 }
767
768 #[test]
769 fn reconfigure_specified_some_sets_value(
770 base in gen_internal_optional_stream_config(),
771 new_sc in gen_storage_class(),
772 new_rp_secs in 1u64..u64::MAX,
773 ) {
774 let reconfig = types::config::StreamReconfiguration {
775 storage_class: Maybe::Specified(Some(new_sc.into())),
776 retention_policy: Maybe::Specified(Some(
777 types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
778 )),
779 ..Default::default()
780 };
781 let result = base.reconfigure(reconfig);
782
783 prop_assert_eq!(result.storage_class, Some(new_sc.into()));
784 prop_assert_eq!(
785 result.retention_policy,
786 Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
787 );
788 }
789
790 #[test]
791 fn to_opt_returns_some_for_non_defaults(
792 sc in gen_storage_class(),
793 doe_secs in 1u64..u64::MAX,
794 ts_mode in gen_timestamping_mode(),
795 ) {
796 let internal = types::config::OptionalStreamConfig {
798 storage_class: Some(sc.into()),
799 ..Default::default()
800 };
801 prop_assert!(StreamConfig::to_opt(internal).is_some());
802
803 let internal = types::config::OptionalDeleteOnEmptyConfig {
805 min_age: Some(Duration::from_secs(doe_secs)),
806 };
807 let api = DeleteOnEmptyConfig::to_opt(internal);
808 prop_assert!(api.is_some());
809 prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
810
811 let internal = types::config::OptionalTimestampingConfig {
813 mode: Some(ts_mode.into()),
814 uncapped: None,
815 };
816 prop_assert!(TimestampingConfig::to_opt(internal).is_some());
817 }
818
819 #[test]
820 fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
821 let has_zero_age = matches!(
822 &reconfig.default_stream_config,
823 Maybe::Specified(Some(sr)) if matches!(
824 sr.retention_policy,
825 Maybe::Specified(Some(RetentionPolicy::Age(0)))
826 )
827 );
828 let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
829
830 if has_zero_age {
831 prop_assert!(result.is_err());
832 } else {
833 prop_assert!(result.is_ok());
834 }
835 }
836
837 #[test]
838 fn reconfigure_basin_unspecified_preserves(
839 base_sc in proptest::option::of(gen_storage_class()),
840 base_on_append in any::<bool>(),
841 base_on_read in any::<bool>(),
842 ) {
843 let base = types::config::BasinConfig {
844 default_stream_config: types::config::OptionalStreamConfig {
845 storage_class: base_sc.map(Into::into),
846 ..Default::default()
847 },
848 create_stream_on_append: base_on_append,
849 create_stream_on_read: base_on_read,
850 };
851
852 let reconfig = types::config::BasinReconfiguration::default();
853 let result = base.clone().reconfigure(reconfig);
854
855 prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
856 prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
857 prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
858 }
859
860 #[test]
861 fn reconfigure_basin_specified_updates(
862 base_on_append in any::<bool>(),
863 new_on_append in any::<bool>(),
864 new_sc in gen_storage_class(),
865 ) {
866 let base = types::config::BasinConfig {
867 create_stream_on_append: base_on_append,
868 ..Default::default()
869 };
870
871 let reconfig = types::config::BasinReconfiguration {
872 default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
873 storage_class: Maybe::Specified(Some(new_sc.into())),
874 ..Default::default()
875 })),
876 create_stream_on_append: Maybe::Specified(new_on_append),
877 ..Default::default()
878 };
879 let result = base.reconfigure(reconfig);
880
881 prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
882 prop_assert_eq!(result.create_stream_on_append, new_on_append);
883 }
884
885 #[test]
886 fn reconfigure_nested_partial_update(
887 base_mode in gen_timestamping_mode(),
888 base_uncapped in any::<bool>(),
889 new_mode in gen_timestamping_mode(),
890 ) {
891 let base = types::config::OptionalStreamConfig {
892 timestamping: types::config::OptionalTimestampingConfig {
893 mode: Some(base_mode.into()),
894 uncapped: Some(base_uncapped),
895 },
896 ..Default::default()
897 };
898
899 let expected_mode: types::config::TimestampingMode = new_mode.into();
900
901 let reconfig = types::config::StreamReconfiguration {
902 timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
903 mode: Maybe::Specified(Some(expected_mode)),
904 uncapped: Maybe::Unspecified,
905 })),
906 ..Default::default()
907 };
908 let result = base.reconfigure(reconfig);
909
910 prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
911 prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
912 }
913 }
914
915 #[test]
916 fn to_opt_returns_none_for_defaults() {
917 assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
919
920 let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
922 let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
923 min_age: Some(Duration::ZERO),
924 };
925 assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
926 assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
927
928 assert!(
930 TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
931 .is_none()
932 );
933 }
934
935 #[test]
936 fn empty_json_converts_to_all_none() {
937 let json = serde_json::json!({});
938 let parsed: StreamConfig = serde_json::from_value(json).unwrap();
939 let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
940
941 assert!(
942 internal.storage_class.is_none(),
943 "storage_class should be None"
944 );
945 assert!(
946 internal.retention_policy.is_none(),
947 "retention_policy should be None"
948 );
949 assert!(
950 internal.timestamping.mode.is_none(),
951 "timestamping.mode should be None"
952 );
953 assert!(
954 internal.timestamping.uncapped.is_none(),
955 "timestamping.uncapped should be None"
956 );
957 assert!(
958 internal.delete_on_empty.min_age.is_none(),
959 "delete_on_empty.min_age should be None"
960 );
961 }
962}