1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11 Standard,
13 Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18 fn from(value: StorageClass) -> Self {
19 match value {
20 StorageClass::Express => Self::Express,
21 StorageClass::Standard => Self::Standard,
22 }
23 }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27 fn from(value: types::config::StorageClass) -> Self {
28 match value {
29 types::config::StorageClass::Express => Self::Express,
30 types::config::StorageClass::Standard => Self::Standard,
31 }
32 }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40 Age(u64),
43 Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53 type Error = types::ValidationError;
54
55 fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56 match value {
57 RetentionPolicy::Age(0) => Err(types::ValidationError(
58 "age must be greater than 0 seconds".to_string(),
59 )),
60 RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61 RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62 }
63 }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67 fn from(value: types::config::RetentionPolicy) -> Self {
68 match value {
69 types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70 types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71 }
72 }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80 #[default]
82 ClientPrefer,
83 ClientRequire,
85 Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90 fn from(value: TimestampingMode) -> Self {
91 match value {
92 TimestampingMode::ClientPrefer => Self::ClientPrefer,
93 TimestampingMode::ClientRequire => Self::ClientRequire,
94 TimestampingMode::Arrival => Self::Arrival,
95 }
96 }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100 fn from(value: types::config::TimestampingMode) -> Self {
101 match value {
102 types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103 types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104 types::config::TimestampingMode::Arrival => Self::Arrival,
105 }
106 }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113 pub mode: Option<TimestampingMode>,
115 pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121 pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122 let config = TimestampingConfig {
123 mode: config.mode.map(Into::into),
124 uncapped: config.uncapped,
125 };
126 if config == Self::default() {
127 None
128 } else {
129 Some(config)
130 }
131 }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135 fn from(value: types::config::TimestampingConfig) -> Self {
136 Self {
137 mode: Some(value.mode.into()),
138 uncapped: Some(value.uncapped),
139 }
140 }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144 fn from(value: TimestampingConfig) -> Self {
145 Self {
146 mode: value.mode.map(Into::into),
147 uncapped: value.uncapped,
148 }
149 }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159 pub mode: Maybe<Option<TimestampingMode>>,
160 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163 pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167 fn from(value: TimestampingReconfiguration) -> Self {
168 Self {
169 mode: value.mode.map_opt(Into::into),
170 uncapped: value.uncapped,
171 }
172 }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176 fn from(value: types::config::TimestampingReconfiguration) -> Self {
177 Self {
178 mode: value.mode.map_opt(Into::into),
179 uncapped: value.uncapped,
180 }
181 }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188 #[serde(default)]
191 pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195 pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196 let min_age = config.min_age.unwrap_or_default();
197 if min_age > Duration::ZERO {
198 Some(DeleteOnEmptyConfig {
199 min_age_secs: min_age.as_secs(),
200 })
201 } else {
202 None
203 }
204 }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208 fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209 Self {
210 min_age_secs: value.min_age.as_secs(),
211 }
212 }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216 fn from(value: DeleteOnEmptyConfig) -> Self {
217 Self {
218 min_age: Duration::from_secs(value.min_age_secs),
219 }
220 }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224 fn from(value: DeleteOnEmptyConfig) -> Self {
225 Self {
226 min_age: Some(Duration::from_secs(value.min_age_secs)),
227 }
228 }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238 #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239 pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244 Self {
245 min_age: value.min_age_secs.map_opt(Duration::from_secs),
246 }
247 }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251 fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252 Self {
253 min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254 }
255 }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
259#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
260pub enum EncryptionMode {
261 #[serde(rename = "plain")]
263 Plain,
264 #[serde(rename = "aegis-256")]
266 Aegis256,
267 #[serde(rename = "aes-256-gcm")]
269 Aes256Gcm,
270}
271
272impl From<EncryptionMode> for encryption::EncryptionMode {
273 fn from(value: EncryptionMode) -> Self {
274 match value {
275 EncryptionMode::Plain => Self::Plain,
276 EncryptionMode::Aegis256 => Self::Aegis256,
277 EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
278 }
279 }
280}
281
282impl From<encryption::EncryptionMode> for EncryptionMode {
283 fn from(value: encryption::EncryptionMode) -> Self {
284 match value {
285 encryption::EncryptionMode::Plain => Self::Plain,
286 encryption::EncryptionMode::Aegis256 => Self::Aegis256,
287 encryption::EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
288 }
289 }
290}
291
292#[rustfmt::skip]
293#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
294#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
295pub struct EncryptionConfig {
296 #[serde(default, skip_serializing_if = "Vec::is_empty")]
299 pub allowed_modes: Vec<EncryptionMode>,
300}
301
302impl EncryptionConfig {
303 pub fn to_opt(config: types::config::OptionalEncryptionConfig) -> Option<Self> {
304 let config = EncryptionConfig {
305 allowed_modes: config.allowed_modes.into_iter().map(Into::into).collect(),
306 };
307 if config == Self::default() {
308 None
309 } else {
310 Some(config)
311 }
312 }
313}
314
315impl From<types::config::EncryptionConfig> for EncryptionConfig {
316 fn from(value: types::config::EncryptionConfig) -> Self {
317 Self {
318 allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
319 }
320 }
321}
322
323impl From<EncryptionConfig> for types::config::OptionalEncryptionConfig {
324 fn from(value: EncryptionConfig) -> Self {
325 Self {
326 allowed_modes: value
327 .allowed_modes
328 .into_iter()
329 .map(encryption::EncryptionMode::from)
330 .collect(),
331 }
332 }
333}
334
335#[rustfmt::skip]
336#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
337#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
338pub struct EncryptionReconfiguration {
339 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
342 #[cfg_attr(feature = "utoipa", schema(value_type = Option<Vec<EncryptionMode>>))]
343 pub allowed_modes: Maybe<Vec<EncryptionMode>>,
344}
345
346impl From<EncryptionReconfiguration> for types::config::EncryptionReconfiguration {
347 fn from(value: EncryptionReconfiguration) -> Self {
348 Self {
349 allowed_modes: value.allowed_modes.map(|modes| {
350 modes
351 .into_iter()
352 .map(encryption::EncryptionMode::from)
353 .collect()
354 }),
355 }
356 }
357}
358
359impl From<types::config::EncryptionReconfiguration> for EncryptionReconfiguration {
360 fn from(value: types::config::EncryptionReconfiguration) -> Self {
361 Self {
362 allowed_modes: value
363 .allowed_modes
364 .map(|modes| modes.into_iter().map(Into::into).collect()),
365 }
366 }
367}
368
369#[rustfmt::skip]
370#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
371#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
372pub struct StreamConfig {
373 pub storage_class: Option<StorageClass>,
375 pub retention_policy: Option<RetentionPolicy>,
378 pub timestamping: Option<TimestampingConfig>,
380 #[serde(default)]
382 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
383 #[serde(default)]
385 pub encryption: Option<EncryptionConfig>,
386}
387
388impl StreamConfig {
389 pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
390 let types::config::OptionalStreamConfig {
391 storage_class,
392 retention_policy,
393 timestamping,
394 delete_on_empty,
395 encryption,
396 } = config;
397
398 let config = StreamConfig {
399 storage_class: storage_class.map(Into::into),
400 retention_policy: retention_policy.map(Into::into),
401 timestamping: TimestampingConfig::to_opt(timestamping),
402 delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
403 encryption: EncryptionConfig::to_opt(encryption),
404 };
405 if config == Self::default() {
406 None
407 } else {
408 Some(config)
409 }
410 }
411}
412
413impl From<types::config::StreamConfig> for StreamConfig {
414 fn from(value: types::config::StreamConfig) -> Self {
415 let types::config::StreamConfig {
416 storage_class,
417 retention_policy,
418 timestamping,
419 delete_on_empty,
420 encryption,
421 } = value;
422
423 Self {
424 storage_class: Some(storage_class.into()),
425 retention_policy: Some(retention_policy.into()),
426 timestamping: Some(timestamping.into()),
427 delete_on_empty: Some(delete_on_empty.into()),
428 encryption: Some(encryption.into()),
429 }
430 }
431}
432
433impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
434 type Error = types::ValidationError;
435
436 fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
437 let StreamConfig {
438 storage_class,
439 retention_policy,
440 timestamping,
441 delete_on_empty,
442 encryption,
443 } = value;
444
445 let retention_policy = match retention_policy {
446 None => None,
447 Some(policy) => Some(policy.try_into()?),
448 };
449
450 Ok(Self {
451 storage_class: storage_class.map(Into::into),
452 retention_policy,
453 timestamping: timestamping.map(Into::into).unwrap_or_default(),
454 delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
455 encryption: encryption.map(Into::into).unwrap_or_default(),
456 })
457 }
458}
459
460#[rustfmt::skip]
461#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
462#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
463pub struct StreamReconfiguration {
464 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
466 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
467 pub storage_class: Maybe<Option<StorageClass>>,
468 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
471 #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
472 pub retention_policy: Maybe<Option<RetentionPolicy>>,
473 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
475 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
476 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
477 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
479 #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
480 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
481 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
483 #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionReconfiguration>))]
484 pub encryption: Maybe<Option<EncryptionReconfiguration>>,
485}
486
487impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
488 type Error = types::ValidationError;
489
490 fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
491 let StreamReconfiguration {
492 storage_class,
493 retention_policy,
494 timestamping,
495 delete_on_empty,
496 encryption,
497 } = value;
498
499 Ok(Self {
500 storage_class: storage_class.map_opt(Into::into),
501 retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
502 timestamping: timestamping.map_opt(Into::into),
503 delete_on_empty: delete_on_empty.map_opt(Into::into),
504 encryption: encryption.map_opt(Into::into),
505 })
506 }
507}
508
509impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
510 fn from(value: types::config::StreamReconfiguration) -> Self {
511 let types::config::StreamReconfiguration {
512 storage_class,
513 retention_policy,
514 timestamping,
515 delete_on_empty,
516 encryption,
517 } = value;
518
519 Self {
520 storage_class: storage_class.map_opt(Into::into),
521 retention_policy: retention_policy.map_opt(Into::into),
522 timestamping: timestamping.map_opt(Into::into),
523 delete_on_empty: delete_on_empty.map_opt(Into::into),
524 encryption: encryption.map_opt(Into::into),
525 }
526 }
527}
528
529#[rustfmt::skip]
530#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
531#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
532pub struct BasinConfig {
533 pub default_stream_config: Option<StreamConfig>,
535 #[serde(default)]
537 #[cfg_attr(feature = "utoipa", schema(default = false))]
538 pub create_stream_on_append: bool,
539 #[serde(default)]
541 #[cfg_attr(feature = "utoipa", schema(default = false))]
542 pub create_stream_on_read: bool,
543}
544
545impl TryFrom<BasinConfig> for types::config::BasinConfig {
546 type Error = types::ValidationError;
547
548 fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
549 let BasinConfig {
550 default_stream_config,
551 create_stream_on_append,
552 create_stream_on_read,
553 } = value;
554
555 Ok(Self {
556 default_stream_config: match default_stream_config {
557 Some(config) => config.try_into()?,
558 None => Default::default(),
559 },
560 create_stream_on_append,
561 create_stream_on_read,
562 })
563 }
564}
565
566impl From<types::config::BasinConfig> for BasinConfig {
567 fn from(value: types::config::BasinConfig) -> Self {
568 let types::config::BasinConfig {
569 default_stream_config,
570 create_stream_on_append,
571 create_stream_on_read,
572 } = value;
573
574 Self {
575 default_stream_config: StreamConfig::to_opt(default_stream_config),
576 create_stream_on_append,
577 create_stream_on_read,
578 }
579 }
580}
581
582#[rustfmt::skip]
583#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
584#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
585pub struct BasinReconfiguration {
586 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
588 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
589 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
590 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
592 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
593 pub create_stream_on_append: Maybe<bool>,
594 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
596 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
597 pub create_stream_on_read: Maybe<bool>,
598}
599
600impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
601 type Error = types::ValidationError;
602
603 fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
604 let BasinReconfiguration {
605 default_stream_config,
606 create_stream_on_append,
607 create_stream_on_read,
608 } = value;
609
610 Ok(Self {
611 default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
612 create_stream_on_append: create_stream_on_append.map(Into::into),
613 create_stream_on_read: create_stream_on_read.map(Into::into),
614 })
615 }
616}
617
618impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
619 fn from(value: types::config::BasinReconfiguration) -> Self {
620 let types::config::BasinReconfiguration {
621 default_stream_config,
622 create_stream_on_append,
623 create_stream_on_read,
624 } = value;
625
626 Self {
627 default_stream_config: default_stream_config.map_opt(Into::into),
628 create_stream_on_append: create_stream_on_append.map(Into::into),
629 create_stream_on_read: create_stream_on_read.map(Into::into),
630 }
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use proptest::prelude::*;
637
638 use super::*;
639
640 fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
641 prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
642 }
643
644 fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
645 prop_oneof![
646 Just(TimestampingMode::ClientPrefer),
647 Just(TimestampingMode::ClientRequire),
648 Just(TimestampingMode::Arrival),
649 ]
650 }
651
652 fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
653 prop_oneof![
654 any::<u64>().prop_map(RetentionPolicy::Age),
655 Just(RetentionPolicy::Infinite(InfiniteRetention {})),
656 ]
657 }
658
659 fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
660 (
661 proptest::option::of(gen_timestamping_mode()),
662 proptest::option::of(any::<bool>()),
663 )
664 .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
665 }
666
667 fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
668 any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
669 }
670
671 fn gen_encryption_mode() -> impl Strategy<Value = EncryptionMode> {
672 prop_oneof![
673 Just(EncryptionMode::Plain),
674 Just(EncryptionMode::Aegis256),
675 Just(EncryptionMode::Aes256Gcm),
676 ]
677 }
678
679 fn gen_encryption_config() -> impl Strategy<Value = EncryptionConfig> {
680 (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
681 let allowed_modes = [
682 (plain, EncryptionMode::Plain),
683 (aegis, EncryptionMode::Aegis256),
684 (aes, EncryptionMode::Aes256Gcm),
685 ]
686 .into_iter()
687 .filter_map(|(include, mode)| include.then_some(mode))
688 .collect();
689 EncryptionConfig { allowed_modes }
690 })
691 }
692
693 fn gen_encryption_reconfiguration() -> impl Strategy<Value = EncryptionReconfiguration> {
694 prop_oneof![
695 Just(Maybe::Unspecified),
696 proptest::collection::vec(gen_encryption_mode(), 0..=3).prop_map(Maybe::Specified),
697 ]
698 .prop_map(|allowed_modes| EncryptionReconfiguration { allowed_modes })
699 }
700
701 fn gen_internal_encryption_modes()
702 -> impl Strategy<Value = enumset::EnumSet<encryption::EncryptionMode>> {
703 (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
704 let mut allowed_modes = enumset::EnumSet::new();
705 if plain {
706 allowed_modes.insert(encryption::EncryptionMode::Plain);
707 }
708 if aegis {
709 allowed_modes.insert(encryption::EncryptionMode::Aegis256);
710 }
711 if aes {
712 allowed_modes.insert(encryption::EncryptionMode::Aes256Gcm);
713 }
714 allowed_modes
715 })
716 }
717
718 fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
719 (
720 proptest::option::of(gen_storage_class()),
721 proptest::option::of(gen_retention_policy()),
722 proptest::option::of(gen_timestamping_config()),
723 proptest::option::of(gen_delete_on_empty_config()),
724 proptest::option::of(gen_encryption_config()),
725 )
726 .prop_map(
727 |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
728 StreamConfig {
729 storage_class,
730 retention_policy,
731 timestamping,
732 delete_on_empty,
733 encryption,
734 }
735 },
736 )
737 }
738
739 fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
740 (
741 proptest::option::of(gen_stream_config()),
742 any::<bool>(),
743 any::<bool>(),
744 )
745 .prop_map(
746 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
747 BasinConfig {
748 default_stream_config,
749 create_stream_on_append,
750 create_stream_on_read,
751 }
752 },
753 )
754 }
755
756 fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
757 inner: impl Strategy<Value = T>,
758 ) -> impl Strategy<Value = Maybe<Option<T>>> {
759 prop_oneof![
760 Just(Maybe::Unspecified),
761 Just(Maybe::Specified(None)),
762 inner.prop_map(|v| Maybe::Specified(Some(v))),
763 ]
764 }
765
766 fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
767 (
768 gen_maybe(gen_storage_class()),
769 gen_maybe(gen_retention_policy()),
770 gen_maybe(gen_timestamping_reconfiguration()),
771 gen_maybe(gen_delete_on_empty_reconfiguration()),
772 gen_maybe(gen_encryption_reconfiguration()),
773 )
774 .prop_map(
775 |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
776 StreamReconfiguration {
777 storage_class,
778 retention_policy,
779 timestamping,
780 delete_on_empty,
781 encryption,
782 }
783 },
784 )
785 }
786
787 fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
788 (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
789 .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
790 }
791
792 fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
793 {
794 gen_maybe(any::<u64>())
795 .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
796 }
797
798 fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
799 (
800 gen_maybe(gen_stream_reconfiguration()),
801 prop_oneof![
802 Just(Maybe::Unspecified),
803 any::<bool>().prop_map(Maybe::Specified),
804 ],
805 prop_oneof![
806 Just(Maybe::Unspecified),
807 any::<bool>().prop_map(Maybe::Specified),
808 ],
809 )
810 .prop_map(
811 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
812 BasinReconfiguration {
813 default_stream_config,
814 create_stream_on_append,
815 create_stream_on_read,
816 }
817 },
818 )
819 }
820
821 fn gen_internal_optional_stream_config()
822 -> impl Strategy<Value = types::config::OptionalStreamConfig> {
823 (
824 proptest::option::of(gen_storage_class()),
825 proptest::option::of(gen_retention_policy()),
826 proptest::option::of(gen_timestamping_mode()),
827 proptest::option::of(any::<bool>()),
828 proptest::option::of(any::<u64>()),
829 gen_internal_encryption_modes(),
830 )
831 .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe, encryption)| {
832 types::config::OptionalStreamConfig {
833 storage_class: sc.map(Into::into),
834 retention_policy: rp.map(|rp| match rp {
835 RetentionPolicy::Age(secs) => {
836 types::config::RetentionPolicy::Age(Duration::from_secs(secs))
837 }
838 RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
839 }),
840 timestamping: types::config::OptionalTimestampingConfig {
841 mode: ts_mode.map(Into::into),
842 uncapped: ts_uncapped,
843 },
844 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
845 min_age: doe.map(Duration::from_secs),
846 },
847 encryption: types::config::OptionalEncryptionConfig {
848 allowed_modes: encryption,
849 },
850 }
851 })
852 }
853
854 proptest! {
855 #[test]
856 fn stream_config_conversion_validates(config in gen_stream_config()) {
857 let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
858 let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
859
860 if has_zero_age {
861 prop_assert!(result.is_err());
862 } else {
863 prop_assert!(result.is_ok());
864 }
865 }
866
867 #[test]
868 fn basin_config_conversion_validates(config in gen_basin_config()) {
869 let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
870 matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
871 });
872
873 let result: Result<types::config::BasinConfig, _> = config.try_into();
874
875 if has_invalid_config {
876 prop_assert!(result.is_err());
877 } else {
878 prop_assert!(result.is_ok());
879 }
880 }
881
882 #[test]
883 fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
884 let has_zero_age = matches!(
885 reconfig.retention_policy,
886 Maybe::Specified(Some(RetentionPolicy::Age(0)))
887 );
888 let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
889
890 if has_zero_age {
891 prop_assert!(result.is_err());
892 } else {
893 prop_assert!(result.is_ok());
894 }
895 }
896
897 #[test]
898 fn merge_stream_or_basin_or_default(
899 stream in gen_internal_optional_stream_config(),
900 basin in gen_internal_optional_stream_config(),
901 ) {
902 let merged = stream.clone().merge(basin.clone());
903
904 prop_assert_eq!(
905 merged.storage_class,
906 stream.storage_class.or(basin.storage_class).unwrap_or_default()
907 );
908 prop_assert_eq!(
909 merged.retention_policy,
910 stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
911 );
912 prop_assert_eq!(
913 merged.timestamping.mode,
914 stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
915 );
916 prop_assert_eq!(
917 merged.timestamping.uncapped,
918 stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
919 );
920 prop_assert_eq!(
921 merged.delete_on_empty.min_age,
922 stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
923 );
924 prop_assert_eq!(
925 merged.encryption.allowed_modes,
926 if !stream.encryption.allowed_modes.is_empty() {
927 stream.encryption.allowed_modes
928 } else if !basin.encryption.allowed_modes.is_empty() {
929 basin.encryption.allowed_modes
930 } else {
931 types::config::DEFAULT_ALLOWED_ENCRYPTION_MODES
932 }
933 );
934 }
935
936 #[test]
937 fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
938 let reconfig = types::config::StreamReconfiguration::default();
939 let result = base.clone().reconfigure(reconfig);
940
941 prop_assert_eq!(result.storage_class, base.storage_class);
942 prop_assert_eq!(result.retention_policy, base.retention_policy);
943 prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
944 prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
945 prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
946 prop_assert_eq!(result.encryption.allowed_modes, base.encryption.allowed_modes);
947 }
948
949 #[test]
950 fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
951 let reconfig = types::config::StreamReconfiguration {
952 storage_class: Maybe::Specified(None),
953 retention_policy: Maybe::Specified(None),
954 timestamping: Maybe::Specified(None),
955 delete_on_empty: Maybe::Specified(None),
956 encryption: Maybe::Specified(None),
957 };
958 let result = base.reconfigure(reconfig);
959
960 prop_assert!(result.storage_class.is_none());
961 prop_assert!(result.retention_policy.is_none());
962 prop_assert!(result.timestamping.mode.is_none());
963 prop_assert!(result.timestamping.uncapped.is_none());
964 prop_assert!(result.delete_on_empty.min_age.is_none());
965 prop_assert!(result.encryption.allowed_modes.is_empty());
966 }
967
968 #[test]
969 fn reconfigure_specified_some_sets_value(
970 base in gen_internal_optional_stream_config(),
971 new_sc in gen_storage_class(),
972 new_rp_secs in 1u64..u64::MAX,
973 ) {
974 let reconfig = types::config::StreamReconfiguration {
975 storage_class: Maybe::Specified(Some(new_sc.into())),
976 retention_policy: Maybe::Specified(Some(
977 types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
978 )),
979 ..Default::default()
980 };
981 let result = base.reconfigure(reconfig);
982
983 prop_assert_eq!(result.storage_class, Some(new_sc.into()));
984 prop_assert_eq!(
985 result.retention_policy,
986 Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
987 );
988 }
989
990 #[test]
991 fn to_opt_returns_some_for_non_defaults(
992 sc in gen_storage_class(),
993 doe_secs in 1u64..u64::MAX,
994 ts_mode in gen_timestamping_mode(),
995 ) {
996 let internal = types::config::OptionalStreamConfig {
998 storage_class: Some(sc.into()),
999 ..Default::default()
1000 };
1001 prop_assert!(StreamConfig::to_opt(internal).is_some());
1002
1003 let internal = types::config::OptionalDeleteOnEmptyConfig {
1005 min_age: Some(Duration::from_secs(doe_secs)),
1006 };
1007 let api = DeleteOnEmptyConfig::to_opt(internal);
1008 prop_assert!(api.is_some());
1009 prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
1010
1011 let internal = types::config::OptionalTimestampingConfig {
1013 mode: Some(ts_mode.into()),
1014 uncapped: None,
1015 };
1016 prop_assert!(TimestampingConfig::to_opt(internal).is_some());
1017 }
1018
1019 #[test]
1020 fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
1021 let has_zero_age = matches!(
1022 &reconfig.default_stream_config,
1023 Maybe::Specified(Some(sr)) if matches!(
1024 sr.retention_policy,
1025 Maybe::Specified(Some(RetentionPolicy::Age(0)))
1026 )
1027 );
1028 let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
1029
1030 if has_zero_age {
1031 prop_assert!(result.is_err());
1032 } else {
1033 prop_assert!(result.is_ok());
1034 }
1035 }
1036
1037 #[test]
1038 fn reconfigure_basin_unspecified_preserves(
1039 base_sc in proptest::option::of(gen_storage_class()),
1040 base_on_append in any::<bool>(),
1041 base_on_read in any::<bool>(),
1042 ) {
1043 let base = types::config::BasinConfig {
1044 default_stream_config: types::config::OptionalStreamConfig {
1045 storage_class: base_sc.map(Into::into),
1046 ..Default::default()
1047 },
1048 create_stream_on_append: base_on_append,
1049 create_stream_on_read: base_on_read,
1050 };
1051
1052 let reconfig = types::config::BasinReconfiguration::default();
1053 let result = base.clone().reconfigure(reconfig);
1054
1055 prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
1056 prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
1057 prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
1058 }
1059
1060 #[test]
1061 fn reconfigure_basin_specified_updates(
1062 base_on_append in any::<bool>(),
1063 new_on_append in any::<bool>(),
1064 new_sc in gen_storage_class(),
1065 ) {
1066 let base = types::config::BasinConfig {
1067 create_stream_on_append: base_on_append,
1068 ..Default::default()
1069 };
1070
1071 let reconfig = types::config::BasinReconfiguration {
1072 default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
1073 storage_class: Maybe::Specified(Some(new_sc.into())),
1074 ..Default::default()
1075 })),
1076 create_stream_on_append: Maybe::Specified(new_on_append),
1077 ..Default::default()
1078 };
1079 let result = base.reconfigure(reconfig);
1080
1081 prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
1082 prop_assert_eq!(result.create_stream_on_append, new_on_append);
1083 }
1084
1085 #[test]
1086 fn reconfigure_nested_partial_update(
1087 base_mode in gen_timestamping_mode(),
1088 base_uncapped in any::<bool>(),
1089 new_mode in gen_timestamping_mode(),
1090 ) {
1091 let base = types::config::OptionalStreamConfig {
1092 timestamping: types::config::OptionalTimestampingConfig {
1093 mode: Some(base_mode.into()),
1094 uncapped: Some(base_uncapped),
1095 },
1096 ..Default::default()
1097 };
1098
1099 let expected_mode: types::config::TimestampingMode = new_mode.into();
1100
1101 let reconfig = types::config::StreamReconfiguration {
1102 timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1103 mode: Maybe::Specified(Some(expected_mode)),
1104 uncapped: Maybe::Unspecified,
1105 })),
1106 ..Default::default()
1107 };
1108 let result = base.reconfigure(reconfig);
1109
1110 prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1111 prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1112 }
1113 }
1114
1115 #[test]
1116 fn to_opt_returns_none_for_defaults() {
1117 assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1119
1120 let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1122 let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
1123 min_age: Some(Duration::ZERO),
1124 };
1125 assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1126 assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
1127
1128 assert!(
1130 TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1131 .is_none()
1132 );
1133 }
1134
1135 #[test]
1136 fn empty_json_converts_to_all_none() {
1137 let json = serde_json::json!({});
1138 let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1139 let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1140
1141 assert!(
1142 internal.storage_class.is_none(),
1143 "storage_class should be None"
1144 );
1145 assert!(
1146 internal.retention_policy.is_none(),
1147 "retention_policy should be None"
1148 );
1149 assert!(
1150 internal.timestamping.mode.is_none(),
1151 "timestamping.mode should be None"
1152 );
1153 assert!(
1154 internal.timestamping.uncapped.is_none(),
1155 "timestamping.uncapped should be None"
1156 );
1157 assert!(
1158 internal.delete_on_empty.min_age.is_none(),
1159 "delete_on_empty.min_age should be None"
1160 );
1161 assert!(
1162 internal.encryption.allowed_modes.is_empty(),
1163 "encryption.allowed_modes should be empty"
1164 );
1165 }
1166}