Skip to main content

s2_api/v1/
config.rs

1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11    /// Append tail latency under 400 milliseconds with s2.dev.
12    Standard,
13    /// Append tail latency under 40 milliseconds with s2.dev.
14    Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18    fn from(value: StorageClass) -> Self {
19        match value {
20            StorageClass::Express => Self::Express,
21            StorageClass::Standard => Self::Standard,
22        }
23    }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27    fn from(value: types::config::StorageClass) -> Self {
28        match value {
29            types::config::StorageClass::Express => Self::Express,
30            types::config::StorageClass::Standard => Self::Standard,
31        }
32    }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40    /// Age in seconds for automatic trimming of records older than this threshold.
41    /// This must be set to a value greater than 0 seconds.
42    Age(u64),
43    /// Retain records unless explicitly trimmed.
44    Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53    type Error = types::ValidationError;
54
55    fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56        match value {
57            RetentionPolicy::Age(0) => Err(types::ValidationError(
58                "age must be greater than 0 seconds".to_string(),
59            )),
60            RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61            RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62        }
63    }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67    fn from(value: types::config::RetentionPolicy) -> Self {
68        match value {
69            types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70            types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71        }
72    }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80    /// Prefer client-specified timestamp if present otherwise use arrival time.
81    #[default]
82    ClientPrefer,
83    /// Require a client-specified timestamp and reject the append if it is missing.
84    ClientRequire,
85    /// Use the arrival time and ignore any client-specified timestamp.
86    Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90    fn from(value: TimestampingMode) -> Self {
91        match value {
92            TimestampingMode::ClientPrefer => Self::ClientPrefer,
93            TimestampingMode::ClientRequire => Self::ClientRequire,
94            TimestampingMode::Arrival => Self::Arrival,
95        }
96    }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100    fn from(value: types::config::TimestampingMode) -> Self {
101        match value {
102            types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103            types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104            types::config::TimestampingMode::Arrival => Self::Arrival,
105        }
106    }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113    /// Timestamping mode for appends that influences how timestamps are handled.
114    pub mode: Option<TimestampingMode>,
115    /// Allow client-specified timestamps to exceed the arrival time.
116    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
117    pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121    pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122        let config = TimestampingConfig {
123            mode: config.mode.map(Into::into),
124            uncapped: config.uncapped,
125        };
126        if config == Self::default() {
127            None
128        } else {
129            Some(config)
130        }
131    }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135    fn from(value: types::config::TimestampingConfig) -> Self {
136        Self {
137            mode: Some(value.mode.into()),
138            uncapped: Some(value.uncapped),
139        }
140    }
141}
142
143impl From<types::config::OptionalTimestampingConfig> for TimestampingConfig {
144    fn from(value: types::config::OptionalTimestampingConfig) -> Self {
145        Self {
146            mode: value.mode.map(Into::into),
147            uncapped: value.uncapped,
148        }
149    }
150}
151
152impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
153    fn from(value: TimestampingConfig) -> Self {
154        Self {
155            mode: value.mode.map(Into::into),
156            uncapped: value.uncapped,
157        }
158    }
159}
160
161#[rustfmt::skip]
162#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
163#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
164pub struct TimestampingReconfiguration {
165    /// Timestamping mode for appends that influences how timestamps are handled.
166    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
167    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
168    pub mode: Maybe<Option<TimestampingMode>>,
169    /// Allow client-specified timestamps to exceed the arrival time.
170    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
171    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
172    pub uncapped: Maybe<Option<bool>>,
173}
174
175impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
176    fn from(value: TimestampingReconfiguration) -> Self {
177        Self {
178            mode: value.mode.map_opt(Into::into),
179            uncapped: value.uncapped,
180        }
181    }
182}
183
184impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
185    fn from(value: types::config::TimestampingReconfiguration) -> Self {
186        Self {
187            mode: value.mode.map_opt(Into::into),
188            uncapped: value.uncapped,
189        }
190    }
191}
192
193#[rustfmt::skip]
194#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
195#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
196pub struct DeleteOnEmptyConfig {
197    /// Minimum age in seconds before an empty stream can be deleted.
198    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
199    #[serde(default)]
200    pub min_age_secs: u64,
201}
202
203impl DeleteOnEmptyConfig {
204    pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
205        config.min_age.map(|min_age| DeleteOnEmptyConfig {
206            min_age_secs: min_age.as_secs(),
207        })
208    }
209}
210
211impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
212    fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
213        Self {
214            min_age_secs: value.min_age.as_secs(),
215        }
216    }
217}
218
219impl From<types::config::OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
220    fn from(value: types::config::OptionalDeleteOnEmptyConfig) -> Self {
221        Self {
222            min_age_secs: value.min_age.unwrap_or_default().as_secs(),
223        }
224    }
225}
226
227impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
228    fn from(value: DeleteOnEmptyConfig) -> Self {
229        Self {
230            min_age: Duration::from_secs(value.min_age_secs),
231        }
232    }
233}
234
235impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
236    fn from(value: DeleteOnEmptyConfig) -> Self {
237        Self {
238            min_age: Some(Duration::from_secs(value.min_age_secs)),
239        }
240    }
241}
242
243#[rustfmt::skip]
244#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
245#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
246pub struct DeleteOnEmptyReconfiguration {
247    /// Minimum age in seconds before an empty stream can be deleted.
248    /// Set to 0 to disable delete-on-empty (don't delete automatically).
249    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
250    #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
251    pub min_age_secs: Maybe<Option<u64>>,
252}
253
254impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
255    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
256        Self {
257            min_age: value.min_age_secs.map_opt(Duration::from_secs),
258        }
259    }
260}
261
262impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
263    fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
264        Self {
265            min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
266        }
267    }
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
271#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
272pub enum EncryptionAlgorithm {
273    /// AEGIS-256 authenticated encryption.
274    #[serde(rename = "aegis-256")]
275    Aegis256,
276    /// AES-256-GCM authenticated encryption.
277    #[serde(rename = "aes-256-gcm")]
278    Aes256Gcm,
279}
280
281impl From<EncryptionAlgorithm> for encryption::EncryptionAlgorithm {
282    fn from(value: EncryptionAlgorithm) -> Self {
283        match value {
284            EncryptionAlgorithm::Aegis256 => Self::Aegis256,
285            EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
286        }
287    }
288}
289
290impl From<encryption::EncryptionAlgorithm> for EncryptionAlgorithm {
291    fn from(value: encryption::EncryptionAlgorithm) -> Self {
292        match value {
293            encryption::EncryptionAlgorithm::Aegis256 => Self::Aegis256,
294            encryption::EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
295        }
296    }
297}
298
299#[rustfmt::skip]
300#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
301#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
302pub struct StreamConfig {
303    /// Storage class for recent writes.
304    pub storage_class: Option<StorageClass>,
305    /// Retention policy for the stream.
306    /// If unspecified, the default is to retain records for 7 days.
307    pub retention_policy: Option<RetentionPolicy>,
308    /// Timestamping behavior.
309    pub timestamping: Option<TimestampingConfig>,
310    /// Delete-on-empty configuration.
311    #[serde(default)]
312    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
313}
314
315impl StreamConfig {
316    pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
317        let types::config::OptionalStreamConfig {
318            storage_class,
319            retention_policy,
320            timestamping,
321            delete_on_empty,
322        } = config;
323
324        let config = StreamConfig {
325            storage_class: storage_class.map(Into::into),
326            retention_policy: retention_policy.map(Into::into),
327            timestamping: TimestampingConfig::to_opt(timestamping),
328            delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
329        };
330        if config == Self::default() {
331            None
332        } else {
333            Some(config)
334        }
335    }
336}
337
338impl From<types::config::StreamConfig> for StreamConfig {
339    fn from(value: types::config::StreamConfig) -> Self {
340        let types::config::StreamConfig {
341            storage_class,
342            retention_policy,
343            timestamping,
344            delete_on_empty,
345        } = value;
346
347        Self {
348            storage_class: Some(storage_class.into()),
349            retention_policy: Some(retention_policy.into()),
350            timestamping: Some(timestamping.into()),
351            delete_on_empty: Some(delete_on_empty.into()),
352        }
353    }
354}
355
356impl From<types::config::OptionalStreamConfig> for StreamConfig {
357    fn from(value: types::config::OptionalStreamConfig) -> Self {
358        let types::config::OptionalStreamConfig {
359            storage_class,
360            retention_policy,
361            timestamping,
362            delete_on_empty,
363        } = value;
364
365        let timestamping = (timestamping.mode.is_some() || timestamping.uncapped.is_some())
366            .then(|| timestamping.into());
367        let delete_on_empty = delete_on_empty.min_age.map(|_| delete_on_empty.into());
368
369        Self {
370            storage_class: storage_class.map(Into::into),
371            retention_policy: retention_policy.map(Into::into),
372            timestamping,
373            delete_on_empty,
374        }
375    }
376}
377
378impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
379    type Error = types::ValidationError;
380
381    fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
382        let StreamConfig {
383            storage_class,
384            retention_policy,
385            timestamping,
386            delete_on_empty,
387        } = value;
388
389        let retention_policy = match retention_policy {
390            None => None,
391            Some(policy) => Some(policy.try_into()?),
392        };
393
394        Ok(Self {
395            storage_class: storage_class.map(Into::into),
396            retention_policy,
397            timestamping: timestamping.map(Into::into).unwrap_or_default(),
398            delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
399        })
400    }
401}
402
403#[rustfmt::skip]
404#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
405#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
406pub struct StreamReconfiguration {
407    /// Storage class for recent writes.
408    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
409    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
410    pub storage_class: Maybe<Option<StorageClass>>,
411    /// Retention policy for the stream.
412    /// If unspecified, the default is to retain records for 7 days.
413    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
414    #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
415    pub retention_policy: Maybe<Option<RetentionPolicy>>,
416    /// Timestamping behavior.
417    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
418    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
419    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
420    /// Delete-on-empty configuration.
421    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
422    #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
423    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
424}
425
426impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
427    type Error = types::ValidationError;
428
429    fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
430        let StreamReconfiguration {
431            storage_class,
432            retention_policy,
433            timestamping,
434            delete_on_empty,
435        } = value;
436
437        Ok(Self {
438            storage_class: storage_class.map_opt(Into::into),
439            retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
440            timestamping: timestamping.map_opt(Into::into),
441            delete_on_empty: delete_on_empty.map_opt(Into::into),
442        })
443    }
444}
445
446impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
447    fn from(value: types::config::StreamReconfiguration) -> Self {
448        let types::config::StreamReconfiguration {
449            storage_class,
450            retention_policy,
451            timestamping,
452            delete_on_empty,
453        } = value;
454
455        Self {
456            storage_class: storage_class.map_opt(Into::into),
457            retention_policy: retention_policy.map_opt(Into::into),
458            timestamping: timestamping.map_opt(Into::into),
459            delete_on_empty: delete_on_empty.map_opt(Into::into),
460        }
461    }
462}
463
464#[rustfmt::skip]
465#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
466#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
467pub struct BasinConfig {
468    /// Default stream configuration.
469    pub default_stream_config: Option<StreamConfig>,
470    /// Encryption algorithm to apply to newly created streams in the basin.
471    pub stream_cipher: Option<EncryptionAlgorithm>,
472    /// Create stream on append if it doesn't exist, using the default stream configuration.
473    #[serde(default)]
474    #[cfg_attr(feature = "utoipa", schema(default = false))]
475    pub create_stream_on_append: bool,
476    /// Create stream on read if it doesn't exist, using the default stream configuration.
477    #[serde(default)]
478    #[cfg_attr(feature = "utoipa", schema(default = false))]
479    pub create_stream_on_read: bool,
480}
481
482impl TryFrom<BasinConfig> for types::config::BasinConfig {
483    type Error = types::ValidationError;
484
485    fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
486        let BasinConfig {
487            default_stream_config,
488            stream_cipher,
489            create_stream_on_append,
490            create_stream_on_read,
491        } = value;
492
493        Ok(Self {
494            default_stream_config: match default_stream_config {
495                Some(config) => config.try_into()?,
496                None => Default::default(),
497            },
498            stream_cipher: stream_cipher.map(Into::into),
499            create_stream_on_append,
500            create_stream_on_read,
501        })
502    }
503}
504
505impl From<types::config::BasinConfig> for BasinConfig {
506    fn from(value: types::config::BasinConfig) -> Self {
507        let types::config::BasinConfig {
508            default_stream_config,
509            stream_cipher,
510            create_stream_on_append,
511            create_stream_on_read,
512        } = value;
513
514        Self {
515            default_stream_config: StreamConfig::to_opt(default_stream_config),
516            stream_cipher: stream_cipher.map(Into::into),
517            create_stream_on_append,
518            create_stream_on_read,
519        }
520    }
521}
522
523#[rustfmt::skip]
524#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
525#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
526pub struct BasinReconfiguration {
527    /// Basin configuration.
528    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
529    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
530    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
531    /// Encryption algorithm to apply to newly created streams in the basin.
532    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
533    #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionAlgorithm>))]
534    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
535    /// Create a stream on append.
536    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
537    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
538    pub create_stream_on_append: Maybe<bool>,
539    /// Create a stream on read.
540    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
541    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
542    pub create_stream_on_read: Maybe<bool>,
543}
544
545impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
546    type Error = types::ValidationError;
547
548    fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
549        let BasinReconfiguration {
550            default_stream_config,
551            stream_cipher,
552            create_stream_on_append,
553            create_stream_on_read,
554        } = value;
555
556        Ok(Self {
557            default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
558            stream_cipher: stream_cipher.map_opt(Into::into),
559            create_stream_on_append: create_stream_on_append.map(Into::into),
560            create_stream_on_read: create_stream_on_read.map(Into::into),
561        })
562    }
563}
564
565impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
566    fn from(value: types::config::BasinReconfiguration) -> Self {
567        let types::config::BasinReconfiguration {
568            default_stream_config,
569            stream_cipher,
570            create_stream_on_append,
571            create_stream_on_read,
572        } = value;
573
574        Self {
575            default_stream_config: default_stream_config.map_opt(Into::into),
576            stream_cipher: stream_cipher.map_opt(Into::into),
577            create_stream_on_append: create_stream_on_append.map(Into::into),
578            create_stream_on_read: create_stream_on_read.map(Into::into),
579        }
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use proptest::prelude::*;
586
587    use super::*;
588
589    fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
590        prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
591    }
592
593    fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
594        prop_oneof![
595            Just(TimestampingMode::ClientPrefer),
596            Just(TimestampingMode::ClientRequire),
597            Just(TimestampingMode::Arrival),
598        ]
599    }
600
601    fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
602        prop_oneof![
603            any::<u64>().prop_map(RetentionPolicy::Age),
604            Just(RetentionPolicy::Infinite(InfiniteRetention {})),
605        ]
606    }
607
608    fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
609        (
610            proptest::option::of(gen_timestamping_mode()),
611            proptest::option::of(any::<bool>()),
612        )
613            .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
614    }
615
616    fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
617        any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
618    }
619
620    fn gen_encryption_algorithm() -> impl Strategy<Value = EncryptionAlgorithm> {
621        prop_oneof![
622            Just(EncryptionAlgorithm::Aegis256),
623            Just(EncryptionAlgorithm::Aes256Gcm),
624        ]
625    }
626
627    fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
628        (
629            proptest::option::of(gen_storage_class()),
630            proptest::option::of(gen_retention_policy()),
631            proptest::option::of(gen_timestamping_config()),
632            proptest::option::of(gen_delete_on_empty_config()),
633        )
634            .prop_map(
635                |(storage_class, retention_policy, timestamping, delete_on_empty)| StreamConfig {
636                    storage_class,
637                    retention_policy,
638                    timestamping,
639                    delete_on_empty,
640                },
641            )
642    }
643
644    fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
645        (
646            proptest::option::of(gen_stream_config()),
647            proptest::option::of(gen_encryption_algorithm()),
648            any::<bool>(),
649            any::<bool>(),
650        )
651            .prop_map(
652                |(
653                    default_stream_config,
654                    stream_cipher,
655                    create_stream_on_append,
656                    create_stream_on_read,
657                )| {
658                    BasinConfig {
659                        default_stream_config,
660                        stream_cipher,
661                        create_stream_on_append,
662                        create_stream_on_read,
663                    }
664                },
665            )
666    }
667
668    fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
669        inner: impl Strategy<Value = T>,
670    ) -> impl Strategy<Value = Maybe<Option<T>>> {
671        prop_oneof![
672            Just(Maybe::Unspecified),
673            Just(Maybe::Specified(None)),
674            inner.prop_map(|v| Maybe::Specified(Some(v))),
675        ]
676    }
677
678    fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
679        (
680            gen_maybe(gen_storage_class()),
681            gen_maybe(gen_retention_policy()),
682            gen_maybe(gen_timestamping_reconfiguration()),
683            gen_maybe(gen_delete_on_empty_reconfiguration()),
684        )
685            .prop_map(
686                |(storage_class, retention_policy, timestamping, delete_on_empty)| {
687                    StreamReconfiguration {
688                        storage_class,
689                        retention_policy,
690                        timestamping,
691                        delete_on_empty,
692                    }
693                },
694            )
695    }
696
697    fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
698        (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
699            .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
700    }
701
702    fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
703    {
704        gen_maybe(any::<u64>())
705            .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
706    }
707
708    fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
709        (
710            gen_maybe(gen_stream_reconfiguration()),
711            gen_maybe(gen_encryption_algorithm()),
712            prop_oneof![
713                Just(Maybe::Unspecified),
714                any::<bool>().prop_map(Maybe::Specified),
715            ],
716            prop_oneof![
717                Just(Maybe::Unspecified),
718                any::<bool>().prop_map(Maybe::Specified),
719            ],
720        )
721            .prop_map(
722                |(
723                    default_stream_config,
724                    stream_cipher,
725                    create_stream_on_append,
726                    create_stream_on_read,
727                )| BasinReconfiguration {
728                    default_stream_config,
729                    stream_cipher,
730                    create_stream_on_append,
731                    create_stream_on_read,
732                },
733            )
734    }
735
736    fn gen_internal_optional_stream_config()
737    -> impl Strategy<Value = types::config::OptionalStreamConfig> {
738        (
739            proptest::option::of(gen_storage_class()),
740            proptest::option::of(gen_retention_policy()),
741            proptest::option::of(gen_timestamping_mode()),
742            proptest::option::of(any::<bool>()),
743            proptest::option::of(any::<u64>()),
744        )
745            .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe)| {
746                types::config::OptionalStreamConfig {
747                    storage_class: sc.map(Into::into),
748                    retention_policy: rp.map(|rp| match rp {
749                        RetentionPolicy::Age(secs) => {
750                            types::config::RetentionPolicy::Age(Duration::from_secs(secs))
751                        }
752                        RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
753                    }),
754                    timestamping: types::config::OptionalTimestampingConfig {
755                        mode: ts_mode.map(Into::into),
756                        uncapped: ts_uncapped,
757                    },
758                    delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
759                        min_age: doe.map(Duration::from_secs),
760                    },
761                }
762            })
763    }
764
765    proptest! {
766        #[test]
767        fn stream_config_conversion_validates(config in gen_stream_config()) {
768            let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
769            let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
770
771            if has_zero_age {
772                prop_assert!(result.is_err());
773            } else {
774                prop_assert!(result.is_ok());
775            }
776        }
777
778        #[test]
779        fn basin_config_conversion_validates(config in gen_basin_config()) {
780            let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
781                matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
782            });
783
784            let result: Result<types::config::BasinConfig, _> = config.try_into();
785
786            if has_invalid_config {
787                prop_assert!(result.is_err());
788            } else {
789                prop_assert!(result.is_ok());
790            }
791        }
792
793        #[test]
794        fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
795            let has_zero_age = matches!(
796                reconfig.retention_policy,
797                Maybe::Specified(Some(RetentionPolicy::Age(0)))
798            );
799            let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
800
801            if has_zero_age {
802                prop_assert!(result.is_err());
803            } else {
804                prop_assert!(result.is_ok());
805            }
806        }
807
808        #[test]
809        fn merge_stream_or_basin_or_default(
810            stream in gen_internal_optional_stream_config(),
811            basin in gen_internal_optional_stream_config(),
812        ) {
813            let merged = stream.clone().merge(basin.clone());
814
815            prop_assert_eq!(
816                merged.storage_class,
817                stream.storage_class.or(basin.storage_class).unwrap_or_default()
818            );
819            prop_assert_eq!(
820                merged.retention_policy,
821                stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
822            );
823            prop_assert_eq!(
824                merged.timestamping.mode,
825                stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
826            );
827            prop_assert_eq!(
828                merged.timestamping.uncapped,
829                stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
830            );
831            prop_assert_eq!(
832                merged.delete_on_empty.min_age,
833                stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
834            );
835        }
836
837        #[test]
838        fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
839            let reconfig = types::config::StreamReconfiguration::default();
840            let result = base.clone().reconfigure(reconfig);
841
842            prop_assert_eq!(result.storage_class, base.storage_class);
843            prop_assert_eq!(result.retention_policy, base.retention_policy);
844            prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
845            prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
846            prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
847        }
848
849        #[test]
850        fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
851            let reconfig = types::config::StreamReconfiguration {
852                storage_class: Maybe::Specified(None),
853                retention_policy: Maybe::Specified(None),
854                timestamping: Maybe::Specified(None),
855                delete_on_empty: Maybe::Specified(None),
856            };
857            let result = base.reconfigure(reconfig);
858
859            prop_assert!(result.storage_class.is_none());
860            prop_assert!(result.retention_policy.is_none());
861            prop_assert!(result.timestamping.mode.is_none());
862            prop_assert!(result.timestamping.uncapped.is_none());
863            prop_assert!(result.delete_on_empty.min_age.is_none());
864        }
865
866        #[test]
867        fn reconfigure_specified_some_sets_value(
868            base in gen_internal_optional_stream_config(),
869            new_sc in gen_storage_class(),
870            new_rp_secs in 1u64..u64::MAX,
871        ) {
872            let reconfig = types::config::StreamReconfiguration {
873                storage_class: Maybe::Specified(Some(new_sc.into())),
874                retention_policy: Maybe::Specified(Some(
875                    types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
876                )),
877                ..Default::default()
878            };
879            let result = base.reconfigure(reconfig);
880
881            prop_assert_eq!(result.storage_class, Some(new_sc.into()));
882            prop_assert_eq!(
883                result.retention_policy,
884                Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
885            );
886        }
887
888        #[test]
889        fn to_opt_returns_some_for_non_defaults(
890            sc in gen_storage_class(),
891            doe_secs in 1u64..u64::MAX,
892            ts_mode in gen_timestamping_mode(),
893        ) {
894            // non-default storage class -> Some
895            let internal = types::config::OptionalStreamConfig {
896                storage_class: Some(sc.into()),
897                ..Default::default()
898            };
899            prop_assert!(StreamConfig::to_opt(internal).is_some());
900
901            // non-zero delete_on_empty -> Some
902            let internal = types::config::OptionalDeleteOnEmptyConfig {
903                min_age: Some(Duration::from_secs(doe_secs)),
904            };
905            let api = DeleteOnEmptyConfig::to_opt(internal);
906            prop_assert!(api.is_some());
907            prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
908
909            // non-default timestamping -> Some
910            let internal = types::config::OptionalTimestampingConfig {
911                mode: Some(ts_mode.into()),
912                uncapped: None,
913            };
914            prop_assert!(TimestampingConfig::to_opt(internal).is_some());
915        }
916
917        #[test]
918        fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
919            let has_zero_age = matches!(
920                &reconfig.default_stream_config,
921                Maybe::Specified(Some(sr)) if matches!(
922                    sr.retention_policy,
923                    Maybe::Specified(Some(RetentionPolicy::Age(0)))
924                )
925            );
926            let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
927
928            if has_zero_age {
929                prop_assert!(result.is_err());
930            } else {
931                prop_assert!(result.is_ok());
932            }
933        }
934
935        #[test]
936        fn reconfigure_basin_unspecified_preserves(
937            base_sc in proptest::option::of(gen_storage_class()),
938            base_algorithm in proptest::option::of(gen_encryption_algorithm()),
939            base_on_append in any::<bool>(),
940            base_on_read in any::<bool>(),
941        ) {
942            let base = types::config::BasinConfig {
943                default_stream_config: types::config::OptionalStreamConfig {
944                    storage_class: base_sc.map(Into::into),
945                    ..Default::default()
946                },
947                stream_cipher: base_algorithm.map(Into::into),
948                create_stream_on_append: base_on_append,
949                create_stream_on_read: base_on_read,
950            };
951
952            let reconfig = types::config::BasinReconfiguration::default();
953            let result = base.clone().reconfigure(reconfig);
954
955            prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
956            prop_assert_eq!(result.stream_cipher, base.stream_cipher);
957            prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
958            prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
959        }
960
961        #[test]
962        fn reconfigure_basin_specified_updates(
963            base_on_append in any::<bool>(),
964            new_on_append in any::<bool>(),
965            new_sc in gen_storage_class(),
966            new_algorithm in gen_encryption_algorithm(),
967        ) {
968            let base = types::config::BasinConfig {
969                create_stream_on_append: base_on_append,
970                ..Default::default()
971            };
972
973            let reconfig = types::config::BasinReconfiguration {
974                default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
975                    storage_class: Maybe::Specified(Some(new_sc.into())),
976                    ..Default::default()
977                })),
978                stream_cipher: Maybe::Specified(Some(new_algorithm.into())),
979                create_stream_on_append: Maybe::Specified(new_on_append),
980                ..Default::default()
981            };
982            let result = base.reconfigure(reconfig);
983
984            prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
985            prop_assert_eq!(result.stream_cipher, Some(new_algorithm.into()));
986            prop_assert_eq!(result.create_stream_on_append, new_on_append);
987        }
988
989        #[test]
990        fn reconfigure_nested_partial_update(
991            base_mode in gen_timestamping_mode(),
992            base_uncapped in any::<bool>(),
993            new_mode in gen_timestamping_mode(),
994        ) {
995            let base = types::config::OptionalStreamConfig {
996                timestamping: types::config::OptionalTimestampingConfig {
997                    mode: Some(base_mode.into()),
998                    uncapped: Some(base_uncapped),
999                },
1000                ..Default::default()
1001            };
1002
1003            let expected_mode: types::config::TimestampingMode = new_mode.into();
1004
1005            let reconfig = types::config::StreamReconfiguration {
1006                timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1007                    mode: Maybe::Specified(Some(expected_mode)),
1008                    uncapped: Maybe::Unspecified,
1009                })),
1010                ..Default::default()
1011            };
1012            let result = base.reconfigure(reconfig);
1013
1014            prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1015            prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1016        }
1017    }
1018
1019    #[test]
1020    fn to_opt_returns_none_for_defaults() {
1021        // default stream config -> None
1022        assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1023
1024        // delete_on_empty: None -> None
1025        let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1026        assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1027
1028        // default timestamping -> None
1029        assert!(
1030            TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1031                .is_none()
1032        );
1033    }
1034
1035    #[test]
1036    fn optional_stream_config_into_api_preserves_explicit_zero_delete_on_empty() {
1037        let api: StreamConfig = types::config::OptionalStreamConfig {
1038            delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
1039                min_age: Some(Duration::ZERO),
1040            },
1041            ..Default::default()
1042        }
1043        .into();
1044
1045        assert_eq!(
1046            api.delete_on_empty,
1047            Some(DeleteOnEmptyConfig { min_age_secs: 0 })
1048        );
1049    }
1050
1051    #[test]
1052    fn optional_stream_config_to_opt_preserves_explicit_zero_delete_on_empty() {
1053        let api = StreamConfig::to_opt(types::config::OptionalStreamConfig {
1054            delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
1055                min_age: Some(Duration::ZERO),
1056            },
1057            ..Default::default()
1058        })
1059        .unwrap();
1060
1061        assert_eq!(
1062            api.delete_on_empty,
1063            Some(DeleteOnEmptyConfig { min_age_secs: 0 })
1064        );
1065    }
1066
1067    #[test]
1068    fn optional_stream_config_into_api_preserves_nested_timestamping_omission() {
1069        let api: StreamConfig = types::config::OptionalStreamConfig {
1070            timestamping: types::config::OptionalTimestampingConfig {
1071                mode: Some(types::config::TimestampingMode::Arrival),
1072                uncapped: None,
1073            },
1074            ..Default::default()
1075        }
1076        .into();
1077
1078        assert_eq!(
1079            api.timestamping,
1080            Some(TimestampingConfig {
1081                mode: Some(TimestampingMode::Arrival),
1082                uncapped: None
1083            })
1084        );
1085    }
1086
1087    #[test]
1088    fn empty_json_converts_to_all_none() {
1089        let json = serde_json::json!({});
1090        let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1091        let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1092
1093        assert!(
1094            internal.storage_class.is_none(),
1095            "storage_class should be None"
1096        );
1097        assert!(
1098            internal.retention_policy.is_none(),
1099            "retention_policy should be None"
1100        );
1101        assert!(
1102            internal.timestamping.mode.is_none(),
1103            "timestamping.mode should be None"
1104        );
1105        assert!(
1106            internal.timestamping.uncapped.is_none(),
1107            "timestamping.uncapped should be None"
1108        );
1109        assert!(
1110            internal.delete_on_empty.min_age.is_none(),
1111            "delete_on_empty.min_age should be None"
1112        );
1113    }
1114}