1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11 Standard,
13 Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18 fn from(value: StorageClass) -> Self {
19 match value {
20 StorageClass::Express => Self::Express,
21 StorageClass::Standard => Self::Standard,
22 }
23 }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27 fn from(value: types::config::StorageClass) -> Self {
28 match value {
29 types::config::StorageClass::Express => Self::Express,
30 types::config::StorageClass::Standard => Self::Standard,
31 }
32 }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40 Age(u64),
43 Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53 type Error = types::ValidationError;
54
55 fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56 match value {
57 RetentionPolicy::Age(0) => Err(types::ValidationError(
58 "age must be greater than 0 seconds".to_string(),
59 )),
60 RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61 RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62 }
63 }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67 fn from(value: types::config::RetentionPolicy) -> Self {
68 match value {
69 types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70 types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71 }
72 }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80 #[default]
82 ClientPrefer,
83 ClientRequire,
85 Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90 fn from(value: TimestampingMode) -> Self {
91 match value {
92 TimestampingMode::ClientPrefer => Self::ClientPrefer,
93 TimestampingMode::ClientRequire => Self::ClientRequire,
94 TimestampingMode::Arrival => Self::Arrival,
95 }
96 }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100 fn from(value: types::config::TimestampingMode) -> Self {
101 match value {
102 types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103 types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104 types::config::TimestampingMode::Arrival => Self::Arrival,
105 }
106 }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113 pub mode: Option<TimestampingMode>,
115 pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121 pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122 let config = TimestampingConfig {
123 mode: config.mode.map(Into::into),
124 uncapped: config.uncapped,
125 };
126 if config == Self::default() {
127 None
128 } else {
129 Some(config)
130 }
131 }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135 fn from(value: types::config::TimestampingConfig) -> Self {
136 Self {
137 mode: Some(value.mode.into()),
138 uncapped: Some(value.uncapped),
139 }
140 }
141}
142
143impl From<types::config::OptionalTimestampingConfig> for TimestampingConfig {
144 fn from(value: types::config::OptionalTimestampingConfig) -> Self {
145 Self {
146 mode: value.mode.map(Into::into),
147 uncapped: value.uncapped,
148 }
149 }
150}
151
152impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
153 fn from(value: TimestampingConfig) -> Self {
154 Self {
155 mode: value.mode.map(Into::into),
156 uncapped: value.uncapped,
157 }
158 }
159}
160
161#[rustfmt::skip]
162#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
163#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
164pub struct TimestampingReconfiguration {
165 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
167 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
168 pub mode: Maybe<Option<TimestampingMode>>,
169 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
171 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
172 pub uncapped: Maybe<Option<bool>>,
173}
174
175impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
176 fn from(value: TimestampingReconfiguration) -> Self {
177 Self {
178 mode: value.mode.map_opt(Into::into),
179 uncapped: value.uncapped,
180 }
181 }
182}
183
184impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
185 fn from(value: types::config::TimestampingReconfiguration) -> Self {
186 Self {
187 mode: value.mode.map_opt(Into::into),
188 uncapped: value.uncapped,
189 }
190 }
191}
192
193#[rustfmt::skip]
194#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
195#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
196pub struct DeleteOnEmptyConfig {
197 #[serde(default)]
200 pub min_age_secs: u64,
201}
202
203impl DeleteOnEmptyConfig {
204 pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
205 let min_age = config.min_age.unwrap_or_default();
206 if min_age > Duration::ZERO {
207 Some(DeleteOnEmptyConfig {
208 min_age_secs: min_age.as_secs(),
209 })
210 } else {
211 None
212 }
213 }
214}
215
216impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
217 fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
218 Self {
219 min_age_secs: value.min_age.as_secs(),
220 }
221 }
222}
223
224impl From<types::config::OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
225 fn from(value: types::config::OptionalDeleteOnEmptyConfig) -> Self {
226 Self {
227 min_age_secs: value.min_age.unwrap_or_default().as_secs(),
228 }
229 }
230}
231
232impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
233 fn from(value: DeleteOnEmptyConfig) -> Self {
234 Self {
235 min_age: Duration::from_secs(value.min_age_secs),
236 }
237 }
238}
239
240impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
241 fn from(value: DeleteOnEmptyConfig) -> Self {
242 Self {
243 min_age: Some(Duration::from_secs(value.min_age_secs)),
244 }
245 }
246}
247
248#[rustfmt::skip]
249#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
250#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
251pub struct DeleteOnEmptyReconfiguration {
252 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
255 #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
256 pub min_age_secs: Maybe<Option<u64>>,
257}
258
259impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
260 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
261 Self {
262 min_age: value.min_age_secs.map_opt(Duration::from_secs),
263 }
264 }
265}
266
267impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
268 fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
269 Self {
270 min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
271 }
272 }
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
276#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
277pub enum EncryptionAlgorithm {
278 #[serde(rename = "aegis-256")]
280 Aegis256,
281 #[serde(rename = "aes-256-gcm")]
283 Aes256Gcm,
284}
285
286impl From<EncryptionAlgorithm> for encryption::EncryptionAlgorithm {
287 fn from(value: EncryptionAlgorithm) -> Self {
288 match value {
289 EncryptionAlgorithm::Aegis256 => Self::Aegis256,
290 EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
291 }
292 }
293}
294
295impl From<encryption::EncryptionAlgorithm> for EncryptionAlgorithm {
296 fn from(value: encryption::EncryptionAlgorithm) -> Self {
297 match value {
298 encryption::EncryptionAlgorithm::Aegis256 => Self::Aegis256,
299 encryption::EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
300 }
301 }
302}
303
304#[rustfmt::skip]
305#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
306#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
307pub struct StreamConfig {
308 pub storage_class: Option<StorageClass>,
310 pub retention_policy: Option<RetentionPolicy>,
313 pub timestamping: Option<TimestampingConfig>,
315 #[serde(default)]
317 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
318}
319
320impl StreamConfig {
321 pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
322 let types::config::OptionalStreamConfig {
323 storage_class,
324 retention_policy,
325 timestamping,
326 delete_on_empty,
327 } = config;
328
329 let config = StreamConfig {
330 storage_class: storage_class.map(Into::into),
331 retention_policy: retention_policy.map(Into::into),
332 timestamping: TimestampingConfig::to_opt(timestamping),
333 delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
334 };
335 if config == Self::default() {
336 None
337 } else {
338 Some(config)
339 }
340 }
341}
342
343impl From<types::config::StreamConfig> for StreamConfig {
344 fn from(value: types::config::StreamConfig) -> Self {
345 let types::config::StreamConfig {
346 storage_class,
347 retention_policy,
348 timestamping,
349 delete_on_empty,
350 } = value;
351
352 Self {
353 storage_class: Some(storage_class.into()),
354 retention_policy: Some(retention_policy.into()),
355 timestamping: Some(timestamping.into()),
356 delete_on_empty: Some(delete_on_empty.into()),
357 }
358 }
359}
360
361impl From<types::config::OptionalStreamConfig> for StreamConfig {
362 fn from(value: types::config::OptionalStreamConfig) -> Self {
363 let types::config::OptionalStreamConfig {
364 storage_class,
365 retention_policy,
366 timestamping,
367 delete_on_empty,
368 } = value;
369
370 let timestamping = (timestamping.mode.is_some() || timestamping.uncapped.is_some())
371 .then(|| timestamping.into());
372 let delete_on_empty = delete_on_empty.min_age.map(|_| delete_on_empty.into());
373
374 Self {
375 storage_class: storage_class.map(Into::into),
376 retention_policy: retention_policy.map(Into::into),
377 timestamping,
378 delete_on_empty,
379 }
380 }
381}
382
383impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
384 type Error = types::ValidationError;
385
386 fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
387 let StreamConfig {
388 storage_class,
389 retention_policy,
390 timestamping,
391 delete_on_empty,
392 } = value;
393
394 let retention_policy = match retention_policy {
395 None => None,
396 Some(policy) => Some(policy.try_into()?),
397 };
398
399 Ok(Self {
400 storage_class: storage_class.map(Into::into),
401 retention_policy,
402 timestamping: timestamping.map(Into::into).unwrap_or_default(),
403 delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
404 })
405 }
406}
407
408#[rustfmt::skip]
409#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
410#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
411pub struct StreamReconfiguration {
412 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
414 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
415 pub storage_class: Maybe<Option<StorageClass>>,
416 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
419 #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
420 pub retention_policy: Maybe<Option<RetentionPolicy>>,
421 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
423 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
424 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
425 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
427 #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
428 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
429}
430
431impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
432 type Error = types::ValidationError;
433
434 fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
435 let StreamReconfiguration {
436 storage_class,
437 retention_policy,
438 timestamping,
439 delete_on_empty,
440 } = value;
441
442 Ok(Self {
443 storage_class: storage_class.map_opt(Into::into),
444 retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
445 timestamping: timestamping.map_opt(Into::into),
446 delete_on_empty: delete_on_empty.map_opt(Into::into),
447 })
448 }
449}
450
451impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
452 fn from(value: types::config::StreamReconfiguration) -> Self {
453 let types::config::StreamReconfiguration {
454 storage_class,
455 retention_policy,
456 timestamping,
457 delete_on_empty,
458 } = value;
459
460 Self {
461 storage_class: storage_class.map_opt(Into::into),
462 retention_policy: retention_policy.map_opt(Into::into),
463 timestamping: timestamping.map_opt(Into::into),
464 delete_on_empty: delete_on_empty.map_opt(Into::into),
465 }
466 }
467}
468
469#[rustfmt::skip]
470#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
471#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
472pub struct BasinConfig {
473 pub default_stream_config: Option<StreamConfig>,
475 pub stream_cipher: Option<EncryptionAlgorithm>,
477 #[serde(default)]
479 #[cfg_attr(feature = "utoipa", schema(default = false))]
480 pub create_stream_on_append: bool,
481 #[serde(default)]
483 #[cfg_attr(feature = "utoipa", schema(default = false))]
484 pub create_stream_on_read: bool,
485}
486
487impl TryFrom<BasinConfig> for types::config::BasinConfig {
488 type Error = types::ValidationError;
489
490 fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
491 let BasinConfig {
492 default_stream_config,
493 stream_cipher,
494 create_stream_on_append,
495 create_stream_on_read,
496 } = value;
497
498 Ok(Self {
499 default_stream_config: match default_stream_config {
500 Some(config) => config.try_into()?,
501 None => Default::default(),
502 },
503 stream_cipher: stream_cipher.map(Into::into),
504 create_stream_on_append,
505 create_stream_on_read,
506 })
507 }
508}
509
510impl From<types::config::BasinConfig> for BasinConfig {
511 fn from(value: types::config::BasinConfig) -> Self {
512 let types::config::BasinConfig {
513 default_stream_config,
514 stream_cipher,
515 create_stream_on_append,
516 create_stream_on_read,
517 } = value;
518
519 Self {
520 default_stream_config: StreamConfig::to_opt(default_stream_config),
521 stream_cipher: stream_cipher.map(Into::into),
522 create_stream_on_append,
523 create_stream_on_read,
524 }
525 }
526}
527
528#[rustfmt::skip]
529#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
530#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
531pub struct BasinReconfiguration {
532 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
534 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
535 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
536 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
538 #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionAlgorithm>))]
539 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
540 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
542 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
543 pub create_stream_on_append: Maybe<bool>,
544 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
546 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
547 pub create_stream_on_read: Maybe<bool>,
548}
549
550impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
551 type Error = types::ValidationError;
552
553 fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
554 let BasinReconfiguration {
555 default_stream_config,
556 stream_cipher,
557 create_stream_on_append,
558 create_stream_on_read,
559 } = value;
560
561 Ok(Self {
562 default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
563 stream_cipher: stream_cipher.map_opt(Into::into),
564 create_stream_on_append: create_stream_on_append.map(Into::into),
565 create_stream_on_read: create_stream_on_read.map(Into::into),
566 })
567 }
568}
569
570impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
571 fn from(value: types::config::BasinReconfiguration) -> Self {
572 let types::config::BasinReconfiguration {
573 default_stream_config,
574 stream_cipher,
575 create_stream_on_append,
576 create_stream_on_read,
577 } = value;
578
579 Self {
580 default_stream_config: default_stream_config.map_opt(Into::into),
581 stream_cipher: stream_cipher.map_opt(Into::into),
582 create_stream_on_append: create_stream_on_append.map(Into::into),
583 create_stream_on_read: create_stream_on_read.map(Into::into),
584 }
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use proptest::prelude::*;
591
592 use super::*;
593
594 fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
595 prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
596 }
597
598 fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
599 prop_oneof![
600 Just(TimestampingMode::ClientPrefer),
601 Just(TimestampingMode::ClientRequire),
602 Just(TimestampingMode::Arrival),
603 ]
604 }
605
606 fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
607 prop_oneof![
608 any::<u64>().prop_map(RetentionPolicy::Age),
609 Just(RetentionPolicy::Infinite(InfiniteRetention {})),
610 ]
611 }
612
613 fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
614 (
615 proptest::option::of(gen_timestamping_mode()),
616 proptest::option::of(any::<bool>()),
617 )
618 .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
619 }
620
621 fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
622 any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
623 }
624
625 fn gen_encryption_algorithm() -> impl Strategy<Value = EncryptionAlgorithm> {
626 prop_oneof![
627 Just(EncryptionAlgorithm::Aegis256),
628 Just(EncryptionAlgorithm::Aes256Gcm),
629 ]
630 }
631
632 fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
633 (
634 proptest::option::of(gen_storage_class()),
635 proptest::option::of(gen_retention_policy()),
636 proptest::option::of(gen_timestamping_config()),
637 proptest::option::of(gen_delete_on_empty_config()),
638 )
639 .prop_map(
640 |(storage_class, retention_policy, timestamping, delete_on_empty)| StreamConfig {
641 storage_class,
642 retention_policy,
643 timestamping,
644 delete_on_empty,
645 },
646 )
647 }
648
649 fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
650 (
651 proptest::option::of(gen_stream_config()),
652 proptest::option::of(gen_encryption_algorithm()),
653 any::<bool>(),
654 any::<bool>(),
655 )
656 .prop_map(
657 |(
658 default_stream_config,
659 stream_cipher,
660 create_stream_on_append,
661 create_stream_on_read,
662 )| {
663 BasinConfig {
664 default_stream_config,
665 stream_cipher,
666 create_stream_on_append,
667 create_stream_on_read,
668 }
669 },
670 )
671 }
672
673 fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
674 inner: impl Strategy<Value = T>,
675 ) -> impl Strategy<Value = Maybe<Option<T>>> {
676 prop_oneof![
677 Just(Maybe::Unspecified),
678 Just(Maybe::Specified(None)),
679 inner.prop_map(|v| Maybe::Specified(Some(v))),
680 ]
681 }
682
683 fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
684 (
685 gen_maybe(gen_storage_class()),
686 gen_maybe(gen_retention_policy()),
687 gen_maybe(gen_timestamping_reconfiguration()),
688 gen_maybe(gen_delete_on_empty_reconfiguration()),
689 )
690 .prop_map(
691 |(storage_class, retention_policy, timestamping, delete_on_empty)| {
692 StreamReconfiguration {
693 storage_class,
694 retention_policy,
695 timestamping,
696 delete_on_empty,
697 }
698 },
699 )
700 }
701
702 fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
703 (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
704 .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
705 }
706
707 fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
708 {
709 gen_maybe(any::<u64>())
710 .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
711 }
712
713 fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
714 (
715 gen_maybe(gen_stream_reconfiguration()),
716 gen_maybe(gen_encryption_algorithm()),
717 prop_oneof![
718 Just(Maybe::Unspecified),
719 any::<bool>().prop_map(Maybe::Specified),
720 ],
721 prop_oneof![
722 Just(Maybe::Unspecified),
723 any::<bool>().prop_map(Maybe::Specified),
724 ],
725 )
726 .prop_map(
727 |(
728 default_stream_config,
729 stream_cipher,
730 create_stream_on_append,
731 create_stream_on_read,
732 )| BasinReconfiguration {
733 default_stream_config,
734 stream_cipher,
735 create_stream_on_append,
736 create_stream_on_read,
737 },
738 )
739 }
740
741 fn gen_internal_optional_stream_config()
742 -> impl Strategy<Value = types::config::OptionalStreamConfig> {
743 (
744 proptest::option::of(gen_storage_class()),
745 proptest::option::of(gen_retention_policy()),
746 proptest::option::of(gen_timestamping_mode()),
747 proptest::option::of(any::<bool>()),
748 proptest::option::of(any::<u64>()),
749 )
750 .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe)| {
751 types::config::OptionalStreamConfig {
752 storage_class: sc.map(Into::into),
753 retention_policy: rp.map(|rp| match rp {
754 RetentionPolicy::Age(secs) => {
755 types::config::RetentionPolicy::Age(Duration::from_secs(secs))
756 }
757 RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
758 }),
759 timestamping: types::config::OptionalTimestampingConfig {
760 mode: ts_mode.map(Into::into),
761 uncapped: ts_uncapped,
762 },
763 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
764 min_age: doe.map(Duration::from_secs),
765 },
766 }
767 })
768 }
769
770 proptest! {
771 #[test]
772 fn stream_config_conversion_validates(config in gen_stream_config()) {
773 let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
774 let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
775
776 if has_zero_age {
777 prop_assert!(result.is_err());
778 } else {
779 prop_assert!(result.is_ok());
780 }
781 }
782
783 #[test]
784 fn basin_config_conversion_validates(config in gen_basin_config()) {
785 let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
786 matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
787 });
788
789 let result: Result<types::config::BasinConfig, _> = config.try_into();
790
791 if has_invalid_config {
792 prop_assert!(result.is_err());
793 } else {
794 prop_assert!(result.is_ok());
795 }
796 }
797
798 #[test]
799 fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
800 let has_zero_age = matches!(
801 reconfig.retention_policy,
802 Maybe::Specified(Some(RetentionPolicy::Age(0)))
803 );
804 let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
805
806 if has_zero_age {
807 prop_assert!(result.is_err());
808 } else {
809 prop_assert!(result.is_ok());
810 }
811 }
812
813 #[test]
814 fn merge_stream_or_basin_or_default(
815 stream in gen_internal_optional_stream_config(),
816 basin in gen_internal_optional_stream_config(),
817 ) {
818 let merged = stream.clone().merge(basin.clone());
819
820 prop_assert_eq!(
821 merged.storage_class,
822 stream.storage_class.or(basin.storage_class).unwrap_or_default()
823 );
824 prop_assert_eq!(
825 merged.retention_policy,
826 stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
827 );
828 prop_assert_eq!(
829 merged.timestamping.mode,
830 stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
831 );
832 prop_assert_eq!(
833 merged.timestamping.uncapped,
834 stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
835 );
836 prop_assert_eq!(
837 merged.delete_on_empty.min_age,
838 stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
839 );
840 }
841
842 #[test]
843 fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
844 let reconfig = types::config::StreamReconfiguration::default();
845 let result = base.clone().reconfigure(reconfig);
846
847 prop_assert_eq!(result.storage_class, base.storage_class);
848 prop_assert_eq!(result.retention_policy, base.retention_policy);
849 prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
850 prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
851 prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
852 }
853
854 #[test]
855 fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
856 let reconfig = types::config::StreamReconfiguration {
857 storage_class: Maybe::Specified(None),
858 retention_policy: Maybe::Specified(None),
859 timestamping: Maybe::Specified(None),
860 delete_on_empty: Maybe::Specified(None),
861 };
862 let result = base.reconfigure(reconfig);
863
864 prop_assert!(result.storage_class.is_none());
865 prop_assert!(result.retention_policy.is_none());
866 prop_assert!(result.timestamping.mode.is_none());
867 prop_assert!(result.timestamping.uncapped.is_none());
868 prop_assert!(result.delete_on_empty.min_age.is_none());
869 }
870
871 #[test]
872 fn reconfigure_specified_some_sets_value(
873 base in gen_internal_optional_stream_config(),
874 new_sc in gen_storage_class(),
875 new_rp_secs in 1u64..u64::MAX,
876 ) {
877 let reconfig = types::config::StreamReconfiguration {
878 storage_class: Maybe::Specified(Some(new_sc.into())),
879 retention_policy: Maybe::Specified(Some(
880 types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
881 )),
882 ..Default::default()
883 };
884 let result = base.reconfigure(reconfig);
885
886 prop_assert_eq!(result.storage_class, Some(new_sc.into()));
887 prop_assert_eq!(
888 result.retention_policy,
889 Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
890 );
891 }
892
893 #[test]
894 fn to_opt_returns_some_for_non_defaults(
895 sc in gen_storage_class(),
896 doe_secs in 1u64..u64::MAX,
897 ts_mode in gen_timestamping_mode(),
898 ) {
899 let internal = types::config::OptionalStreamConfig {
901 storage_class: Some(sc.into()),
902 ..Default::default()
903 };
904 prop_assert!(StreamConfig::to_opt(internal).is_some());
905
906 let internal = types::config::OptionalDeleteOnEmptyConfig {
908 min_age: Some(Duration::from_secs(doe_secs)),
909 };
910 let api = DeleteOnEmptyConfig::to_opt(internal);
911 prop_assert!(api.is_some());
912 prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
913
914 let internal = types::config::OptionalTimestampingConfig {
916 mode: Some(ts_mode.into()),
917 uncapped: None,
918 };
919 prop_assert!(TimestampingConfig::to_opt(internal).is_some());
920 }
921
922 #[test]
923 fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
924 let has_zero_age = matches!(
925 &reconfig.default_stream_config,
926 Maybe::Specified(Some(sr)) if matches!(
927 sr.retention_policy,
928 Maybe::Specified(Some(RetentionPolicy::Age(0)))
929 )
930 );
931 let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
932
933 if has_zero_age {
934 prop_assert!(result.is_err());
935 } else {
936 prop_assert!(result.is_ok());
937 }
938 }
939
940 #[test]
941 fn reconfigure_basin_unspecified_preserves(
942 base_sc in proptest::option::of(gen_storage_class()),
943 base_algorithm in proptest::option::of(gen_encryption_algorithm()),
944 base_on_append in any::<bool>(),
945 base_on_read in any::<bool>(),
946 ) {
947 let base = types::config::BasinConfig {
948 default_stream_config: types::config::OptionalStreamConfig {
949 storage_class: base_sc.map(Into::into),
950 ..Default::default()
951 },
952 stream_cipher: base_algorithm.map(Into::into),
953 create_stream_on_append: base_on_append,
954 create_stream_on_read: base_on_read,
955 };
956
957 let reconfig = types::config::BasinReconfiguration::default();
958 let result = base.clone().reconfigure(reconfig);
959
960 prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
961 prop_assert_eq!(result.stream_cipher, base.stream_cipher);
962 prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
963 prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
964 }
965
966 #[test]
967 fn reconfigure_basin_specified_updates(
968 base_on_append in any::<bool>(),
969 new_on_append in any::<bool>(),
970 new_sc in gen_storage_class(),
971 new_algorithm in gen_encryption_algorithm(),
972 ) {
973 let base = types::config::BasinConfig {
974 create_stream_on_append: base_on_append,
975 ..Default::default()
976 };
977
978 let reconfig = types::config::BasinReconfiguration {
979 default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
980 storage_class: Maybe::Specified(Some(new_sc.into())),
981 ..Default::default()
982 })),
983 stream_cipher: Maybe::Specified(Some(new_algorithm.into())),
984 create_stream_on_append: Maybe::Specified(new_on_append),
985 ..Default::default()
986 };
987 let result = base.reconfigure(reconfig);
988
989 prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
990 prop_assert_eq!(result.stream_cipher, Some(new_algorithm.into()));
991 prop_assert_eq!(result.create_stream_on_append, new_on_append);
992 }
993
994 #[test]
995 fn reconfigure_nested_partial_update(
996 base_mode in gen_timestamping_mode(),
997 base_uncapped in any::<bool>(),
998 new_mode in gen_timestamping_mode(),
999 ) {
1000 let base = types::config::OptionalStreamConfig {
1001 timestamping: types::config::OptionalTimestampingConfig {
1002 mode: Some(base_mode.into()),
1003 uncapped: Some(base_uncapped),
1004 },
1005 ..Default::default()
1006 };
1007
1008 let expected_mode: types::config::TimestampingMode = new_mode.into();
1009
1010 let reconfig = types::config::StreamReconfiguration {
1011 timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1012 mode: Maybe::Specified(Some(expected_mode)),
1013 uncapped: Maybe::Unspecified,
1014 })),
1015 ..Default::default()
1016 };
1017 let result = base.reconfigure(reconfig);
1018
1019 prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1020 prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1021 }
1022 }
1023
1024 #[test]
1025 fn to_opt_returns_none_for_defaults() {
1026 assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1028
1029 let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1031 let doe_zero = types::config::OptionalDeleteOnEmptyConfig {
1032 min_age: Some(Duration::ZERO),
1033 };
1034 assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1035 assert!(DeleteOnEmptyConfig::to_opt(doe_zero).is_none());
1036
1037 assert!(
1039 TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1040 .is_none()
1041 );
1042 }
1043
1044 #[test]
1045 fn optional_stream_config_into_api_preserves_explicit_zero_delete_on_empty() {
1046 let api: StreamConfig = types::config::OptionalStreamConfig {
1047 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
1048 min_age: Some(Duration::ZERO),
1049 },
1050 ..Default::default()
1051 }
1052 .into();
1053
1054 assert_eq!(
1055 api.delete_on_empty,
1056 Some(DeleteOnEmptyConfig { min_age_secs: 0 })
1057 );
1058 }
1059
1060 #[test]
1061 fn optional_stream_config_into_api_preserves_nested_timestamping_omission() {
1062 let api: StreamConfig = types::config::OptionalStreamConfig {
1063 timestamping: types::config::OptionalTimestampingConfig {
1064 mode: Some(types::config::TimestampingMode::Arrival),
1065 uncapped: None,
1066 },
1067 ..Default::default()
1068 }
1069 .into();
1070
1071 assert_eq!(
1072 api.timestamping,
1073 Some(TimestampingConfig {
1074 mode: Some(TimestampingMode::Arrival),
1075 uncapped: None
1076 })
1077 );
1078 }
1079
1080 #[test]
1081 fn empty_json_converts_to_all_none() {
1082 let json = serde_json::json!({});
1083 let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1084 let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1085
1086 assert!(
1087 internal.storage_class.is_none(),
1088 "storage_class should be None"
1089 );
1090 assert!(
1091 internal.retention_policy.is_none(),
1092 "retention_policy should be None"
1093 );
1094 assert!(
1095 internal.timestamping.mode.is_none(),
1096 "timestamping.mode should be None"
1097 );
1098 assert!(
1099 internal.timestamping.uncapped.is_none(),
1100 "timestamping.uncapped should be None"
1101 );
1102 assert!(
1103 internal.delete_on_empty.min_age.is_none(),
1104 "delete_on_empty.min_age should be None"
1105 );
1106 }
1107}