Skip to main content

s2_api/v1/
config.rs

1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11    /// Append tail latency under 400 milliseconds with s2.dev.
12    Standard,
13    /// Append tail latency under 40 milliseconds with s2.dev.
14    Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18    fn from(value: StorageClass) -> Self {
19        match value {
20            StorageClass::Express => Self::Express,
21            StorageClass::Standard => Self::Standard,
22        }
23    }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27    fn from(value: types::config::StorageClass) -> Self {
28        match value {
29            types::config::StorageClass::Express => Self::Express,
30            types::config::StorageClass::Standard => Self::Standard,
31        }
32    }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40    /// Age in seconds for automatic trimming of records older than this threshold.
41    /// This must be set to a value greater than 0 seconds.
42    Age(u64),
43    /// Retain records unless explicitly trimmed.
44    Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53    type Error = types::ValidationError;
54
55    fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56        match value {
57            RetentionPolicy::Age(0) => Err(types::ValidationError(
58                "age must be greater than 0 seconds".to_string(),
59            )),
60            RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61            RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62        }
63    }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67    fn from(value: types::config::RetentionPolicy) -> Self {
68        match value {
69            types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70            types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71        }
72    }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80    /// Prefer client-specified timestamp if present otherwise use arrival time.
81    #[default]
82    ClientPrefer,
83    /// Require a client-specified timestamp and reject the append if it is missing.
84    ClientRequire,
85    /// Use the arrival time and ignore any client-specified timestamp.
86    Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90    fn from(value: TimestampingMode) -> Self {
91        match value {
92            TimestampingMode::ClientPrefer => Self::ClientPrefer,
93            TimestampingMode::ClientRequire => Self::ClientRequire,
94            TimestampingMode::Arrival => Self::Arrival,
95        }
96    }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100    fn from(value: types::config::TimestampingMode) -> Self {
101        match value {
102            types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103            types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104            types::config::TimestampingMode::Arrival => Self::Arrival,
105        }
106    }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113    /// Timestamping mode for appends that influences how timestamps are handled.
114    pub mode: Option<TimestampingMode>,
115    /// Allow client-specified timestamps to exceed the arrival time.
116    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
117    pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121    pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122        let config = TimestampingConfig {
123            mode: config.mode.map(Into::into),
124            uncapped: config.uncapped,
125        };
126        if config == Self::default() {
127            None
128        } else {
129            Some(config)
130        }
131    }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135    fn from(value: types::config::TimestampingConfig) -> Self {
136        Self {
137            mode: Some(value.mode.into()),
138            uncapped: Some(value.uncapped),
139        }
140    }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144    fn from(value: TimestampingConfig) -> Self {
145        Self {
146            mode: value.mode.map(Into::into),
147            uncapped: value.uncapped,
148        }
149    }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156    /// Timestamping mode for appends that influences how timestamps are handled.
157    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159    pub mode: Maybe<Option<TimestampingMode>>,
160    /// Allow client-specified timestamps to exceed the arrival time.
161    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163    pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167    fn from(value: TimestampingReconfiguration) -> Self {
168        Self {
169            mode: value.mode.map_opt(Into::into),
170            uncapped: value.uncapped,
171        }
172    }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176    fn from(value: types::config::TimestampingReconfiguration) -> Self {
177        Self {
178            mode: value.mode.map_opt(Into::into),
179            uncapped: value.uncapped,
180        }
181    }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188    /// Minimum age in seconds before an empty stream can be deleted.
189    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
190    #[serde(default)]
191    pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195    pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196        let min_age = config.min_age.unwrap_or_default();
197        if min_age > Duration::ZERO {
198            Some(DeleteOnEmptyConfig {
199                min_age_secs: min_age.as_secs(),
200            })
201        } else {
202            None
203        }
204    }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208    fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209        Self {
210            min_age_secs: value.min_age.as_secs(),
211        }
212    }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216    fn from(value: DeleteOnEmptyConfig) -> Self {
217        Self {
218            min_age: Duration::from_secs(value.min_age_secs),
219        }
220    }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224    fn from(value: DeleteOnEmptyConfig) -> Self {
225        Self {
226            min_age: Some(Duration::from_secs(value.min_age_secs)),
227        }
228    }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235    /// Minimum age in seconds before an empty stream can be deleted.
236    /// Set to 0 to disable delete-on-empty (don't delete automatically).
237    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238    #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239    pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244        Self {
245            min_age: value.min_age_secs.map_opt(Duration::from_secs),
246        }
247    }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251    fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252        Self {
253            min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254        }
255    }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
259#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
260pub enum EncryptionAlgorithm {
261    /// AEGIS-256 authenticated encryption.
262    #[serde(rename = "aegis-256")]
263    Aegis256,
264    /// AES-256-GCM authenticated encryption.
265    #[serde(rename = "aes-256-gcm")]
266    Aes256Gcm,
267}
268
269impl From<EncryptionAlgorithm> for encryption::EncryptionAlgorithm {
270    fn from(value: EncryptionAlgorithm) -> Self {
271        match value {
272            EncryptionAlgorithm::Aegis256 => Self::Aegis256,
273            EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
274        }
275    }
276}
277
278impl From<encryption::EncryptionAlgorithm> for EncryptionAlgorithm {
279    fn from(value: encryption::EncryptionAlgorithm) -> Self {
280        match value {
281            encryption::EncryptionAlgorithm::Aegis256 => Self::Aegis256,
282            encryption::EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
283        }
284    }
285}
286
287#[rustfmt::skip]
288#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
289#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
290pub struct StreamConfig {
291    /// Storage class for recent writes.
292    pub storage_class: Option<StorageClass>,
293    /// Retention policy for the stream.
294    /// If unspecified, the default is to retain records for 7 days.
295    pub retention_policy: Option<RetentionPolicy>,
296    /// Timestamping behavior.
297    pub timestamping: Option<TimestampingConfig>,
298    /// Delete-on-empty configuration.
299    #[serde(default)]
300    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
301}
302
303impl StreamConfig {
304    pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
305        let types::config::OptionalStreamConfig {
306            storage_class,
307            retention_policy,
308            timestamping,
309            delete_on_empty,
310        } = config;
311
312        let config = StreamConfig {
313            storage_class: storage_class.map(Into::into),
314            retention_policy: retention_policy.map(Into::into),
315            timestamping: TimestampingConfig::to_opt(timestamping),
316            delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
317        };
318        if config == Self::default() {
319            None
320        } else {
321            Some(config)
322        }
323    }
324}
325
326impl From<types::config::StreamConfig> for StreamConfig {
327    fn from(value: types::config::StreamConfig) -> Self {
328        let types::config::StreamConfig {
329            storage_class,
330            retention_policy,
331            timestamping,
332            delete_on_empty,
333        } = value;
334
335        Self {
336            storage_class: Some(storage_class.into()),
337            retention_policy: Some(retention_policy.into()),
338            timestamping: Some(timestamping.into()),
339            delete_on_empty: Some(delete_on_empty.into()),
340        }
341    }
342}
343
344impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
345    type Error = types::ValidationError;
346
347    fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
348        let StreamConfig {
349            storage_class,
350            retention_policy,
351            timestamping,
352            delete_on_empty,
353        } = value;
354
355        let retention_policy = match retention_policy {
356            None => None,
357            Some(policy) => Some(policy.try_into()?),
358        };
359
360        Ok(Self {
361            storage_class: storage_class.map(Into::into),
362            retention_policy,
363            timestamping: timestamping.map(Into::into).unwrap_or_default(),
364            delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
365        })
366    }
367}
368
369#[rustfmt::skip]
370#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
371#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
372pub struct StreamReconfiguration {
373    /// Storage class for recent writes.
374    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
375    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
376    pub storage_class: Maybe<Option<StorageClass>>,
377    /// Retention policy for the stream.
378    /// If unspecified, the default is to retain records for 7 days.
379    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
380    #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
381    pub retention_policy: Maybe<Option<RetentionPolicy>>,
382    /// Timestamping behavior.
383    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
384    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
385    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
386    /// Delete-on-empty configuration.
387    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
388    #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
389    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
390}
391
392impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
393    type Error = types::ValidationError;
394
395    fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
396        let StreamReconfiguration {
397            storage_class,
398            retention_policy,
399            timestamping,
400            delete_on_empty,
401        } = value;
402
403        Ok(Self {
404            storage_class: storage_class.map_opt(Into::into),
405            retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
406            timestamping: timestamping.map_opt(Into::into),
407            delete_on_empty: delete_on_empty.map_opt(Into::into),
408        })
409    }
410}
411
412impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
413    fn from(value: types::config::StreamReconfiguration) -> Self {
414        let types::config::StreamReconfiguration {
415            storage_class,
416            retention_policy,
417            timestamping,
418            delete_on_empty,
419        } = value;
420
421        Self {
422            storage_class: storage_class.map_opt(Into::into),
423            retention_policy: retention_policy.map_opt(Into::into),
424            timestamping: timestamping.map_opt(Into::into),
425            delete_on_empty: delete_on_empty.map_opt(Into::into),
426        }
427    }
428}
429
430#[rustfmt::skip]
431#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
432#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
433pub struct BasinConfig {
434    /// Default stream configuration.
435    pub default_stream_config: Option<StreamConfig>,
436    /// Encryption algorithm to apply to newly created streams in the basin.
437    pub stream_cipher: Option<EncryptionAlgorithm>,
438    /// Create stream on append if it doesn't exist, using the default stream configuration.
439    #[serde(default)]
440    #[cfg_attr(feature = "utoipa", schema(default = false))]
441    pub create_stream_on_append: bool,
442    /// Create stream on read if it doesn't exist, using the default stream configuration.
443    #[serde(default)]
444    #[cfg_attr(feature = "utoipa", schema(default = false))]
445    pub create_stream_on_read: bool,
446}
447
448impl TryFrom<BasinConfig> for types::config::BasinConfig {
449    type Error = types::ValidationError;
450
451    fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
452        let BasinConfig {
453            default_stream_config,
454            stream_cipher,
455            create_stream_on_append,
456            create_stream_on_read,
457        } = value;
458
459        Ok(Self {
460            default_stream_config: match default_stream_config {
461                Some(config) => config.try_into()?,
462                None => Default::default(),
463            },
464            stream_cipher: stream_cipher.map(Into::into),
465            create_stream_on_append,
466            create_stream_on_read,
467        })
468    }
469}
470
471impl From<types::config::BasinConfig> for BasinConfig {
472    fn from(value: types::config::BasinConfig) -> Self {
473        let types::config::BasinConfig {
474            default_stream_config,
475            stream_cipher,
476            create_stream_on_append,
477            create_stream_on_read,
478        } = value;
479
480        Self {
481            default_stream_config: StreamConfig::to_opt(default_stream_config),
482            stream_cipher: stream_cipher.map(Into::into),
483            create_stream_on_append,
484            create_stream_on_read,
485        }
486    }
487}
488
489#[rustfmt::skip]
490#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
491#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
492pub struct BasinReconfiguration {
493    /// Basin configuration.
494    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
495    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
496    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
497    /// Encryption algorithm to apply to newly created streams in the basin.
498    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
499    #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionAlgorithm>))]
500    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
501    /// Create a stream on append.
502    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
503    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
504    pub create_stream_on_append: Maybe<bool>,
505    /// Create a stream on read.
506    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
507    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
508    pub create_stream_on_read: Maybe<bool>,
509}
510
511impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
512    type Error = types::ValidationError;
513
514    fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
515        let BasinReconfiguration {
516            default_stream_config,
517            stream_cipher,
518            create_stream_on_append,
519            create_stream_on_read,
520        } = value;
521
522        Ok(Self {
523            default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
524            stream_cipher: stream_cipher.map_opt(Into::into),
525            create_stream_on_append: create_stream_on_append.map(Into::into),
526            create_stream_on_read: create_stream_on_read.map(Into::into),
527        })
528    }
529}
530
531impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
532    fn from(value: types::config::BasinReconfiguration) -> Self {
533        let types::config::BasinReconfiguration {
534            default_stream_config,
535            stream_cipher,
536            create_stream_on_append,
537            create_stream_on_read,
538        } = value;
539
540        Self {
541            default_stream_config: default_stream_config.map_opt(Into::into),
542            stream_cipher: stream_cipher.map_opt(Into::into),
543            create_stream_on_append: create_stream_on_append.map(Into::into),
544            create_stream_on_read: create_stream_on_read.map(Into::into),
545        }
546    }
547}
548
549#[cfg(test)]
550mod tests {
551    use proptest::prelude::*;
552
553    use super::*;
554
555    fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
556        prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
557    }
558
559    fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
560        prop_oneof![
561            Just(TimestampingMode::ClientPrefer),
562            Just(TimestampingMode::ClientRequire),
563            Just(TimestampingMode::Arrival),
564        ]
565    }
566
567    fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
568        prop_oneof![
569            any::<u64>().prop_map(RetentionPolicy::Age),
570            Just(RetentionPolicy::Infinite(InfiniteRetention {})),
571        ]
572    }
573
574    fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
575        (
576            proptest::option::of(gen_timestamping_mode()),
577            proptest::option::of(any::<bool>()),
578        )
579            .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
580    }
581
582    fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
583        any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
584    }
585
586    fn gen_encryption_algorithm() -> impl Strategy<Value = EncryptionAlgorithm> {
587        prop_oneof![
588            Just(EncryptionAlgorithm::Aegis256),
589            Just(EncryptionAlgorithm::Aes256Gcm),
590        ]
591    }
592
593    fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
594        (
595            proptest::option::of(gen_storage_class()),
596            proptest::option::of(gen_retention_policy()),
597            proptest::option::of(gen_timestamping_config()),
598            proptest::option::of(gen_delete_on_empty_config()),
599        )
600            .prop_map(
601                |(storage_class, retention_policy, timestamping, delete_on_empty)| StreamConfig {
602                    storage_class,
603                    retention_policy,
604                    timestamping,
605                    delete_on_empty,
606                },
607            )
608    }
609
610    fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
611        (
612            proptest::option::of(gen_stream_config()),
613            proptest::option::of(gen_encryption_algorithm()),
614            any::<bool>(),
615            any::<bool>(),
616        )
617            .prop_map(
618                |(
619                    default_stream_config,
620                    stream_cipher,
621                    create_stream_on_append,
622                    create_stream_on_read,
623                )| {
624                    BasinConfig {
625                        default_stream_config,
626                        stream_cipher,
627                        create_stream_on_append,
628                        create_stream_on_read,
629                    }
630                },
631            )
632    }
633
634    fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
635        inner: impl Strategy<Value = T>,
636    ) -> impl Strategy<Value = Maybe<Option<T>>> {
637        prop_oneof![
638            Just(Maybe::Unspecified),
639            Just(Maybe::Specified(None)),
640            inner.prop_map(|v| Maybe::Specified(Some(v))),
641        ]
642    }
643
644    fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
645        (
646            gen_maybe(gen_storage_class()),
647            gen_maybe(gen_retention_policy()),
648            gen_maybe(gen_timestamping_reconfiguration()),
649            gen_maybe(gen_delete_on_empty_reconfiguration()),
650        )
651            .prop_map(
652                |(storage_class, retention_policy, timestamping, delete_on_empty)| {
653                    StreamReconfiguration {
654                        storage_class,
655                        retention_policy,
656                        timestamping,
657                        delete_on_empty,
658                    }
659                },
660            )
661    }
662
663    fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
664        (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
665            .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
666    }
667
668    fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
669    {
670        gen_maybe(any::<u64>())
671            .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
672    }
673
674    fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
675        (
676            gen_maybe(gen_stream_reconfiguration()),
677            gen_maybe(gen_encryption_algorithm()),
678            prop_oneof![
679                Just(Maybe::Unspecified),
680                any::<bool>().prop_map(Maybe::Specified),
681            ],
682            prop_oneof![
683                Just(Maybe::Unspecified),
684                any::<bool>().prop_map(Maybe::Specified),
685            ],
686        )
687            .prop_map(
688                |(
689                    default_stream_config,
690                    stream_cipher,
691                    create_stream_on_append,
692                    create_stream_on_read,
693                )| BasinReconfiguration {
694                    default_stream_config,
695                    stream_cipher,
696                    create_stream_on_append,
697                    create_stream_on_read,
698                },
699            )
700    }
701
702    fn gen_internal_optional_stream_config()
703    -> impl Strategy<Value = types::config::OptionalStreamConfig> {
704        (
705            proptest::option::of(gen_storage_class()),
706            proptest::option::of(gen_retention_policy()),
707            proptest::option::of(gen_timestamping_mode()),
708            proptest::option::of(any::<bool>()),
709            proptest::option::of(any::<u64>()),
710        )
711            .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe)| {
712                types::config::OptionalStreamConfig {
713                    storage_class: sc.map(Into::into),
714                    retention_policy: rp.map(|rp| match rp {
715                        RetentionPolicy::Age(secs) => {
716                            types::config::RetentionPolicy::Age(Duration::from_secs(secs))
717                        }
718                        RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
719                    }),
720                    timestamping: types::config::OptionalTimestampingConfig {
721                        mode: ts_mode.map(Into::into),
722                        uncapped: ts_uncapped,
723                    },
724                    delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
725                        min_age: doe.map(Duration::from_secs),
726                    },
727                }
728            })
729    }
730
731    proptest! {
732        #[test]
733        fn stream_config_conversion_validates(config in gen_stream_config()) {
734            let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
735            let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
736
737            if has_zero_age {
738                prop_assert!(result.is_err());
739            } else {
740                prop_assert!(result.is_ok());
741            }
742        }
743
744        #[test]
745        fn basin_config_conversion_validates(config in gen_basin_config()) {
746            let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
747                matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
748            });
749
750            let result: Result<types::config::BasinConfig, _> = config.try_into();
751
752            if has_invalid_config {
753                prop_assert!(result.is_err());
754            } else {
755                prop_assert!(result.is_ok());
756            }
757        }
758
759        #[test]
760        fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
761            let has_zero_age = matches!(
762                reconfig.retention_policy,
763                Maybe::Specified(Some(RetentionPolicy::Age(0)))
764            );
765            let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
766
767            if has_zero_age {
768                prop_assert!(result.is_err());
769            } else {
770                prop_assert!(result.is_ok());
771            }
772        }
773
774        #[test]
775        fn merge_stream_or_basin_or_default(
776            stream in gen_internal_optional_stream_config(),
777            basin in gen_internal_optional_stream_config(),
778        ) {
779            let merged = stream.clone().merge(basin.clone());
780
781            prop_assert_eq!(
782                merged.storage_class,
783                stream.storage_class.or(basin.storage_class).unwrap_or_default()
784            );
785            prop_assert_eq!(
786                merged.retention_policy,
787                stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
788            );
789            prop_assert_eq!(
790                merged.timestamping.mode,
791                stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
792            );
793            prop_assert_eq!(
794                merged.timestamping.uncapped,
795                stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
796            );
797            prop_assert_eq!(
798                merged.delete_on_empty.min_age,
799                stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
800            );
801        }
802
803        #[test]
804        fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
805            let reconfig = types::config::StreamReconfiguration::default();
806            let result = base.clone().reconfigure(reconfig);
807
808            prop_assert_eq!(result.storage_class, base.storage_class);
809            prop_assert_eq!(result.retention_policy, base.retention_policy);
810            prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
811            prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
812            prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
813        }
814
815        #[test]
816        fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
817            let reconfig = types::config::StreamReconfiguration {
818                storage_class: Maybe::Specified(None),
819                retention_policy: Maybe::Specified(None),
820                timestamping: Maybe::Specified(None),
821                delete_on_empty: Maybe::Specified(None),
822            };
823            let result = base.reconfigure(reconfig);
824
825            prop_assert!(result.storage_class.is_none());
826            prop_assert!(result.retention_policy.is_none());
827            prop_assert!(result.timestamping.mode.is_none());
828            prop_assert!(result.timestamping.uncapped.is_none());
829            prop_assert!(result.delete_on_empty.min_age.is_none());
830        }
831
832        #[test]
833        fn reconfigure_specified_some_sets_value(
834            base in gen_internal_optional_stream_config(),
835            new_sc in gen_storage_class(),
836            new_rp_secs in 1u64..u64::MAX,
837        ) {
838            let reconfig = types::config::StreamReconfiguration {
839                storage_class: Maybe::Specified(Some(new_sc.into())),
840                retention_policy: Maybe::Specified(Some(
841                    types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
842                )),
843                ..Default::default()
844            };
845            let result = base.reconfigure(reconfig);
846
847            prop_assert_eq!(result.storage_class, Some(new_sc.into()));
848            prop_assert_eq!(
849                result.retention_policy,
850                Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
851            );
852        }
853
854        #[test]
855        fn to_opt_returns_some_for_non_defaults(
856            sc in gen_storage_class(),
857            doe_secs in 1u64..u64::MAX,
858            ts_mode in gen_timestamping_mode(),
859        ) {
860            // non-default storage class -> Some
861            let internal = types::config::OptionalStreamConfig {
862                storage_class: Some(sc.into()),
863                ..Default::default()
864            };
865            prop_assert!(StreamConfig::to_opt(internal).is_some());
866
867            // non-zero delete_on_empty -> Some
868            let internal = types::config::OptionalDeleteOnEmptyConfig {
869                min_age: Some(Duration::from_secs(doe_secs)),
870            };
871            let api = DeleteOnEmptyConfig::to_opt(internal);
872            prop_assert!(api.is_some());
873            prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
874
875            // non-default timestamping -> Some
876            let internal = types::config::OptionalTimestampingConfig {
877                mode: Some(ts_mode.into()),
878                uncapped: None,
879            };
880            prop_assert!(TimestampingConfig::to_opt(internal).is_some());
881        }
882
883        #[test]
884        fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
885            let has_zero_age = matches!(
886                &reconfig.default_stream_config,
887                Maybe::Specified(Some(sr)) if matches!(
888                    sr.retention_policy,
889                    Maybe::Specified(Some(RetentionPolicy::Age(0)))
890                )
891            );
892            let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
893
894            if has_zero_age {
895                prop_assert!(result.is_err());
896            } else {
897                prop_assert!(result.is_ok());
898            }
899        }
900
901        #[test]
902        fn reconfigure_basin_unspecified_preserves(
903            base_sc in proptest::option::of(gen_storage_class()),
904            base_algorithm in proptest::option::of(gen_encryption_algorithm()),
905            base_on_append in any::<bool>(),
906            base_on_read in any::<bool>(),
907        ) {
908            let base = types::config::BasinConfig {
909                default_stream_config: types::config::OptionalStreamConfig {
910                    storage_class: base_sc.map(Into::into),
911                    ..Default::default()
912                },
913                stream_cipher: base_algorithm.map(Into::into),
914                create_stream_on_append: base_on_append,
915                create_stream_on_read: base_on_read,
916            };
917
918            let reconfig = types::config::BasinReconfiguration::default();
919            let result = base.clone().reconfigure(reconfig);
920
921            prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
922            prop_assert_eq!(result.stream_cipher, base.stream_cipher);
923            prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
924            prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
925        }
926
927        #[test]
928        fn reconfigure_basin_specified_updates(
929            base_on_append in any::<bool>(),
930            new_on_append in any::<bool>(),
931            new_sc in gen_storage_class(),
932            new_algorithm in gen_encryption_algorithm(),
933        ) {
934            let base = types::config::BasinConfig {
935                create_stream_on_append: base_on_append,
936                ..Default::default()
937            };
938
939            let reconfig = types::config::BasinReconfiguration {
940                default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
941                    storage_class: Maybe::Specified(Some(new_sc.into())),
942                    ..Default::default()
943                })),
944                stream_cipher: Maybe::Specified(Some(new_algorithm.into())),
945                create_stream_on_append: Maybe::Specified(new_on_append),
946                ..Default::default()
947            };
948            let result = base.reconfigure(reconfig);
949
950            prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
951            prop_assert_eq!(result.stream_cipher, Some(new_algorithm.into()));
952            prop_assert_eq!(result.create_stream_on_append, new_on_append);
953        }
954
955        #[test]
956        fn reconfigure_nested_partial_update(
957            base_mode in gen_timestamping_mode(),
958            base_uncapped in any::<bool>(),
959            new_mode in gen_timestamping_mode(),
960        ) {
961            let base = types::config::OptionalStreamConfig {
962                timestamping: types::config::OptionalTimestampingConfig {
963                    mode: Some(base_mode.into()),
964                    uncapped: Some(base_uncapped),
965                },
966                ..Default::default()
967            };
968
969            let expected_mode: types::config::TimestampingMode = new_mode.into();
970
971            let reconfig = types::config::StreamReconfiguration {
972                timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
973                    mode: Maybe::Specified(Some(expected_mode)),
974                    uncapped: Maybe::Unspecified,
975                })),
976                ..Default::default()
977            };
978            let result = base.reconfigure(reconfig);
979
980            prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
981            prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
982        }
983    }
984
985    #[test]
986    fn to_opt_returns_none_for_defaults() {
987        // default stream config -> None
988        assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
989
990        // delete_on_empty: None or Some(ZERO) -> None
991        let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
992        let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
993            min_age: Some(Duration::ZERO),
994        };
995        assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
996        assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
997
998        // default timestamping -> None
999        assert!(
1000            TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1001                .is_none()
1002        );
1003    }
1004
1005    #[test]
1006    fn empty_json_converts_to_all_none() {
1007        let json = serde_json::json!({});
1008        let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1009        let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1010
1011        assert!(
1012            internal.storage_class.is_none(),
1013            "storage_class should be None"
1014        );
1015        assert!(
1016            internal.retention_policy.is_none(),
1017            "retention_policy should be None"
1018        );
1019        assert!(
1020            internal.timestamping.mode.is_none(),
1021            "timestamping.mode should be None"
1022        );
1023        assert!(
1024            internal.timestamping.uncapped.is_none(),
1025            "timestamping.uncapped should be None"
1026        );
1027        assert!(
1028            internal.delete_on_empty.min_age.is_none(),
1029            "delete_on_empty.min_age should be None"
1030        );
1031    }
1032}