Skip to main content

s2_api/v1/
config.rs

1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11    /// Append tail latency under 400 milliseconds with s2.dev.
12    Standard,
13    /// Append tail latency under 40 milliseconds with s2.dev.
14    Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18    fn from(value: StorageClass) -> Self {
19        match value {
20            StorageClass::Express => Self::Express,
21            StorageClass::Standard => Self::Standard,
22        }
23    }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27    fn from(value: types::config::StorageClass) -> Self {
28        match value {
29            types::config::StorageClass::Express => Self::Express,
30            types::config::StorageClass::Standard => Self::Standard,
31        }
32    }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40    /// Age in seconds for automatic trimming of records older than this threshold.
41    /// This must be set to a value greater than 0 seconds.
42    Age(u64),
43    /// Retain records unless explicitly trimmed.
44    Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53    type Error = types::ValidationError;
54
55    fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56        match value {
57            RetentionPolicy::Age(0) => Err(types::ValidationError(
58                "age must be greater than 0 seconds".to_string(),
59            )),
60            RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61            RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62        }
63    }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67    fn from(value: types::config::RetentionPolicy) -> Self {
68        match value {
69            types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70            types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71        }
72    }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80    /// Prefer client-specified timestamp if present otherwise use arrival time.
81    #[default]
82    ClientPrefer,
83    /// Require a client-specified timestamp and reject the append if it is missing.
84    ClientRequire,
85    /// Use the arrival time and ignore any client-specified timestamp.
86    Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90    fn from(value: TimestampingMode) -> Self {
91        match value {
92            TimestampingMode::ClientPrefer => Self::ClientPrefer,
93            TimestampingMode::ClientRequire => Self::ClientRequire,
94            TimestampingMode::Arrival => Self::Arrival,
95        }
96    }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100    fn from(value: types::config::TimestampingMode) -> Self {
101        match value {
102            types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103            types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104            types::config::TimestampingMode::Arrival => Self::Arrival,
105        }
106    }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113    /// Timestamping mode for appends that influences how timestamps are handled.
114    pub mode: Option<TimestampingMode>,
115    /// Allow client-specified timestamps to exceed the arrival time.
116    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
117    pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121    pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122        let config = TimestampingConfig {
123            mode: config.mode.map(Into::into),
124            uncapped: config.uncapped,
125        };
126        if config == Self::default() {
127            None
128        } else {
129            Some(config)
130        }
131    }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135    fn from(value: types::config::TimestampingConfig) -> Self {
136        Self {
137            mode: Some(value.mode.into()),
138            uncapped: Some(value.uncapped),
139        }
140    }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144    fn from(value: TimestampingConfig) -> Self {
145        Self {
146            mode: value.mode.map(Into::into),
147            uncapped: value.uncapped,
148        }
149    }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156    /// Timestamping mode for appends that influences how timestamps are handled.
157    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159    pub mode: Maybe<Option<TimestampingMode>>,
160    /// Allow client-specified timestamps to exceed the arrival time.
161    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163    pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167    fn from(value: TimestampingReconfiguration) -> Self {
168        Self {
169            mode: value.mode.map_opt(Into::into),
170            uncapped: value.uncapped,
171        }
172    }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176    fn from(value: types::config::TimestampingReconfiguration) -> Self {
177        Self {
178            mode: value.mode.map_opt(Into::into),
179            uncapped: value.uncapped,
180        }
181    }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188    /// Minimum age in seconds before an empty stream can be deleted.
189    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
190    #[serde(default)]
191    pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195    pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196        let min_age = config.min_age.unwrap_or_default();
197        if min_age > Duration::ZERO {
198            Some(DeleteOnEmptyConfig {
199                min_age_secs: min_age.as_secs(),
200            })
201        } else {
202            None
203        }
204    }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208    fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209        Self {
210            min_age_secs: value.min_age.as_secs(),
211        }
212    }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216    fn from(value: DeleteOnEmptyConfig) -> Self {
217        Self {
218            min_age: Duration::from_secs(value.min_age_secs),
219        }
220    }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224    fn from(value: DeleteOnEmptyConfig) -> Self {
225        Self {
226            min_age: Some(Duration::from_secs(value.min_age_secs)),
227        }
228    }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235    /// Minimum age in seconds before an empty stream can be deleted.
236    /// Set to 0 to disable delete-on-empty (don't delete automatically).
237    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238    #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239    pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244        Self {
245            min_age: value.min_age_secs.map_opt(Duration::from_secs),
246        }
247    }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251    fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252        Self {
253            min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254        }
255    }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
259#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
260#[serde(rename_all = "kebab-case")]
261pub enum EncryptionMode {
262    /// Plaintext (no encryption).
263    Plain,
264    /// AEGIS-256 authenticated encryption.
265    Aegis256,
266    /// AES-256-GCM authenticated encryption.
267    Aes256Gcm,
268}
269
270impl From<EncryptionMode> for encryption::EncryptionMode {
271    fn from(value: EncryptionMode) -> Self {
272        match value {
273            EncryptionMode::Plain => Self::Plain,
274            EncryptionMode::Aegis256 => Self::Aegis256,
275            EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
276        }
277    }
278}
279
280impl From<encryption::EncryptionMode> for EncryptionMode {
281    fn from(value: encryption::EncryptionMode) -> Self {
282        match value {
283            encryption::EncryptionMode::Plain => Self::Plain,
284            encryption::EncryptionMode::Aegis256 => Self::Aegis256,
285            encryption::EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
286        }
287    }
288}
289
290#[rustfmt::skip]
291#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
292#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
293pub struct EncryptionConfig {
294    /// Allowed encryption modes for the stream.
295    /// If empty, use defaults. If no default is configured, only plaintext is allowed.
296    #[serde(default, skip_serializing_if = "Vec::is_empty")]
297    pub allowed_modes: Vec<EncryptionMode>,
298}
299
300impl EncryptionConfig {
301    pub fn to_opt(config: types::config::OptionalEncryptionConfig) -> Option<Self> {
302        let config = EncryptionConfig {
303            allowed_modes: config.allowed_modes.into_iter().map(Into::into).collect(),
304        };
305        if config == Self::default() {
306            None
307        } else {
308            Some(config)
309        }
310    }
311}
312
313impl From<types::config::EncryptionConfig> for EncryptionConfig {
314    fn from(value: types::config::EncryptionConfig) -> Self {
315        Self {
316            allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
317        }
318    }
319}
320
321impl From<EncryptionConfig> for types::config::OptionalEncryptionConfig {
322    fn from(value: EncryptionConfig) -> Self {
323        Self {
324            allowed_modes: value
325                .allowed_modes
326                .into_iter()
327                .map(encryption::EncryptionMode::from)
328                .collect(),
329        }
330    }
331}
332
333#[rustfmt::skip]
334#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
335#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
336pub struct EncryptionReconfiguration {
337    /// Allowed encryption modes for the stream.
338    /// Specify an empty list to reset to defaults.
339    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
340    #[cfg_attr(feature = "utoipa", schema(value_type = Option<Vec<EncryptionMode>>))]
341    pub allowed_modes: Maybe<Vec<EncryptionMode>>,
342}
343
344impl From<EncryptionReconfiguration> for types::config::EncryptionReconfiguration {
345    fn from(value: EncryptionReconfiguration) -> Self {
346        Self {
347            allowed_modes: value.allowed_modes.map(|modes| {
348                modes
349                    .into_iter()
350                    .map(encryption::EncryptionMode::from)
351                    .collect()
352            }),
353        }
354    }
355}
356
357impl From<types::config::EncryptionReconfiguration> for EncryptionReconfiguration {
358    fn from(value: types::config::EncryptionReconfiguration) -> Self {
359        Self {
360            allowed_modes: value
361                .allowed_modes
362                .map(|modes| modes.into_iter().map(Into::into).collect()),
363        }
364    }
365}
366
367#[rustfmt::skip]
368#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
369#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
370pub struct StreamConfig {
371    /// Storage class for recent writes.
372    pub storage_class: Option<StorageClass>,
373    /// Retention policy for the stream.
374    /// If unspecified, the default is to retain records for 7 days.
375    pub retention_policy: Option<RetentionPolicy>,
376    /// Timestamping behavior.
377    pub timestamping: Option<TimestampingConfig>,
378    /// Delete-on-empty configuration.
379    #[serde(default)]
380    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
381    /// Encryption configuration.
382    #[serde(default)]
383    pub encryption: Option<EncryptionConfig>,
384}
385
386impl StreamConfig {
387    pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
388        let types::config::OptionalStreamConfig {
389            storage_class,
390            retention_policy,
391            timestamping,
392            delete_on_empty,
393            encryption,
394        } = config;
395
396        let config = StreamConfig {
397            storage_class: storage_class.map(Into::into),
398            retention_policy: retention_policy.map(Into::into),
399            timestamping: TimestampingConfig::to_opt(timestamping),
400            delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
401            encryption: EncryptionConfig::to_opt(encryption),
402        };
403        if config == Self::default() {
404            None
405        } else {
406            Some(config)
407        }
408    }
409}
410
411impl From<types::config::StreamConfig> for StreamConfig {
412    fn from(value: types::config::StreamConfig) -> Self {
413        let types::config::StreamConfig {
414            storage_class,
415            retention_policy,
416            timestamping,
417            delete_on_empty,
418            encryption,
419        } = value;
420
421        Self {
422            storage_class: Some(storage_class.into()),
423            retention_policy: Some(retention_policy.into()),
424            timestamping: Some(timestamping.into()),
425            delete_on_empty: Some(delete_on_empty.into()),
426            encryption: Some(encryption.into()),
427        }
428    }
429}
430
431impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
432    type Error = types::ValidationError;
433
434    fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
435        let StreamConfig {
436            storage_class,
437            retention_policy,
438            timestamping,
439            delete_on_empty,
440            encryption,
441        } = value;
442
443        let retention_policy = match retention_policy {
444            None => None,
445            Some(policy) => Some(policy.try_into()?),
446        };
447
448        Ok(Self {
449            storage_class: storage_class.map(Into::into),
450            retention_policy,
451            timestamping: timestamping.map(Into::into).unwrap_or_default(),
452            delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
453            encryption: encryption.map(Into::into).unwrap_or_default(),
454        })
455    }
456}
457
458#[rustfmt::skip]
459#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
460#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
461pub struct StreamReconfiguration {
462    /// Storage class for recent writes.
463    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
464    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
465    pub storage_class: Maybe<Option<StorageClass>>,
466    /// Retention policy for the stream.
467    /// If unspecified, the default is to retain records for 7 days.
468    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
469    #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
470    pub retention_policy: Maybe<Option<RetentionPolicy>>,
471    /// Timestamping behavior.
472    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
473    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
474    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
475    /// Delete-on-empty configuration.
476    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
477    #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
478    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
479    /// Encryption configuration.
480    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
481    #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionReconfiguration>))]
482    pub encryption: Maybe<Option<EncryptionReconfiguration>>,
483}
484
485impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
486    type Error = types::ValidationError;
487
488    fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
489        let StreamReconfiguration {
490            storage_class,
491            retention_policy,
492            timestamping,
493            delete_on_empty,
494            encryption,
495        } = value;
496
497        Ok(Self {
498            storage_class: storage_class.map_opt(Into::into),
499            retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
500            timestamping: timestamping.map_opt(Into::into),
501            delete_on_empty: delete_on_empty.map_opt(Into::into),
502            encryption: encryption.map_opt(Into::into),
503        })
504    }
505}
506
507impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
508    fn from(value: types::config::StreamReconfiguration) -> Self {
509        let types::config::StreamReconfiguration {
510            storage_class,
511            retention_policy,
512            timestamping,
513            delete_on_empty,
514            encryption,
515        } = value;
516
517        Self {
518            storage_class: storage_class.map_opt(Into::into),
519            retention_policy: retention_policy.map_opt(Into::into),
520            timestamping: timestamping.map_opt(Into::into),
521            delete_on_empty: delete_on_empty.map_opt(Into::into),
522            encryption: encryption.map_opt(Into::into),
523        }
524    }
525}
526
527#[rustfmt::skip]
528#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
529#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
530pub struct BasinConfig {
531    /// Default stream configuration.
532    pub default_stream_config: Option<StreamConfig>,
533    /// Create stream on append if it doesn't exist, using the default stream configuration.
534    #[serde(default)]
535    #[cfg_attr(feature = "utoipa", schema(default = false))]
536    pub create_stream_on_append: bool,
537    /// Create stream on read if it doesn't exist, using the default stream configuration.
538    #[serde(default)]
539    #[cfg_attr(feature = "utoipa", schema(default = false))]
540    pub create_stream_on_read: bool,
541}
542
543impl TryFrom<BasinConfig> for types::config::BasinConfig {
544    type Error = types::ValidationError;
545
546    fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
547        let BasinConfig {
548            default_stream_config,
549            create_stream_on_append,
550            create_stream_on_read,
551        } = value;
552
553        Ok(Self {
554            default_stream_config: match default_stream_config {
555                Some(config) => config.try_into()?,
556                None => Default::default(),
557            },
558            create_stream_on_append,
559            create_stream_on_read,
560        })
561    }
562}
563
564impl From<types::config::BasinConfig> for BasinConfig {
565    fn from(value: types::config::BasinConfig) -> Self {
566        let types::config::BasinConfig {
567            default_stream_config,
568            create_stream_on_append,
569            create_stream_on_read,
570        } = value;
571
572        Self {
573            default_stream_config: StreamConfig::to_opt(default_stream_config),
574            create_stream_on_append,
575            create_stream_on_read,
576        }
577    }
578}
579
580#[rustfmt::skip]
581#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
582#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
583pub struct BasinReconfiguration {
584    /// Basin configuration.
585    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
586    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
587    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
588    /// Create a stream on append.
589    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
590    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
591    pub create_stream_on_append: Maybe<bool>,
592    /// Create a stream on read.
593    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
594    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
595    pub create_stream_on_read: Maybe<bool>,
596}
597
598impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
599    type Error = types::ValidationError;
600
601    fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
602        let BasinReconfiguration {
603            default_stream_config,
604            create_stream_on_append,
605            create_stream_on_read,
606        } = value;
607
608        Ok(Self {
609            default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
610            create_stream_on_append: create_stream_on_append.map(Into::into),
611            create_stream_on_read: create_stream_on_read.map(Into::into),
612        })
613    }
614}
615
616impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
617    fn from(value: types::config::BasinReconfiguration) -> Self {
618        let types::config::BasinReconfiguration {
619            default_stream_config,
620            create_stream_on_append,
621            create_stream_on_read,
622        } = value;
623
624        Self {
625            default_stream_config: default_stream_config.map_opt(Into::into),
626            create_stream_on_append: create_stream_on_append.map(Into::into),
627            create_stream_on_read: create_stream_on_read.map(Into::into),
628        }
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use proptest::prelude::*;
635
636    use super::*;
637
638    fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
639        prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
640    }
641
642    fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
643        prop_oneof![
644            Just(TimestampingMode::ClientPrefer),
645            Just(TimestampingMode::ClientRequire),
646            Just(TimestampingMode::Arrival),
647        ]
648    }
649
650    fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
651        prop_oneof![
652            any::<u64>().prop_map(RetentionPolicy::Age),
653            Just(RetentionPolicy::Infinite(InfiniteRetention {})),
654        ]
655    }
656
657    fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
658        (
659            proptest::option::of(gen_timestamping_mode()),
660            proptest::option::of(any::<bool>()),
661        )
662            .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
663    }
664
665    fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
666        any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
667    }
668
669    fn gen_encryption_mode() -> impl Strategy<Value = EncryptionMode> {
670        prop_oneof![
671            Just(EncryptionMode::Plain),
672            Just(EncryptionMode::Aegis256),
673            Just(EncryptionMode::Aes256Gcm),
674        ]
675    }
676
677    fn gen_encryption_config() -> impl Strategy<Value = EncryptionConfig> {
678        (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
679            let allowed_modes = [
680                (plain, EncryptionMode::Plain),
681                (aegis, EncryptionMode::Aegis256),
682                (aes, EncryptionMode::Aes256Gcm),
683            ]
684            .into_iter()
685            .filter_map(|(include, mode)| include.then_some(mode))
686            .collect();
687            EncryptionConfig { allowed_modes }
688        })
689    }
690
691    fn gen_encryption_reconfiguration() -> impl Strategy<Value = EncryptionReconfiguration> {
692        prop_oneof![
693            Just(Maybe::Unspecified),
694            proptest::collection::vec(gen_encryption_mode(), 0..=3).prop_map(Maybe::Specified),
695        ]
696        .prop_map(|allowed_modes| EncryptionReconfiguration { allowed_modes })
697    }
698
699    fn gen_internal_encryption_modes()
700    -> impl Strategy<Value = enumset::EnumSet<encryption::EncryptionMode>> {
701        (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
702            let mut allowed_modes = enumset::EnumSet::new();
703            if plain {
704                allowed_modes.insert(encryption::EncryptionMode::Plain);
705            }
706            if aegis {
707                allowed_modes.insert(encryption::EncryptionMode::Aegis256);
708            }
709            if aes {
710                allowed_modes.insert(encryption::EncryptionMode::Aes256Gcm);
711            }
712            allowed_modes
713        })
714    }
715
716    fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
717        (
718            proptest::option::of(gen_storage_class()),
719            proptest::option::of(gen_retention_policy()),
720            proptest::option::of(gen_timestamping_config()),
721            proptest::option::of(gen_delete_on_empty_config()),
722            proptest::option::of(gen_encryption_config()),
723        )
724            .prop_map(
725                |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
726                    StreamConfig {
727                        storage_class,
728                        retention_policy,
729                        timestamping,
730                        delete_on_empty,
731                        encryption,
732                    }
733                },
734            )
735    }
736
737    fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
738        (
739            proptest::option::of(gen_stream_config()),
740            any::<bool>(),
741            any::<bool>(),
742        )
743            .prop_map(
744                |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
745                    BasinConfig {
746                        default_stream_config,
747                        create_stream_on_append,
748                        create_stream_on_read,
749                    }
750                },
751            )
752    }
753
754    fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
755        inner: impl Strategy<Value = T>,
756    ) -> impl Strategy<Value = Maybe<Option<T>>> {
757        prop_oneof![
758            Just(Maybe::Unspecified),
759            Just(Maybe::Specified(None)),
760            inner.prop_map(|v| Maybe::Specified(Some(v))),
761        ]
762    }
763
764    fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
765        (
766            gen_maybe(gen_storage_class()),
767            gen_maybe(gen_retention_policy()),
768            gen_maybe(gen_timestamping_reconfiguration()),
769            gen_maybe(gen_delete_on_empty_reconfiguration()),
770            gen_maybe(gen_encryption_reconfiguration()),
771        )
772            .prop_map(
773                |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
774                    StreamReconfiguration {
775                        storage_class,
776                        retention_policy,
777                        timestamping,
778                        delete_on_empty,
779                        encryption,
780                    }
781                },
782            )
783    }
784
785    fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
786        (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
787            .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
788    }
789
790    fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
791    {
792        gen_maybe(any::<u64>())
793            .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
794    }
795
796    fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
797        (
798            gen_maybe(gen_stream_reconfiguration()),
799            prop_oneof![
800                Just(Maybe::Unspecified),
801                any::<bool>().prop_map(Maybe::Specified),
802            ],
803            prop_oneof![
804                Just(Maybe::Unspecified),
805                any::<bool>().prop_map(Maybe::Specified),
806            ],
807        )
808            .prop_map(
809                |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
810                    BasinReconfiguration {
811                        default_stream_config,
812                        create_stream_on_append,
813                        create_stream_on_read,
814                    }
815                },
816            )
817    }
818
819    fn gen_internal_optional_stream_config()
820    -> impl Strategy<Value = types::config::OptionalStreamConfig> {
821        (
822            proptest::option::of(gen_storage_class()),
823            proptest::option::of(gen_retention_policy()),
824            proptest::option::of(gen_timestamping_mode()),
825            proptest::option::of(any::<bool>()),
826            proptest::option::of(any::<u64>()),
827            gen_internal_encryption_modes(),
828        )
829            .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe, encryption)| {
830                types::config::OptionalStreamConfig {
831                    storage_class: sc.map(Into::into),
832                    retention_policy: rp.map(|rp| match rp {
833                        RetentionPolicy::Age(secs) => {
834                            types::config::RetentionPolicy::Age(Duration::from_secs(secs))
835                        }
836                        RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
837                    }),
838                    timestamping: types::config::OptionalTimestampingConfig {
839                        mode: ts_mode.map(Into::into),
840                        uncapped: ts_uncapped,
841                    },
842                    delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
843                        min_age: doe.map(Duration::from_secs),
844                    },
845                    encryption: types::config::OptionalEncryptionConfig {
846                        allowed_modes: encryption,
847                    },
848                }
849            })
850    }
851
852    proptest! {
853        #[test]
854        fn stream_config_conversion_validates(config in gen_stream_config()) {
855            let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
856            let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
857
858            if has_zero_age {
859                prop_assert!(result.is_err());
860            } else {
861                prop_assert!(result.is_ok());
862            }
863        }
864
865        #[test]
866        fn basin_config_conversion_validates(config in gen_basin_config()) {
867            let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
868                matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
869            });
870
871            let result: Result<types::config::BasinConfig, _> = config.try_into();
872
873            if has_invalid_config {
874                prop_assert!(result.is_err());
875            } else {
876                prop_assert!(result.is_ok());
877            }
878        }
879
880        #[test]
881        fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
882            let has_zero_age = matches!(
883                reconfig.retention_policy,
884                Maybe::Specified(Some(RetentionPolicy::Age(0)))
885            );
886            let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
887
888            if has_zero_age {
889                prop_assert!(result.is_err());
890            } else {
891                prop_assert!(result.is_ok());
892            }
893        }
894
895        #[test]
896        fn merge_stream_or_basin_or_default(
897            stream in gen_internal_optional_stream_config(),
898            basin in gen_internal_optional_stream_config(),
899        ) {
900            let merged = stream.clone().merge(basin.clone());
901
902            prop_assert_eq!(
903                merged.storage_class,
904                stream.storage_class.or(basin.storage_class).unwrap_or_default()
905            );
906            prop_assert_eq!(
907                merged.retention_policy,
908                stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
909            );
910            prop_assert_eq!(
911                merged.timestamping.mode,
912                stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
913            );
914            prop_assert_eq!(
915                merged.timestamping.uncapped,
916                stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
917            );
918            prop_assert_eq!(
919                merged.delete_on_empty.min_age,
920                stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
921            );
922            prop_assert_eq!(
923                merged.encryption.allowed_modes,
924                if !stream.encryption.allowed_modes.is_empty() {
925                    stream.encryption.allowed_modes
926                } else if !basin.encryption.allowed_modes.is_empty() {
927                    basin.encryption.allowed_modes
928                } else {
929                    types::config::DEFAULT_ALLOWED_ENCRYPTION_MODES
930                }
931            );
932        }
933
934        #[test]
935        fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
936            let reconfig = types::config::StreamReconfiguration::default();
937            let result = base.clone().reconfigure(reconfig);
938
939            prop_assert_eq!(result.storage_class, base.storage_class);
940            prop_assert_eq!(result.retention_policy, base.retention_policy);
941            prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
942            prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
943            prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
944            prop_assert_eq!(result.encryption.allowed_modes, base.encryption.allowed_modes);
945        }
946
947        #[test]
948        fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
949            let reconfig = types::config::StreamReconfiguration {
950                storage_class: Maybe::Specified(None),
951                retention_policy: Maybe::Specified(None),
952                timestamping: Maybe::Specified(None),
953                delete_on_empty: Maybe::Specified(None),
954                encryption: Maybe::Specified(None),
955            };
956            let result = base.reconfigure(reconfig);
957
958            prop_assert!(result.storage_class.is_none());
959            prop_assert!(result.retention_policy.is_none());
960            prop_assert!(result.timestamping.mode.is_none());
961            prop_assert!(result.timestamping.uncapped.is_none());
962            prop_assert!(result.delete_on_empty.min_age.is_none());
963            prop_assert!(result.encryption.allowed_modes.is_empty());
964        }
965
966        #[test]
967        fn reconfigure_specified_some_sets_value(
968            base in gen_internal_optional_stream_config(),
969            new_sc in gen_storage_class(),
970            new_rp_secs in 1u64..u64::MAX,
971        ) {
972            let reconfig = types::config::StreamReconfiguration {
973                storage_class: Maybe::Specified(Some(new_sc.into())),
974                retention_policy: Maybe::Specified(Some(
975                    types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
976                )),
977                ..Default::default()
978            };
979            let result = base.reconfigure(reconfig);
980
981            prop_assert_eq!(result.storage_class, Some(new_sc.into()));
982            prop_assert_eq!(
983                result.retention_policy,
984                Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
985            );
986        }
987
988        #[test]
989        fn to_opt_returns_some_for_non_defaults(
990            sc in gen_storage_class(),
991            doe_secs in 1u64..u64::MAX,
992            ts_mode in gen_timestamping_mode(),
993        ) {
994            // non-default storage class -> Some
995            let internal = types::config::OptionalStreamConfig {
996                storage_class: Some(sc.into()),
997                ..Default::default()
998            };
999            prop_assert!(StreamConfig::to_opt(internal).is_some());
1000
1001            // non-zero delete_on_empty -> Some
1002            let internal = types::config::OptionalDeleteOnEmptyConfig {
1003                min_age: Some(Duration::from_secs(doe_secs)),
1004            };
1005            let api = DeleteOnEmptyConfig::to_opt(internal);
1006            prop_assert!(api.is_some());
1007            prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
1008
1009            // non-default timestamping -> Some
1010            let internal = types::config::OptionalTimestampingConfig {
1011                mode: Some(ts_mode.into()),
1012                uncapped: None,
1013            };
1014            prop_assert!(TimestampingConfig::to_opt(internal).is_some());
1015        }
1016
1017        #[test]
1018        fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
1019            let has_zero_age = matches!(
1020                &reconfig.default_stream_config,
1021                Maybe::Specified(Some(sr)) if matches!(
1022                    sr.retention_policy,
1023                    Maybe::Specified(Some(RetentionPolicy::Age(0)))
1024                )
1025            );
1026            let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
1027
1028            if has_zero_age {
1029                prop_assert!(result.is_err());
1030            } else {
1031                prop_assert!(result.is_ok());
1032            }
1033        }
1034
1035        #[test]
1036        fn reconfigure_basin_unspecified_preserves(
1037            base_sc in proptest::option::of(gen_storage_class()),
1038            base_on_append in any::<bool>(),
1039            base_on_read in any::<bool>(),
1040        ) {
1041            let base = types::config::BasinConfig {
1042                default_stream_config: types::config::OptionalStreamConfig {
1043                    storage_class: base_sc.map(Into::into),
1044                    ..Default::default()
1045                },
1046                create_stream_on_append: base_on_append,
1047                create_stream_on_read: base_on_read,
1048            };
1049
1050            let reconfig = types::config::BasinReconfiguration::default();
1051            let result = base.clone().reconfigure(reconfig);
1052
1053            prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
1054            prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
1055            prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
1056        }
1057
1058        #[test]
1059        fn reconfigure_basin_specified_updates(
1060            base_on_append in any::<bool>(),
1061            new_on_append in any::<bool>(),
1062            new_sc in gen_storage_class(),
1063        ) {
1064            let base = types::config::BasinConfig {
1065                create_stream_on_append: base_on_append,
1066                ..Default::default()
1067            };
1068
1069            let reconfig = types::config::BasinReconfiguration {
1070                default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
1071                    storage_class: Maybe::Specified(Some(new_sc.into())),
1072                    ..Default::default()
1073                })),
1074                create_stream_on_append: Maybe::Specified(new_on_append),
1075                ..Default::default()
1076            };
1077            let result = base.reconfigure(reconfig);
1078
1079            prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
1080            prop_assert_eq!(result.create_stream_on_append, new_on_append);
1081        }
1082
1083        #[test]
1084        fn reconfigure_nested_partial_update(
1085            base_mode in gen_timestamping_mode(),
1086            base_uncapped in any::<bool>(),
1087            new_mode in gen_timestamping_mode(),
1088        ) {
1089            let base = types::config::OptionalStreamConfig {
1090                timestamping: types::config::OptionalTimestampingConfig {
1091                    mode: Some(base_mode.into()),
1092                    uncapped: Some(base_uncapped),
1093                },
1094                ..Default::default()
1095            };
1096
1097            let expected_mode: types::config::TimestampingMode = new_mode.into();
1098
1099            let reconfig = types::config::StreamReconfiguration {
1100                timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1101                    mode: Maybe::Specified(Some(expected_mode)),
1102                    uncapped: Maybe::Unspecified,
1103                })),
1104                ..Default::default()
1105            };
1106            let result = base.reconfigure(reconfig);
1107
1108            prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1109            prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1110        }
1111    }
1112
1113    #[test]
1114    fn to_opt_returns_none_for_defaults() {
1115        // default stream config -> None
1116        assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1117
1118        // delete_on_empty: None or Some(ZERO) -> None
1119        let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1120        let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
1121            min_age: Some(Duration::ZERO),
1122        };
1123        assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1124        assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
1125
1126        // default timestamping -> None
1127        assert!(
1128            TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1129                .is_none()
1130        );
1131    }
1132
1133    #[test]
1134    fn empty_json_converts_to_all_none() {
1135        let json = serde_json::json!({});
1136        let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1137        let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1138
1139        assert!(
1140            internal.storage_class.is_none(),
1141            "storage_class should be None"
1142        );
1143        assert!(
1144            internal.retention_policy.is_none(),
1145            "retention_policy should be None"
1146        );
1147        assert!(
1148            internal.timestamping.mode.is_none(),
1149            "timestamping.mode should be None"
1150        );
1151        assert!(
1152            internal.timestamping.uncapped.is_none(),
1153            "timestamping.uncapped should be None"
1154        );
1155        assert!(
1156            internal.delete_on_empty.min_age.is_none(),
1157            "delete_on_empty.min_age should be None"
1158        );
1159        assert!(
1160            internal.encryption.allowed_modes.is_empty(),
1161            "encryption.allowed_modes should be empty"
1162        );
1163    }
1164}