Skip to main content

s2_api/v1/
config.rs

1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11    /// Append tail latency under 400 milliseconds with s2.dev.
12    Standard,
13    /// Append tail latency under 40 milliseconds with s2.dev.
14    Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18    fn from(value: StorageClass) -> Self {
19        match value {
20            StorageClass::Express => Self::Express,
21            StorageClass::Standard => Self::Standard,
22        }
23    }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27    fn from(value: types::config::StorageClass) -> Self {
28        match value {
29            types::config::StorageClass::Express => Self::Express,
30            types::config::StorageClass::Standard => Self::Standard,
31        }
32    }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40    /// Age in seconds for automatic trimming of records older than this threshold.
41    /// This must be set to a value greater than 0 seconds.
42    Age(u64),
43    /// Retain records unless explicitly trimmed.
44    Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53    type Error = types::ValidationError;
54
55    fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56        match value {
57            RetentionPolicy::Age(0) => Err(types::ValidationError(
58                "age must be greater than 0 seconds".to_string(),
59            )),
60            RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61            RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62        }
63    }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67    fn from(value: types::config::RetentionPolicy) -> Self {
68        match value {
69            types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70            types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71        }
72    }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80    /// Prefer client-specified timestamp if present otherwise use arrival time.
81    #[default]
82    ClientPrefer,
83    /// Require a client-specified timestamp and reject the append if it is missing.
84    ClientRequire,
85    /// Use the arrival time and ignore any client-specified timestamp.
86    Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90    fn from(value: TimestampingMode) -> Self {
91        match value {
92            TimestampingMode::ClientPrefer => Self::ClientPrefer,
93            TimestampingMode::ClientRequire => Self::ClientRequire,
94            TimestampingMode::Arrival => Self::Arrival,
95        }
96    }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100    fn from(value: types::config::TimestampingMode) -> Self {
101        match value {
102            types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103            types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104            types::config::TimestampingMode::Arrival => Self::Arrival,
105        }
106    }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113    /// Timestamping mode for appends that influences how timestamps are handled.
114    pub mode: Option<TimestampingMode>,
115    /// Allow client-specified timestamps to exceed the arrival time.
116    /// If this is `false` or not set, client timestamps will be capped at the arrival time.
117    pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121    pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122        let config = TimestampingConfig {
123            mode: config.mode.map(Into::into),
124            uncapped: config.uncapped,
125        };
126        if config == Self::default() {
127            None
128        } else {
129            Some(config)
130        }
131    }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135    fn from(value: types::config::TimestampingConfig) -> Self {
136        Self {
137            mode: Some(value.mode.into()),
138            uncapped: Some(value.uncapped),
139        }
140    }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144    fn from(value: TimestampingConfig) -> Self {
145        Self {
146            mode: value.mode.map(Into::into),
147            uncapped: value.uncapped,
148        }
149    }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156    /// Timestamping mode for appends that influences how timestamps are handled.
157    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159    pub mode: Maybe<Option<TimestampingMode>>,
160    /// Allow client-specified timestamps to exceed the arrival time.
161    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163    pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167    fn from(value: TimestampingReconfiguration) -> Self {
168        Self {
169            mode: value.mode.map_opt(Into::into),
170            uncapped: value.uncapped,
171        }
172    }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176    fn from(value: types::config::TimestampingReconfiguration) -> Self {
177        Self {
178            mode: value.mode.map_opt(Into::into),
179            uncapped: value.uncapped,
180        }
181    }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188    /// Minimum age in seconds before an empty stream can be deleted.
189    /// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
190    #[serde(default)]
191    pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195    pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196        let min_age = config.min_age.unwrap_or_default();
197        if min_age > Duration::ZERO {
198            Some(DeleteOnEmptyConfig {
199                min_age_secs: min_age.as_secs(),
200            })
201        } else {
202            None
203        }
204    }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208    fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209        Self {
210            min_age_secs: value.min_age.as_secs(),
211        }
212    }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216    fn from(value: DeleteOnEmptyConfig) -> Self {
217        Self {
218            min_age: Duration::from_secs(value.min_age_secs),
219        }
220    }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224    fn from(value: DeleteOnEmptyConfig) -> Self {
225        Self {
226            min_age: Some(Duration::from_secs(value.min_age_secs)),
227        }
228    }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235    /// Minimum age in seconds before an empty stream can be deleted.
236    /// Set to 0 to disable delete-on-empty (don't delete automatically).
237    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238    #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239    pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244        Self {
245            min_age: value.min_age_secs.map_opt(Duration::from_secs),
246        }
247    }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251    fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252        Self {
253            min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254        }
255    }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
259#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
260#[serde(rename_all = "kebab-case")]
261pub enum EncryptionMode {
262    /// Plaintext (no encryption).
263    Plain,
264    /// AEGIS-256 authenticated encryption.
265    Aegis256,
266    /// AES-256-GCM authenticated encryption.
267    Aes256Gcm,
268}
269
270impl From<EncryptionMode> for encryption::EncryptionMode {
271    fn from(value: EncryptionMode) -> Self {
272        match value {
273            EncryptionMode::Plain => Self::Plain,
274            EncryptionMode::Aegis256 => Self::Aegis256,
275            EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
276        }
277    }
278}
279
280impl From<encryption::EncryptionMode> for EncryptionMode {
281    fn from(value: encryption::EncryptionMode) -> Self {
282        match value {
283            encryption::EncryptionMode::Plain => Self::Plain,
284            encryption::EncryptionMode::Aegis256 => Self::Aegis256,
285            encryption::EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
286        }
287    }
288}
289
290#[rustfmt::skip]
291#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
292#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
293pub struct EncryptionConfig {
294    /// Allowed encryption modes for the stream.
295    /// If empty, use defaults. If no default is configured, only plaintext is allowed.
296    #[serde(default, skip_serializing_if = "Vec::is_empty")]
297    pub allowed_modes: Vec<EncryptionMode>,
298}
299
300impl EncryptionConfig {
301    pub fn to_opt(config: types::config::OptionalEncryptionConfig) -> Option<Self> {
302        let config = EncryptionConfig {
303            allowed_modes: config
304                .allowed_modes
305                .map(|modes| modes.into_iter().map(Into::into).collect())
306                .unwrap_or_default(),
307        };
308        if config == Self::default() {
309            None
310        } else {
311            Some(config)
312        }
313    }
314}
315
316impl From<types::config::EncryptionConfig> for EncryptionConfig {
317    fn from(value: types::config::EncryptionConfig) -> Self {
318        Self {
319            allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
320        }
321    }
322}
323
324impl From<EncryptionConfig> for types::config::OptionalEncryptionConfig {
325    fn from(value: EncryptionConfig) -> Self {
326        Self {
327            allowed_modes: if value.allowed_modes.is_empty() {
328                None
329            } else {
330                Some(
331                    value
332                        .allowed_modes
333                        .into_iter()
334                        .map(encryption::EncryptionMode::from)
335                        .collect(),
336                )
337            },
338        }
339    }
340}
341
342#[rustfmt::skip]
343#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
344#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
345pub struct EncryptionReconfiguration {
346    /// Allowed encryption modes for the stream.
347    /// Specify an empty list to reset to defaults.
348    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
349    #[cfg_attr(feature = "utoipa", schema(value_type = Option<Vec<EncryptionMode>>))]
350    pub allowed_modes: Maybe<Vec<EncryptionMode>>,
351}
352
353impl From<EncryptionReconfiguration> for types::config::EncryptionReconfiguration {
354    fn from(value: EncryptionReconfiguration) -> Self {
355        Self {
356            allowed_modes: value.allowed_modes.map(|modes| {
357                modes
358                    .into_iter()
359                    .map(encryption::EncryptionMode::from)
360                    .collect()
361            }),
362        }
363    }
364}
365
366impl From<types::config::EncryptionReconfiguration> for EncryptionReconfiguration {
367    fn from(value: types::config::EncryptionReconfiguration) -> Self {
368        Self {
369            allowed_modes: value
370                .allowed_modes
371                .map(|modes| modes.into_iter().map(Into::into).collect()),
372        }
373    }
374}
375
376#[rustfmt::skip]
377#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
378#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
379pub struct StreamConfig {
380    /// Storage class for recent writes.
381    pub storage_class: Option<StorageClass>,
382    /// Retention policy for the stream.
383    /// If unspecified, the default is to retain records for 7 days.
384    pub retention_policy: Option<RetentionPolicy>,
385    /// Timestamping behavior.
386    pub timestamping: Option<TimestampingConfig>,
387    /// Delete-on-empty configuration.
388    #[serde(default)]
389    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
390    /// Encryption configuration.
391    #[serde(default)]
392    pub encryption: Option<EncryptionConfig>,
393}
394
395impl StreamConfig {
396    pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
397        let types::config::OptionalStreamConfig {
398            storage_class,
399            retention_policy,
400            timestamping,
401            delete_on_empty,
402            encryption,
403        } = config;
404
405        let config = StreamConfig {
406            storage_class: storage_class.map(Into::into),
407            retention_policy: retention_policy.map(Into::into),
408            timestamping: TimestampingConfig::to_opt(timestamping),
409            delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
410            encryption: EncryptionConfig::to_opt(encryption),
411        };
412        if config == Self::default() {
413            None
414        } else {
415            Some(config)
416        }
417    }
418}
419
420impl From<types::config::StreamConfig> for StreamConfig {
421    fn from(value: types::config::StreamConfig) -> Self {
422        let types::config::StreamConfig {
423            storage_class,
424            retention_policy,
425            timestamping,
426            delete_on_empty,
427            encryption,
428        } = value;
429
430        Self {
431            storage_class: Some(storage_class.into()),
432            retention_policy: Some(retention_policy.into()),
433            timestamping: Some(timestamping.into()),
434            delete_on_empty: Some(delete_on_empty.into()),
435            encryption: Some(encryption.into()),
436        }
437    }
438}
439
440impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
441    type Error = types::ValidationError;
442
443    fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
444        let StreamConfig {
445            storage_class,
446            retention_policy,
447            timestamping,
448            delete_on_empty,
449            encryption,
450        } = value;
451
452        let retention_policy = match retention_policy {
453            None => None,
454            Some(policy) => Some(policy.try_into()?),
455        };
456
457        Ok(Self {
458            storage_class: storage_class.map(Into::into),
459            retention_policy,
460            timestamping: timestamping.map(Into::into).unwrap_or_default(),
461            delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
462            encryption: encryption.map(Into::into).unwrap_or_default(),
463        })
464    }
465}
466
467#[rustfmt::skip]
468#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
469#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
470pub struct StreamReconfiguration {
471    /// Storage class for recent writes.
472    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
473    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
474    pub storage_class: Maybe<Option<StorageClass>>,
475    /// Retention policy for the stream.
476    /// If unspecified, the default is to retain records for 7 days.
477    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
478    #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
479    pub retention_policy: Maybe<Option<RetentionPolicy>>,
480    /// Timestamping behavior.
481    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
482    #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
483    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
484    /// Delete-on-empty configuration.
485    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
486    #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
487    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
488    /// Encryption configuration.
489    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
490    #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionReconfiguration>))]
491    pub encryption: Maybe<Option<EncryptionReconfiguration>>,
492}
493
494impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
495    type Error = types::ValidationError;
496
497    fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
498        let StreamReconfiguration {
499            storage_class,
500            retention_policy,
501            timestamping,
502            delete_on_empty,
503            encryption,
504        } = value;
505
506        Ok(Self {
507            storage_class: storage_class.map_opt(Into::into),
508            retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
509            timestamping: timestamping.map_opt(Into::into),
510            delete_on_empty: delete_on_empty.map_opt(Into::into),
511            encryption: encryption.map_opt(Into::into),
512        })
513    }
514}
515
516impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
517    fn from(value: types::config::StreamReconfiguration) -> Self {
518        let types::config::StreamReconfiguration {
519            storage_class,
520            retention_policy,
521            timestamping,
522            delete_on_empty,
523            encryption,
524        } = value;
525
526        Self {
527            storage_class: storage_class.map_opt(Into::into),
528            retention_policy: retention_policy.map_opt(Into::into),
529            timestamping: timestamping.map_opt(Into::into),
530            delete_on_empty: delete_on_empty.map_opt(Into::into),
531            encryption: encryption.map_opt(Into::into),
532        }
533    }
534}
535
536#[rustfmt::skip]
537#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
538#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
539pub struct BasinConfig {
540    /// Default stream configuration.
541    pub default_stream_config: Option<StreamConfig>,
542    /// Create stream on append if it doesn't exist, using the default stream configuration.
543    #[serde(default)]
544    #[cfg_attr(feature = "utoipa", schema(default = false))]
545    pub create_stream_on_append: bool,
546    /// Create stream on read if it doesn't exist, using the default stream configuration.
547    #[serde(default)]
548    #[cfg_attr(feature = "utoipa", schema(default = false))]
549    pub create_stream_on_read: bool,
550}
551
552impl TryFrom<BasinConfig> for types::config::BasinConfig {
553    type Error = types::ValidationError;
554
555    fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
556        let BasinConfig {
557            default_stream_config,
558            create_stream_on_append,
559            create_stream_on_read,
560        } = value;
561
562        Ok(Self {
563            default_stream_config: match default_stream_config {
564                Some(config) => config.try_into()?,
565                None => Default::default(),
566            },
567            create_stream_on_append,
568            create_stream_on_read,
569        })
570    }
571}
572
573impl From<types::config::BasinConfig> for BasinConfig {
574    fn from(value: types::config::BasinConfig) -> Self {
575        let types::config::BasinConfig {
576            default_stream_config,
577            create_stream_on_append,
578            create_stream_on_read,
579        } = value;
580
581        Self {
582            default_stream_config: StreamConfig::to_opt(default_stream_config),
583            create_stream_on_append,
584            create_stream_on_read,
585        }
586    }
587}
588
589#[rustfmt::skip]
590#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
591#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
592pub struct BasinReconfiguration {
593    /// Basin configuration.
594    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
595    #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
596    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
597    /// Create a stream on append.
598    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
599    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
600    pub create_stream_on_append: Maybe<bool>,
601    /// Create a stream on read.
602    #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
603    #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
604    pub create_stream_on_read: Maybe<bool>,
605}
606
607impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
608    type Error = types::ValidationError;
609
610    fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
611        let BasinReconfiguration {
612            default_stream_config,
613            create_stream_on_append,
614            create_stream_on_read,
615        } = value;
616
617        Ok(Self {
618            default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
619            create_stream_on_append: create_stream_on_append.map(Into::into),
620            create_stream_on_read: create_stream_on_read.map(Into::into),
621        })
622    }
623}
624
625impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
626    fn from(value: types::config::BasinReconfiguration) -> Self {
627        let types::config::BasinReconfiguration {
628            default_stream_config,
629            create_stream_on_append,
630            create_stream_on_read,
631        } = value;
632
633        Self {
634            default_stream_config: default_stream_config.map_opt(Into::into),
635            create_stream_on_append: create_stream_on_append.map(Into::into),
636            create_stream_on_read: create_stream_on_read.map(Into::into),
637        }
638    }
639}
640
641#[cfg(test)]
642mod tests {
643    use proptest::prelude::*;
644
645    use super::*;
646
647    fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
648        prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
649    }
650
651    fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
652        prop_oneof![
653            Just(TimestampingMode::ClientPrefer),
654            Just(TimestampingMode::ClientRequire),
655            Just(TimestampingMode::Arrival),
656        ]
657    }
658
659    fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
660        prop_oneof![
661            any::<u64>().prop_map(RetentionPolicy::Age),
662            Just(RetentionPolicy::Infinite(InfiniteRetention {})),
663        ]
664    }
665
666    fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
667        (
668            proptest::option::of(gen_timestamping_mode()),
669            proptest::option::of(any::<bool>()),
670        )
671            .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
672    }
673
674    fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
675        any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
676    }
677
678    fn gen_encryption_mode() -> impl Strategy<Value = EncryptionMode> {
679        prop_oneof![
680            Just(EncryptionMode::Plain),
681            Just(EncryptionMode::Aegis256),
682            Just(EncryptionMode::Aes256Gcm),
683        ]
684    }
685
686    fn gen_encryption_config() -> impl Strategy<Value = EncryptionConfig> {
687        (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
688            let allowed_modes = [
689                (plain, EncryptionMode::Plain),
690                (aegis, EncryptionMode::Aegis256),
691                (aes, EncryptionMode::Aes256Gcm),
692            ]
693            .into_iter()
694            .filter_map(|(include, mode)| include.then_some(mode))
695            .collect();
696            EncryptionConfig { allowed_modes }
697        })
698    }
699
700    fn gen_encryption_reconfiguration() -> impl Strategy<Value = EncryptionReconfiguration> {
701        prop_oneof![
702            Just(Maybe::Unspecified),
703            proptest::collection::vec(gen_encryption_mode(), 0..=3).prop_map(Maybe::Specified),
704        ]
705        .prop_map(|allowed_modes| EncryptionReconfiguration { allowed_modes })
706    }
707
708    fn gen_internal_encryption_modes()
709    -> impl Strategy<Value = enumset::EnumSet<encryption::EncryptionMode>> {
710        (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
711            let mut allowed_modes = enumset::EnumSet::new();
712            if plain {
713                allowed_modes.insert(encryption::EncryptionMode::Plain);
714            }
715            if aegis {
716                allowed_modes.insert(encryption::EncryptionMode::Aegis256);
717            }
718            if aes {
719                allowed_modes.insert(encryption::EncryptionMode::Aes256Gcm);
720            }
721            allowed_modes
722        })
723    }
724
725    fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
726        (
727            proptest::option::of(gen_storage_class()),
728            proptest::option::of(gen_retention_policy()),
729            proptest::option::of(gen_timestamping_config()),
730            proptest::option::of(gen_delete_on_empty_config()),
731            proptest::option::of(gen_encryption_config()),
732        )
733            .prop_map(
734                |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
735                    StreamConfig {
736                        storage_class,
737                        retention_policy,
738                        timestamping,
739                        delete_on_empty,
740                        encryption,
741                    }
742                },
743            )
744    }
745
746    fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
747        (
748            proptest::option::of(gen_stream_config()),
749            any::<bool>(),
750            any::<bool>(),
751        )
752            .prop_map(
753                |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
754                    BasinConfig {
755                        default_stream_config,
756                        create_stream_on_append,
757                        create_stream_on_read,
758                    }
759                },
760            )
761    }
762
763    fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
764        inner: impl Strategy<Value = T>,
765    ) -> impl Strategy<Value = Maybe<Option<T>>> {
766        prop_oneof![
767            Just(Maybe::Unspecified),
768            Just(Maybe::Specified(None)),
769            inner.prop_map(|v| Maybe::Specified(Some(v))),
770        ]
771    }
772
773    fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
774        (
775            gen_maybe(gen_storage_class()),
776            gen_maybe(gen_retention_policy()),
777            gen_maybe(gen_timestamping_reconfiguration()),
778            gen_maybe(gen_delete_on_empty_reconfiguration()),
779            gen_maybe(gen_encryption_reconfiguration()),
780        )
781            .prop_map(
782                |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
783                    StreamReconfiguration {
784                        storage_class,
785                        retention_policy,
786                        timestamping,
787                        delete_on_empty,
788                        encryption,
789                    }
790                },
791            )
792    }
793
794    fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
795        (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
796            .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
797    }
798
799    fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
800    {
801        gen_maybe(any::<u64>())
802            .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
803    }
804
805    fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
806        (
807            gen_maybe(gen_stream_reconfiguration()),
808            prop_oneof![
809                Just(Maybe::Unspecified),
810                any::<bool>().prop_map(Maybe::Specified),
811            ],
812            prop_oneof![
813                Just(Maybe::Unspecified),
814                any::<bool>().prop_map(Maybe::Specified),
815            ],
816        )
817            .prop_map(
818                |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
819                    BasinReconfiguration {
820                        default_stream_config,
821                        create_stream_on_append,
822                        create_stream_on_read,
823                    }
824                },
825            )
826    }
827
828    fn gen_internal_optional_stream_config()
829    -> impl Strategy<Value = types::config::OptionalStreamConfig> {
830        (
831            proptest::option::of(gen_storage_class()),
832            proptest::option::of(gen_retention_policy()),
833            proptest::option::of(gen_timestamping_mode()),
834            proptest::option::of(any::<bool>()),
835            proptest::option::of(any::<u64>()),
836            proptest::option::of(gen_internal_encryption_modes()),
837        )
838            .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe, encryption)| {
839                types::config::OptionalStreamConfig {
840                    storage_class: sc.map(Into::into),
841                    retention_policy: rp.map(|rp| match rp {
842                        RetentionPolicy::Age(secs) => {
843                            types::config::RetentionPolicy::Age(Duration::from_secs(secs))
844                        }
845                        RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
846                    }),
847                    timestamping: types::config::OptionalTimestampingConfig {
848                        mode: ts_mode.map(Into::into),
849                        uncapped: ts_uncapped,
850                    },
851                    delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
852                        min_age: doe.map(Duration::from_secs),
853                    },
854                    encryption: types::config::OptionalEncryptionConfig {
855                        allowed_modes: encryption,
856                    },
857                }
858            })
859    }
860
861    proptest! {
862        #[test]
863        fn stream_config_conversion_validates(config in gen_stream_config()) {
864            let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
865            let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
866
867            if has_zero_age {
868                prop_assert!(result.is_err());
869            } else {
870                prop_assert!(result.is_ok());
871            }
872        }
873
874        #[test]
875        fn basin_config_conversion_validates(config in gen_basin_config()) {
876            let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
877                matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
878            });
879
880            let result: Result<types::config::BasinConfig, _> = config.try_into();
881
882            if has_invalid_config {
883                prop_assert!(result.is_err());
884            } else {
885                prop_assert!(result.is_ok());
886            }
887        }
888
889        #[test]
890        fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
891            let has_zero_age = matches!(
892                reconfig.retention_policy,
893                Maybe::Specified(Some(RetentionPolicy::Age(0)))
894            );
895            let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
896
897            if has_zero_age {
898                prop_assert!(result.is_err());
899            } else {
900                prop_assert!(result.is_ok());
901            }
902        }
903
904        #[test]
905        fn merge_stream_or_basin_or_default(
906            stream in gen_internal_optional_stream_config(),
907            basin in gen_internal_optional_stream_config(),
908        ) {
909            let merged = stream.clone().merge(basin.clone());
910
911            prop_assert_eq!(
912                merged.storage_class,
913                stream.storage_class.or(basin.storage_class).unwrap_or_default()
914            );
915            prop_assert_eq!(
916                merged.retention_policy,
917                stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
918            );
919            prop_assert_eq!(
920                merged.timestamping.mode,
921                stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
922            );
923            prop_assert_eq!(
924                merged.timestamping.uncapped,
925                stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
926            );
927            prop_assert_eq!(
928                merged.delete_on_empty.min_age,
929                stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
930            );
931            prop_assert_eq!(
932                merged.encryption.allowed_modes,
933                stream
934                    .encryption
935                    .allowed_modes
936                    .or(basin.encryption.allowed_modes)
937                    .unwrap_or(types::config::DEFAULT_ALLOWED_ENCRYPTION_MODES)
938            );
939        }
940
941        #[test]
942        fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
943            let reconfig = types::config::StreamReconfiguration::default();
944            let result = base.clone().reconfigure(reconfig);
945
946            prop_assert_eq!(result.storage_class, base.storage_class);
947            prop_assert_eq!(result.retention_policy, base.retention_policy);
948            prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
949            prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
950            prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
951            prop_assert_eq!(result.encryption.allowed_modes, base.encryption.allowed_modes);
952        }
953
954        #[test]
955        fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
956            let reconfig = types::config::StreamReconfiguration {
957                storage_class: Maybe::Specified(None),
958                retention_policy: Maybe::Specified(None),
959                timestamping: Maybe::Specified(None),
960                delete_on_empty: Maybe::Specified(None),
961                encryption: Maybe::Specified(None),
962            };
963            let result = base.reconfigure(reconfig);
964
965            prop_assert!(result.storage_class.is_none());
966            prop_assert!(result.retention_policy.is_none());
967            prop_assert!(result.timestamping.mode.is_none());
968            prop_assert!(result.timestamping.uncapped.is_none());
969            prop_assert!(result.delete_on_empty.min_age.is_none());
970            prop_assert!(result.encryption.allowed_modes.is_none());
971        }
972
973        #[test]
974        fn reconfigure_specified_some_sets_value(
975            base in gen_internal_optional_stream_config(),
976            new_sc in gen_storage_class(),
977            new_rp_secs in 1u64..u64::MAX,
978        ) {
979            let reconfig = types::config::StreamReconfiguration {
980                storage_class: Maybe::Specified(Some(new_sc.into())),
981                retention_policy: Maybe::Specified(Some(
982                    types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
983                )),
984                ..Default::default()
985            };
986            let result = base.reconfigure(reconfig);
987
988            prop_assert_eq!(result.storage_class, Some(new_sc.into()));
989            prop_assert_eq!(
990                result.retention_policy,
991                Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
992            );
993        }
994
995        #[test]
996        fn to_opt_returns_some_for_non_defaults(
997            sc in gen_storage_class(),
998            doe_secs in 1u64..u64::MAX,
999            ts_mode in gen_timestamping_mode(),
1000        ) {
1001            // non-default storage class -> Some
1002            let internal = types::config::OptionalStreamConfig {
1003                storage_class: Some(sc.into()),
1004                ..Default::default()
1005            };
1006            prop_assert!(StreamConfig::to_opt(internal).is_some());
1007
1008            // non-zero delete_on_empty -> Some
1009            let internal = types::config::OptionalDeleteOnEmptyConfig {
1010                min_age: Some(Duration::from_secs(doe_secs)),
1011            };
1012            let api = DeleteOnEmptyConfig::to_opt(internal);
1013            prop_assert!(api.is_some());
1014            prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
1015
1016            // non-default timestamping -> Some
1017            let internal = types::config::OptionalTimestampingConfig {
1018                mode: Some(ts_mode.into()),
1019                uncapped: None,
1020            };
1021            prop_assert!(TimestampingConfig::to_opt(internal).is_some());
1022        }
1023
1024        #[test]
1025        fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
1026            let has_zero_age = matches!(
1027                &reconfig.default_stream_config,
1028                Maybe::Specified(Some(sr)) if matches!(
1029                    sr.retention_policy,
1030                    Maybe::Specified(Some(RetentionPolicy::Age(0)))
1031                )
1032            );
1033            let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
1034
1035            if has_zero_age {
1036                prop_assert!(result.is_err());
1037            } else {
1038                prop_assert!(result.is_ok());
1039            }
1040        }
1041
1042        #[test]
1043        fn reconfigure_basin_unspecified_preserves(
1044            base_sc in proptest::option::of(gen_storage_class()),
1045            base_on_append in any::<bool>(),
1046            base_on_read in any::<bool>(),
1047        ) {
1048            let base = types::config::BasinConfig {
1049                default_stream_config: types::config::OptionalStreamConfig {
1050                    storage_class: base_sc.map(Into::into),
1051                    ..Default::default()
1052                },
1053                create_stream_on_append: base_on_append,
1054                create_stream_on_read: base_on_read,
1055            };
1056
1057            let reconfig = types::config::BasinReconfiguration::default();
1058            let result = base.clone().reconfigure(reconfig);
1059
1060            prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
1061            prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
1062            prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
1063        }
1064
1065        #[test]
1066        fn reconfigure_basin_specified_updates(
1067            base_on_append in any::<bool>(),
1068            new_on_append in any::<bool>(),
1069            new_sc in gen_storage_class(),
1070        ) {
1071            let base = types::config::BasinConfig {
1072                create_stream_on_append: base_on_append,
1073                ..Default::default()
1074            };
1075
1076            let reconfig = types::config::BasinReconfiguration {
1077                default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
1078                    storage_class: Maybe::Specified(Some(new_sc.into())),
1079                    ..Default::default()
1080                })),
1081                create_stream_on_append: Maybe::Specified(new_on_append),
1082                ..Default::default()
1083            };
1084            let result = base.reconfigure(reconfig);
1085
1086            prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
1087            prop_assert_eq!(result.create_stream_on_append, new_on_append);
1088        }
1089
1090        #[test]
1091        fn reconfigure_nested_partial_update(
1092            base_mode in gen_timestamping_mode(),
1093            base_uncapped in any::<bool>(),
1094            new_mode in gen_timestamping_mode(),
1095        ) {
1096            let base = types::config::OptionalStreamConfig {
1097                timestamping: types::config::OptionalTimestampingConfig {
1098                    mode: Some(base_mode.into()),
1099                    uncapped: Some(base_uncapped),
1100                },
1101                ..Default::default()
1102            };
1103
1104            let expected_mode: types::config::TimestampingMode = new_mode.into();
1105
1106            let reconfig = types::config::StreamReconfiguration {
1107                timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1108                    mode: Maybe::Specified(Some(expected_mode)),
1109                    uncapped: Maybe::Unspecified,
1110                })),
1111                ..Default::default()
1112            };
1113            let result = base.reconfigure(reconfig);
1114
1115            prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1116            prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1117        }
1118    }
1119
1120    #[test]
1121    fn to_opt_returns_none_for_defaults() {
1122        // default stream config -> None
1123        assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1124
1125        // delete_on_empty: None or Some(ZERO) -> None
1126        let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1127        let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
1128            min_age: Some(Duration::ZERO),
1129        };
1130        assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1131        assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
1132
1133        // default timestamping -> None
1134        assert!(
1135            TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1136                .is_none()
1137        );
1138    }
1139
1140    #[test]
1141    fn empty_json_converts_to_all_none() {
1142        let json = serde_json::json!({});
1143        let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1144        let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1145
1146        assert!(
1147            internal.storage_class.is_none(),
1148            "storage_class should be None"
1149        );
1150        assert!(
1151            internal.retention_policy.is_none(),
1152            "retention_policy should be None"
1153        );
1154        assert!(
1155            internal.timestamping.mode.is_none(),
1156            "timestamping.mode should be None"
1157        );
1158        assert!(
1159            internal.timestamping.uncapped.is_none(),
1160            "timestamping.uncapped should be None"
1161        );
1162        assert!(
1163            internal.delete_on_empty.min_age.is_none(),
1164            "delete_on_empty.min_age should be None"
1165        );
1166        assert!(
1167            internal.encryption.allowed_modes.is_none(),
1168            "encryption.allowed_modes should be None"
1169        );
1170    }
1171}