1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11 Standard,
13 Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18 fn from(value: StorageClass) -> Self {
19 match value {
20 StorageClass::Express => Self::Express,
21 StorageClass::Standard => Self::Standard,
22 }
23 }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27 fn from(value: types::config::StorageClass) -> Self {
28 match value {
29 types::config::StorageClass::Express => Self::Express,
30 types::config::StorageClass::Standard => Self::Standard,
31 }
32 }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40 Age(u64),
43 Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53 type Error = types::ValidationError;
54
55 fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56 match value {
57 RetentionPolicy::Age(0) => Err(types::ValidationError(
58 "age must be greater than 0 seconds".to_string(),
59 )),
60 RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61 RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62 }
63 }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67 fn from(value: types::config::RetentionPolicy) -> Self {
68 match value {
69 types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70 types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71 }
72 }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80 #[default]
82 ClientPrefer,
83 ClientRequire,
85 Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90 fn from(value: TimestampingMode) -> Self {
91 match value {
92 TimestampingMode::ClientPrefer => Self::ClientPrefer,
93 TimestampingMode::ClientRequire => Self::ClientRequire,
94 TimestampingMode::Arrival => Self::Arrival,
95 }
96 }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100 fn from(value: types::config::TimestampingMode) -> Self {
101 match value {
102 types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103 types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104 types::config::TimestampingMode::Arrival => Self::Arrival,
105 }
106 }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113 pub mode: Option<TimestampingMode>,
115 pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121 pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122 let config = TimestampingConfig {
123 mode: config.mode.map(Into::into),
124 uncapped: config.uncapped,
125 };
126 if config == Self::default() {
127 None
128 } else {
129 Some(config)
130 }
131 }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135 fn from(value: types::config::TimestampingConfig) -> Self {
136 Self {
137 mode: Some(value.mode.into()),
138 uncapped: Some(value.uncapped),
139 }
140 }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144 fn from(value: TimestampingConfig) -> Self {
145 Self {
146 mode: value.mode.map(Into::into),
147 uncapped: value.uncapped,
148 }
149 }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159 pub mode: Maybe<Option<TimestampingMode>>,
160 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163 pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167 fn from(value: TimestampingReconfiguration) -> Self {
168 Self {
169 mode: value.mode.map_opt(Into::into),
170 uncapped: value.uncapped,
171 }
172 }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176 fn from(value: types::config::TimestampingReconfiguration) -> Self {
177 Self {
178 mode: value.mode.map_opt(Into::into),
179 uncapped: value.uncapped,
180 }
181 }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188 #[serde(default)]
191 pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195 pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196 let min_age = config.min_age.unwrap_or_default();
197 if min_age > Duration::ZERO {
198 Some(DeleteOnEmptyConfig {
199 min_age_secs: min_age.as_secs(),
200 })
201 } else {
202 None
203 }
204 }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208 fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209 Self {
210 min_age_secs: value.min_age.as_secs(),
211 }
212 }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216 fn from(value: DeleteOnEmptyConfig) -> Self {
217 Self {
218 min_age: Duration::from_secs(value.min_age_secs),
219 }
220 }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224 fn from(value: DeleteOnEmptyConfig) -> Self {
225 Self {
226 min_age: Some(Duration::from_secs(value.min_age_secs)),
227 }
228 }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238 #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239 pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244 Self {
245 min_age: value.min_age_secs.map_opt(Duration::from_secs),
246 }
247 }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251 fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252 Self {
253 min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254 }
255 }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
259#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
260#[serde(rename_all = "kebab-case")]
261pub enum EncryptionMode {
262 Plain,
264 Aegis256,
266 Aes256Gcm,
268}
269
270impl From<EncryptionMode> for encryption::EncryptionMode {
271 fn from(value: EncryptionMode) -> Self {
272 match value {
273 EncryptionMode::Plain => Self::Plain,
274 EncryptionMode::Aegis256 => Self::Aegis256,
275 EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
276 }
277 }
278}
279
280impl From<encryption::EncryptionMode> for EncryptionMode {
281 fn from(value: encryption::EncryptionMode) -> Self {
282 match value {
283 encryption::EncryptionMode::Plain => Self::Plain,
284 encryption::EncryptionMode::Aegis256 => Self::Aegis256,
285 encryption::EncryptionMode::Aes256Gcm => Self::Aes256Gcm,
286 }
287 }
288}
289
290#[rustfmt::skip]
291#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
292#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
293pub struct EncryptionConfig {
294 #[serde(default, skip_serializing_if = "Vec::is_empty")]
297 pub allowed_modes: Vec<EncryptionMode>,
298}
299
300impl EncryptionConfig {
301 pub fn to_opt(config: types::config::OptionalEncryptionConfig) -> Option<Self> {
302 let config = EncryptionConfig {
303 allowed_modes: config
304 .allowed_modes
305 .map(|modes| modes.into_iter().map(Into::into).collect())
306 .unwrap_or_default(),
307 };
308 if config == Self::default() {
309 None
310 } else {
311 Some(config)
312 }
313 }
314}
315
316impl From<types::config::EncryptionConfig> for EncryptionConfig {
317 fn from(value: types::config::EncryptionConfig) -> Self {
318 Self {
319 allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
320 }
321 }
322}
323
324impl From<EncryptionConfig> for types::config::OptionalEncryptionConfig {
325 fn from(value: EncryptionConfig) -> Self {
326 Self {
327 allowed_modes: if value.allowed_modes.is_empty() {
328 None
329 } else {
330 Some(
331 value
332 .allowed_modes
333 .into_iter()
334 .map(encryption::EncryptionMode::from)
335 .collect(),
336 )
337 },
338 }
339 }
340}
341
342#[rustfmt::skip]
343#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
344#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
345pub struct EncryptionReconfiguration {
346 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
349 #[cfg_attr(feature = "utoipa", schema(value_type = Option<Vec<EncryptionMode>>))]
350 pub allowed_modes: Maybe<Vec<EncryptionMode>>,
351}
352
353impl From<EncryptionReconfiguration> for types::config::EncryptionReconfiguration {
354 fn from(value: EncryptionReconfiguration) -> Self {
355 Self {
356 allowed_modes: value.allowed_modes.map(|modes| {
357 modes
358 .into_iter()
359 .map(encryption::EncryptionMode::from)
360 .collect()
361 }),
362 }
363 }
364}
365
366impl From<types::config::EncryptionReconfiguration> for EncryptionReconfiguration {
367 fn from(value: types::config::EncryptionReconfiguration) -> Self {
368 Self {
369 allowed_modes: value
370 .allowed_modes
371 .map(|modes| modes.into_iter().map(Into::into).collect()),
372 }
373 }
374}
375
376#[rustfmt::skip]
377#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
378#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
379pub struct StreamConfig {
380 pub storage_class: Option<StorageClass>,
382 pub retention_policy: Option<RetentionPolicy>,
385 pub timestamping: Option<TimestampingConfig>,
387 #[serde(default)]
389 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
390 #[serde(default)]
392 pub encryption: Option<EncryptionConfig>,
393}
394
395impl StreamConfig {
396 pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
397 let types::config::OptionalStreamConfig {
398 storage_class,
399 retention_policy,
400 timestamping,
401 delete_on_empty,
402 encryption,
403 } = config;
404
405 let config = StreamConfig {
406 storage_class: storage_class.map(Into::into),
407 retention_policy: retention_policy.map(Into::into),
408 timestamping: TimestampingConfig::to_opt(timestamping),
409 delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
410 encryption: EncryptionConfig::to_opt(encryption),
411 };
412 if config == Self::default() {
413 None
414 } else {
415 Some(config)
416 }
417 }
418}
419
420impl From<types::config::StreamConfig> for StreamConfig {
421 fn from(value: types::config::StreamConfig) -> Self {
422 let types::config::StreamConfig {
423 storage_class,
424 retention_policy,
425 timestamping,
426 delete_on_empty,
427 encryption,
428 } = value;
429
430 Self {
431 storage_class: Some(storage_class.into()),
432 retention_policy: Some(retention_policy.into()),
433 timestamping: Some(timestamping.into()),
434 delete_on_empty: Some(delete_on_empty.into()),
435 encryption: Some(encryption.into()),
436 }
437 }
438}
439
440impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
441 type Error = types::ValidationError;
442
443 fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
444 let StreamConfig {
445 storage_class,
446 retention_policy,
447 timestamping,
448 delete_on_empty,
449 encryption,
450 } = value;
451
452 let retention_policy = match retention_policy {
453 None => None,
454 Some(policy) => Some(policy.try_into()?),
455 };
456
457 Ok(Self {
458 storage_class: storage_class.map(Into::into),
459 retention_policy,
460 timestamping: timestamping.map(Into::into).unwrap_or_default(),
461 delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
462 encryption: encryption.map(Into::into).unwrap_or_default(),
463 })
464 }
465}
466
467#[rustfmt::skip]
468#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
469#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
470pub struct StreamReconfiguration {
471 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
473 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
474 pub storage_class: Maybe<Option<StorageClass>>,
475 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
478 #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
479 pub retention_policy: Maybe<Option<RetentionPolicy>>,
480 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
482 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
483 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
484 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
486 #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
487 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
488 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
490 #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionReconfiguration>))]
491 pub encryption: Maybe<Option<EncryptionReconfiguration>>,
492}
493
494impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
495 type Error = types::ValidationError;
496
497 fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
498 let StreamReconfiguration {
499 storage_class,
500 retention_policy,
501 timestamping,
502 delete_on_empty,
503 encryption,
504 } = value;
505
506 Ok(Self {
507 storage_class: storage_class.map_opt(Into::into),
508 retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
509 timestamping: timestamping.map_opt(Into::into),
510 delete_on_empty: delete_on_empty.map_opt(Into::into),
511 encryption: encryption.map_opt(Into::into),
512 })
513 }
514}
515
516impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
517 fn from(value: types::config::StreamReconfiguration) -> Self {
518 let types::config::StreamReconfiguration {
519 storage_class,
520 retention_policy,
521 timestamping,
522 delete_on_empty,
523 encryption,
524 } = value;
525
526 Self {
527 storage_class: storage_class.map_opt(Into::into),
528 retention_policy: retention_policy.map_opt(Into::into),
529 timestamping: timestamping.map_opt(Into::into),
530 delete_on_empty: delete_on_empty.map_opt(Into::into),
531 encryption: encryption.map_opt(Into::into),
532 }
533 }
534}
535
536#[rustfmt::skip]
537#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
538#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
539pub struct BasinConfig {
540 pub default_stream_config: Option<StreamConfig>,
542 #[serde(default)]
544 #[cfg_attr(feature = "utoipa", schema(default = false))]
545 pub create_stream_on_append: bool,
546 #[serde(default)]
548 #[cfg_attr(feature = "utoipa", schema(default = false))]
549 pub create_stream_on_read: bool,
550}
551
552impl TryFrom<BasinConfig> for types::config::BasinConfig {
553 type Error = types::ValidationError;
554
555 fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
556 let BasinConfig {
557 default_stream_config,
558 create_stream_on_append,
559 create_stream_on_read,
560 } = value;
561
562 Ok(Self {
563 default_stream_config: match default_stream_config {
564 Some(config) => config.try_into()?,
565 None => Default::default(),
566 },
567 create_stream_on_append,
568 create_stream_on_read,
569 })
570 }
571}
572
573impl From<types::config::BasinConfig> for BasinConfig {
574 fn from(value: types::config::BasinConfig) -> Self {
575 let types::config::BasinConfig {
576 default_stream_config,
577 create_stream_on_append,
578 create_stream_on_read,
579 } = value;
580
581 Self {
582 default_stream_config: StreamConfig::to_opt(default_stream_config),
583 create_stream_on_append,
584 create_stream_on_read,
585 }
586 }
587}
588
589#[rustfmt::skip]
590#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
591#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
592pub struct BasinReconfiguration {
593 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
595 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
596 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
597 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
599 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
600 pub create_stream_on_append: Maybe<bool>,
601 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
603 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
604 pub create_stream_on_read: Maybe<bool>,
605}
606
607impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
608 type Error = types::ValidationError;
609
610 fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
611 let BasinReconfiguration {
612 default_stream_config,
613 create_stream_on_append,
614 create_stream_on_read,
615 } = value;
616
617 Ok(Self {
618 default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
619 create_stream_on_append: create_stream_on_append.map(Into::into),
620 create_stream_on_read: create_stream_on_read.map(Into::into),
621 })
622 }
623}
624
625impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
626 fn from(value: types::config::BasinReconfiguration) -> Self {
627 let types::config::BasinReconfiguration {
628 default_stream_config,
629 create_stream_on_append,
630 create_stream_on_read,
631 } = value;
632
633 Self {
634 default_stream_config: default_stream_config.map_opt(Into::into),
635 create_stream_on_append: create_stream_on_append.map(Into::into),
636 create_stream_on_read: create_stream_on_read.map(Into::into),
637 }
638 }
639}
640
641#[cfg(test)]
642mod tests {
643 use proptest::prelude::*;
644
645 use super::*;
646
647 fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
648 prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
649 }
650
651 fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
652 prop_oneof![
653 Just(TimestampingMode::ClientPrefer),
654 Just(TimestampingMode::ClientRequire),
655 Just(TimestampingMode::Arrival),
656 ]
657 }
658
659 fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
660 prop_oneof![
661 any::<u64>().prop_map(RetentionPolicy::Age),
662 Just(RetentionPolicy::Infinite(InfiniteRetention {})),
663 ]
664 }
665
666 fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
667 (
668 proptest::option::of(gen_timestamping_mode()),
669 proptest::option::of(any::<bool>()),
670 )
671 .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
672 }
673
674 fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
675 any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
676 }
677
678 fn gen_encryption_mode() -> impl Strategy<Value = EncryptionMode> {
679 prop_oneof![
680 Just(EncryptionMode::Plain),
681 Just(EncryptionMode::Aegis256),
682 Just(EncryptionMode::Aes256Gcm),
683 ]
684 }
685
686 fn gen_encryption_config() -> impl Strategy<Value = EncryptionConfig> {
687 (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
688 let allowed_modes = [
689 (plain, EncryptionMode::Plain),
690 (aegis, EncryptionMode::Aegis256),
691 (aes, EncryptionMode::Aes256Gcm),
692 ]
693 .into_iter()
694 .filter_map(|(include, mode)| include.then_some(mode))
695 .collect();
696 EncryptionConfig { allowed_modes }
697 })
698 }
699
700 fn gen_encryption_reconfiguration() -> impl Strategy<Value = EncryptionReconfiguration> {
701 prop_oneof![
702 Just(Maybe::Unspecified),
703 proptest::collection::vec(gen_encryption_mode(), 0..=3).prop_map(Maybe::Specified),
704 ]
705 .prop_map(|allowed_modes| EncryptionReconfiguration { allowed_modes })
706 }
707
708 fn gen_internal_encryption_modes()
709 -> impl Strategy<Value = enumset::EnumSet<encryption::EncryptionMode>> {
710 (any::<bool>(), any::<bool>(), any::<bool>()).prop_map(|(plain, aegis, aes)| {
711 let mut allowed_modes = enumset::EnumSet::new();
712 if plain {
713 allowed_modes.insert(encryption::EncryptionMode::Plain);
714 }
715 if aegis {
716 allowed_modes.insert(encryption::EncryptionMode::Aegis256);
717 }
718 if aes {
719 allowed_modes.insert(encryption::EncryptionMode::Aes256Gcm);
720 }
721 allowed_modes
722 })
723 }
724
725 fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
726 (
727 proptest::option::of(gen_storage_class()),
728 proptest::option::of(gen_retention_policy()),
729 proptest::option::of(gen_timestamping_config()),
730 proptest::option::of(gen_delete_on_empty_config()),
731 proptest::option::of(gen_encryption_config()),
732 )
733 .prop_map(
734 |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
735 StreamConfig {
736 storage_class,
737 retention_policy,
738 timestamping,
739 delete_on_empty,
740 encryption,
741 }
742 },
743 )
744 }
745
746 fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
747 (
748 proptest::option::of(gen_stream_config()),
749 any::<bool>(),
750 any::<bool>(),
751 )
752 .prop_map(
753 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
754 BasinConfig {
755 default_stream_config,
756 create_stream_on_append,
757 create_stream_on_read,
758 }
759 },
760 )
761 }
762
763 fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
764 inner: impl Strategy<Value = T>,
765 ) -> impl Strategy<Value = Maybe<Option<T>>> {
766 prop_oneof![
767 Just(Maybe::Unspecified),
768 Just(Maybe::Specified(None)),
769 inner.prop_map(|v| Maybe::Specified(Some(v))),
770 ]
771 }
772
773 fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
774 (
775 gen_maybe(gen_storage_class()),
776 gen_maybe(gen_retention_policy()),
777 gen_maybe(gen_timestamping_reconfiguration()),
778 gen_maybe(gen_delete_on_empty_reconfiguration()),
779 gen_maybe(gen_encryption_reconfiguration()),
780 )
781 .prop_map(
782 |(storage_class, retention_policy, timestamping, delete_on_empty, encryption)| {
783 StreamReconfiguration {
784 storage_class,
785 retention_policy,
786 timestamping,
787 delete_on_empty,
788 encryption,
789 }
790 },
791 )
792 }
793
794 fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
795 (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
796 .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
797 }
798
799 fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
800 {
801 gen_maybe(any::<u64>())
802 .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
803 }
804
805 fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
806 (
807 gen_maybe(gen_stream_reconfiguration()),
808 prop_oneof![
809 Just(Maybe::Unspecified),
810 any::<bool>().prop_map(Maybe::Specified),
811 ],
812 prop_oneof![
813 Just(Maybe::Unspecified),
814 any::<bool>().prop_map(Maybe::Specified),
815 ],
816 )
817 .prop_map(
818 |(default_stream_config, create_stream_on_append, create_stream_on_read)| {
819 BasinReconfiguration {
820 default_stream_config,
821 create_stream_on_append,
822 create_stream_on_read,
823 }
824 },
825 )
826 }
827
828 fn gen_internal_optional_stream_config()
829 -> impl Strategy<Value = types::config::OptionalStreamConfig> {
830 (
831 proptest::option::of(gen_storage_class()),
832 proptest::option::of(gen_retention_policy()),
833 proptest::option::of(gen_timestamping_mode()),
834 proptest::option::of(any::<bool>()),
835 proptest::option::of(any::<u64>()),
836 proptest::option::of(gen_internal_encryption_modes()),
837 )
838 .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe, encryption)| {
839 types::config::OptionalStreamConfig {
840 storage_class: sc.map(Into::into),
841 retention_policy: rp.map(|rp| match rp {
842 RetentionPolicy::Age(secs) => {
843 types::config::RetentionPolicy::Age(Duration::from_secs(secs))
844 }
845 RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
846 }),
847 timestamping: types::config::OptionalTimestampingConfig {
848 mode: ts_mode.map(Into::into),
849 uncapped: ts_uncapped,
850 },
851 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
852 min_age: doe.map(Duration::from_secs),
853 },
854 encryption: types::config::OptionalEncryptionConfig {
855 allowed_modes: encryption,
856 },
857 }
858 })
859 }
860
861 proptest! {
862 #[test]
863 fn stream_config_conversion_validates(config in gen_stream_config()) {
864 let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
865 let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
866
867 if has_zero_age {
868 prop_assert!(result.is_err());
869 } else {
870 prop_assert!(result.is_ok());
871 }
872 }
873
874 #[test]
875 fn basin_config_conversion_validates(config in gen_basin_config()) {
876 let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
877 matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
878 });
879
880 let result: Result<types::config::BasinConfig, _> = config.try_into();
881
882 if has_invalid_config {
883 prop_assert!(result.is_err());
884 } else {
885 prop_assert!(result.is_ok());
886 }
887 }
888
889 #[test]
890 fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
891 let has_zero_age = matches!(
892 reconfig.retention_policy,
893 Maybe::Specified(Some(RetentionPolicy::Age(0)))
894 );
895 let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
896
897 if has_zero_age {
898 prop_assert!(result.is_err());
899 } else {
900 prop_assert!(result.is_ok());
901 }
902 }
903
904 #[test]
905 fn merge_stream_or_basin_or_default(
906 stream in gen_internal_optional_stream_config(),
907 basin in gen_internal_optional_stream_config(),
908 ) {
909 let merged = stream.clone().merge(basin.clone());
910
911 prop_assert_eq!(
912 merged.storage_class,
913 stream.storage_class.or(basin.storage_class).unwrap_or_default()
914 );
915 prop_assert_eq!(
916 merged.retention_policy,
917 stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
918 );
919 prop_assert_eq!(
920 merged.timestamping.mode,
921 stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
922 );
923 prop_assert_eq!(
924 merged.timestamping.uncapped,
925 stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
926 );
927 prop_assert_eq!(
928 merged.delete_on_empty.min_age,
929 stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
930 );
931 prop_assert_eq!(
932 merged.encryption.allowed_modes,
933 stream
934 .encryption
935 .allowed_modes
936 .or(basin.encryption.allowed_modes)
937 .unwrap_or(types::config::DEFAULT_ALLOWED_ENCRYPTION_MODES)
938 );
939 }
940
941 #[test]
942 fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
943 let reconfig = types::config::StreamReconfiguration::default();
944 let result = base.clone().reconfigure(reconfig);
945
946 prop_assert_eq!(result.storage_class, base.storage_class);
947 prop_assert_eq!(result.retention_policy, base.retention_policy);
948 prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
949 prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
950 prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
951 prop_assert_eq!(result.encryption.allowed_modes, base.encryption.allowed_modes);
952 }
953
954 #[test]
955 fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
956 let reconfig = types::config::StreamReconfiguration {
957 storage_class: Maybe::Specified(None),
958 retention_policy: Maybe::Specified(None),
959 timestamping: Maybe::Specified(None),
960 delete_on_empty: Maybe::Specified(None),
961 encryption: Maybe::Specified(None),
962 };
963 let result = base.reconfigure(reconfig);
964
965 prop_assert!(result.storage_class.is_none());
966 prop_assert!(result.retention_policy.is_none());
967 prop_assert!(result.timestamping.mode.is_none());
968 prop_assert!(result.timestamping.uncapped.is_none());
969 prop_assert!(result.delete_on_empty.min_age.is_none());
970 prop_assert!(result.encryption.allowed_modes.is_none());
971 }
972
973 #[test]
974 fn reconfigure_specified_some_sets_value(
975 base in gen_internal_optional_stream_config(),
976 new_sc in gen_storage_class(),
977 new_rp_secs in 1u64..u64::MAX,
978 ) {
979 let reconfig = types::config::StreamReconfiguration {
980 storage_class: Maybe::Specified(Some(new_sc.into())),
981 retention_policy: Maybe::Specified(Some(
982 types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
983 )),
984 ..Default::default()
985 };
986 let result = base.reconfigure(reconfig);
987
988 prop_assert_eq!(result.storage_class, Some(new_sc.into()));
989 prop_assert_eq!(
990 result.retention_policy,
991 Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
992 );
993 }
994
995 #[test]
996 fn to_opt_returns_some_for_non_defaults(
997 sc in gen_storage_class(),
998 doe_secs in 1u64..u64::MAX,
999 ts_mode in gen_timestamping_mode(),
1000 ) {
1001 let internal = types::config::OptionalStreamConfig {
1003 storage_class: Some(sc.into()),
1004 ..Default::default()
1005 };
1006 prop_assert!(StreamConfig::to_opt(internal).is_some());
1007
1008 let internal = types::config::OptionalDeleteOnEmptyConfig {
1010 min_age: Some(Duration::from_secs(doe_secs)),
1011 };
1012 let api = DeleteOnEmptyConfig::to_opt(internal);
1013 prop_assert!(api.is_some());
1014 prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
1015
1016 let internal = types::config::OptionalTimestampingConfig {
1018 mode: Some(ts_mode.into()),
1019 uncapped: None,
1020 };
1021 prop_assert!(TimestampingConfig::to_opt(internal).is_some());
1022 }
1023
1024 #[test]
1025 fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
1026 let has_zero_age = matches!(
1027 &reconfig.default_stream_config,
1028 Maybe::Specified(Some(sr)) if matches!(
1029 sr.retention_policy,
1030 Maybe::Specified(Some(RetentionPolicy::Age(0)))
1031 )
1032 );
1033 let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
1034
1035 if has_zero_age {
1036 prop_assert!(result.is_err());
1037 } else {
1038 prop_assert!(result.is_ok());
1039 }
1040 }
1041
1042 #[test]
1043 fn reconfigure_basin_unspecified_preserves(
1044 base_sc in proptest::option::of(gen_storage_class()),
1045 base_on_append in any::<bool>(),
1046 base_on_read in any::<bool>(),
1047 ) {
1048 let base = types::config::BasinConfig {
1049 default_stream_config: types::config::OptionalStreamConfig {
1050 storage_class: base_sc.map(Into::into),
1051 ..Default::default()
1052 },
1053 create_stream_on_append: base_on_append,
1054 create_stream_on_read: base_on_read,
1055 };
1056
1057 let reconfig = types::config::BasinReconfiguration::default();
1058 let result = base.clone().reconfigure(reconfig);
1059
1060 prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
1061 prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
1062 prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
1063 }
1064
1065 #[test]
1066 fn reconfigure_basin_specified_updates(
1067 base_on_append in any::<bool>(),
1068 new_on_append in any::<bool>(),
1069 new_sc in gen_storage_class(),
1070 ) {
1071 let base = types::config::BasinConfig {
1072 create_stream_on_append: base_on_append,
1073 ..Default::default()
1074 };
1075
1076 let reconfig = types::config::BasinReconfiguration {
1077 default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
1078 storage_class: Maybe::Specified(Some(new_sc.into())),
1079 ..Default::default()
1080 })),
1081 create_stream_on_append: Maybe::Specified(new_on_append),
1082 ..Default::default()
1083 };
1084 let result = base.reconfigure(reconfig);
1085
1086 prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
1087 prop_assert_eq!(result.create_stream_on_append, new_on_append);
1088 }
1089
1090 #[test]
1091 fn reconfigure_nested_partial_update(
1092 base_mode in gen_timestamping_mode(),
1093 base_uncapped in any::<bool>(),
1094 new_mode in gen_timestamping_mode(),
1095 ) {
1096 let base = types::config::OptionalStreamConfig {
1097 timestamping: types::config::OptionalTimestampingConfig {
1098 mode: Some(base_mode.into()),
1099 uncapped: Some(base_uncapped),
1100 },
1101 ..Default::default()
1102 };
1103
1104 let expected_mode: types::config::TimestampingMode = new_mode.into();
1105
1106 let reconfig = types::config::StreamReconfiguration {
1107 timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1108 mode: Maybe::Specified(Some(expected_mode)),
1109 uncapped: Maybe::Unspecified,
1110 })),
1111 ..Default::default()
1112 };
1113 let result = base.reconfigure(reconfig);
1114
1115 prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1116 prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1117 }
1118 }
1119
1120 #[test]
1121 fn to_opt_returns_none_for_defaults() {
1122 assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1124
1125 let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1127 let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
1128 min_age: Some(Duration::ZERO),
1129 };
1130 assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1131 assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
1132
1133 assert!(
1135 TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1136 .is_none()
1137 );
1138 }
1139
1140 #[test]
1141 fn empty_json_converts_to_all_none() {
1142 let json = serde_json::json!({});
1143 let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1144 let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1145
1146 assert!(
1147 internal.storage_class.is_none(),
1148 "storage_class should be None"
1149 );
1150 assert!(
1151 internal.retention_policy.is_none(),
1152 "retention_policy should be None"
1153 );
1154 assert!(
1155 internal.timestamping.mode.is_none(),
1156 "timestamping.mode should be None"
1157 );
1158 assert!(
1159 internal.timestamping.uncapped.is_none(),
1160 "timestamping.uncapped should be None"
1161 );
1162 assert!(
1163 internal.delete_on_empty.min_age.is_none(),
1164 "delete_on_empty.min_age should be None"
1165 );
1166 assert!(
1167 internal.encryption.allowed_modes.is_none(),
1168 "encryption.allowed_modes should be None"
1169 );
1170 }
1171}