1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11 Standard,
13 Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18 fn from(value: StorageClass) -> Self {
19 match value {
20 StorageClass::Express => Self::Express,
21 StorageClass::Standard => Self::Standard,
22 }
23 }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27 fn from(value: types::config::StorageClass) -> Self {
28 match value {
29 types::config::StorageClass::Express => Self::Express,
30 types::config::StorageClass::Standard => Self::Standard,
31 }
32 }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40 Age(u64),
43 Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53 type Error = types::ValidationError;
54
55 fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56 match value {
57 RetentionPolicy::Age(0) => Err(types::ValidationError(
58 "age must be greater than 0 seconds".to_string(),
59 )),
60 RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61 RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62 }
63 }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67 fn from(value: types::config::RetentionPolicy) -> Self {
68 match value {
69 types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70 types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71 }
72 }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80 #[default]
82 ClientPrefer,
83 ClientRequire,
85 Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90 fn from(value: TimestampingMode) -> Self {
91 match value {
92 TimestampingMode::ClientPrefer => Self::ClientPrefer,
93 TimestampingMode::ClientRequire => Self::ClientRequire,
94 TimestampingMode::Arrival => Self::Arrival,
95 }
96 }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100 fn from(value: types::config::TimestampingMode) -> Self {
101 match value {
102 types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103 types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104 types::config::TimestampingMode::Arrival => Self::Arrival,
105 }
106 }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113 pub mode: Option<TimestampingMode>,
115 pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121 pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122 let config = TimestampingConfig {
123 mode: config.mode.map(Into::into),
124 uncapped: config.uncapped,
125 };
126 if config == Self::default() {
127 None
128 } else {
129 Some(config)
130 }
131 }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135 fn from(value: types::config::TimestampingConfig) -> Self {
136 Self {
137 mode: Some(value.mode.into()),
138 uncapped: Some(value.uncapped),
139 }
140 }
141}
142
143impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
144 fn from(value: TimestampingConfig) -> Self {
145 Self {
146 mode: value.mode.map(Into::into),
147 uncapped: value.uncapped,
148 }
149 }
150}
151
152#[rustfmt::skip]
153#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
154#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
155pub struct TimestampingReconfiguration {
156 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
158 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
159 pub mode: Maybe<Option<TimestampingMode>>,
160 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
162 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
163 pub uncapped: Maybe<Option<bool>>,
164}
165
166impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
167 fn from(value: TimestampingReconfiguration) -> Self {
168 Self {
169 mode: value.mode.map_opt(Into::into),
170 uncapped: value.uncapped,
171 }
172 }
173}
174
175impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
176 fn from(value: types::config::TimestampingReconfiguration) -> Self {
177 Self {
178 mode: value.mode.map_opt(Into::into),
179 uncapped: value.uncapped,
180 }
181 }
182}
183
184#[rustfmt::skip]
185#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
186#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
187pub struct DeleteOnEmptyConfig {
188 #[serde(default)]
191 pub min_age_secs: u64,
192}
193
194impl DeleteOnEmptyConfig {
195 pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
196 let min_age = config.min_age.unwrap_or_default();
197 if min_age > Duration::ZERO {
198 Some(DeleteOnEmptyConfig {
199 min_age_secs: min_age.as_secs(),
200 })
201 } else {
202 None
203 }
204 }
205}
206
207impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
208 fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
209 Self {
210 min_age_secs: value.min_age.as_secs(),
211 }
212 }
213}
214
215impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
216 fn from(value: DeleteOnEmptyConfig) -> Self {
217 Self {
218 min_age: Duration::from_secs(value.min_age_secs),
219 }
220 }
221}
222
223impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
224 fn from(value: DeleteOnEmptyConfig) -> Self {
225 Self {
226 min_age: Some(Duration::from_secs(value.min_age_secs)),
227 }
228 }
229}
230
231#[rustfmt::skip]
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
234pub struct DeleteOnEmptyReconfiguration {
235 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
238 #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
239 pub min_age_secs: Maybe<Option<u64>>,
240}
241
242impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
243 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
244 Self {
245 min_age: value.min_age_secs.map_opt(Duration::from_secs),
246 }
247 }
248}
249
250impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
251 fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
252 Self {
253 min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
254 }
255 }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
259#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
260pub enum EncryptionAlgorithm {
261 #[serde(rename = "aegis-256")]
263 Aegis256,
264 #[serde(rename = "aes-256-gcm")]
266 Aes256Gcm,
267}
268
269impl From<EncryptionAlgorithm> for encryption::EncryptionAlgorithm {
270 fn from(value: EncryptionAlgorithm) -> Self {
271 match value {
272 EncryptionAlgorithm::Aegis256 => Self::Aegis256,
273 EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
274 }
275 }
276}
277
278impl From<encryption::EncryptionAlgorithm> for EncryptionAlgorithm {
279 fn from(value: encryption::EncryptionAlgorithm) -> Self {
280 match value {
281 encryption::EncryptionAlgorithm::Aegis256 => Self::Aegis256,
282 encryption::EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
283 }
284 }
285}
286
287#[rustfmt::skip]
288#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
289#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
290pub struct StreamConfig {
291 pub storage_class: Option<StorageClass>,
293 pub retention_policy: Option<RetentionPolicy>,
296 pub timestamping: Option<TimestampingConfig>,
298 #[serde(default)]
300 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
301}
302
303impl StreamConfig {
304 pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
305 let types::config::OptionalStreamConfig {
306 storage_class,
307 retention_policy,
308 timestamping,
309 delete_on_empty,
310 } = config;
311
312 let config = StreamConfig {
313 storage_class: storage_class.map(Into::into),
314 retention_policy: retention_policy.map(Into::into),
315 timestamping: TimestampingConfig::to_opt(timestamping),
316 delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
317 };
318 if config == Self::default() {
319 None
320 } else {
321 Some(config)
322 }
323 }
324}
325
326impl From<types::config::StreamConfig> for StreamConfig {
327 fn from(value: types::config::StreamConfig) -> Self {
328 let types::config::StreamConfig {
329 storage_class,
330 retention_policy,
331 timestamping,
332 delete_on_empty,
333 } = value;
334
335 Self {
336 storage_class: Some(storage_class.into()),
337 retention_policy: Some(retention_policy.into()),
338 timestamping: Some(timestamping.into()),
339 delete_on_empty: Some(delete_on_empty.into()),
340 }
341 }
342}
343
344impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
345 type Error = types::ValidationError;
346
347 fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
348 let StreamConfig {
349 storage_class,
350 retention_policy,
351 timestamping,
352 delete_on_empty,
353 } = value;
354
355 let retention_policy = match retention_policy {
356 None => None,
357 Some(policy) => Some(policy.try_into()?),
358 };
359
360 Ok(Self {
361 storage_class: storage_class.map(Into::into),
362 retention_policy,
363 timestamping: timestamping.map(Into::into).unwrap_or_default(),
364 delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
365 })
366 }
367}
368
369#[rustfmt::skip]
370#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
371#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
372pub struct StreamReconfiguration {
373 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
375 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
376 pub storage_class: Maybe<Option<StorageClass>>,
377 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
380 #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
381 pub retention_policy: Maybe<Option<RetentionPolicy>>,
382 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
384 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
385 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
386 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
388 #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
389 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
390}
391
392impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
393 type Error = types::ValidationError;
394
395 fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
396 let StreamReconfiguration {
397 storage_class,
398 retention_policy,
399 timestamping,
400 delete_on_empty,
401 } = value;
402
403 Ok(Self {
404 storage_class: storage_class.map_opt(Into::into),
405 retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
406 timestamping: timestamping.map_opt(Into::into),
407 delete_on_empty: delete_on_empty.map_opt(Into::into),
408 })
409 }
410}
411
412impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
413 fn from(value: types::config::StreamReconfiguration) -> Self {
414 let types::config::StreamReconfiguration {
415 storage_class,
416 retention_policy,
417 timestamping,
418 delete_on_empty,
419 } = value;
420
421 Self {
422 storage_class: storage_class.map_opt(Into::into),
423 retention_policy: retention_policy.map_opt(Into::into),
424 timestamping: timestamping.map_opt(Into::into),
425 delete_on_empty: delete_on_empty.map_opt(Into::into),
426 }
427 }
428}
429
430#[rustfmt::skip]
431#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
432#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
433pub struct BasinConfig {
434 pub default_stream_config: Option<StreamConfig>,
436 pub stream_cipher: Option<EncryptionAlgorithm>,
438 #[serde(default)]
440 #[cfg_attr(feature = "utoipa", schema(default = false))]
441 pub create_stream_on_append: bool,
442 #[serde(default)]
444 #[cfg_attr(feature = "utoipa", schema(default = false))]
445 pub create_stream_on_read: bool,
446}
447
448impl TryFrom<BasinConfig> for types::config::BasinConfig {
449 type Error = types::ValidationError;
450
451 fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
452 let BasinConfig {
453 default_stream_config,
454 stream_cipher,
455 create_stream_on_append,
456 create_stream_on_read,
457 } = value;
458
459 Ok(Self {
460 default_stream_config: match default_stream_config {
461 Some(config) => config.try_into()?,
462 None => Default::default(),
463 },
464 stream_cipher: stream_cipher.map(Into::into),
465 create_stream_on_append,
466 create_stream_on_read,
467 })
468 }
469}
470
471impl From<types::config::BasinConfig> for BasinConfig {
472 fn from(value: types::config::BasinConfig) -> Self {
473 let types::config::BasinConfig {
474 default_stream_config,
475 stream_cipher,
476 create_stream_on_append,
477 create_stream_on_read,
478 } = value;
479
480 Self {
481 default_stream_config: StreamConfig::to_opt(default_stream_config),
482 stream_cipher: stream_cipher.map(Into::into),
483 create_stream_on_append,
484 create_stream_on_read,
485 }
486 }
487}
488
489#[rustfmt::skip]
490#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
491#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
492pub struct BasinReconfiguration {
493 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
495 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
496 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
497 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
499 #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionAlgorithm>))]
500 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
501 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
503 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
504 pub create_stream_on_append: Maybe<bool>,
505 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
507 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
508 pub create_stream_on_read: Maybe<bool>,
509}
510
511impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
512 type Error = types::ValidationError;
513
514 fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
515 let BasinReconfiguration {
516 default_stream_config,
517 stream_cipher,
518 create_stream_on_append,
519 create_stream_on_read,
520 } = value;
521
522 Ok(Self {
523 default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
524 stream_cipher: stream_cipher.map_opt(Into::into),
525 create_stream_on_append: create_stream_on_append.map(Into::into),
526 create_stream_on_read: create_stream_on_read.map(Into::into),
527 })
528 }
529}
530
531impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
532 fn from(value: types::config::BasinReconfiguration) -> Self {
533 let types::config::BasinReconfiguration {
534 default_stream_config,
535 stream_cipher,
536 create_stream_on_append,
537 create_stream_on_read,
538 } = value;
539
540 Self {
541 default_stream_config: default_stream_config.map_opt(Into::into),
542 stream_cipher: stream_cipher.map_opt(Into::into),
543 create_stream_on_append: create_stream_on_append.map(Into::into),
544 create_stream_on_read: create_stream_on_read.map(Into::into),
545 }
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use proptest::prelude::*;
552
553 use super::*;
554
555 fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
556 prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
557 }
558
559 fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
560 prop_oneof![
561 Just(TimestampingMode::ClientPrefer),
562 Just(TimestampingMode::ClientRequire),
563 Just(TimestampingMode::Arrival),
564 ]
565 }
566
567 fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
568 prop_oneof![
569 any::<u64>().prop_map(RetentionPolicy::Age),
570 Just(RetentionPolicy::Infinite(InfiniteRetention {})),
571 ]
572 }
573
574 fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
575 (
576 proptest::option::of(gen_timestamping_mode()),
577 proptest::option::of(any::<bool>()),
578 )
579 .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
580 }
581
582 fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
583 any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
584 }
585
586 fn gen_encryption_algorithm() -> impl Strategy<Value = EncryptionAlgorithm> {
587 prop_oneof![
588 Just(EncryptionAlgorithm::Aegis256),
589 Just(EncryptionAlgorithm::Aes256Gcm),
590 ]
591 }
592
593 fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
594 (
595 proptest::option::of(gen_storage_class()),
596 proptest::option::of(gen_retention_policy()),
597 proptest::option::of(gen_timestamping_config()),
598 proptest::option::of(gen_delete_on_empty_config()),
599 )
600 .prop_map(
601 |(storage_class, retention_policy, timestamping, delete_on_empty)| StreamConfig {
602 storage_class,
603 retention_policy,
604 timestamping,
605 delete_on_empty,
606 },
607 )
608 }
609
610 fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
611 (
612 proptest::option::of(gen_stream_config()),
613 proptest::option::of(gen_encryption_algorithm()),
614 any::<bool>(),
615 any::<bool>(),
616 )
617 .prop_map(
618 |(
619 default_stream_config,
620 stream_cipher,
621 create_stream_on_append,
622 create_stream_on_read,
623 )| {
624 BasinConfig {
625 default_stream_config,
626 stream_cipher,
627 create_stream_on_append,
628 create_stream_on_read,
629 }
630 },
631 )
632 }
633
634 fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
635 inner: impl Strategy<Value = T>,
636 ) -> impl Strategy<Value = Maybe<Option<T>>> {
637 prop_oneof![
638 Just(Maybe::Unspecified),
639 Just(Maybe::Specified(None)),
640 inner.prop_map(|v| Maybe::Specified(Some(v))),
641 ]
642 }
643
644 fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
645 (
646 gen_maybe(gen_storage_class()),
647 gen_maybe(gen_retention_policy()),
648 gen_maybe(gen_timestamping_reconfiguration()),
649 gen_maybe(gen_delete_on_empty_reconfiguration()),
650 )
651 .prop_map(
652 |(storage_class, retention_policy, timestamping, delete_on_empty)| {
653 StreamReconfiguration {
654 storage_class,
655 retention_policy,
656 timestamping,
657 delete_on_empty,
658 }
659 },
660 )
661 }
662
663 fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
664 (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
665 .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
666 }
667
668 fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
669 {
670 gen_maybe(any::<u64>())
671 .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
672 }
673
674 fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
675 (
676 gen_maybe(gen_stream_reconfiguration()),
677 gen_maybe(gen_encryption_algorithm()),
678 prop_oneof![
679 Just(Maybe::Unspecified),
680 any::<bool>().prop_map(Maybe::Specified),
681 ],
682 prop_oneof![
683 Just(Maybe::Unspecified),
684 any::<bool>().prop_map(Maybe::Specified),
685 ],
686 )
687 .prop_map(
688 |(
689 default_stream_config,
690 stream_cipher,
691 create_stream_on_append,
692 create_stream_on_read,
693 )| BasinReconfiguration {
694 default_stream_config,
695 stream_cipher,
696 create_stream_on_append,
697 create_stream_on_read,
698 },
699 )
700 }
701
702 fn gen_internal_optional_stream_config()
703 -> impl Strategy<Value = types::config::OptionalStreamConfig> {
704 (
705 proptest::option::of(gen_storage_class()),
706 proptest::option::of(gen_retention_policy()),
707 proptest::option::of(gen_timestamping_mode()),
708 proptest::option::of(any::<bool>()),
709 proptest::option::of(any::<u64>()),
710 )
711 .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe)| {
712 types::config::OptionalStreamConfig {
713 storage_class: sc.map(Into::into),
714 retention_policy: rp.map(|rp| match rp {
715 RetentionPolicy::Age(secs) => {
716 types::config::RetentionPolicy::Age(Duration::from_secs(secs))
717 }
718 RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
719 }),
720 timestamping: types::config::OptionalTimestampingConfig {
721 mode: ts_mode.map(Into::into),
722 uncapped: ts_uncapped,
723 },
724 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
725 min_age: doe.map(Duration::from_secs),
726 },
727 }
728 })
729 }
730
731 proptest! {
732 #[test]
733 fn stream_config_conversion_validates(config in gen_stream_config()) {
734 let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
735 let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
736
737 if has_zero_age {
738 prop_assert!(result.is_err());
739 } else {
740 prop_assert!(result.is_ok());
741 }
742 }
743
744 #[test]
745 fn basin_config_conversion_validates(config in gen_basin_config()) {
746 let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
747 matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
748 });
749
750 let result: Result<types::config::BasinConfig, _> = config.try_into();
751
752 if has_invalid_config {
753 prop_assert!(result.is_err());
754 } else {
755 prop_assert!(result.is_ok());
756 }
757 }
758
759 #[test]
760 fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
761 let has_zero_age = matches!(
762 reconfig.retention_policy,
763 Maybe::Specified(Some(RetentionPolicy::Age(0)))
764 );
765 let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
766
767 if has_zero_age {
768 prop_assert!(result.is_err());
769 } else {
770 prop_assert!(result.is_ok());
771 }
772 }
773
774 #[test]
775 fn merge_stream_or_basin_or_default(
776 stream in gen_internal_optional_stream_config(),
777 basin in gen_internal_optional_stream_config(),
778 ) {
779 let merged = stream.clone().merge(basin.clone());
780
781 prop_assert_eq!(
782 merged.storage_class,
783 stream.storage_class.or(basin.storage_class).unwrap_or_default()
784 );
785 prop_assert_eq!(
786 merged.retention_policy,
787 stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
788 );
789 prop_assert_eq!(
790 merged.timestamping.mode,
791 stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
792 );
793 prop_assert_eq!(
794 merged.timestamping.uncapped,
795 stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
796 );
797 prop_assert_eq!(
798 merged.delete_on_empty.min_age,
799 stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
800 );
801 }
802
803 #[test]
804 fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
805 let reconfig = types::config::StreamReconfiguration::default();
806 let result = base.clone().reconfigure(reconfig);
807
808 prop_assert_eq!(result.storage_class, base.storage_class);
809 prop_assert_eq!(result.retention_policy, base.retention_policy);
810 prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
811 prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
812 prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
813 }
814
815 #[test]
816 fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
817 let reconfig = types::config::StreamReconfiguration {
818 storage_class: Maybe::Specified(None),
819 retention_policy: Maybe::Specified(None),
820 timestamping: Maybe::Specified(None),
821 delete_on_empty: Maybe::Specified(None),
822 };
823 let result = base.reconfigure(reconfig);
824
825 prop_assert!(result.storage_class.is_none());
826 prop_assert!(result.retention_policy.is_none());
827 prop_assert!(result.timestamping.mode.is_none());
828 prop_assert!(result.timestamping.uncapped.is_none());
829 prop_assert!(result.delete_on_empty.min_age.is_none());
830 }
831
832 #[test]
833 fn reconfigure_specified_some_sets_value(
834 base in gen_internal_optional_stream_config(),
835 new_sc in gen_storage_class(),
836 new_rp_secs in 1u64..u64::MAX,
837 ) {
838 let reconfig = types::config::StreamReconfiguration {
839 storage_class: Maybe::Specified(Some(new_sc.into())),
840 retention_policy: Maybe::Specified(Some(
841 types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
842 )),
843 ..Default::default()
844 };
845 let result = base.reconfigure(reconfig);
846
847 prop_assert_eq!(result.storage_class, Some(new_sc.into()));
848 prop_assert_eq!(
849 result.retention_policy,
850 Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
851 );
852 }
853
854 #[test]
855 fn to_opt_returns_some_for_non_defaults(
856 sc in gen_storage_class(),
857 doe_secs in 1u64..u64::MAX,
858 ts_mode in gen_timestamping_mode(),
859 ) {
860 let internal = types::config::OptionalStreamConfig {
862 storage_class: Some(sc.into()),
863 ..Default::default()
864 };
865 prop_assert!(StreamConfig::to_opt(internal).is_some());
866
867 let internal = types::config::OptionalDeleteOnEmptyConfig {
869 min_age: Some(Duration::from_secs(doe_secs)),
870 };
871 let api = DeleteOnEmptyConfig::to_opt(internal);
872 prop_assert!(api.is_some());
873 prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
874
875 let internal = types::config::OptionalTimestampingConfig {
877 mode: Some(ts_mode.into()),
878 uncapped: None,
879 };
880 prop_assert!(TimestampingConfig::to_opt(internal).is_some());
881 }
882
883 #[test]
884 fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
885 let has_zero_age = matches!(
886 &reconfig.default_stream_config,
887 Maybe::Specified(Some(sr)) if matches!(
888 sr.retention_policy,
889 Maybe::Specified(Some(RetentionPolicy::Age(0)))
890 )
891 );
892 let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
893
894 if has_zero_age {
895 prop_assert!(result.is_err());
896 } else {
897 prop_assert!(result.is_ok());
898 }
899 }
900
901 #[test]
902 fn reconfigure_basin_unspecified_preserves(
903 base_sc in proptest::option::of(gen_storage_class()),
904 base_algorithm in proptest::option::of(gen_encryption_algorithm()),
905 base_on_append in any::<bool>(),
906 base_on_read in any::<bool>(),
907 ) {
908 let base = types::config::BasinConfig {
909 default_stream_config: types::config::OptionalStreamConfig {
910 storage_class: base_sc.map(Into::into),
911 ..Default::default()
912 },
913 stream_cipher: base_algorithm.map(Into::into),
914 create_stream_on_append: base_on_append,
915 create_stream_on_read: base_on_read,
916 };
917
918 let reconfig = types::config::BasinReconfiguration::default();
919 let result = base.clone().reconfigure(reconfig);
920
921 prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
922 prop_assert_eq!(result.stream_cipher, base.stream_cipher);
923 prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
924 prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
925 }
926
927 #[test]
928 fn reconfigure_basin_specified_updates(
929 base_on_append in any::<bool>(),
930 new_on_append in any::<bool>(),
931 new_sc in gen_storage_class(),
932 new_algorithm in gen_encryption_algorithm(),
933 ) {
934 let base = types::config::BasinConfig {
935 create_stream_on_append: base_on_append,
936 ..Default::default()
937 };
938
939 let reconfig = types::config::BasinReconfiguration {
940 default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
941 storage_class: Maybe::Specified(Some(new_sc.into())),
942 ..Default::default()
943 })),
944 stream_cipher: Maybe::Specified(Some(new_algorithm.into())),
945 create_stream_on_append: Maybe::Specified(new_on_append),
946 ..Default::default()
947 };
948 let result = base.reconfigure(reconfig);
949
950 prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
951 prop_assert_eq!(result.stream_cipher, Some(new_algorithm.into()));
952 prop_assert_eq!(result.create_stream_on_append, new_on_append);
953 }
954
955 #[test]
956 fn reconfigure_nested_partial_update(
957 base_mode in gen_timestamping_mode(),
958 base_uncapped in any::<bool>(),
959 new_mode in gen_timestamping_mode(),
960 ) {
961 let base = types::config::OptionalStreamConfig {
962 timestamping: types::config::OptionalTimestampingConfig {
963 mode: Some(base_mode.into()),
964 uncapped: Some(base_uncapped),
965 },
966 ..Default::default()
967 };
968
969 let expected_mode: types::config::TimestampingMode = new_mode.into();
970
971 let reconfig = types::config::StreamReconfiguration {
972 timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
973 mode: Maybe::Specified(Some(expected_mode)),
974 uncapped: Maybe::Unspecified,
975 })),
976 ..Default::default()
977 };
978 let result = base.reconfigure(reconfig);
979
980 prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
981 prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
982 }
983 }
984
985 #[test]
986 fn to_opt_returns_none_for_defaults() {
987 assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
989
990 let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
992 let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
993 min_age: Some(Duration::ZERO),
994 };
995 assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
996 assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
997
998 assert!(
1000 TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1001 .is_none()
1002 );
1003 }
1004
1005 #[test]
1006 fn empty_json_converts_to_all_none() {
1007 let json = serde_json::json!({});
1008 let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1009 let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1010
1011 assert!(
1012 internal.storage_class.is_none(),
1013 "storage_class should be None"
1014 );
1015 assert!(
1016 internal.retention_policy.is_none(),
1017 "retention_policy should be None"
1018 );
1019 assert!(
1020 internal.timestamping.mode.is_none(),
1021 "timestamping.mode should be None"
1022 );
1023 assert!(
1024 internal.timestamping.uncapped.is_none(),
1025 "timestamping.uncapped should be None"
1026 );
1027 assert!(
1028 internal.delete_on_empty.min_age.is_none(),
1029 "delete_on_empty.min_age should be None"
1030 );
1031 }
1032}