1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11 Standard,
13 Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18 fn from(value: StorageClass) -> Self {
19 match value {
20 StorageClass::Express => Self::Express,
21 StorageClass::Standard => Self::Standard,
22 }
23 }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27 fn from(value: types::config::StorageClass) -> Self {
28 match value {
29 types::config::StorageClass::Express => Self::Express,
30 types::config::StorageClass::Standard => Self::Standard,
31 }
32 }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40 Age(u64),
43 Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53 type Error = types::ValidationError;
54
55 fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56 match value {
57 RetentionPolicy::Age(0) => Err(types::ValidationError(
58 "age must be greater than 0 seconds".to_string(),
59 )),
60 RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61 RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62 }
63 }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67 fn from(value: types::config::RetentionPolicy) -> Self {
68 match value {
69 types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70 types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71 }
72 }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80 #[default]
82 ClientPrefer,
83 ClientRequire,
85 Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90 fn from(value: TimestampingMode) -> Self {
91 match value {
92 TimestampingMode::ClientPrefer => Self::ClientPrefer,
93 TimestampingMode::ClientRequire => Self::ClientRequire,
94 TimestampingMode::Arrival => Self::Arrival,
95 }
96 }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100 fn from(value: types::config::TimestampingMode) -> Self {
101 match value {
102 types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103 types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104 types::config::TimestampingMode::Arrival => Self::Arrival,
105 }
106 }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113 pub mode: Option<TimestampingMode>,
115 pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121 pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122 let config = TimestampingConfig {
123 mode: config.mode.map(Into::into),
124 uncapped: config.uncapped,
125 };
126 if config == Self::default() {
127 None
128 } else {
129 Some(config)
130 }
131 }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135 fn from(value: types::config::TimestampingConfig) -> Self {
136 Self {
137 mode: Some(value.mode.into()),
138 uncapped: Some(value.uncapped),
139 }
140 }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144 fn from(value: TimestampingConfig) -> Self {
145 Self {
146 mode: value.mode.map(Into::into),
147 uncapped: value.uncapped,
148 }
149 }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159 pub mode: Maybe<Option<TimestampingMode>>,
160 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163 pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167 fn from(value: TimestampingReconfiguration) -> Self {
168 Self {
169 mode: value.mode.map_opt(Into::into),
170 uncapped: value.uncapped,
171 }
172 }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176 fn from(value: types::config::TimestampingReconfiguration) -> Self {
177 Self {
178 mode: value.mode.map_opt(Into::into),
179 uncapped: value.uncapped,
180 }
181 }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188 #[serde(default)]
191 pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195 pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196 let min_age = config.min_age.unwrap_or_default();
197 if min_age > Duration::ZERO {
198 Some(DeleteOnEmptyConfig {
199 min_age_secs: min_age.as_secs(),
200 })
201 } else {
202 None
203 }
204 }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208 fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209 Self {
210 min_age_secs: value.min_age.as_secs(),
211 }
212 }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216 fn from(value: DeleteOnEmptyConfig) -> Self {
217 Self {
218 min_age: Duration::from_secs(value.min_age_secs),
219 }
220 }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224 fn from(value: DeleteOnEmptyConfig) -> Self {
225 Self {
226 min_age: Some(Duration::from_secs(value.min_age_secs)),
227 }
228 }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238 #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239 pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244 Self {
245 min_age: value.min_age_secs.map_opt(Duration::from_secs),
246 }
247 }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251 fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252 Self {
253 min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254 }
255 }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
259#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
260#[serde(rename_all = "kebab-case")]
261pub enum EncryptionMode {
262 Plain,
264 Aegis256,
266 Aes256Gcm,
268}
269
270impl From<EncryptionMode> for encryption::EncryptionMode {
271 fn from(value: EncryptionMode) -> Self {
272 match value {
273 EncryptionMode::Plain => Self::Plain,
274 EncryptionMode::Aegis256 => Self::Aegis256,
275 EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
276 }
277 }
278}
279
280impl From<encryption::EncryptionMode> for EncryptionMode {
281 fn from(value: encryption::EncryptionMode) -> Self {
282 match value {
283 encryption::EncryptionMode::Plain => Self::Plain,
284 encryption::EncryptionMode::Aegis256 => Self::Aegis256,
285 encryption::EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
286 }
287 }
288}
289
290#[rustfmt::skip]
291#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
292#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
293pub struct EncryptionConfig {
294 #[serde(default, skip_serializing_if = "Vec::is_empty")]
297 pub allowed_modes: Vec<EncryptionMode>,
298}
299
300impl EncryptionConfig {
301 pub fn to_opt(config: types::config::OptionalEncryptionConfig) -> Option<Self> {
302 let config = EncryptionConfig {
303 allowed_modes: config.allowed_modes.into_iter().map(Into::into).collect(),
304 };
305 if config == Self::default() {
306 None
307 } else {
308 Some(config)
309 }
310 }
311}
312
313impl From<types::config::EncryptionConfig> for EncryptionConfig {
314 fn from(value: types::config::EncryptionConfig) -> Self {
315 Self {
316 allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
317 }
318 }
319}
320
321impl From<EncryptionConfig> for types::config::OptionalEncryptionConfig {
322 fn from(value: EncryptionConfig) -> Self {
323 Self {
324 allowed_modes: value
325 .allowed_modes
326 .into_iter()
327 .map(encryption::EncryptionMode::from)
328 .collect(),
329 }
330 }
331}
332
333#[rustfmt::skip]
334#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
335#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
336pub struct EncryptionReconfiguration {
337 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
340 #[cfg_attr(feature = "utoipa", schema(value_type = Option<Vec<EncryptionMode>>))]
341 pub allowed_modes: Maybe<Vec<EncryptionMode>>,
342}
343
344impl From<EncryptionReconfiguration> for types::config::EncryptionReconfiguration {
345 fn from(value: EncryptionReconfiguration) -> Self {
346 Self {
347 allowed_modes: value.allowed_modes.map(|modes| {
348 modes
349 .into_iter()
350 .map(encryption::EncryptionMode::from)
351 .collect()
352 }),
353 }
354 }
355}
356
357impl From<types::config::EncryptionReconfiguration> for EncryptionReconfiguration {
358 fn from(value: types::config::EncryptionReconfiguration) -> Self {
359 Self {
360 allowed_modes: value
361 .allowed_modes
362 .map(|modes| modes.into_iter().map(Into::into).collect()),
363 }
364 }
365}
366
367#[rustfmt::skip]
368#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
369#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
370pub struct StreamConfig {
371 pub storage_class: Option<StorageClass>,
373 pub retention_policy: Option<RetentionPolicy>,
376 pub timestamping: Option<TimestampingConfig>,
378 #[serde(default)]
380 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
381 #[serde(default)]
383 pub encryption: Option<EncryptionConfig>,
384}
385
386impl StreamConfig {
387 pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
388 let types::config::OptionalStreamConfig {
389 storage_class,
390 retention_policy,
391 timestamping,
392 delete_on_empty,
393 encryption,
394 } = config;
395
396 let config = StreamConfig {
397 storage_class: storage_class.map(Into::into),
398 retention_policy: retention_policy.map(Into::into),
399 timestamping: TimestampingConfig::to_opt(timestamping),
400 delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
401 encryption: EncryptionConfig::to_opt(encryption),
402 };
403 if config == Self::default() {
404 None
405 } else {
406 Some(config)
407 }
408 }
409}
410
411impl From<types::config::StreamConfig> for StreamConfig {
412 fn from(value: types::config::StreamConfig) -> Self {
413 let types::config::StreamConfig {
414 storage_class,
415 retention_policy,
416 timestamping,
417 delete_on_empty,
418 encryption,
419 } = value;
420
421 Self {
422 storage_class: Some(storage_class.into()),
423 retention_policy: Some(retention_policy.into()),
424 timestamping: Some(timestamping.into()),
425 delete_on_empty: Some(delete_on_empty.into()),
426 encryption: Some(encryption.into()),
427 }
428 }
429}
430
431impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
432 type Error = types::ValidationError;
433
434 fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
435 let StreamConfig {
436 storage_class,
437 retention_policy,
438 timestamping,
439 delete_on_empty,
440 encryption,
441 } = value;
442
443 let retention_policy = match retention_policy {
444 None => None,
445 Some(policy) => Some(policy.try_into()?),
446 };
447
448 Ok(Self {
449 storage_class: storage_class.map(Into::into),
450 retention_policy,
451 timestamping: timestamping.map(Into::into).unwrap_or_default(),
452 delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
453 encryption: encryption.map(Into::into).unwrap_or_default(),
454 })
455 }
456}
457
458#[rustfmt::skip]
459#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
460#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
461pub struct StreamReconfiguration {
462 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
464 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
465 pub storage_class: Maybe<Option<StorageClass>>,
466 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
469 #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
470 pub retention_policy: Maybe<Option<RetentionPolicy>>,
471 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
473 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
474 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
475 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
477 #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
478 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
479 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
481 #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionReconfiguration>))]
482 pub encryption: Maybe<Option<EncryptionReconfiguration>>,
483}
484
485impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
486 type Error = types::ValidationError;
487
488 fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
489 let StreamReconfiguration {
490 storage_class,
491 retention_policy,
492 timestamping,
493 delete_on_empty,
494 encryption,
495 } = value;
496
497 Ok(Self {
498 storage_class: storage_class.map_opt(Into::into),
499 retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
500 timestamping: timestamping.map_opt(Into::into),
501 delete_on_empty: delete_on_empty.map_opt(Into::into),
502 encryption: encryption.map_opt(Into::into),
503 })
504 }
505}
506
507impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
508 fn from(value: types::config::StreamReconfiguration) -> Self {
509 let types::config::StreamReconfiguration {
510 storage_class,
511 retention_policy,
512 timestamping,
513 delete_on_empty,
514 encryption,
515 } = value;
516
517 Self {
518 storage_class: storage_class.map_opt(Into::into),
519 retention_policy: retention_policy.map_opt(Into::into),
520 timestamping: timestamping.map_opt(Into::into),
521 delete_on_empty: delete_on_empty.map_opt(Into::into),
522 encryption: encryption.map_opt(Into::into),
523 }
524 }
525}
526
527#[rustfmt::skip]
528#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
529#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
530pub struct BasinConfig {
531 pub default_stream_config: Option<StreamConfig>,
533 #[serde(default)]
535 #[cfg_attr(feature = "utoipa", schema(default = false))]
536 pub create_stream_on_append: bool,
537 #[serde(default)]
539 #[cfg_attr(feature = "utoipa", schema(default = false))]
540 pub create_stream_on_read: bool,
541}
542
543impl TryFrom<BasinConfig> for types::config::BasinConfig {
544 type Error = types::ValidationError;
545
546 fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
547 let BasinConfig {
548 default_stream_config,
549 create_stream_on_append,
550 create_stream_on_read,
551 } = value;
552
553 Ok(Self {
554 default_stream_config: match default_stream_config {
555 Some(config) => config.try_into()?,
556 None => Default::default(),
557 },
558 create_stream_on_append,
559 create_stream_on_read,
560 })
561 }
562}
563
564impl From<types::config::BasinConfig> for BasinConfig {
565 fn from(value: types::config::BasinConfig) -> Self {
566 let types::config::BasinConfig {
567 default_stream_config,
568 create_stream_on_append,
569 create_stream_on_read,
570 } = value;
571
572 Self {
573 default_stream_config: StreamConfig::to_opt(default_stream_config),
574 create_stream_on_append,
575 create_stream_on_read,
576 }
577 }
578}
579
580#[rustfmt::skip]
581#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
582#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
583pub struct BasinReconfiguration {
584 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
586 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
587 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
588 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
590 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
591 pub create_stream_on_append: Maybe<bool>,
592 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
594 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
595 pub create_stream_on_read: Maybe<bool>,
596}
597
598impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
599 type Error = types::ValidationError;
600
601 fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
602 let BasinReconfiguration {
603 default_stream_config,
604 create_stream_on_append,
605 create_stream_on_read,
606 } = value;
607
608 Ok(Self {
609 default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
610 create_stream_on_append: create_stream_on_append.map(Into::into),
611 create_stream_on_read: create_stream_on_read.map(Into::into),
612 })
613 }
614}
615
616impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
617 fn from(value: types::config::BasinReconfiguration) -> Self {
618 let types::config::BasinReconfiguration {
619 default_stream_config,
620 create_stream_on_append,
621 create_stream_on_read,
622 } = value;
623
624 Self {
625 default_stream_config: default_stream_config.map_opt(Into::into),
626 create_stream_on_append: create_stream_on_append.map(Into::into),
627 create_stream_on_read: create_stream_on_read.map(Into::into),
628 }
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use proptest::prelude::*;
635
636 use super::*;
637
638 fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
639 prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
640 }
641
642 fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
643 prop_oneof![
644 Just(TimestampingMode::ClientPrefer),
645 Just(TimestampingMode::ClientRequire),
646 Just(TimestampingMode::Arrival),
647 ]
648 }
649
650 fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
651 prop_oneof![
652 any::<u64>().prop_map(RetentionPolicy::Age),
653 Just(RetentionPolicy::Infinite(InfiniteRetention {})),
654 ]
655 }
656
657 fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
658 (
659 proptest::option::of(gen_timestamping_mode()),
660 proptest::option::of(any::<bool>()),
661 )
662 .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
663 }
664
665 fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
666 any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
667 }
668
669 fn gen_encryption_mode() -> impl Strategy<Value = EncryptionMode> {
670 prop_oneof![
671 Just(EncryptionMode::Plain),
672 Just(EncryptionMode::Aegis256),
673 Just(EncryptionMode::Aes256Gcm),
674 ]
675 }
676
677 fn gen_encryption_config() -> impl Strategy<Value = EncryptionConfig> {
678 (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
679 let allowed_modes = [
680 (plain, EncryptionMode::Plain),
681 (aegis, EncryptionMode::Aegis256),
682 (aes, EncryptionMode::Aes256Gcm),
683 ]
684 .into_iter()
685 .filter_map(|(include, mode)| include.then_some(mode))
686 .collect();
687 EncryptionConfig { allowed_modes }
688 })
689 }
690
691 fn gen_encryption_reconfiguration() -> impl Strategy<Value = EncryptionReconfiguration> {
692 prop_oneof![
693 Just(Maybe::Unspecified),
694 proptest::collection::vec(gen_encryption_mode(), 0..=3).prop_map(Maybe::Specified),
695 ]
696 .prop_map(|allowed_modes| EncryptionReconfiguration { allowed_modes })
697 }
698
699 fn gen_internal_encryption_modes()
700 -> impl Strategy<Value = enumset::EnumSet<encryption::EncryptionMode>> {
701 (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
702 let mut allowed_modes = enumset::EnumSet::new();
703 if plain {
704 allowed_modes.insert(encryption::EncryptionMode::Plain);
705 }
706 if aegis {
707 allowed_modes.insert(encryption::EncryptionMode::Aegis256);
708 }
709 if aes {
710 allowed_modes.insert(encryption::EncryptionMode::Aes256Gcm);
711 }
712 allowed_modes
713 })
714 }
715
716 fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
717 (
718 proptest::option::of(gen_storage_class()),
719 proptest::option::of(gen_retention_policy()),
720 proptest::option::of(gen_timestamping_config()),
721 proptest::option::of(gen_delete_on_empty_config()),
722 proptest::option::of(gen_encryption_config()),
723 )
724 .prop_map(
725 |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
726 StreamConfig {
727 storage_class,
728 retention_policy,
729 timestamping,
730 delete_on_empty,
731 encryption,
732 }
733 },
734 )
735 }
736
737 fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
738 (
739 proptest::option::of(gen_stream_config()),
740 any::<bool>(),
741 any::<bool>(),
742 )
743 .prop_map(
744 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
745 BasinConfig {
746 default_stream_config,
747 create_stream_on_append,
748 create_stream_on_read,
749 }
750 },
751 )
752 }
753
754 fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
755 inner: impl Strategy<Value = T>,
756 ) -> impl Strategy<Value = Maybe<Option<T>>> {
757 prop_oneof![
758 Just(Maybe::Unspecified),
759 Just(Maybe::Specified(None)),
760 inner.prop_map(|v| Maybe::Specified(Some(v))),
761 ]
762 }
763
764 fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
765 (
766 gen_maybe(gen_storage_class()),
767 gen_maybe(gen_retention_policy()),
768 gen_maybe(gen_timestamping_reconfiguration()),
769 gen_maybe(gen_delete_on_empty_reconfiguration()),
770 gen_maybe(gen_encryption_reconfiguration()),
771 )
772 .prop_map(
773 |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
774 StreamReconfiguration {
775 storage_class,
776 retention_policy,
777 timestamping,
778 delete_on_empty,
779 encryption,
780 }
781 },
782 )
783 }
784
785 fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
786 (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
787 .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
788 }
789
790 fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
791 {
792 gen_maybe(any::<u64>())
793 .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
794 }
795
796 fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
797 (
798 gen_maybe(gen_stream_reconfiguration()),
799 prop_oneof![
800 Just(Maybe::Unspecified),
801 any::<bool>().prop_map(Maybe::Specified),
802 ],
803 prop_oneof![
804 Just(Maybe::Unspecified),
805 any::<bool>().prop_map(Maybe::Specified),
806 ],
807 )
808 .prop_map(
809 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
810 BasinReconfiguration {
811 default_stream_config,
812 create_stream_on_append,
813 create_stream_on_read,
814 }
815 },
816 )
817 }
818
819 fn gen_internal_optional_stream_config()
820 -> impl Strategy<Value = types::config::OptionalStreamConfig> {
821 (
822 proptest::option::of(gen_storage_class()),
823 proptest::option::of(gen_retention_policy()),
824 proptest::option::of(gen_timestamping_mode()),
825 proptest::option::of(any::<bool>()),
826 proptest::option::of(any::<u64>()),
827 gen_internal_encryption_modes(),
828 )
829 .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe, encryption)| {
830 types::config::OptionalStreamConfig {
831 storage_class: sc.map(Into::into),
832 retention_policy: rp.map(|rp| match rp {
833 RetentionPolicy::Age(secs) => {
834 types::config::RetentionPolicy::Age(Duration::from_secs(secs))
835 }
836 RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
837 }),
838 timestamping: types::config::OptionalTimestampingConfig {
839 mode: ts_mode.map(Into::into),
840 uncapped: ts_uncapped,
841 },
842 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
843 min_age: doe.map(Duration::from_secs),
844 },
845 encryption: types::config::OptionalEncryptionConfig {
846 allowed_modes: encryption,
847 },
848 }
849 })
850 }
851
852 proptest! {
853 #[test]
854 fn stream_config_conversion_validates(config in gen_stream_config()) {
855 let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
856 let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
857
858 if has_zero_age {
859 prop_assert!(result.is_err());
860 } else {
861 prop_assert!(result.is_ok());
862 }
863 }
864
865 #[test]
866 fn basin_config_conversion_validates(config in gen_basin_config()) {
867 let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
868 matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
869 });
870
871 let result: Result<types::config::BasinConfig, _> = config.try_into();
872
873 if has_invalid_config {
874 prop_assert!(result.is_err());
875 } else {
876 prop_assert!(result.is_ok());
877 }
878 }
879
880 #[test]
881 fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
882 let has_zero_age = matches!(
883 reconfig.retention_policy,
884 Maybe::Specified(Some(RetentionPolicy::Age(0)))
885 );
886 let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
887
888 if has_zero_age {
889 prop_assert!(result.is_err());
890 } else {
891 prop_assert!(result.is_ok());
892 }
893 }
894
895 #[test]
896 fn merge_stream_or_basin_or_default(
897 stream in gen_internal_optional_stream_config(),
898 basin in gen_internal_optional_stream_config(),
899 ) {
900 let merged = stream.clone().merge(basin.clone());
901
902 prop_assert_eq!(
903 merged.storage_class,
904 stream.storage_class.or(basin.storage_class).unwrap_or_default()
905 );
906 prop_assert_eq!(
907 merged.retention_policy,
908 stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
909 );
910 prop_assert_eq!(
911 merged.timestamping.mode,
912 stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
913 );
914 prop_assert_eq!(
915 merged.timestamping.uncapped,
916 stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
917 );
918 prop_assert_eq!(
919 merged.delete_on_empty.min_age,
920 stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
921 );
922 prop_assert_eq!(
923 merged.encryption.allowed_modes,
924 if !stream.encryption.allowed_modes.is_empty() {
925 stream.encryption.allowed_modes
926 } else if !basin.encryption.allowed_modes.is_empty() {
927 basin.encryption.allowed_modes
928 } else {
929 types::config::DEFAULT_ALLOWED_ENCRYPTION_MODES
930 }
931 );
932 }
933
934 #[test]
935 fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
936 let reconfig = types::config::StreamReconfiguration::default();
937 let result = base.clone().reconfigure(reconfig);
938
939 prop_assert_eq!(result.storage_class, base.storage_class);
940 prop_assert_eq!(result.retention_policy, base.retention_policy);
941 prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
942 prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
943 prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
944 prop_assert_eq!(result.encryption.allowed_modes, base.encryption.allowed_modes);
945 }
946
947 #[test]
948 fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
949 let reconfig = types::config::StreamReconfiguration {
950 storage_class: Maybe::Specified(None),
951 retention_policy: Maybe::Specified(None),
952 timestamping: Maybe::Specified(None),
953 delete_on_empty: Maybe::Specified(None),
954 encryption: Maybe::Specified(None),
955 };
956 let result = base.reconfigure(reconfig);
957
958 prop_assert!(result.storage_class.is_none());
959 prop_assert!(result.retention_policy.is_none());
960 prop_assert!(result.timestamping.mode.is_none());
961 prop_assert!(result.timestamping.uncapped.is_none());
962 prop_assert!(result.delete_on_empty.min_age.is_none());
963 prop_assert!(result.encryption.allowed_modes.is_empty());
964 }
965
966 #[test]
967 fn reconfigure_specified_some_sets_value(
968 base in gen_internal_optional_stream_config(),
969 new_sc in gen_storage_class(),
970 new_rp_secs in 1u64..u64::MAX,
971 ) {
972 let reconfig = types::config::StreamReconfiguration {
973 storage_class: Maybe::Specified(Some(new_sc.into())),
974 retention_policy: Maybe::Specified(Some(
975 types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
976 )),
977 ..Default::default()
978 };
979 let result = base.reconfigure(reconfig);
980
981 prop_assert_eq!(result.storage_class, Some(new_sc.into()));
982 prop_assert_eq!(
983 result.retention_policy,
984 Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
985 );
986 }
987
988 #[test]
989 fn to_opt_returns_some_for_non_defaults(
990 sc in gen_storage_class(),
991 doe_secs in 1u64..u64::MAX,
992 ts_mode in gen_timestamping_mode(),
993 ) {
994 let internal = types::config::OptionalStreamConfig {
996 storage_class: Some(sc.into()),
997 ..Default::default()
998 };
999 prop_assert!(StreamConfig::to_opt(internal).is_some());
1000
1001 let internal = types::config::OptionalDeleteOnEmptyConfig {
1003 min_age: Some(Duration::from_secs(doe_secs)),
1004 };
1005 let api = DeleteOnEmptyConfig::to_opt(internal);
1006 prop_assert!(api.is_some());
1007 prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
1008
1009 let internal = types::config::OptionalTimestampingConfig {
1011 mode: Some(ts_mode.into()),
1012 uncapped: None,
1013 };
1014 prop_assert!(TimestampingConfig::to_opt(internal).is_some());
1015 }
1016
1017 #[test]
1018 fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
1019 let has_zero_age = matches!(
1020 &reconfig.default_stream_config,
1021 Maybe::Specified(Some(sr)) if matches!(
1022 sr.retention_policy,
1023 Maybe::Specified(Some(RetentionPolicy::Age(0)))
1024 )
1025 );
1026 let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
1027
1028 if has_zero_age {
1029 prop_assert!(result.is_err());
1030 } else {
1031 prop_assert!(result.is_ok());
1032 }
1033 }
1034
1035 #[test]
1036 fn reconfigure_basin_unspecified_preserves(
1037 base_sc in proptest::option::of(gen_storage_class()),
1038 base_on_append in any::<bool>(),
1039 base_on_read in any::<bool>(),
1040 ) {
1041 let base = types::config::BasinConfig {
1042 default_stream_config: types::config::OptionalStreamConfig {
1043 storage_class: base_sc.map(Into::into),
1044 ..Default::default()
1045 },
1046 create_stream_on_append: base_on_append,
1047 create_stream_on_read: base_on_read,
1048 };
1049
1050 let reconfig = types::config::BasinReconfiguration::default();
1051 let result = base.clone().reconfigure(reconfig);
1052
1053 prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
1054 prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
1055 prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
1056 }
1057
1058 #[test]
1059 fn reconfigure_basin_specified_updates(
1060 base_on_append in any::<bool>(),
1061 new_on_append in any::<bool>(),
1062 new_sc in gen_storage_class(),
1063 ) {
1064 let base = types::config::BasinConfig {
1065 create_stream_on_append: base_on_append,
1066 ..Default::default()
1067 };
1068
1069 let reconfig = types::config::BasinReconfiguration {
1070 default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
1071 storage_class: Maybe::Specified(Some(new_sc.into())),
1072 ..Default::default()
1073 })),
1074 create_stream_on_append: Maybe::Specified(new_on_append),
1075 ..Default::default()
1076 };
1077 let result = base.reconfigure(reconfig);
1078
1079 prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
1080 prop_assert_eq!(result.create_stream_on_append, new_on_append);
1081 }
1082
1083 #[test]
1084 fn reconfigure_nested_partial_update(
1085 base_mode in gen_timestamping_mode(),
1086 base_uncapped in any::<bool>(),
1087 new_mode in gen_timestamping_mode(),
1088 ) {
1089 let base = types::config::OptionalStreamConfig {
1090 timestamping: types::config::OptionalTimestampingConfig {
1091 mode: Some(base_mode.into()),
1092 uncapped: Some(base_uncapped),
1093 },
1094 ..Default::default()
1095 };
1096
1097 let expected_mode: types::config::TimestampingMode = new_mode.into();
1098
1099 let reconfig = types::config::StreamReconfiguration {
1100 timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1101 mode: Maybe::Specified(Some(expected_mode)),
1102 uncapped: Maybe::Unspecified,
1103 })),
1104 ..Default::default()
1105 };
1106 let result = base.reconfigure(reconfig);
1107
1108 prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1109 prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1110 }
1111 }
1112
1113 #[test]
1114 fn to_opt_returns_none_for_defaults() {
1115 assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1117
1118 let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1120 let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
1121 min_age: Some(Duration::ZERO),
1122 };
1123 assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1124 assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
1125
1126 assert!(
1128 TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1129 .is_none()
1130 );
1131 }
1132
1133 #[test]
1134 fn empty_json_converts_to_all_none() {
1135 let json = serde_json::json!({});
1136 let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1137 let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1138
1139 assert!(
1140 internal.storage_class.is_none(),
1141 "storage_class should be None"
1142 );
1143 assert!(
1144 internal.retention_policy.is_none(),
1145 "retention_policy should be None"
1146 );
1147 assert!(
1148 internal.timestamping.mode.is_none(),
1149 "timestamping.mode should be None"
1150 );
1151 assert!(
1152 internal.timestamping.uncapped.is_none(),
1153 "timestamping.uncapped should be None"
1154 );
1155 assert!(
1156 internal.delete_on_empty.min_age.is_none(),
1157 "delete_on_empty.min_age should be None"
1158 );
1159 assert!(
1160 internal.encryption.allowed_modes.is_empty(),
1161 "encryption.allowed_modes should be empty"
1162 );
1163 }
1164}