Skip to main content

s2_api/v1/
config.rs

1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11    /// Append tail latency under 400 milliseconds with s2.dev.
12    Standard,
13    /// Append tail latency under 40 milliseconds with s2.dev.
14    Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18    fn from(value: StorageClass) -> Self {
19        match value {
20            StorageClass::Express => Self::Express,
21            StorageClass::Standard => Self::Standard,
22        }
23    }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27    fn from(value: types::config::StorageClass) -> Self {
28        match value {
29            types::config::StorageClass::Express => Self::Express,
30            types::config::StorageClass::Standard => Self::Standard,
31        }
32    }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40    /// Age in seconds for automatic trimming of records older than this threshold.
41    /// This must be set to a value greater than 0 seconds.
42    Age(u64),
43    /// Retain records unless explicitly trimmed.
44    Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53    type Error = types::ValidationError;
54
55    fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56        match value {
57            RetentionPolicy::Age(0) => Err(types::ValidationError(
58                "age must be greater than 0 seconds".to_string(),
59            )),
60            RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61            RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62        }
63    }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67    fn from(value: types::config::RetentionPolicy) -> Self {
68        match value {
69            types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70            types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71        }
72    }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80    /// Prefer client-specified timestamp if present otherwise use arrival time.
81    #[default]
82    ClientPrefer,
83    /// Require a client-specified timestamp and reject the append if it is missing.
84    ClientRequire,
85    /// Use the arrival time and ignore any client-specified timestamp.
86    Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90    fn from(value: TimestampingMode) -> Self {
91        match value {
92            TimestampingMode::ClientPrefer => Self::ClientPrefer,
93            TimestampingMode::ClientRequire => Self::ClientRequire,
94            TimestampingMode::Arrival => Self::Arrival,
95        }
96    }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100    fn from(value: types::config::TimestampingMode) -> Self {
101        match value {
102            types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103            types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104            types::config::TimestampingMode::Arrival => Self::Arrival,
105        }
106    }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113    /// Timestamping mode for appends that influences how timestamps are handled.
114    pub mode: Option<TimestampingMode>,
115    /// Allow client-specified timestamps to exceed the arrival time.
116    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
117    pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121    pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122        let config = TimestampingConfig {
123            mode: config.mode.map(Into::into),
124            uncapped: config.uncapped,
125        };
126        if config == Self::default() {
127            None
128        } else {
129            Some(config)
130        }
131    }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135    fn from(value: types::config::TimestampingConfig) -> Self {
136        Self {
137            mode: Some(value.mode.into()),
138            uncapped: Some(value.uncapped),
139        }
140    }
141}
142
143impl From<types::config::OptionalTimestampingConfig> for TimestampingConfig {
144    fn from(value: types::config::OptionalTimestampingConfig) -> Self {
145        Self {
146            mode: value.mode.map(Into::into),
147            uncapped: value.uncapped,
148        }
149    }
150}
151
152impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
153    fn from(value: TimestampingConfig) -> Self {
154        Self {
155            mode: value.mode.map(Into::into),
156            uncapped: value.uncapped,
157        }
158    }
159}
160
161#[rustfmt::skip]
162#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
163#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
164pub struct TimestampingReconfiguration {
165    /// Timestamping mode for appends that influences how timestamps are handled.
166    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
167    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
168    pub mode: Maybe<Option<TimestampingMode>>,
169    /// Allow client-specified timestamps to exceed the arrival time.
170    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
171    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
172    pub uncapped: Maybe<Option<bool>>,
173}
174
175impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
176    fn from(value: TimestampingReconfiguration) -> Self {
177        Self {
178            mode: value.mode.map_opt(Into::into),
179            uncapped: value.uncapped,
180        }
181    }
182}
183
184impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
185    fn from(value: types::config::TimestampingReconfiguration) -> Self {
186        Self {
187            mode: value.mode.map_opt(Into::into),
188            uncapped: value.uncapped,
189        }
190    }
191}
192
193#[rustfmt::skip]
194#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
195#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
196pub struct DeleteOnEmptyConfig {
197    /// Minimum age in seconds before an empty stream can be deleted.
198    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
199    #[serde(default)]
200    pub min_age_secs: u64,
201}
202
203impl DeleteOnEmptyConfig {
204    pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
205        let min_age = config.min_age.unwrap_or_default();
206        if min_age > Duration::ZERO {
207            Some(DeleteOnEmptyConfig {
208                min_age_secs: min_age.as_secs(),
209            })
210        } else {
211            None
212        }
213    }
214}
215
216impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
217    fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
218        Self {
219            min_age_secs: value.min_age.as_secs(),
220        }
221    }
222}
223
224impl From<types::config::OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
225    fn from(value: types::config::OptionalDeleteOnEmptyConfig) -> Self {
226        Self {
227            min_age_secs: value.min_age.unwrap_or_default().as_secs(),
228        }
229    }
230}
231
232impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
233    fn from(value: DeleteOnEmptyConfig) -> Self {
234        Self {
235            min_age: Duration::from_secs(value.min_age_secs),
236        }
237    }
238}
239
240impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
241    fn from(value: DeleteOnEmptyConfig) -> Self {
242        Self {
243            min_age: Some(Duration::from_secs(value.min_age_secs)),
244        }
245    }
246}
247
248#[rustfmt::skip]
249#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
250#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
251pub struct DeleteOnEmptyReconfiguration {
252    /// Minimum age in seconds before an empty stream can be deleted.
253    /// Set to 0 to disable delete-on-empty (don't delete automatically).
254    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
255    #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
256    pub min_age_secs: Maybe<Option<u64>>,
257}
258
259impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
260    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
261        Self {
262            min_age: value.min_age_secs.map_opt(Duration::from_secs),
263        }
264    }
265}
266
267impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
268    fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
269        Self {
270            min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
271        }
272    }
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
276#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
277pub enum EncryptionAlgorithm {
278    /// AEGIS-256 authenticated encryption.
279    #[serde(rename = "aegis-256")]
280    Aegis256,
281    /// AES-256-GCM authenticated encryption.
282    #[serde(rename = "aes-256-gcm")]
283    Aes256Gcm,
284}
285
286impl From<EncryptionAlgorithm> for encryption::EncryptionAlgorithm {
287    fn from(value: EncryptionAlgorithm) -> Self {
288        match value {
289            EncryptionAlgorithm::Aegis256 => Self::Aegis256,
290            EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
291        }
292    }
293}
294
295impl From<encryption::EncryptionAlgorithm> for EncryptionAlgorithm {
296    fn from(value: encryption::EncryptionAlgorithm) -> Self {
297        match value {
298            encryption::EncryptionAlgorithm::Aegis256 => Self::Aegis256,
299            encryption::EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
300        }
301    }
302}
303
304#[rustfmt::skip]
305#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
306#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
307pub struct StreamConfig {
308    /// Storage class for recent writes.
309    pub storage_class: Option<StorageClass>,
310    /// Retention policy for the stream.
311    /// If unspecified, the default is to retain records for 7 days.
312    pub retention_policy: Option<RetentionPolicy>,
313    /// Timestamping behavior.
314    pub timestamping: Option<TimestampingConfig>,
315    /// Delete-on-empty configuration.
316    #[serde(default)]
317    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
318}
319
320impl StreamConfig {
321    pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
322        let types::config::OptionalStreamConfig {
323            storage_class,
324            retention_policy,
325            timestamping,
326            delete_on_empty,
327        } = config;
328
329        let config = StreamConfig {
330            storage_class: storage_class.map(Into::into),
331            retention_policy: retention_policy.map(Into::into),
332            timestamping: TimestampingConfig::to_opt(timestamping),
333            delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
334        };
335        if config == Self::default() {
336            None
337        } else {
338            Some(config)
339        }
340    }
341}
342
343impl From<types::config::StreamConfig> for StreamConfig {
344    fn from(value: types::config::StreamConfig) -> Self {
345        let types::config::StreamConfig {
346            storage_class,
347            retention_policy,
348            timestamping,
349            delete_on_empty,
350        } = value;
351
352        Self {
353            storage_class: Some(storage_class.into()),
354            retention_policy: Some(retention_policy.into()),
355            timestamping: Some(timestamping.into()),
356            delete_on_empty: Some(delete_on_empty.into()),
357        }
358    }
359}
360
361impl From<types::config::OptionalStreamConfig> for StreamConfig {
362    fn from(value: types::config::OptionalStreamConfig) -> Self {
363        let types::config::OptionalStreamConfig {
364            storage_class,
365            retention_policy,
366            timestamping,
367            delete_on_empty,
368        } = value;
369
370        let timestamping = (timestamping.mode.is_some() || timestamping.uncapped.is_some())
371            .then(|| timestamping.into());
372        let delete_on_empty = delete_on_empty.min_age.map(|_| delete_on_empty.into());
373
374        Self {
375            storage_class: storage_class.map(Into::into),
376            retention_policy: retention_policy.map(Into::into),
377            timestamping,
378            delete_on_empty,
379        }
380    }
381}
382
383impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
384    type Error = types::ValidationError;
385
386    fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
387        let StreamConfig {
388            storage_class,
389            retention_policy,
390            timestamping,
391            delete_on_empty,
392        } = value;
393
394        let retention_policy = match retention_policy {
395            None => None,
396            Some(policy) => Some(policy.try_into()?),
397        };
398
399        Ok(Self {
400            storage_class: storage_class.map(Into::into),
401            retention_policy,
402            timestamping: timestamping.map(Into::into).unwrap_or_default(),
403            delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
404        })
405    }
406}
407
408#[rustfmt::skip]
409#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
410#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
411pub struct StreamReconfiguration {
412    /// Storage class for recent writes.
413    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
414    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
415    pub storage_class: Maybe<Option<StorageClass>>,
416    /// Retention policy for the stream.
417    /// If unspecified, the default is to retain records for 7 days.
418    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
419    #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
420    pub retention_policy: Maybe<Option<RetentionPolicy>>,
421    /// Timestamping behavior.
422    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
423    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
424    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
425    /// Delete-on-empty configuration.
426    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
427    #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
428    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
429}
430
431impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
432    type Error = types::ValidationError;
433
434    fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
435        let StreamReconfiguration {
436            storage_class,
437            retention_policy,
438            timestamping,
439            delete_on_empty,
440        } = value;
441
442        Ok(Self {
443            storage_class: storage_class.map_opt(Into::into),
444            retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
445            timestamping: timestamping.map_opt(Into::into),
446            delete_on_empty: delete_on_empty.map_opt(Into::into),
447        })
448    }
449}
450
451impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
452    fn from(value: types::config::StreamReconfiguration) -> Self {
453        let types::config::StreamReconfiguration {
454            storage_class,
455            retention_policy,
456            timestamping,
457            delete_on_empty,
458        } = value;
459
460        Self {
461            storage_class: storage_class.map_opt(Into::into),
462            retention_policy: retention_policy.map_opt(Into::into),
463            timestamping: timestamping.map_opt(Into::into),
464            delete_on_empty: delete_on_empty.map_opt(Into::into),
465        }
466    }
467}
468
469#[rustfmt::skip]
470#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
471#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
472pub struct BasinConfig {
473    /// Default stream configuration.
474    pub default_stream_config: Option<StreamConfig>,
475    /// Encryption algorithm to apply to newly created streams in the basin.
476    pub stream_cipher: Option<EncryptionAlgorithm>,
477    /// Create stream on append if it doesn't exist, using the default stream configuration.
478    #[serde(default)]
479    #[cfg_attr(feature = "utoipa", schema(default = false))]
480    pub create_stream_on_append: bool,
481    /// Create stream on read if it doesn't exist, using the default stream configuration.
482    #[serde(default)]
483    #[cfg_attr(feature = "utoipa", schema(default = false))]
484    pub create_stream_on_read: bool,
485}
486
487impl TryFrom<BasinConfig> for types::config::BasinConfig {
488    type Error = types::ValidationError;
489
490    fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
491        let BasinConfig {
492            default_stream_config,
493            stream_cipher,
494            create_stream_on_append,
495            create_stream_on_read,
496        } = value;
497
498        Ok(Self {
499            default_stream_config: match default_stream_config {
500                Some(config) => config.try_into()?,
501                None => Default::default(),
502            },
503            stream_cipher: stream_cipher.map(Into::into),
504            create_stream_on_append,
505            create_stream_on_read,
506        })
507    }
508}
509
510impl From<types::config::BasinConfig> for BasinConfig {
511    fn from(value: types::config::BasinConfig) -> Self {
512        let types::config::BasinConfig {
513            default_stream_config,
514            stream_cipher,
515            create_stream_on_append,
516            create_stream_on_read,
517        } = value;
518
519        Self {
520            default_stream_config: StreamConfig::to_opt(default_stream_config),
521            stream_cipher: stream_cipher.map(Into::into),
522            create_stream_on_append,
523            create_stream_on_read,
524        }
525    }
526}
527
528#[rustfmt::skip]
529#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
530#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
531pub struct BasinReconfiguration {
532    /// Basin configuration.
533    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
534    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
535    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
536    /// Encryption algorithm to apply to newly created streams in the basin.
537    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
538    #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionAlgorithm>))]
539    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
540    /// Create a stream on append.
541    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
542    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
543    pub create_stream_on_append: Maybe<bool>,
544    /// Create a stream on read.
545    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
546    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
547    pub create_stream_on_read: Maybe<bool>,
548}
549
550impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
551    type Error = types::ValidationError;
552
553    fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
554        let BasinReconfiguration {
555            default_stream_config,
556            stream_cipher,
557            create_stream_on_append,
558            create_stream_on_read,
559        } = value;
560
561        Ok(Self {
562            default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
563            stream_cipher: stream_cipher.map_opt(Into::into),
564            create_stream_on_append: create_stream_on_append.map(Into::into),
565            create_stream_on_read: create_stream_on_read.map(Into::into),
566        })
567    }
568}
569
570impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
571    fn from(value: types::config::BasinReconfiguration) -> Self {
572        let types::config::BasinReconfiguration {
573            default_stream_config,
574            stream_cipher,
575            create_stream_on_append,
576            create_stream_on_read,
577        } = value;
578
579        Self {
580            default_stream_config: default_stream_config.map_opt(Into::into),
581            stream_cipher: stream_cipher.map_opt(Into::into),
582            create_stream_on_append: create_stream_on_append.map(Into::into),
583            create_stream_on_read: create_stream_on_read.map(Into::into),
584        }
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use proptest::prelude::*;
591
592    use super::*;
593
594    fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
595        prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
596    }
597
598    fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
599        prop_oneof![
600            Just(TimestampingMode::ClientPrefer),
601            Just(TimestampingMode::ClientRequire),
602            Just(TimestampingMode::Arrival),
603        ]
604    }
605
606    fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
607        prop_oneof![
608            any::<u64>().prop_map(RetentionPolicy::Age),
609            Just(RetentionPolicy::Infinite(InfiniteRetention {})),
610        ]
611    }
612
613    fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
614        (
615            proptest::option::of(gen_timestamping_mode()),
616            proptest::option::of(any::<bool>()),
617        )
618            .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
619    }
620
621    fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
622        any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
623    }
624
625    fn gen_encryption_algorithm() -> impl Strategy<Value = EncryptionAlgorithm> {
626        prop_oneof![
627            Just(EncryptionAlgorithm::Aegis256),
628            Just(EncryptionAlgorithm::Aes256Gcm),
629        ]
630    }
631
632    fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
633        (
634            proptest::option::of(gen_storage_class()),
635            proptest::option::of(gen_retention_policy()),
636            proptest::option::of(gen_timestamping_config()),
637            proptest::option::of(gen_delete_on_empty_config()),
638        )
639            .prop_map(
640                |(storage_class, retention_policy, timestamping, delete_on_empty)| StreamConfig {
641                    storage_class,
642                    retention_policy,
643                    timestamping,
644                    delete_on_empty,
645                },
646            )
647    }
648
649    fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
650        (
651            proptest::option::of(gen_stream_config()),
652            proptest::option::of(gen_encryption_algorithm()),
653            any::<bool>(),
654            any::<bool>(),
655        )
656            .prop_map(
657                |(
658                    default_stream_config,
659                    stream_cipher,
660                    create_stream_on_append,
661                    create_stream_on_read,
662                )| {
663                    BasinConfig {
664                        default_stream_config,
665                        stream_cipher,
666                        create_stream_on_append,
667                        create_stream_on_read,
668                    }
669                },
670            )
671    }
672
673    fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
674        inner: impl Strategy<Value = T>,
675    ) -> impl Strategy<Value = Maybe<Option<T>>> {
676        prop_oneof![
677            Just(Maybe::Unspecified),
678            Just(Maybe::Specified(None)),
679            inner.prop_map(|v| Maybe::Specified(Some(v))),
680        ]
681    }
682
683    fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
684        (
685            gen_maybe(gen_storage_class()),
686            gen_maybe(gen_retention_policy()),
687            gen_maybe(gen_timestamping_reconfiguration()),
688            gen_maybe(gen_delete_on_empty_reconfiguration()),
689        )
690            .prop_map(
691                |(storage_class, retention_policy, timestamping, delete_on_empty)| {
692                    StreamReconfiguration {
693                        storage_class,
694                        retention_policy,
695                        timestamping,
696                        delete_on_empty,
697                    }
698                },
699            )
700    }
701
702    fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
703        (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
704            .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
705    }
706
707    fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
708    {
709        gen_maybe(any::<u64>())
710            .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
711    }
712
713    fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
714        (
715            gen_maybe(gen_stream_reconfiguration()),
716            gen_maybe(gen_encryption_algorithm()),
717            prop_oneof![
718                Just(Maybe::Unspecified),
719                any::<bool>().prop_map(Maybe::Specified),
720            ],
721            prop_oneof![
722                Just(Maybe::Unspecified),
723                any::<bool>().prop_map(Maybe::Specified),
724            ],
725        )
726            .prop_map(
727                |(
728                    default_stream_config,
729                    stream_cipher,
730                    create_stream_on_append,
731                    create_stream_on_read,
732                )| BasinReconfiguration {
733                    default_stream_config,
734                    stream_cipher,
735                    create_stream_on_append,
736                    create_stream_on_read,
737                },
738            )
739    }
740
741    fn gen_internal_optional_stream_config()
742    -> impl Strategy<Value = types::config::OptionalStreamConfig> {
743        (
744            proptest::option::of(gen_storage_class()),
745            proptest::option::of(gen_retention_policy()),
746            proptest::option::of(gen_timestamping_mode()),
747            proptest::option::of(any::<bool>()),
748            proptest::option::of(any::<u64>()),
749        )
750            .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe)| {
751                types::config::OptionalStreamConfig {
752                    storage_class: sc.map(Into::into),
753                    retention_policy: rp.map(|rp| match rp {
754                        RetentionPolicy::Age(secs) => {
755                            types::config::RetentionPolicy::Age(Duration::from_secs(secs))
756                        }
757                        RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
758                    }),
759                    timestamping: types::config::OptionalTimestampingConfig {
760                        mode: ts_mode.map(Into::into),
761                        uncapped: ts_uncapped,
762                    },
763                    delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
764                        min_age: doe.map(Duration::from_secs),
765                    },
766                }
767            })
768    }
769
770    proptest! {
771        #[test]
772        fn stream_config_conversion_validates(config in gen_stream_config()) {
773            let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
774            let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
775
776            if has_zero_age {
777                prop_assert!(result.is_err());
778            } else {
779                prop_assert!(result.is_ok());
780            }
781        }
782
783        #[test]
784        fn basin_config_conversion_validates(config in gen_basin_config()) {
785            let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
786                matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
787            });
788
789            let result: Result<types::config::BasinConfig, _> = config.try_into();
790
791            if has_invalid_config {
792                prop_assert!(result.is_err());
793            } else {
794                prop_assert!(result.is_ok());
795            }
796        }
797
798        #[test]
799        fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
800            let has_zero_age = matches!(
801                reconfig.retention_policy,
802                Maybe::Specified(Some(RetentionPolicy::Age(0)))
803            );
804            let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
805
806            if has_zero_age {
807                prop_assert!(result.is_err());
808            } else {
809                prop_assert!(result.is_ok());
810            }
811        }
812
813        #[test]
814        fn merge_stream_or_basin_or_default(
815            stream in gen_internal_optional_stream_config(),
816            basin in gen_internal_optional_stream_config(),
817        ) {
818            let merged = stream.clone().merge(basin.clone());
819
820            prop_assert_eq!(
821                merged.storage_class,
822                stream.storage_class.or(basin.storage_class).unwrap_or_default()
823            );
824            prop_assert_eq!(
825                merged.retention_policy,
826                stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
827            );
828            prop_assert_eq!(
829                merged.timestamping.mode,
830                stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
831            );
832            prop_assert_eq!(
833                merged.timestamping.uncapped,
834                stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
835            );
836            prop_assert_eq!(
837                merged.delete_on_empty.min_age,
838                stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
839            );
840        }
841
842        #[test]
843        fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
844            let reconfig = types::config::StreamReconfiguration::default();
845            let result = base.clone().reconfigure(reconfig);
846
847            prop_assert_eq!(result.storage_class, base.storage_class);
848            prop_assert_eq!(result.retention_policy, base.retention_policy);
849            prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
850            prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
851            prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
852        }
853
854        #[test]
855        fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
856            let reconfig = types::config::StreamReconfiguration {
857                storage_class: Maybe::Specified(None),
858                retention_policy: Maybe::Specified(None),
859                timestamping: Maybe::Specified(None),
860                delete_on_empty: Maybe::Specified(None),
861            };
862            let result = base.reconfigure(reconfig);
863
864            prop_assert!(result.storage_class.is_none());
865            prop_assert!(result.retention_policy.is_none());
866            prop_assert!(result.timestamping.mode.is_none());
867            prop_assert!(result.timestamping.uncapped.is_none());
868            prop_assert!(result.delete_on_empty.min_age.is_none());
869        }
870
871        #[test]
872        fn reconfigure_specified_some_sets_value(
873            base in gen_internal_optional_stream_config(),
874            new_sc in gen_storage_class(),
875            new_rp_secs in 1u64..u64::MAX,
876        ) {
877            let reconfig = types::config::StreamReconfiguration {
878                storage_class: Maybe::Specified(Some(new_sc.into())),
879                retention_policy: Maybe::Specified(Some(
880                    types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
881                )),
882                ..Default::default()
883            };
884            let result = base.reconfigure(reconfig);
885
886            prop_assert_eq!(result.storage_class, Some(new_sc.into()));
887            prop_assert_eq!(
888                result.retention_policy,
889                Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
890            );
891        }
892
893        #[test]
894        fn to_opt_returns_some_for_non_defaults(
895            sc in gen_storage_class(),
896            doe_secs in 1u64..u64::MAX,
897            ts_mode in gen_timestamping_mode(),
898        ) {
899            // non-default storage class -> Some
900            let internal = types::config::OptionalStreamConfig {
901                storage_class: Some(sc.into()),
902                ..Default::default()
903            };
904            prop_assert!(StreamConfig::to_opt(internal).is_some());
905
906            // non-zero delete_on_empty -> Some
907            let internal = types::config::OptionalDeleteOnEmptyConfig {
908                min_age: Some(Duration::from_secs(doe_secs)),
909            };
910            let api = DeleteOnEmptyConfig::to_opt(internal);
911            prop_assert!(api.is_some());
912            prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
913
914            // non-default timestamping -> Some
915            let internal = types::config::OptionalTimestampingConfig {
916                mode: Some(ts_mode.into()),
917                uncapped: None,
918            };
919            prop_assert!(TimestampingConfig::to_opt(internal).is_some());
920        }
921
922        #[test]
923        fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
924            let has_zero_age = matches!(
925                &reconfig.default_stream_config,
926                Maybe::Specified(Some(sr)) if matches!(
927                    sr.retention_policy,
928                    Maybe::Specified(Some(RetentionPolicy::Age(0)))
929                )
930            );
931            let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
932
933            if has_zero_age {
934                prop_assert!(result.is_err());
935            } else {
936                prop_assert!(result.is_ok());
937            }
938        }
939
940        #[test]
941        fn reconfigure_basin_unspecified_preserves(
942            base_sc in proptest::option::of(gen_storage_class()),
943            base_algorithm in proptest::option::of(gen_encryption_algorithm()),
944            base_on_append in any::<bool>(),
945            base_on_read in any::<bool>(),
946        ) {
947            let base = types::config::BasinConfig {
948                default_stream_config: types::config::OptionalStreamConfig {
949                    storage_class: base_sc.map(Into::into),
950                    ..Default::default()
951                },
952                stream_cipher: base_algorithm.map(Into::into),
953                create_stream_on_append: base_on_append,
954                create_stream_on_read: base_on_read,
955            };
956
957            let reconfig = types::config::BasinReconfiguration::default();
958            let result = base.clone().reconfigure(reconfig);
959
960            prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
961            prop_assert_eq!(result.stream_cipher, base.stream_cipher);
962            prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
963            prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
964        }
965
966        #[test]
967        fn reconfigure_basin_specified_updates(
968            base_on_append in any::<bool>(),
969            new_on_append in any::<bool>(),
970            new_sc in gen_storage_class(),
971            new_algorithm in gen_encryption_algorithm(),
972        ) {
973            let base = types::config::BasinConfig {
974                create_stream_on_append: base_on_append,
975                ..Default::default()
976            };
977
978            let reconfig = types::config::BasinReconfiguration {
979                default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
980                    storage_class: Maybe::Specified(Some(new_sc.into())),
981                    ..Default::default()
982                })),
983                stream_cipher: Maybe::Specified(Some(new_algorithm.into())),
984                create_stream_on_append: Maybe::Specified(new_on_append),
985                ..Default::default()
986            };
987            let result = base.reconfigure(reconfig);
988
989            prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
990            prop_assert_eq!(result.stream_cipher, Some(new_algorithm.into()));
991            prop_assert_eq!(result.create_stream_on_append, new_on_append);
992        }
993
994        #[test]
995        fn reconfigure_nested_partial_update(
996            base_mode in gen_timestamping_mode(),
997            base_uncapped in any::<bool>(),
998            new_mode in gen_timestamping_mode(),
999        ) {
1000            let base = types::config::OptionalStreamConfig {
1001                timestamping: types::config::OptionalTimestampingConfig {
1002                    mode: Some(base_mode.into()),
1003                    uncapped: Some(base_uncapped),
1004                },
1005                ..Default::default()
1006            };
1007
1008            let expected_mode: types::config::TimestampingMode = new_mode.into();
1009
1010            let reconfig = types::config::StreamReconfiguration {
1011                timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1012                    mode: Maybe::Specified(Some(expected_mode)),
1013                    uncapped: Maybe::Unspecified,
1014                })),
1015                ..Default::default()
1016            };
1017            let result = base.reconfigure(reconfig);
1018
1019            prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1020            prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1021        }
1022    }
1023
1024    #[test]
1025    fn to_opt_returns_none_for_defaults() {
1026        // default stream config -> None
1027        assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1028
1029        // delete_on_empty: None or Some(ZERO) -> None
1030        let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1031        let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
1032            min_age: Some(Duration::ZERO),
1033        };
1034        assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1035        assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
1036
1037        // default timestamping -> None
1038        assert!(
1039            TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1040                .is_none()
1041        );
1042    }
1043
1044    #[test]
1045    fn optional_stream_config_into_api_preserves_explicit_zero_delete_on_empty() {
1046        let api: StreamConfig = types::config::OptionalStreamConfig {
1047            delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
1048                min_age: Some(Duration::ZERO),
1049            },
1050            ..Default::default()
1051        }
1052        .into();
1053
1054        assert_eq!(
1055            api.delete_on_empty,
1056            Some(DeleteOnEmptyConfig { min_age_secs: 0 })
1057        );
1058    }
1059
1060    #[test]
1061    fn optional_stream_config_into_api_preserves_nested_timestamping_omission() {
1062        let api: StreamConfig = types::config::OptionalStreamConfig {
1063            timestamping: types::config::OptionalTimestampingConfig {
1064                mode: Some(types::config::TimestampingMode::Arrival),
1065                uncapped: None,
1066            },
1067            ..Default::default()
1068        }
1069        .into();
1070
1071        assert_eq!(
1072            api.timestamping,
1073            Some(TimestampingConfig {
1074                mode: Some(TimestampingMode::Arrival),
1075                uncapped: None
1076            })
1077        );
1078    }
1079
1080    #[test]
1081    fn empty_json_converts_to_all_none() {
1082        let json = serde_json::json!({});
1083        let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1084        let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1085
1086        assert!(
1087            internal.storage_class.is_none(),
1088            "storage_class should be None"
1089        );
1090        assert!(
1091            internal.retention_policy.is_none(),
1092            "retention_policy should be None"
1093        );
1094        assert!(
1095            internal.timestamping.mode.is_none(),
1096            "timestamping.mode should be None"
1097        );
1098        assert!(
1099            internal.timestamping.uncapped.is_none(),
1100            "timestamping.uncapped should be None"
1101        );
1102        assert!(
1103            internal.delete_on_empty.min_age.is_none(),
1104            "delete_on_empty.min_age should be None"
1105        );
1106    }
1107}