Skip to main content

s2_api/v1/
config.rs

1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11    /// Append tail latency under 400 milliseconds with s2.dev.
12    Standard,
13    /// Append tail latency under 40 milliseconds with s2.dev.
14    Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18    fn from(value: StorageClass) -> Self {
19        match value {
20            StorageClass::Express => Self::Express,
21            StorageClass::Standard => Self::Standard,
22        }
23    }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27    fn from(value: types::config::StorageClass) -> Self {
28        match value {
29            types::config::StorageClass::Express => Self::Express,
30            types::config::StorageClass::Standard => Self::Standard,
31        }
32    }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40    /// Age in seconds for automatic trimming of records older than this threshold.
41    /// This must be set to a value greater than 0 seconds.
42    Age(u64),
43    /// Retain records unless explicitly trimmed.
44    Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53    type Error = types::ValidationError;
54
55    fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56        match value {
57            RetentionPolicy::Age(0) => Err(types::ValidationError(
58                "age must be greater than 0 seconds".to_string(),
59            )),
60            RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61            RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62        }
63    }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67    fn from(value: types::config::RetentionPolicy) -> Self {
68        match value {
69            types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70            types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71        }
72    }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80    /// Prefer client-specified timestamp if present otherwise use arrival time.
81    #[default]
82    ClientPrefer,
83    /// Require a client-specified timestamp and reject the append if it is missing.
84    ClientRequire,
85    /// Use the arrival time and ignore any client-specified timestamp.
86    Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90    fn from(value: TimestampingMode) -> Self {
91        match value {
92            TimestampingMode::ClientPrefer => Self::ClientPrefer,
93            TimestampingMode::ClientRequire => Self::ClientRequire,
94            TimestampingMode::Arrival => Self::Arrival,
95        }
96    }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100    fn from(value: types::config::TimestampingMode) -> Self {
101        match value {
102            types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103            types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104            types::config::TimestampingMode::Arrival => Self::Arrival,
105        }
106    }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113    /// Timestamping mode for appends that influences how timestamps are handled.
114    pub mode: Option<TimestampingMode>,
115    /// Allow client-specified timestamps to exceed the arrival time.
116    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
117    pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121    pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122        let config = TimestampingConfig {
123            mode: config.mode.map(Into::into),
124            uncapped: config.uncapped,
125        };
126        if config == Self::default() {
127            None
128        } else {
129            Some(config)
130        }
131    }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135    fn from(value: types::config::TimestampingConfig) -> Self {
136        Self {
137            mode: Some(value.mode.into()),
138            uncapped: Some(value.uncapped),
139        }
140    }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144    fn from(value: TimestampingConfig) -> Self {
145        Self {
146            mode: value.mode.map(Into::into),
147            uncapped: value.uncapped,
148        }
149    }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156    /// Timestamping mode for appends that influences how timestamps are handled.
157    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159    pub mode: Maybe<Option<TimestampingMode>>,
160    /// Allow client-specified timestamps to exceed the arrival time.
161    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163    pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167    fn from(value: TimestampingReconfiguration) -> Self {
168        Self {
169            mode: value.mode.map_opt(Into::into),
170            uncapped: value.uncapped,
171        }
172    }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176    fn from(value: types::config::TimestampingReconfiguration) -> Self {
177        Self {
178            mode: value.mode.map_opt(Into::into),
179            uncapped: value.uncapped,
180        }
181    }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188    /// Minimum age in seconds before an empty stream can be deleted.
189    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
190    #[serde(default)]
191    pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195    pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196        let min_age = config.min_age.unwrap_or_default();
197        if min_age > Duration::ZERO {
198            Some(DeleteOnEmptyConfig {
199                min_age_secs: min_age.as_secs(),
200            })
201        } else {
202            None
203        }
204    }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208    fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209        Self {
210            min_age_secs: value.min_age.as_secs(),
211        }
212    }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216    fn from(value: DeleteOnEmptyConfig) -> Self {
217        Self {
218            min_age: Duration::from_secs(value.min_age_secs),
219        }
220    }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224    fn from(value: DeleteOnEmptyConfig) -> Self {
225        Self {
226            min_age: Some(Duration::from_secs(value.min_age_secs)),
227        }
228    }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235    /// Minimum age in seconds before an empty stream can be deleted.
236    /// Set to 0 to disable delete-on-empty (don't delete automatically).
237    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238    #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239    pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244        Self {
245            min_age: value.min_age_secs.map_opt(Duration::from_secs),
246        }
247    }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251    fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252        Self {
253            min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254        }
255    }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
259#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
260pub enum EncryptionMode {
261    /// Plaintext (no encryption).
262    #[serde(rename = "plain")]
263    Plain,
264    /// AEGIS-256 authenticated encryption.
265    #[serde(rename = "aegis-256")]
266    Aegis256,
267    /// AES-256-GCM authenticated encryption.
268    #[serde(rename = "aes-256-gcm")]
269    Aes256Gcm,
270}
271
272impl From<EncryptionMode> for encryption::EncryptionMode {
273    fn from(value: EncryptionMode) -> Self {
274        match value {
275            EncryptionMode::Plain => Self::Plain,
276            EncryptionMode::Aegis256 => Self::Aegis256,
277            EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
278        }
279    }
280}
281
282impl From<encryption::EncryptionMode> for EncryptionMode {
283    fn from(value: encryption::EncryptionMode) -> Self {
284        match value {
285            encryption::EncryptionMode::Plain => Self::Plain,
286            encryption::EncryptionMode::Aegis256 => Self::Aegis256,
287            encryption::EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
288        }
289    }
290}
291
292#[rustfmt::skip]
293#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
294#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
295pub struct EncryptionConfig {
296    /// Allowed encryption modes for the stream.
297    /// If empty, use defaults. If no default is configured, only plaintext is allowed.
298    #[serde(default, skip_serializing_if = "Vec::is_empty")]
299    pub allowed_modes: Vec<EncryptionMode>,
300}
301
302impl EncryptionConfig {
303    pub fn to_opt(config: types::config::OptionalEncryptionConfig) -> Option<Self> {
304        let config = EncryptionConfig {
305            allowed_modes: config.allowed_modes.into_iter().map(Into::into).collect(),
306        };
307        if config == Self::default() {
308            None
309        } else {
310            Some(config)
311        }
312    }
313}
314
315impl From<types::config::EncryptionConfig> for EncryptionConfig {
316    fn from(value: types::config::EncryptionConfig) -> Self {
317        Self {
318            allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
319        }
320    }
321}
322
323impl From<EncryptionConfig> for types::config::OptionalEncryptionConfig {
324    fn from(value: EncryptionConfig) -> Self {
325        Self {
326            allowed_modes: value
327                .allowed_modes
328                .into_iter()
329                .map(encryption::EncryptionMode::from)
330                .collect(),
331        }
332    }
333}
334
335#[rustfmt::skip]
336#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
337#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
338pub struct EncryptionReconfiguration {
339    /// Allowed encryption modes for the stream.
340    /// Specify an empty list to reset to defaults.
341    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
342    #[cfg_attr(feature = "utoipa", schema(value_type = Option<Vec<EncryptionMode>>))]
343    pub allowed_modes: Maybe<Vec<EncryptionMode>>,
344}
345
346impl From<EncryptionReconfiguration> for types::config::EncryptionReconfiguration {
347    fn from(value: EncryptionReconfiguration) -> Self {
348        Self {
349            allowed_modes: value.allowed_modes.map(|modes| {
350                modes
351                    .into_iter()
352                    .map(encryption::EncryptionMode::from)
353                    .collect()
354            }),
355        }
356    }
357}
358
359impl From<types::config::EncryptionReconfiguration> for EncryptionReconfiguration {
360    fn from(value: types::config::EncryptionReconfiguration) -> Self {
361        Self {
362            allowed_modes: value
363                .allowed_modes
364                .map(|modes| modes.into_iter().map(Into::into).collect()),
365        }
366    }
367}
368
369#[rustfmt::skip]
370#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
371#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
372pub struct StreamConfig {
373    /// Storage class for recent writes.
374    pub storage_class: Option<StorageClass>,
375    /// Retention policy for the stream.
376    /// If unspecified, the default is to retain records for 7 days.
377    pub retention_policy: Option<RetentionPolicy>,
378    /// Timestamping behavior.
379    pub timestamping: Option<TimestampingConfig>,
380    /// Delete-on-empty configuration.
381    #[serde(default)]
382    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
383    /// Encryption configuration.
384    #[serde(default)]
385    pub encryption: Option<EncryptionConfig>,
386}
387
388impl StreamConfig {
389    pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
390        let types::config::OptionalStreamConfig {
391            storage_class,
392            retention_policy,
393            timestamping,
394            delete_on_empty,
395            encryption,
396        } = config;
397
398        let config = StreamConfig {
399            storage_class: storage_class.map(Into::into),
400            retention_policy: retention_policy.map(Into::into),
401            timestamping: TimestampingConfig::to_opt(timestamping),
402            delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
403            encryption: EncryptionConfig::to_opt(encryption),
404        };
405        if config == Self::default() {
406            None
407        } else {
408            Some(config)
409        }
410    }
411}
412
413impl From<types::config::StreamConfig> for StreamConfig {
414    fn from(value: types::config::StreamConfig) -> Self {
415        let types::config::StreamConfig {
416            storage_class,
417            retention_policy,
418            timestamping,
419            delete_on_empty,
420            encryption,
421        } = value;
422
423        Self {
424            storage_class: Some(storage_class.into()),
425            retention_policy: Some(retention_policy.into()),
426            timestamping: Some(timestamping.into()),
427            delete_on_empty: Some(delete_on_empty.into()),
428            encryption: Some(encryption.into()),
429        }
430    }
431}
432
433impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
434    type Error = types::ValidationError;
435
436    fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
437        let StreamConfig {
438            storage_class,
439            retention_policy,
440            timestamping,
441            delete_on_empty,
442            encryption,
443        } = value;
444
445        let retention_policy = match retention_policy {
446            None => None,
447            Some(policy) => Some(policy.try_into()?),
448        };
449
450        Ok(Self {
451            storage_class: storage_class.map(Into::into),
452            retention_policy,
453            timestamping: timestamping.map(Into::into).unwrap_or_default(),
454            delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
455            encryption: encryption.map(Into::into).unwrap_or_default(),
456        })
457    }
458}
459
460#[rustfmt::skip]
461#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
462#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
463pub struct StreamReconfiguration {
464    /// Storage class for recent writes.
465    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
466    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
467    pub storage_class: Maybe<Option<StorageClass>>,
468    /// Retention policy for the stream.
469    /// If unspecified, the default is to retain records for 7 days.
470    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
471    #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
472    pub retention_policy: Maybe<Option<RetentionPolicy>>,
473    /// Timestamping behavior.
474    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
475    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
476    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
477    /// Delete-on-empty configuration.
478    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
479    #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
480    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
481    /// Encryption configuration.
482    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
483    #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionReconfiguration>))]
484    pub encryption: Maybe<Option<EncryptionReconfiguration>>,
485}
486
487impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
488    type Error = types::ValidationError;
489
490    fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
491        let StreamReconfiguration {
492            storage_class,
493            retention_policy,
494            timestamping,
495            delete_on_empty,
496            encryption,
497        } = value;
498
499        Ok(Self {
500            storage_class: storage_class.map_opt(Into::into),
501            retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
502            timestamping: timestamping.map_opt(Into::into),
503            delete_on_empty: delete_on_empty.map_opt(Into::into),
504            encryption: encryption.map_opt(Into::into),
505        })
506    }
507}
508
509impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
510    fn from(value: types::config::StreamReconfiguration) -> Self {
511        let types::config::StreamReconfiguration {
512            storage_class,
513            retention_policy,
514            timestamping,
515            delete_on_empty,
516            encryption,
517        } = value;
518
519        Self {
520            storage_class: storage_class.map_opt(Into::into),
521            retention_policy: retention_policy.map_opt(Into::into),
522            timestamping: timestamping.map_opt(Into::into),
523            delete_on_empty: delete_on_empty.map_opt(Into::into),
524            encryption: encryption.map_opt(Into::into),
525        }
526    }
527}
528
529#[rustfmt::skip]
530#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
531#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
532pub struct BasinConfig {
533    /// Default stream configuration.
534    pub default_stream_config: Option<StreamConfig>,
535    /// Create stream on append if it doesn't exist, using the default stream configuration.
536    #[serde(default)]
537    #[cfg_attr(feature = "utoipa", schema(default = false))]
538    pub create_stream_on_append: bool,
539    /// Create stream on read if it doesn't exist, using the default stream configuration.
540    #[serde(default)]
541    #[cfg_attr(feature = "utoipa", schema(default = false))]
542    pub create_stream_on_read: bool,
543}
544
545impl TryFrom<BasinConfig> for types::config::BasinConfig {
546    type Error = types::ValidationError;
547
548    fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
549        let BasinConfig {
550            default_stream_config,
551            create_stream_on_append,
552            create_stream_on_read,
553        } = value;
554
555        Ok(Self {
556            default_stream_config: match default_stream_config {
557                Some(config) => config.try_into()?,
558                None => Default::default(),
559            },
560            create_stream_on_append,
561            create_stream_on_read,
562        })
563    }
564}
565
566impl From<types::config::BasinConfig> for BasinConfig {
567    fn from(value: types::config::BasinConfig) -> Self {
568        let types::config::BasinConfig {
569            default_stream_config,
570            create_stream_on_append,
571            create_stream_on_read,
572        } = value;
573
574        Self {
575            default_stream_config: StreamConfig::to_opt(default_stream_config),
576            create_stream_on_append,
577            create_stream_on_read,
578        }
579    }
580}
581
582#[rustfmt::skip]
583#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
584#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
585pub struct BasinReconfiguration {
586    /// Basin configuration.
587    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
588    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
589    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
590    /// Create a stream on append.
591    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
592    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
593    pub create_stream_on_append: Maybe<bool>,
594    /// Create a stream on read.
595    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
596    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
597    pub create_stream_on_read: Maybe<bool>,
598}
599
600impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
601    type Error = types::ValidationError;
602
603    fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
604        let BasinReconfiguration {
605            default_stream_config,
606            create_stream_on_append,
607            create_stream_on_read,
608        } = value;
609
610        Ok(Self {
611            default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
612            create_stream_on_append: create_stream_on_append.map(Into::into),
613            create_stream_on_read: create_stream_on_read.map(Into::into),
614        })
615    }
616}
617
618impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
619    fn from(value: types::config::BasinReconfiguration) -> Self {
620        let types::config::BasinReconfiguration {
621            default_stream_config,
622            create_stream_on_append,
623            create_stream_on_read,
624        } = value;
625
626        Self {
627            default_stream_config: default_stream_config.map_opt(Into::into),
628            create_stream_on_append: create_stream_on_append.map(Into::into),
629            create_stream_on_read: create_stream_on_read.map(Into::into),
630        }
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use proptest::prelude::*;
637
638    use super::*;
639
640    fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
641        prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
642    }
643
644    fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
645        prop_oneof![
646            Just(TimestampingMode::ClientPrefer),
647            Just(TimestampingMode::ClientRequire),
648            Just(TimestampingMode::Arrival),
649        ]
650    }
651
652    fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
653        prop_oneof![
654            any::<u64>().prop_map(RetentionPolicy::Age),
655            Just(RetentionPolicy::Infinite(InfiniteRetention {})),
656        ]
657    }
658
659    fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
660        (
661            proptest::option::of(gen_timestamping_mode()),
662            proptest::option::of(any::<bool>()),
663        )
664            .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
665    }
666
667    fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
668        any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
669    }
670
671    fn gen_encryption_mode() -> impl Strategy<Value = EncryptionMode> {
672        prop_oneof![
673            Just(EncryptionMode::Plain),
674            Just(EncryptionMode::Aegis256),
675            Just(EncryptionMode::Aes256Gcm),
676        ]
677    }
678
679    fn gen_encryption_config() -> impl Strategy<Value = EncryptionConfig> {
680        (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
681            let allowed_modes = [
682                (plain, EncryptionMode::Plain),
683                (aegis, EncryptionMode::Aegis256),
684                (aes, EncryptionMode::Aes256Gcm),
685            ]
686            .into_iter()
687            .filter_map(|(include, mode)| include.then_some(mode))
688            .collect();
689            EncryptionConfig { allowed_modes }
690        })
691    }
692
693    fn gen_encryption_reconfiguration() -> impl Strategy<Value = EncryptionReconfiguration> {
694        prop_oneof![
695            Just(Maybe::Unspecified),
696            proptest::collection::vec(gen_encryption_mode(), 0..=3).prop_map(Maybe::Specified),
697        ]
698        .prop_map(|allowed_modes| EncryptionReconfiguration { allowed_modes })
699    }
700
701    fn gen_internal_encryption_modes()
702    -> impl Strategy<Value = enumset::EnumSet<encryption::EncryptionMode>> {
703        (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
704            let mut allowed_modes = enumset::EnumSet::new();
705            if plain {
706                allowed_modes.insert(encryption::EncryptionMode::Plain);
707            }
708            if aegis {
709                allowed_modes.insert(encryption::EncryptionMode::Aegis256);
710            }
711            if aes {
712                allowed_modes.insert(encryption::EncryptionMode::Aes256Gcm);
713            }
714            allowed_modes
715        })
716    }
717
718    fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
719        (
720            proptest::option::of(gen_storage_class()),
721            proptest::option::of(gen_retention_policy()),
722            proptest::option::of(gen_timestamping_config()),
723            proptest::option::of(gen_delete_on_empty_config()),
724            proptest::option::of(gen_encryption_config()),
725        )
726            .prop_map(
727                |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
728                    StreamConfig {
729                        storage_class,
730                        retention_policy,
731                        timestamping,
732                        delete_on_empty,
733                        encryption,
734                    }
735                },
736            )
737    }
738
739    fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
740        (
741            proptest::option::of(gen_stream_config()),
742            any::<bool>(),
743            any::<bool>(),
744        )
745            .prop_map(
746                |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
747                    BasinConfig {
748                        default_stream_config,
749                        create_stream_on_append,
750                        create_stream_on_read,
751                    }
752                },
753            )
754    }
755
756    fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
757        inner: impl Strategy<Value = T>,
758    ) -> impl Strategy<Value = Maybe<Option<T>>> {
759        prop_oneof![
760            Just(Maybe::Unspecified),
761            Just(Maybe::Specified(None)),
762            inner.prop_map(|v| Maybe::Specified(Some(v))),
763        ]
764    }
765
766    fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
767        (
768            gen_maybe(gen_storage_class()),
769            gen_maybe(gen_retention_policy()),
770            gen_maybe(gen_timestamping_reconfiguration()),
771            gen_maybe(gen_delete_on_empty_reconfiguration()),
772            gen_maybe(gen_encryption_reconfiguration()),
773        )
774            .prop_map(
775                |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
776                    StreamReconfiguration {
777                        storage_class,
778                        retention_policy,
779                        timestamping,
780                        delete_on_empty,
781                        encryption,
782                    }
783                },
784            )
785    }
786
787    fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
788        (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
789            .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
790    }
791
792    fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
793    {
794        gen_maybe(any::<u64>())
795            .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
796    }
797
798    fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
799        (
800            gen_maybe(gen_stream_reconfiguration()),
801            prop_oneof![
802                Just(Maybe::Unspecified),
803                any::<bool>().prop_map(Maybe::Specified),
804            ],
805            prop_oneof![
806                Just(Maybe::Unspecified),
807                any::<bool>().prop_map(Maybe::Specified),
808            ],
809        )
810            .prop_map(
811                |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
812                    BasinReconfiguration {
813                        default_stream_config,
814                        create_stream_on_append,
815                        create_stream_on_read,
816                    }
817                },
818            )
819    }
820
821    fn gen_internal_optional_stream_config()
822    -> impl Strategy<Value = types::config::OptionalStreamConfig> {
823        (
824            proptest::option::of(gen_storage_class()),
825            proptest::option::of(gen_retention_policy()),
826            proptest::option::of(gen_timestamping_mode()),
827            proptest::option::of(any::<bool>()),
828            proptest::option::of(any::<u64>()),
829            gen_internal_encryption_modes(),
830        )
831            .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe, encryption)| {
832                types::config::OptionalStreamConfig {
833                    storage_class: sc.map(Into::into),
834                    retention_policy: rp.map(|rp| match rp {
835                        RetentionPolicy::Age(secs) => {
836                            types::config::RetentionPolicy::Age(Duration::from_secs(secs))
837                        }
838                        RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
839                    }),
840                    timestamping: types::config::OptionalTimestampingConfig {
841                        mode: ts_mode.map(Into::into),
842                        uncapped: ts_uncapped,
843                    },
844                    delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
845                        min_age: doe.map(Duration::from_secs),
846                    },
847                    encryption: types::config::OptionalEncryptionConfig {
848                        allowed_modes: encryption,
849                    },
850                }
851            })
852    }
853
854    proptest! {
855        #[test]
856        fn stream_config_conversion_validates(config in gen_stream_config()) {
857            let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
858            let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
859
860            if has_zero_age {
861                prop_assert!(result.is_err());
862            } else {
863                prop_assert!(result.is_ok());
864            }
865        }
866
867        #[test]
868        fn basin_config_conversion_validates(config in gen_basin_config()) {
869            let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
870                matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
871            });
872
873            let result: Result<types::config::BasinConfig, _> = config.try_into();
874
875            if has_invalid_config {
876                prop_assert!(result.is_err());
877            } else {
878                prop_assert!(result.is_ok());
879            }
880        }
881
882        #[test]
883        fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
884            let has_zero_age = matches!(
885                reconfig.retention_policy,
886                Maybe::Specified(Some(RetentionPolicy::Age(0)))
887            );
888            let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
889
890            if has_zero_age {
891                prop_assert!(result.is_err());
892            } else {
893                prop_assert!(result.is_ok());
894            }
895        }
896
897        #[test]
898        fn merge_stream_or_basin_or_default(
899            stream in gen_internal_optional_stream_config(),
900            basin in gen_internal_optional_stream_config(),
901        ) {
902            let merged = stream.clone().merge(basin.clone());
903
904            prop_assert_eq!(
905                merged.storage_class,
906                stream.storage_class.or(basin.storage_class).unwrap_or_default()
907            );
908            prop_assert_eq!(
909                merged.retention_policy,
910                stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
911            );
912            prop_assert_eq!(
913                merged.timestamping.mode,
914                stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
915            );
916            prop_assert_eq!(
917                merged.timestamping.uncapped,
918                stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
919            );
920            prop_assert_eq!(
921                merged.delete_on_empty.min_age,
922                stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
923            );
924            prop_assert_eq!(
925                merged.encryption.allowed_modes,
926                if !stream.encryption.allowed_modes.is_empty() {
927                    stream.encryption.allowed_modes
928                } else if !basin.encryption.allowed_modes.is_empty() {
929                    basin.encryption.allowed_modes
930                } else {
931                    types::config::DEFAULT_ALLOWED_ENCRYPTION_MODES
932                }
933            );
934        }
935
936        #[test]
937        fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
938            let reconfig = types::config::StreamReconfiguration::default();
939            let result = base.clone().reconfigure(reconfig);
940
941            prop_assert_eq!(result.storage_class, base.storage_class);
942            prop_assert_eq!(result.retention_policy, base.retention_policy);
943            prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
944            prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
945            prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
946            prop_assert_eq!(result.encryption.allowed_modes, base.encryption.allowed_modes);
947        }
948
949        #[test]
950        fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
951            let reconfig = types::config::StreamReconfiguration {
952                storage_class: Maybe::Specified(None),
953                retention_policy: Maybe::Specified(None),
954                timestamping: Maybe::Specified(None),
955                delete_on_empty: Maybe::Specified(None),
956                encryption: Maybe::Specified(None),
957            };
958            let result = base.reconfigure(reconfig);
959
960            prop_assert!(result.storage_class.is_none());
961            prop_assert!(result.retention_policy.is_none());
962            prop_assert!(result.timestamping.mode.is_none());
963            prop_assert!(result.timestamping.uncapped.is_none());
964            prop_assert!(result.delete_on_empty.min_age.is_none());
965            prop_assert!(result.encryption.allowed_modes.is_empty());
966        }
967
968        #[test]
969        fn reconfigure_specified_some_sets_value(
970            base in gen_internal_optional_stream_config(),
971            new_sc in gen_storage_class(),
972            new_rp_secs in 1u64..u64::MAX,
973        ) {
974            let reconfig = types::config::StreamReconfiguration {
975                storage_class: Maybe::Specified(Some(new_sc.into())),
976                retention_policy: Maybe::Specified(Some(
977                    types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
978                )),
979                ..Default::default()
980            };
981            let result = base.reconfigure(reconfig);
982
983            prop_assert_eq!(result.storage_class, Some(new_sc.into()));
984            prop_assert_eq!(
985                result.retention_policy,
986                Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
987            );
988        }
989
990        #[test]
991        fn to_opt_returns_some_for_non_defaults(
992            sc in gen_storage_class(),
993            doe_secs in 1u64..u64::MAX,
994            ts_mode in gen_timestamping_mode(),
995        ) {
996            // non-default storage class -> Some
997            let internal = types::config::OptionalStreamConfig {
998                storage_class: Some(sc.into()),
999                ..Default::default()
1000            };
1001            prop_assert!(StreamConfig::to_opt(internal).is_some());
1002
1003            // non-zero delete_on_empty -> Some
1004            let internal = types::config::OptionalDeleteOnEmptyConfig {
1005                min_age: Some(Duration::from_secs(doe_secs)),
1006            };
1007            let api = DeleteOnEmptyConfig::to_opt(internal);
1008            prop_assert!(api.is_some());
1009            prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
1010
1011            // non-default timestamping -> Some
1012            let internal = types::config::OptionalTimestampingConfig {
1013                mode: Some(ts_mode.into()),
1014                uncapped: None,
1015            };
1016            prop_assert!(TimestampingConfig::to_opt(internal).is_some());
1017        }
1018
1019        #[test]
1020        fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
1021            let has_zero_age = matches!(
1022                &reconfig.default_stream_config,
1023                Maybe::Specified(Some(sr)) if matches!(
1024                    sr.retention_policy,
1025                    Maybe::Specified(Some(RetentionPolicy::Age(0)))
1026                )
1027            );
1028            let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
1029
1030            if has_zero_age {
1031                prop_assert!(result.is_err());
1032            } else {
1033                prop_assert!(result.is_ok());
1034            }
1035        }
1036
1037        #[test]
1038        fn reconfigure_basin_unspecified_preserves(
1039            base_sc in proptest::option::of(gen_storage_class()),
1040            base_on_append in any::<bool>(),
1041            base_on_read in any::<bool>(),
1042        ) {
1043            let base = types::config::BasinConfig {
1044                default_stream_config: types::config::OptionalStreamConfig {
1045                    storage_class: base_sc.map(Into::into),
1046                    ..Default::default()
1047                },
1048                create_stream_on_append: base_on_append,
1049                create_stream_on_read: base_on_read,
1050            };
1051
1052            let reconfig = types::config::BasinReconfiguration::default();
1053            let result = base.clone().reconfigure(reconfig);
1054
1055            prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
1056            prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
1057            prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
1058        }
1059
1060        #[test]
1061        fn reconfigure_basin_specified_updates(
1062            base_on_append in any::<bool>(),
1063            new_on_append in any::<bool>(),
1064            new_sc in gen_storage_class(),
1065        ) {
1066            let base = types::config::BasinConfig {
1067                create_stream_on_append: base_on_append,
1068                ..Default::default()
1069            };
1070
1071            let reconfig = types::config::BasinReconfiguration {
1072                default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
1073                    storage_class: Maybe::Specified(Some(new_sc.into())),
1074                    ..Default::default()
1075                })),
1076                create_stream_on_append: Maybe::Specified(new_on_append),
1077                ..Default::default()
1078            };
1079            let result = base.reconfigure(reconfig);
1080
1081            prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
1082            prop_assert_eq!(result.create_stream_on_append, new_on_append);
1083        }
1084
1085        #[test]
1086        fn reconfigure_nested_partial_update(
1087            base_mode in gen_timestamping_mode(),
1088            base_uncapped in any::<bool>(),
1089            new_mode in gen_timestamping_mode(),
1090        ) {
1091            let base = types::config::OptionalStreamConfig {
1092                timestamping: types::config::OptionalTimestampingConfig {
1093                    mode: Some(base_mode.into()),
1094                    uncapped: Some(base_uncapped),
1095                },
1096                ..Default::default()
1097            };
1098
1099            let expected_mode: types::config::TimestampingMode = new_mode.into();
1100
1101            let reconfig = types::config::StreamReconfiguration {
1102                timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1103                    mode: Maybe::Specified(Some(expected_mode)),
1104                    uncapped: Maybe::Unspecified,
1105                })),
1106                ..Default::default()
1107            };
1108            let result = base.reconfigure(reconfig);
1109
1110            prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1111            prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1112        }
1113    }
1114
1115    #[test]
1116    fn to_opt_returns_none_for_defaults() {
1117        // default stream config -> None
1118        assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1119
1120        // delete_on_empty: None or Some(ZERO) -> None
1121        let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1122        let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
1123            min_age: Some(Duration::ZERO),
1124        };
1125        assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1126        assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
1127
1128        // default timestamping -> None
1129        assert!(
1130            TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1131                .is_none()
1132        );
1133    }
1134
1135    #[test]
1136    fn empty_json_converts_to_all_none() {
1137        let json = serde_json::json!({});
1138        let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1139        let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1140
1141        assert!(
1142            internal.storage_class.is_none(),
1143            "storage_class should be None"
1144        );
1145        assert!(
1146            internal.retention_policy.is_none(),
1147            "retention_policy should be None"
1148        );
1149        assert!(
1150            internal.timestamping.mode.is_none(),
1151            "timestamping.mode should be None"
1152        );
1153        assert!(
1154            internal.timestamping.uncapped.is_none(),
1155            "timestamping.uncapped should be None"
1156        );
1157        assert!(
1158            internal.delete_on_empty.min_age.is_none(),
1159            "delete_on_empty.min_age should be None"
1160        );
1161        assert!(
1162            internal.encryption.allowed_modes.is_empty(),
1163            "encryption.allowed_modes should be empty"
1164        );
1165    }
1166}