1use std::time::Duration;
2
3use s2_common::{maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11 Standard,
13 Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18 fn from(value: StorageClass) -> Self {
19 match value {
20 StorageClass::Express => Self::Express,
21 StorageClass::Standard => Self::Standard,
22 }
23 }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27 fn from(value: types::config::StorageClass) -> Self {
28 match value {
29 types::config::StorageClass::Express => Self::Express,
30 types::config::StorageClass::Standard => Self::Standard,
31 }
32 }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40 Age(u64),
43 Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53 type Error = types::ValidationError;
54
55 fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56 match value {
57 RetentionPolicy::Age(0) => Err(types::ValidationError(
58 "age must be greater than 0 seconds".to_string(),
59 )),
60 RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61 RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62 }
63 }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67 fn from(value: types::config::RetentionPolicy) -> Self {
68 match value {
69 types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70 types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71 }
72 }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80 #[default]
82 ClientPrefer,
83 ClientRequire,
85 Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90 fn from(value: TimestampingMode) -> Self {
91 match value {
92 TimestampingMode::ClientPrefer => Self::ClientPrefer,
93 TimestampingMode::ClientRequire => Self::ClientRequire,
94 TimestampingMode::Arrival => Self::Arrival,
95 }
96 }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100 fn from(value: types::config::TimestampingMode) -> Self {
101 match value {
102 types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103 types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104 types::config::TimestampingMode::Arrival => Self::Arrival,
105 }
106 }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113 pub mode: Option<TimestampingMode>,
115 pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121 pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122 let config = TimestampingConfig {
123 mode: config.mode.map(Into::into),
124 uncapped: config.uncapped,
125 };
126 if config == Self::default() {
127 None
128 } else {
129 Some(config)
130 }
131 }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135 fn from(value: types::config::TimestampingConfig) -> Self {
136 Self {
137 mode: Some(value.mode.into()),
138 uncapped: Some(value.uncapped),
139 }
140 }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144 fn from(value: TimestampingConfig) -> Self {
145 Self {
146 mode: value.mode.map(Into::into),
147 uncapped: value.uncapped,
148 }
149 }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159 pub mode: Maybe<Option<TimestampingMode>>,
160 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163 pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167 fn from(value: TimestampingReconfiguration) -> Self {
168 Self {
169 mode: value.mode.map_opt(Into::into),
170 uncapped: value.uncapped,
171 }
172 }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176 fn from(value: types::config::TimestampingReconfiguration) -> Self {
177 Self {
178 mode: value.mode.map_opt(Into::into),
179 uncapped: value.uncapped,
180 }
181 }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188 #[serde(default)]
191 pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195 pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196 let min_age = config.min_age.unwrap_or_default();
197 if min_age > Duration::ZERO {
198 Some(DeleteOnEmptyConfig {
199 min_age_secs: min_age.as_secs(),
200 })
201 } else {
202 None
203 }
204 }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208 fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209 Self {
210 min_age_secs: value.min_age.as_secs(),
211 }
212 }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216 fn from(value: DeleteOnEmptyConfig) -> Self {
217 Self {
218 min_age: Duration::from_secs(value.min_age_secs),
219 }
220 }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224 fn from(value: DeleteOnEmptyConfig) -> Self {
225 Self {
226 min_age: Some(Duration::from_secs(value.min_age_secs)),
227 }
228 }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238 #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239 pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244 Self {
245 min_age: value.min_age_secs.map_opt(Duration::from_secs),
246 }
247 }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251 fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252 Self {
253 min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254 }
255 }
256}
257
258#[rustfmt::skip]
259#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
260#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
261pub struct StreamConfig {
262 pub storage_class: Option<StorageClass>,
264 pub retention_policy: Option<RetentionPolicy>,
267 pub timestamping: Option<TimestampingConfig>,
269 #[serde(default)]
271 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
272}
273
274impl StreamConfig {
275 pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
276 let types::config::OptionalStreamConfig {
277 storage_class,
278 retention_policy,
279 timestamping,
280 delete_on_empty,
281 } = config;
282
283 let config = StreamConfig {
284 storage_class: storage_class.map(Into::into),
285 retention_policy: retention_policy.map(Into::into),
286 timestamping: TimestampingConfig::to_opt(timestamping),
287 delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
288 };
289 if config == Self::default() {
290 None
291 } else {
292 Some(config)
293 }
294 }
295}
296
297impl From<types::config::StreamConfig> for StreamConfig {
298 fn from(value: types::config::StreamConfig) -> Self {
299 let types::config::StreamConfig {
300 storage_class,
301 retention_policy,
302 timestamping,
303 delete_on_empty,
304 } = value;
305
306 Self {
307 storage_class: Some(storage_class.into()),
308 retention_policy: Some(retention_policy.into()),
309 timestamping: Some(timestamping.into()),
310 delete_on_empty: Some(delete_on_empty.into()),
311 }
312 }
313}
314
315impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
316 type Error = types::ValidationError;
317
318 fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
319 let StreamConfig {
320 storage_class,
321 retention_policy,
322 timestamping,
323 delete_on_empty,
324 } = value;
325
326 let retention_policy = match retention_policy {
327 None => None,
328 Some(policy) => Some(policy.try_into()?),
329 };
330
331 Ok(Self {
332 storage_class: storage_class.map(Into::into),
333 retention_policy,
334 timestamping: timestamping.map(Into::into).unwrap_or_default(),
335 delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
336 })
337 }
338}
339
340#[rustfmt::skip]
341#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
342#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
343pub struct StreamReconfiguration {
344 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
346 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
347 pub storage_class: Maybe<Option<StorageClass>>,
348 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
351 #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
352 pub retention_policy: Maybe<Option<RetentionPolicy>>,
353 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
355 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
356 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
357 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
359 #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
360 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
361}
362
363impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
364 type Error = types::ValidationError;
365
366 fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
367 let StreamReconfiguration {
368 storage_class,
369 retention_policy,
370 timestamping,
371 delete_on_empty,
372 } = value;
373
374 Ok(Self {
375 storage_class: storage_class.map_opt(Into::into),
376 retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
377 timestamping: timestamping.map_opt(Into::into),
378 delete_on_empty: delete_on_empty.map_opt(Into::into),
379 })
380 }
381}
382
383impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
384 fn from(value: types::config::StreamReconfiguration) -> Self {
385 let types::config::StreamReconfiguration {
386 storage_class,
387 retention_policy,
388 timestamping,
389 delete_on_empty,
390 } = value;
391
392 Self {
393 storage_class: storage_class.map_opt(Into::into),
394 retention_policy: retention_policy.map_opt(Into::into),
395 timestamping: timestamping.map_opt(Into::into),
396 delete_on_empty: delete_on_empty.map_opt(Into::into),
397 }
398 }
399}
400
401#[rustfmt::skip]
402#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
403#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
404pub struct BasinConfig {
405 pub default_stream_config: Option<StreamConfig>,
407 #[serde(default)]
409 #[cfg_attr(feature = "utoipa", schema(default = false))]
410 pub create_stream_on_append: bool,
411 #[serde(default)]
413 #[cfg_attr(feature = "utoipa", schema(default = false))]
414 pub create_stream_on_read: bool,
415}
416
417impl TryFrom<BasinConfig> for types::config::BasinConfig {
418 type Error = types::ValidationError;
419
420 fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
421 let BasinConfig {
422 default_stream_config,
423 create_stream_on_append,
424 create_stream_on_read,
425 } = value;
426
427 Ok(Self {
428 default_stream_config: match default_stream_config {
429 Some(config) => config.try_into()?,
430 None => Default::default(),
431 },
432 create_stream_on_append,
433 create_stream_on_read,
434 })
435 }
436}
437
438impl From<types::config::BasinConfig> for BasinConfig {
439 fn from(value: types::config::BasinConfig) -> Self {
440 let types::config::BasinConfig {
441 default_stream_config,
442 create_stream_on_append,
443 create_stream_on_read,
444 } = value;
445
446 Self {
447 default_stream_config: StreamConfig::to_opt(default_stream_config),
448 create_stream_on_append,
449 create_stream_on_read,
450 }
451 }
452}
453
454#[rustfmt::skip]
455#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
456#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
457pub struct BasinReconfiguration {
458 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
460 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
461 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
462 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
464 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
465 pub create_stream_on_append: Maybe<bool>,
466 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
468 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
469 pub create_stream_on_read: Maybe<bool>,
470}
471
472impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
473 type Error = types::ValidationError;
474
475 fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
476 let BasinReconfiguration {
477 default_stream_config,
478 create_stream_on_append,
479 create_stream_on_read,
480 } = value;
481
482 Ok(Self {
483 default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
484 create_stream_on_append: create_stream_on_append.map(Into::into),
485 create_stream_on_read: create_stream_on_read.map(Into::into),
486 })
487 }
488}
489
490impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
491 fn from(value: types::config::BasinReconfiguration) -> Self {
492 let types::config::BasinReconfiguration {
493 default_stream_config,
494 create_stream_on_append,
495 create_stream_on_read,
496 } = value;
497
498 Self {
499 default_stream_config: default_stream_config.map_opt(Into::into),
500 create_stream_on_append: create_stream_on_append.map(Into::into),
501 create_stream_on_read: create_stream_on_read.map(Into::into),
502 }
503 }
504}
505
506#[cfg(test)]
507mod tests {
508 use proptest::prelude::*;
509
510 use super::*;
511
512 fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
513 prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
514 }
515
516 fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
517 prop_oneof![
518 Just(TimestampingMode::ClientPrefer),
519 Just(TimestampingMode::ClientRequire),
520 Just(TimestampingMode::Arrival),
521 ]
522 }
523
524 fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
525 prop_oneof![
526 any::<u64>().prop_map(RetentionPolicy::Age),
527 Just(RetentionPolicy::Infinite(InfiniteRetention {})),
528 ]
529 }
530
531 fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
532 (
533 proptest::option::of(gen_timestamping_mode()),
534 proptest::option::of(any::<bool>()),
535 )
536 .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
537 }
538
539 fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
540 any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
541 }
542
543 fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
544 (
545 proptest::option::of(gen_storage_class()),
546 proptest::option::of(gen_retention_policy()),
547 proptest::option::of(gen_timestamping_config()),
548 proptest::option::of(gen_delete_on_empty_config()),
549 )
550 .prop_map(
551 |(storage_class, retention_policy, timestamping, delete_on_empty)| StreamConfig {
552 storage_class,
553 retention_policy,
554 timestamping,
555 delete_on_empty,
556 },
557 )
558 }
559
560 fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
561 (
562 proptest::option::of(gen_stream_config()),
563 any::<bool>(),
564 any::<bool>(),
565 )
566 .prop_map(
567 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
568 BasinConfig {
569 default_stream_config,
570 create_stream_on_append,
571 create_stream_on_read,
572 }
573 },
574 )
575 }
576
577 fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
578 inner: impl Strategy<Value = T>,
579 ) -> impl Strategy<Value = Maybe<Option<T>>> {
580 prop_oneof![
581 Just(Maybe::Unspecified),
582 Just(Maybe::Specified(None)),
583 inner.prop_map(|v| Maybe::Specified(Some(v))),
584 ]
585 }
586
587 fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
588 (
589 gen_maybe(gen_storage_class()),
590 gen_maybe(gen_retention_policy()),
591 gen_maybe(gen_timestamping_reconfiguration()),
592 gen_maybe(gen_delete_on_empty_reconfiguration()),
593 )
594 .prop_map(
595 |(storage_class, retention_policy, timestamping, delete_on_empty)| {
596 StreamReconfiguration {
597 storage_class,
598 retention_policy,
599 timestamping,
600 delete_on_empty,
601 }
602 },
603 )
604 }
605
606 fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
607 (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
608 .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
609 }
610
611 fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
612 {
613 gen_maybe(any::<u64>())
614 .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
615 }
616
617 fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
618 (
619 gen_maybe(gen_stream_reconfiguration()),
620 prop_oneof![
621 Just(Maybe::Unspecified),
622 any::<bool>().prop_map(Maybe::Specified),
623 ],
624 prop_oneof![
625 Just(Maybe::Unspecified),
626 any::<bool>().prop_map(Maybe::Specified),
627 ],
628 )
629 .prop_map(
630 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
631 BasinReconfiguration {
632 default_stream_config,
633 create_stream_on_append,
634 create_stream_on_read,
635 }
636 },
637 )
638 }
639
640 fn gen_internal_optional_stream_config()
641 -> impl Strategy<Value = types::config::OptionalStreamConfig> {
642 (
643 proptest::option::of(gen_storage_class()),
644 proptest::option::of(gen_retention_policy()),
645 proptest::option::of(gen_timestamping_mode()),
646 proptest::option::of(any::<bool>()),
647 proptest::option::of(any::<u64>()),
648 )
649 .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe)| {
650 types::config::OptionalStreamConfig {
651 storage_class: sc.map(Into::into),
652 retention_policy: rp.map(|rp| match rp {
653 RetentionPolicy::Age(secs) => {
654 types::config::RetentionPolicy::Age(Duration::from_secs(secs))
655 }
656 RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
657 }),
658 timestamping: types::config::OptionalTimestampingConfig {
659 mode: ts_mode.map(Into::into),
660 uncapped: ts_uncapped,
661 },
662 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
663 min_age: doe.map(Duration::from_secs),
664 },
665 }
666 })
667 }
668
669 proptest! {
670 #[test]
671 fn stream_config_conversion_validates(config in gen_stream_config()) {
672 let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
673 let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
674
675 if has_zero_age {
676 prop_assert!(result.is_err());
677 } else {
678 prop_assert!(result.is_ok());
679 }
680 }
681
682 #[test]
683 fn basin_config_conversion_validates(config in gen_basin_config()) {
684 let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
685 matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
686 });
687
688 let result: Result<types::config::BasinConfig, _> = config.try_into();
689
690 if has_invalid_config {
691 prop_assert!(result.is_err());
692 } else {
693 prop_assert!(result.is_ok());
694 }
695 }
696
697 #[test]
698 fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
699 let has_zero_age = matches!(
700 reconfig.retention_policy,
701 Maybe::Specified(Some(RetentionPolicy::Age(0)))
702 );
703 let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
704
705 if has_zero_age {
706 prop_assert!(result.is_err());
707 } else {
708 prop_assert!(result.is_ok());
709 }
710 }
711
712 #[test]
713 fn merge_stream_or_basin_or_default(
714 stream in gen_internal_optional_stream_config(),
715 basin in gen_internal_optional_stream_config(),
716 ) {
717 let merged = stream.clone().merge(basin.clone());
718
719 prop_assert_eq!(
720 merged.storage_class,
721 stream.storage_class.or(basin.storage_class).unwrap_or_default()
722 );
723 prop_assert_eq!(
724 merged.retention_policy,
725 stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
726 );
727 prop_assert_eq!(
728 merged.timestamping.mode,
729 stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
730 );
731 prop_assert_eq!(
732 merged.timestamping.uncapped,
733 stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
734 );
735 prop_assert_eq!(
736 merged.delete_on_empty.min_age,
737 stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
738 );
739 }
740
741 #[test]
742 fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
743 let reconfig = types::config::StreamReconfiguration::default();
744 let result = base.clone().reconfigure(reconfig);
745
746 prop_assert_eq!(result.storage_class, base.storage_class);
747 prop_assert_eq!(result.retention_policy, base.retention_policy);
748 prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
749 prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
750 prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
751 }
752
753 #[test]
754 fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
755 let reconfig = types::config::StreamReconfiguration {
756 storage_class: Maybe::Specified(None),
757 retention_policy: Maybe::Specified(None),
758 timestamping: Maybe::Specified(None),
759 delete_on_empty: Maybe::Specified(None),
760 };
761 let result = base.reconfigure(reconfig);
762
763 prop_assert!(result.storage_class.is_none());
764 prop_assert!(result.retention_policy.is_none());
765 prop_assert!(result.timestamping.mode.is_none());
766 prop_assert!(result.timestamping.uncapped.is_none());
767 prop_assert!(result.delete_on_empty.min_age.is_none());
768 }
769
770 #[test]
771 fn reconfigure_specified_some_sets_value(
772 base in gen_internal_optional_stream_config(),
773 new_sc in gen_storage_class(),
774 new_rp_secs in 1u64..u64::MAX,
775 ) {
776 let reconfig = types::config::StreamReconfiguration {
777 storage_class: Maybe::Specified(Some(new_sc.into())),
778 retention_policy: Maybe::Specified(Some(
779 types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
780 )),
781 ..Default::default()
782 };
783 let result = base.reconfigure(reconfig);
784
785 prop_assert_eq!(result.storage_class, Some(new_sc.into()));
786 prop_assert_eq!(
787 result.retention_policy,
788 Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
789 );
790 }
791
792 #[test]
793 fn to_opt_returns_some_for_non_defaults(
794 sc in gen_storage_class(),
795 doe_secs in 1u64..u64::MAX,
796 ts_mode in gen_timestamping_mode(),
797 ) {
798 let internal = types::config::OptionalStreamConfig {
800 storage_class: Some(sc.into()),
801 ..Default::default()
802 };
803 prop_assert!(StreamConfig::to_opt(internal).is_some());
804
805 let internal = types::config::OptionalDeleteOnEmptyConfig {
807 min_age: Some(Duration::from_secs(doe_secs)),
808 };
809 let api = DeleteOnEmptyConfig::to_opt(internal);
810 prop_assert!(api.is_some());
811 prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
812
813 let internal = types::config::OptionalTimestampingConfig {
815 mode: Some(ts_mode.into()),
816 uncapped: None,
817 };
818 prop_assert!(TimestampingConfig::to_opt(internal).is_some());
819 }
820
821 #[test]
822 fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
823 let has_zero_age = matches!(
824 &reconfig.default_stream_config,
825 Maybe::Specified(Some(sr)) if matches!(
826 sr.retention_policy,
827 Maybe::Specified(Some(RetentionPolicy::Age(0)))
828 )
829 );
830 let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
831
832 if has_zero_age {
833 prop_assert!(result.is_err());
834 } else {
835 prop_assert!(result.is_ok());
836 }
837 }
838
839 #[test]
840 fn reconfigure_basin_unspecified_preserves(
841 base_sc in proptest::option::of(gen_storage_class()),
842 base_on_append in any::<bool>(),
843 base_on_read in any::<bool>(),
844 ) {
845 let base = types::config::BasinConfig {
846 default_stream_config: types::config::OptionalStreamConfig {
847 storage_class: base_sc.map(Into::into),
848 ..Default::default()
849 },
850 create_stream_on_append: base_on_append,
851 create_stream_on_read: base_on_read,
852 };
853
854 let reconfig = types::config::BasinReconfiguration::default();
855 let result = base.clone().reconfigure(reconfig);
856
857 prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
858 prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
859 prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
860 }
861
862 #[test]
863 fn reconfigure_basin_specified_updates(
864 base_on_append in any::<bool>(),
865 new_on_append in any::<bool>(),
866 new_sc in gen_storage_class(),
867 ) {
868 let base = types::config::BasinConfig {
869 create_stream_on_append: base_on_append,
870 ..Default::default()
871 };
872
873 let reconfig = types::config::BasinReconfiguration {
874 default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
875 storage_class: Maybe::Specified(Some(new_sc.into())),
876 ..Default::default()
877 })),
878 create_stream_on_append: Maybe::Specified(new_on_append),
879 ..Default::default()
880 };
881 let result = base.reconfigure(reconfig);
882
883 prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
884 prop_assert_eq!(result.create_stream_on_append, new_on_append);
885 }
886
887 #[test]
888 fn reconfigure_nested_partial_update(
889 base_mode in gen_timestamping_mode(),
890 base_uncapped in any::<bool>(),
891 new_mode in gen_timestamping_mode(),
892 ) {
893 let base = types::config::OptionalStreamConfig {
894 timestamping: types::config::OptionalTimestampingConfig {
895 mode: Some(base_mode.into()),
896 uncapped: Some(base_uncapped),
897 },
898 ..Default::default()
899 };
900
901 let expected_mode: types::config::TimestampingMode = new_mode.into();
902
903 let reconfig = types::config::StreamReconfiguration {
904 timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
905 mode: Maybe::Specified(Some(expected_mode)),
906 uncapped: Maybe::Unspecified,
907 })),
908 ..Default::default()
909 };
910 let result = base.reconfigure(reconfig);
911
912 prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
913 prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
914 }
915 }
916
917 #[test]
918 fn to_opt_returns_none_for_defaults() {
919 assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
921
922 let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
924 let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
925 min_age: Some(Duration::ZERO),
926 };
927 assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
928 assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
929
930 assert!(
932 TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
933 .is_none()
934 );
935 }
936
937 #[test]
938 fn empty_json_converts_to_all_none() {
939 let json = serde_json::json!({});
940 let parsed: StreamConfig = serde_json::from_value(json).unwrap();
941 let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
942
943 assert!(
944 internal.storage_class.is_none(),
945 "storage_class should be None"
946 );
947 assert!(
948 internal.retention_policy.is_none(),
949 "retention_policy should be None"
950 );
951 assert!(
952 internal.timestamping.mode.is_none(),
953 "timestamping.mode should be None"
954 );
955 assert!(
956 internal.timestamping.uncapped.is_none(),
957 "timestamping.uncapped should be None"
958 );
959 assert!(
960 internal.delete_on_empty.min_age.is_none(),
961 "delete_on_empty.min_age should be None"
962 );
963 }
964}