Skip to main content

s2_api/v1/
config.rs

1use std::time::Duration;
2
3use s2_common::{maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11    /// Append tail latency under 400 milliseconds with s2.dev.
12    Standard,
13    /// Append tail latency under 40 milliseconds with s2.dev.
14    Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18    fn from(value: StorageClass) -> Self {
19        match value {
20            StorageClass::Express => Self::Express,
21            StorageClass::Standard => Self::Standard,
22        }
23    }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27    fn from(value: types::config::StorageClass) -> Self {
28        match value {
29            types::config::StorageClass::Express => Self::Express,
30            types::config::StorageClass::Standard => Self::Standard,
31        }
32    }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40    /// Age in seconds for automatic trimming of records older than this threshold.
41    /// This must be set to a value greater than 0 seconds.
42    Age(u64),
43    /// Retain records unless explicitly trimmed.
44    Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53    type Error = types::ValidationError;
54
55    fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56        match value {
57            RetentionPolicy::Age(0) => Err(types::ValidationError(
58                "age must be greater than 0 seconds".to_string(),
59            )),
60            RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61            RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62        }
63    }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67    fn from(value: types::config::RetentionPolicy) -> Self {
68        match value {
69            types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70            types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71        }
72    }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80    /// Prefer client-specified timestamp if present otherwise use arrival time.
81    #[default]
82    ClientPrefer,
83    /// Require a client-specified timestamp and reject the append if it is missing.
84    ClientRequire,
85    /// Use the arrival time and ignore any client-specified timestamp.
86    Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90    fn from(value: TimestampingMode) -> Self {
91        match value {
92            TimestampingMode::ClientPrefer => Self::ClientPrefer,
93            TimestampingMode::ClientRequire => Self::ClientRequire,
94            TimestampingMode::Arrival => Self::Arrival,
95        }
96    }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100    fn from(value: types::config::TimestampingMode) -> Self {
101        match value {
102            types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103            types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104            types::config::TimestampingMode::Arrival => Self::Arrival,
105        }
106    }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113    /// Timestamping mode for appends that influences how timestamps are handled.
114    pub mode: Option<TimestampingMode>,
115    /// Allow client-specified timestamps to exceed the arrival time.
116    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
117    pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121    pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122        let config = TimestampingConfig {
123            mode: config.mode.map(Into::into),
124            uncapped: config.uncapped,
125        };
126        if config == Self::default() {
127            None
128        } else {
129            Some(config)
130        }
131    }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135    fn from(value: types::config::TimestampingConfig) -> Self {
136        Self {
137            mode: Some(value.mode.into()),
138            uncapped: Some(value.uncapped),
139        }
140    }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144    fn from(value: TimestampingConfig) -> Self {
145        Self {
146            mode: value.mode.map(Into::into),
147            uncapped: value.uncapped,
148        }
149    }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156    /// Timestamping mode for appends that influences how timestamps are handled.
157    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159    pub mode: Maybe<Option<TimestampingMode>>,
160    /// Allow client-specified timestamps to exceed the arrival time.
161    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163    pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167    fn from(value: TimestampingReconfiguration) -> Self {
168        Self {
169            mode: value.mode.map_opt(Into::into),
170            uncapped: value.uncapped,
171        }
172    }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176    fn from(value: types::config::TimestampingReconfiguration) -> Self {
177        Self {
178            mode: value.mode.map_opt(Into::into),
179            uncapped: value.uncapped,
180        }
181    }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188    /// Minimum age in seconds before an empty stream can be deleted.
189    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
190    #[serde(default)]
191    pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195    pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196        let min_age = config.min_age.unwrap_or_default();
197        if min_age > Duration::ZERO {
198            Some(DeleteOnEmptyConfig {
199                min_age_secs: min_age.as_secs(),
200            })
201        } else {
202            None
203        }
204    }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208    fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209        Self {
210            min_age_secs: value.min_age.as_secs(),
211        }
212    }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216    fn from(value: DeleteOnEmptyConfig) -> Self {
217        Self {
218            min_age: Duration::from_secs(value.min_age_secs),
219        }
220    }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224    fn from(value: DeleteOnEmptyConfig) -> Self {
225        Self {
226            min_age: Some(Duration::from_secs(value.min_age_secs)),
227        }
228    }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235    /// Minimum age in seconds before an empty stream can be deleted.
236    /// Set to 0 to disable delete-on-empty (don't delete automatically).
237    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238    #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239    pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244        Self {
245            min_age: value.min_age_secs.map_opt(Duration::from_secs),
246        }
247    }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251    fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252        Self {
253            min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254        }
255    }
256}
257
258#[rustfmt::skip]
259#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
260#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
261pub struct StreamConfig {
262    /// Storage class for recent writes.
263    pub storage_class: Option<StorageClass>,
264    /// Retention policy for the stream.
265    /// If unspecified, the default is to retain records for 7 days.
266    pub retention_policy: Option<RetentionPolicy>,
267    /// Timestamping behavior.
268    pub timestamping: Option<TimestampingConfig>,
269    /// Delete-on-empty configuration.
270    #[serde(default)]
271    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
272}
273
274impl StreamConfig {
275    pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
276        let types::config::OptionalStreamConfig {
277            storage_class,
278            retention_policy,
279            timestamping,
280            delete_on_empty,
281        } = config;
282
283        let config = StreamConfig {
284            storage_class: storage_class.map(Into::into),
285            retention_policy: retention_policy.map(Into::into),
286            timestamping: TimestampingConfig::to_opt(timestamping),
287            delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
288        };
289        if config == Self::default() {
290            None
291        } else {
292            Some(config)
293        }
294    }
295}
296
297impl From<types::config::StreamConfig> for StreamConfig {
298    fn from(value: types::config::StreamConfig) -> Self {
299        let types::config::StreamConfig {
300            storage_class,
301            retention_policy,
302            timestamping,
303            delete_on_empty,
304        } = value;
305
306        Self {
307            storage_class: Some(storage_class.into()),
308            retention_policy: Some(retention_policy.into()),
309            timestamping: Some(timestamping.into()),
310            delete_on_empty: Some(delete_on_empty.into()),
311        }
312    }
313}
314
315impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
316    type Error = types::ValidationError;
317
318    fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
319        let StreamConfig {
320            storage_class,
321            retention_policy,
322            timestamping,
323            delete_on_empty,
324        } = value;
325
326        let retention_policy = match retention_policy {
327            None => None,
328            Some(policy) => Some(policy.try_into()?),
329        };
330
331        Ok(Self {
332            storage_class: storage_class.map(Into::into),
333            retention_policy,
334            timestamping: timestamping.map(Into::into).unwrap_or_default(),
335            delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
336        })
337    }
338}
339
340#[rustfmt::skip]
341#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
342#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
343pub struct StreamReconfiguration {
344    /// Storage class for recent writes.
345    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
346    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
347    pub storage_class: Maybe<Option<StorageClass>>,
348    /// Retention policy for the stream.
349    /// If unspecified, the default is to retain records for 7 days.
350    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
351    #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
352    pub retention_policy: Maybe<Option<RetentionPolicy>>,
353    /// Timestamping behavior.
354    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
355    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
356    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
357    /// Delete-on-empty configuration.
358    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
359    #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
360    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
361}
362
363impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
364    type Error = types::ValidationError;
365
366    fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
367        let StreamReconfiguration {
368            storage_class,
369            retention_policy,
370            timestamping,
371            delete_on_empty,
372        } = value;
373
374        Ok(Self {
375            storage_class: storage_class.map_opt(Into::into),
376            retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
377            timestamping: timestamping.map_opt(Into::into),
378            delete_on_empty: delete_on_empty.map_opt(Into::into),
379        })
380    }
381}
382
383impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
384    fn from(value: types::config::StreamReconfiguration) -> Self {
385        let types::config::StreamReconfiguration {
386            storage_class,
387            retention_policy,
388            timestamping,
389            delete_on_empty,
390        } = value;
391
392        Self {
393            storage_class: storage_class.map_opt(Into::into),
394            retention_policy: retention_policy.map_opt(Into::into),
395            timestamping: timestamping.map_opt(Into::into),
396            delete_on_empty: delete_on_empty.map_opt(Into::into),
397        }
398    }
399}
400
401#[rustfmt::skip]
402#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
403#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
404pub struct BasinConfig {
405    /// Default stream configuration.
406    pub default_stream_config: Option<StreamConfig>,
407    /// Create stream on append if it doesn't exist, using the default stream configuration.
408    #[serde(default)]
409    #[cfg_attr(feature = "utoipa", schema(default = false))]
410    pub create_stream_on_append: bool,
411    /// Create stream on read if it doesn't exist, using the default stream configuration.
412    #[serde(default)]
413    #[cfg_attr(feature = "utoipa", schema(default = false))]
414    pub create_stream_on_read: bool,
415}
416
417impl TryFrom<BasinConfig> for types::config::BasinConfig {
418    type Error = types::ValidationError;
419
420    fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
421        let BasinConfig {
422            default_stream_config,
423            create_stream_on_append,
424            create_stream_on_read,
425        } = value;
426
427        Ok(Self {
428            default_stream_config: match default_stream_config {
429                Some(config) => config.try_into()?,
430                None => Default::default(),
431            },
432            create_stream_on_append,
433            create_stream_on_read,
434        })
435    }
436}
437
438impl From<types::config::BasinConfig> for BasinConfig {
439    fn from(value: types::config::BasinConfig) -> Self {
440        let types::config::BasinConfig {
441            default_stream_config,
442            create_stream_on_append,
443            create_stream_on_read,
444        } = value;
445
446        Self {
447            default_stream_config: StreamConfig::to_opt(default_stream_config),
448            create_stream_on_append,
449            create_stream_on_read,
450        }
451    }
452}
453
454#[rustfmt::skip]
455#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
456#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
457pub struct BasinReconfiguration {
458    /// Basin configuration.
459    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
460    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
461    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
462    /// Create a stream on append.
463    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
464    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
465    pub create_stream_on_append: Maybe<bool>,
466    /// Create a stream on read.
467    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
468    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
469    pub create_stream_on_read: Maybe<bool>,
470}
471
472impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
473    type Error = types::ValidationError;
474
475    fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
476        let BasinReconfiguration {
477            default_stream_config,
478            create_stream_on_append,
479            create_stream_on_read,
480        } = value;
481
482        Ok(Self {
483            default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
484            create_stream_on_append: create_stream_on_append.map(Into::into),
485            create_stream_on_read: create_stream_on_read.map(Into::into),
486        })
487    }
488}
489
490impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
491    fn from(value: types::config::BasinReconfiguration) -> Self {
492        let types::config::BasinReconfiguration {
493            default_stream_config,
494            create_stream_on_append,
495            create_stream_on_read,
496        } = value;
497
498        Self {
499            default_stream_config: default_stream_config.map_opt(Into::into),
500            create_stream_on_append: create_stream_on_append.map(Into::into),
501            create_stream_on_read: create_stream_on_read.map(Into::into),
502        }
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use proptest::prelude::*;
509
510    use super::*;
511
512    fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
513        prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
514    }
515
516    fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
517        prop_oneof![
518            Just(TimestampingMode::ClientPrefer),
519            Just(TimestampingMode::ClientRequire),
520            Just(TimestampingMode::Arrival),
521        ]
522    }
523
524    fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
525        prop_oneof![
526            any::<u64>().prop_map(RetentionPolicy::Age),
527            Just(RetentionPolicy::Infinite(InfiniteRetention {})),
528        ]
529    }
530
531    fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
532        (
533            proptest::option::of(gen_timestamping_mode()),
534            proptest::option::of(any::<bool>()),
535        )
536            .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
537    }
538
539    fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
540        any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
541    }
542
543    fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
544        (
545            proptest::option::of(gen_storage_class()),
546            proptest::option::of(gen_retention_policy()),
547            proptest::option::of(gen_timestamping_config()),
548            proptest::option::of(gen_delete_on_empty_config()),
549        )
550            .prop_map(
551                |(storage_class, retention_policy, timestamping, delete_on_empty)| StreamConfig {
552                    storage_class,
553                    retention_policy,
554                    timestamping,
555                    delete_on_empty,
556                },
557            )
558    }
559
560    fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
561        (
562            proptest::option::of(gen_stream_config()),
563            any::<bool>(),
564            any::<bool>(),
565        )
566            .prop_map(
567                |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
568                    BasinConfig {
569                        default_stream_config,
570                        create_stream_on_append,
571                        create_stream_on_read,
572                    }
573                },
574            )
575    }
576
577    fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
578        inner: impl Strategy<Value = T>,
579    ) -> impl Strategy<Value = Maybe<Option<T>>> {
580        prop_oneof![
581            Just(Maybe::Unspecified),
582            Just(Maybe::Specified(None)),
583            inner.prop_map(|v| Maybe::Specified(Some(v))),
584        ]
585    }
586
587    fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
588        (
589            gen_maybe(gen_storage_class()),
590            gen_maybe(gen_retention_policy()),
591            gen_maybe(gen_timestamping_reconfiguration()),
592            gen_maybe(gen_delete_on_empty_reconfiguration()),
593        )
594            .prop_map(
595                |(storage_class, retention_policy, timestamping, delete_on_empty)| {
596                    StreamReconfiguration {
597                        storage_class,
598                        retention_policy,
599                        timestamping,
600                        delete_on_empty,
601                    }
602                },
603            )
604    }
605
606    fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
607        (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
608            .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
609    }
610
611    fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
612    {
613        gen_maybe(any::<u64>())
614            .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
615    }
616
617    fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
618        (
619            gen_maybe(gen_stream_reconfiguration()),
620            prop_oneof![
621                Just(Maybe::Unspecified),
622                any::<bool>().prop_map(Maybe::Specified),
623            ],
624            prop_oneof![
625                Just(Maybe::Unspecified),
626                any::<bool>().prop_map(Maybe::Specified),
627            ],
628        )
629            .prop_map(
630                |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
631                    BasinReconfiguration {
632                        default_stream_config,
633                        create_stream_on_append,
634                        create_stream_on_read,
635                    }
636                },
637            )
638    }
639
640    fn gen_internal_optional_stream_config()
641    -> impl Strategy<Value = types::config::OptionalStreamConfig> {
642        (
643            proptest::option::of(gen_storage_class()),
644            proptest::option::of(gen_retention_policy()),
645            proptest::option::of(gen_timestamping_mode()),
646            proptest::option::of(any::<bool>()),
647            proptest::option::of(any::<u64>()),
648        )
649            .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe)| {
650                types::config::OptionalStreamConfig {
651                    storage_class: sc.map(Into::into),
652                    retention_policy: rp.map(|rp| match rp {
653                        RetentionPolicy::Age(secs) => {
654                            types::config::RetentionPolicy::Age(Duration::from_secs(secs))
655                        }
656                        RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
657                    }),
658                    timestamping: types::config::OptionalTimestampingConfig {
659                        mode: ts_mode.map(Into::into),
660                        uncapped: ts_uncapped,
661                    },
662                    delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
663                        min_age: doe.map(Duration::from_secs),
664                    },
665                }
666            })
667    }
668
669    proptest! {
670        #[test]
671        fn stream_config_conversion_validates(config in gen_stream_config()) {
672            let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
673            let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
674
675            if has_zero_age {
676                prop_assert!(result.is_err());
677            } else {
678                prop_assert!(result.is_ok());
679            }
680        }
681
682        #[test]
683        fn basin_config_conversion_validates(config in gen_basin_config()) {
684            let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
685                matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
686            });
687
688            let result: Result<types::config::BasinConfig, _> = config.try_into();
689
690            if has_invalid_config {
691                prop_assert!(result.is_err());
692            } else {
693                prop_assert!(result.is_ok());
694            }
695        }
696
697        #[test]
698        fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
699            let has_zero_age = matches!(
700                reconfig.retention_policy,
701                Maybe::Specified(Some(RetentionPolicy::Age(0)))
702            );
703            let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
704
705            if has_zero_age {
706                prop_assert!(result.is_err());
707            } else {
708                prop_assert!(result.is_ok());
709            }
710        }
711
712        #[test]
713        fn merge_stream_or_basin_or_default(
714            stream in gen_internal_optional_stream_config(),
715            basin in gen_internal_optional_stream_config(),
716        ) {
717            let merged = stream.clone().merge(basin.clone());
718
719            prop_assert_eq!(
720                merged.storage_class,
721                stream.storage_class.or(basin.storage_class).unwrap_or_default()
722            );
723            prop_assert_eq!(
724                merged.retention_policy,
725                stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
726            );
727            prop_assert_eq!(
728                merged.timestamping.mode,
729                stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
730            );
731            prop_assert_eq!(
732                merged.timestamping.uncapped,
733                stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
734            );
735            prop_assert_eq!(
736                merged.delete_on_empty.min_age,
737                stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
738            );
739        }
740
741        #[test]
742        fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
743            let reconfig = types::config::StreamReconfiguration::default();
744            let result = base.clone().reconfigure(reconfig);
745
746            prop_assert_eq!(result.storage_class, base.storage_class);
747            prop_assert_eq!(result.retention_policy, base.retention_policy);
748            prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
749            prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
750            prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
751        }
752
753        #[test]
754        fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
755            let reconfig = types::config::StreamReconfiguration {
756                storage_class: Maybe::Specified(None),
757                retention_policy: Maybe::Specified(None),
758                timestamping: Maybe::Specified(None),
759                delete_on_empty: Maybe::Specified(None),
760            };
761            let result = base.reconfigure(reconfig);
762
763            prop_assert!(result.storage_class.is_none());
764            prop_assert!(result.retention_policy.is_none());
765            prop_assert!(result.timestamping.mode.is_none());
766            prop_assert!(result.timestamping.uncapped.is_none());
767            prop_assert!(result.delete_on_empty.min_age.is_none());
768        }
769
770        #[test]
771        fn reconfigure_specified_some_sets_value(
772            base in gen_internal_optional_stream_config(),
773            new_sc in gen_storage_class(),
774            new_rp_secs in 1u64..u64::MAX,
775        ) {
776            let reconfig = types::config::StreamReconfiguration {
777                storage_class: Maybe::Specified(Some(new_sc.into())),
778                retention_policy: Maybe::Specified(Some(
779                    types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
780                )),
781                ..Default::default()
782            };
783            let result = base.reconfigure(reconfig);
784
785            prop_assert_eq!(result.storage_class, Some(new_sc.into()));
786            prop_assert_eq!(
787                result.retention_policy,
788                Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
789            );
790        }
791
792        #[test]
793        fn to_opt_returns_some_for_non_defaults(
794            sc in gen_storage_class(),
795            doe_secs in 1u64..u64::MAX,
796            ts_mode in gen_timestamping_mode(),
797        ) {
798            // non-default storage class -> Some
799            let internal = types::config::OptionalStreamConfig {
800                storage_class: Some(sc.into()),
801                ..Default::default()
802            };
803            prop_assert!(StreamConfig::to_opt(internal).is_some());
804
805            // non-zero delete_on_empty -> Some
806            let internal = types::config::OptionalDeleteOnEmptyConfig {
807                min_age: Some(Duration::from_secs(doe_secs)),
808            };
809            let api = DeleteOnEmptyConfig::to_opt(internal);
810            prop_assert!(api.is_some());
811            prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
812
813            // non-default timestamping -> Some
814            let internal = types::config::OptionalTimestampingConfig {
815                mode: Some(ts_mode.into()),
816                uncapped: None,
817            };
818            prop_assert!(TimestampingConfig::to_opt(internal).is_some());
819        }
820
821        #[test]
822        fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
823            let has_zero_age = matches!(
824                &reconfig.default_stream_config,
825                Maybe::Specified(Some(sr)) if matches!(
826                    sr.retention_policy,
827                    Maybe::Specified(Some(RetentionPolicy::Age(0)))
828                )
829            );
830            let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
831
832            if has_zero_age {
833                prop_assert!(result.is_err());
834            } else {
835                prop_assert!(result.is_ok());
836            }
837        }
838
839        #[test]
840        fn reconfigure_basin_unspecified_preserves(
841            base_sc in proptest::option::of(gen_storage_class()),
842            base_on_append in any::<bool>(),
843            base_on_read in any::<bool>(),
844        ) {
845            let base = types::config::BasinConfig {
846                default_stream_config: types::config::OptionalStreamConfig {
847                    storage_class: base_sc.map(Into::into),
848                    ..Default::default()
849                },
850                create_stream_on_append: base_on_append,
851                create_stream_on_read: base_on_read,
852            };
853
854            let reconfig = types::config::BasinReconfiguration::default();
855            let result = base.clone().reconfigure(reconfig);
856
857            prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
858            prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
859            prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
860        }
861
862        #[test]
863        fn reconfigure_basin_specified_updates(
864            base_on_append in any::<bool>(),
865            new_on_append in any::<bool>(),
866            new_sc in gen_storage_class(),
867        ) {
868            let base = types::config::BasinConfig {
869                create_stream_on_append: base_on_append,
870                ..Default::default()
871            };
872
873            let reconfig = types::config::BasinReconfiguration {
874                default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
875                    storage_class: Maybe::Specified(Some(new_sc.into())),
876                    ..Default::default()
877                })),
878                create_stream_on_append: Maybe::Specified(new_on_append),
879                ..Default::default()
880            };
881            let result = base.reconfigure(reconfig);
882
883            prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
884            prop_assert_eq!(result.create_stream_on_append, new_on_append);
885        }
886
887        #[test]
888        fn reconfigure_nested_partial_update(
889            base_mode in gen_timestamping_mode(),
890            base_uncapped in any::<bool>(),
891            new_mode in gen_timestamping_mode(),
892        ) {
893            let base = types::config::OptionalStreamConfig {
894                timestamping: types::config::OptionalTimestampingConfig {
895                    mode: Some(base_mode.into()),
896                    uncapped: Some(base_uncapped),
897                },
898                ..Default::default()
899            };
900
901            let expected_mode: types::config::TimestampingMode = new_mode.into();
902
903            let reconfig = types::config::StreamReconfiguration {
904                timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
905                    mode: Maybe::Specified(Some(expected_mode)),
906                    uncapped: Maybe::Unspecified,
907                })),
908                ..Default::default()
909            };
910            let result = base.reconfigure(reconfig);
911
912            prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
913            prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
914        }
915    }
916
917    #[test]
918    fn to_opt_returns_none_for_defaults() {
919        // default stream config -> None
920        assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
921
922        // delete_on_empty: None or Some(ZERO) -> None
923        let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
924        let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
925            min_age: Some(Duration::ZERO),
926        };
927        assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
928        assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
929
930        // default timestamping -> None
931        assert!(
932            TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
933                .is_none()
934        );
935    }
936
937    #[test]
938    fn empty_json_converts_to_all_none() {
939        let json = serde_json::json!({});
940        let parsed: StreamConfig = serde_json::from_value(json).unwrap();
941        let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
942
943        assert!(
944            internal.storage_class.is_none(),
945            "storage_class should be None"
946        );
947        assert!(
948            internal.retention_policy.is_none(),
949            "retention_policy should be None"
950        );
951        assert!(
952            internal.timestamping.mode.is_none(),
953            "timestamping.mode should be None"
954        );
955        assert!(
956            internal.timestamping.uncapped.is_none(),
957            "timestamping.uncapped should be None"
958        );
959        assert!(
960            internal.delete_on_empty.min_age.is_none(),
961            "delete_on_empty.min_age should be None"
962        );
963    }
964}