1use std::time::Duration;
2
3use s2_common::{encryption, maybe::Maybe, types};
4use serde::{Deserialize, Serialize};
5
6#[rustfmt::skip]
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
9#[serde(rename_all = "kebab-case")]
10pub enum StorageClass {
11 Standard,
13 Express,
15}
16
17impl From<StorageClass> for types::config::StorageClass {
18 fn from(value: StorageClass) -> Self {
19 match value {
20 StorageClass::Express => Self::Express,
21 StorageClass::Standard => Self::Standard,
22 }
23 }
24}
25
26impl From<types::config::StorageClass> for StorageClass {
27 fn from(value: types::config::StorageClass) -> Self {
28 match value {
29 types::config::StorageClass::Express => Self::Express,
30 types::config::StorageClass::Standard => Self::Standard,
31 }
32 }
33}
34
35#[rustfmt::skip]
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
38#[serde(rename_all = "kebab-case")]
39pub enum RetentionPolicy {
40 Age(u64),
43 Infinite(InfiniteRetention)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
49#[serde(rename_all = "kebab-case")]
50pub struct InfiniteRetention {}
51
52impl TryFrom<RetentionPolicy> for types::config::RetentionPolicy {
53 type Error = types::ValidationError;
54
55 fn try_from(value: RetentionPolicy) -> Result<Self, Self::Error> {
56 match value {
57 RetentionPolicy::Age(0) => Err(types::ValidationError(
58 "age must be greater than 0 seconds".to_string(),
59 )),
60 RetentionPolicy::Age(age) => Ok(Self::Age(Duration::from_secs(age))),
61 RetentionPolicy::Infinite(_) => Ok(Self::Infinite()),
62 }
63 }
64}
65
66impl From<types::config::RetentionPolicy> for RetentionPolicy {
67 fn from(value: types::config::RetentionPolicy) -> Self {
68 match value {
69 types::config::RetentionPolicy::Age(age) => Self::Age(age.as_secs()),
70 types::config::RetentionPolicy::Infinite() => Self::Infinite(InfiniteRetention {}),
71 }
72 }
73}
74
75#[rustfmt::skip]
76#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
78#[serde(rename_all = "kebab-case")]
79pub enum TimestampingMode {
80 #[default]
82 ClientPrefer,
83 ClientRequire,
85 Arrival,
87}
88
89impl From<TimestampingMode> for types::config::TimestampingMode {
90 fn from(value: TimestampingMode) -> Self {
91 match value {
92 TimestampingMode::ClientPrefer => Self::ClientPrefer,
93 TimestampingMode::ClientRequire => Self::ClientRequire,
94 TimestampingMode::Arrival => Self::Arrival,
95 }
96 }
97}
98
99impl From<types::config::TimestampingMode> for TimestampingMode {
100 fn from(value: types::config::TimestampingMode) -> Self {
101 match value {
102 types::config::TimestampingMode::ClientPrefer => Self::ClientPrefer,
103 types::config::TimestampingMode::ClientRequire => Self::ClientRequire,
104 types::config::TimestampingMode::Arrival => Self::Arrival,
105 }
106 }
107}
108
109#[rustfmt::skip]
110#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
112pub struct TimestampingConfig {
113 pub mode: Option<TimestampingMode>,
115 pub uncapped: Option<bool>,
118}
119
120impl TimestampingConfig {
121 pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
122 let config = TimestampingConfig {
123 mode: config.mode.map(Into::into),
124 uncapped: config.uncapped,
125 };
126 if config == Self::default() {
127 None
128 } else {
129 Some(config)
130 }
131 }
132}
133
134impl From<types::config::TimestampingConfig> for TimestampingConfig {
135 fn from(value: types::config::TimestampingConfig) -> Self {
136 Self {
137 mode: Some(value.mode.into()),
138 uncapped: Some(value.uncapped),
139 }
140 }
141}
142
143impl From<types::config::OptionalTimestampingConfig> for TimestampingConfig {
144 fn from(value: types::config::OptionalTimestampingConfig) -> Self {
145 Self {
146 mode: value.mode.map(Into::into),
147 uncapped: value.uncapped,
148 }
149 }
150}
151
152impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
153 fn from(value: TimestampingConfig) -> Self {
154 Self {
155 mode: value.mode.map(Into::into),
156 uncapped: value.uncapped,
157 }
158 }
159}
160
161#[rustfmt::skip]
162#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
163#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
164pub struct TimestampingReconfiguration {
165 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
167 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingMode>))]
168 pub mode: Maybe<Option<TimestampingMode>>,
169 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
171 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
172 pub uncapped: Maybe<Option<bool>>,
173}
174
175impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
176 fn from(value: TimestampingReconfiguration) -> Self {
177 Self {
178 mode: value.mode.map_opt(Into::into),
179 uncapped: value.uncapped,
180 }
181 }
182}
183
184impl From<types::config::TimestampingReconfiguration> for TimestampingReconfiguration {
185 fn from(value: types::config::TimestampingReconfiguration) -> Self {
186 Self {
187 mode: value.mode.map_opt(Into::into),
188 uncapped: value.uncapped,
189 }
190 }
191}
192
193#[rustfmt::skip]
194#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
195#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
196pub struct DeleteOnEmptyConfig {
197 #[serde(default)]
200 pub min_age_secs: u64,
201}
202
203impl DeleteOnEmptyConfig {
204 pub fn to_opt(config: types::config::OptionalDeleteOnEmptyConfig) -> Option<Self> {
205 config.min_age.map(|min_age| DeleteOnEmptyConfig {
206 min_age_secs: min_age.as_secs(),
207 })
208 }
209}
210
211impl From<types::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
212 fn from(value: types::config::DeleteOnEmptyConfig) -> Self {
213 Self {
214 min_age_secs: value.min_age.as_secs(),
215 }
216 }
217}
218
219impl From<types::config::OptionalDeleteOnEmptyConfig> for DeleteOnEmptyConfig {
220 fn from(value: types::config::OptionalDeleteOnEmptyConfig) -> Self {
221 Self {
222 min_age_secs: value.min_age.unwrap_or_default().as_secs(),
223 }
224 }
225}
226
227impl From<DeleteOnEmptyConfig> for types::config::DeleteOnEmptyConfig {
228 fn from(value: DeleteOnEmptyConfig) -> Self {
229 Self {
230 min_age: Duration::from_secs(value.min_age_secs),
231 }
232 }
233}
234
235impl From<DeleteOnEmptyConfig> for types::config::OptionalDeleteOnEmptyConfig {
236 fn from(value: DeleteOnEmptyConfig) -> Self {
237 Self {
238 min_age: Some(Duration::from_secs(value.min_age_secs)),
239 }
240 }
241}
242
243#[rustfmt::skip]
244#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
245#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
246pub struct DeleteOnEmptyReconfiguration {
247 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
250 #[cfg_attr(feature = "utoipa", schema(value_type = Option<u64>))]
251 pub min_age_secs: Maybe<Option<u64>>,
252}
253
254impl From<DeleteOnEmptyReconfiguration> for types::config::DeleteOnEmptyReconfiguration {
255 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
256 Self {
257 min_age: value.min_age_secs.map_opt(Duration::from_secs),
258 }
259 }
260}
261
262impl From<types::config::DeleteOnEmptyReconfiguration> for DeleteOnEmptyReconfiguration {
263 fn from(value: types::config::DeleteOnEmptyReconfiguration) -> Self {
264 Self {
265 min_age_secs: value.min_age.map_opt(|d| d.as_secs()),
266 }
267 }
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
271#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
272pub enum EncryptionAlgorithm {
273 #[serde(rename = "aegis-256")]
275 Aegis256,
276 #[serde(rename = "aes-256-gcm")]
278 Aes256Gcm,
279}
280
281impl From<EncryptionAlgorithm> for encryption::EncryptionAlgorithm {
282 fn from(value: EncryptionAlgorithm) -> Self {
283 match value {
284 EncryptionAlgorithm::Aegis256 => Self::Aegis256,
285 EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
286 }
287 }
288}
289
290impl From<encryption::EncryptionAlgorithm> for EncryptionAlgorithm {
291 fn from(value: encryption::EncryptionAlgorithm) -> Self {
292 match value {
293 encryption::EncryptionAlgorithm::Aegis256 => Self::Aegis256,
294 encryption::EncryptionAlgorithm::Aes256Gcm => Self::Aes256Gcm,
295 }
296 }
297}
298
299#[rustfmt::skip]
300#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
301#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
302pub struct StreamConfig {
303 pub storage_class: Option<StorageClass>,
305 pub retention_policy: Option<RetentionPolicy>,
308 pub timestamping: Option<TimestampingConfig>,
310 #[serde(default)]
312 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
313}
314
315impl StreamConfig {
316 pub fn to_opt(config: types::config::OptionalStreamConfig) -> Option<Self> {
317 let types::config::OptionalStreamConfig {
318 storage_class,
319 retention_policy,
320 timestamping,
321 delete_on_empty,
322 } = config;
323
324 let config = StreamConfig {
325 storage_class: storage_class.map(Into::into),
326 retention_policy: retention_policy.map(Into::into),
327 timestamping: TimestampingConfig::to_opt(timestamping),
328 delete_on_empty: DeleteOnEmptyConfig::to_opt(delete_on_empty),
329 };
330 if config == Self::default() {
331 None
332 } else {
333 Some(config)
334 }
335 }
336}
337
338impl From<types::config::StreamConfig> for StreamConfig {
339 fn from(value: types::config::StreamConfig) -> Self {
340 let types::config::StreamConfig {
341 storage_class,
342 retention_policy,
343 timestamping,
344 delete_on_empty,
345 } = value;
346
347 Self {
348 storage_class: Some(storage_class.into()),
349 retention_policy: Some(retention_policy.into()),
350 timestamping: Some(timestamping.into()),
351 delete_on_empty: Some(delete_on_empty.into()),
352 }
353 }
354}
355
356impl From<types::config::OptionalStreamConfig> for StreamConfig {
357 fn from(value: types::config::OptionalStreamConfig) -> Self {
358 let types::config::OptionalStreamConfig {
359 storage_class,
360 retention_policy,
361 timestamping,
362 delete_on_empty,
363 } = value;
364
365 let timestamping = (timestamping.mode.is_some() || timestamping.uncapped.is_some())
366 .then(|| timestamping.into());
367 let delete_on_empty = delete_on_empty.min_age.map(|_| delete_on_empty.into());
368
369 Self {
370 storage_class: storage_class.map(Into::into),
371 retention_policy: retention_policy.map(Into::into),
372 timestamping,
373 delete_on_empty,
374 }
375 }
376}
377
378impl TryFrom<StreamConfig> for types::config::OptionalStreamConfig {
379 type Error = types::ValidationError;
380
381 fn try_from(value: StreamConfig) -> Result<Self, Self::Error> {
382 let StreamConfig {
383 storage_class,
384 retention_policy,
385 timestamping,
386 delete_on_empty,
387 } = value;
388
389 let retention_policy = match retention_policy {
390 None => None,
391 Some(policy) => Some(policy.try_into()?),
392 };
393
394 Ok(Self {
395 storage_class: storage_class.map(Into::into),
396 retention_policy,
397 timestamping: timestamping.map(Into::into).unwrap_or_default(),
398 delete_on_empty: delete_on_empty.map(Into::into).unwrap_or_default(),
399 })
400 }
401}
402
403#[rustfmt::skip]
404#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
405#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
406pub struct StreamReconfiguration {
407 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
409 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StorageClass>))]
410 pub storage_class: Maybe<Option<StorageClass>>,
411 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
414 #[cfg_attr(feature = "utoipa", schema(value_type = Option<RetentionPolicy>))]
415 pub retention_policy: Maybe<Option<RetentionPolicy>>,
416 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
418 #[cfg_attr(feature = "utoipa", schema(value_type = Option<TimestampingReconfiguration>))]
419 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
420 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
422 #[cfg_attr(feature = "utoipa", schema(value_type = Option<DeleteOnEmptyReconfiguration>))]
423 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
424}
425
426impl TryFrom<StreamReconfiguration> for types::config::StreamReconfiguration {
427 type Error = types::ValidationError;
428
429 fn try_from(value: StreamReconfiguration) -> Result<Self, Self::Error> {
430 let StreamReconfiguration {
431 storage_class,
432 retention_policy,
433 timestamping,
434 delete_on_empty,
435 } = value;
436
437 Ok(Self {
438 storage_class: storage_class.map_opt(Into::into),
439 retention_policy: retention_policy.try_map_opt(TryInto::try_into)?,
440 timestamping: timestamping.map_opt(Into::into),
441 delete_on_empty: delete_on_empty.map_opt(Into::into),
442 })
443 }
444}
445
446impl From<types::config::StreamReconfiguration> for StreamReconfiguration {
447 fn from(value: types::config::StreamReconfiguration) -> Self {
448 let types::config::StreamReconfiguration {
449 storage_class,
450 retention_policy,
451 timestamping,
452 delete_on_empty,
453 } = value;
454
455 Self {
456 storage_class: storage_class.map_opt(Into::into),
457 retention_policy: retention_policy.map_opt(Into::into),
458 timestamping: timestamping.map_opt(Into::into),
459 delete_on_empty: delete_on_empty.map_opt(Into::into),
460 }
461 }
462}
463
464#[rustfmt::skip]
465#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
466#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
467pub struct BasinConfig {
468 pub default_stream_config: Option<StreamConfig>,
470 pub stream_cipher: Option<EncryptionAlgorithm>,
472 #[serde(default)]
474 #[cfg_attr(feature = "utoipa", schema(default = false))]
475 pub create_stream_on_append: bool,
476 #[serde(default)]
478 #[cfg_attr(feature = "utoipa", schema(default = false))]
479 pub create_stream_on_read: bool,
480}
481
482impl TryFrom<BasinConfig> for types::config::BasinConfig {
483 type Error = types::ValidationError;
484
485 fn try_from(value: BasinConfig) -> Result<Self, Self::Error> {
486 let BasinConfig {
487 default_stream_config,
488 stream_cipher,
489 create_stream_on_append,
490 create_stream_on_read,
491 } = value;
492
493 Ok(Self {
494 default_stream_config: match default_stream_config {
495 Some(config) => config.try_into()?,
496 None => Default::default(),
497 },
498 stream_cipher: stream_cipher.map(Into::into),
499 create_stream_on_append,
500 create_stream_on_read,
501 })
502 }
503}
504
505impl From<types::config::BasinConfig> for BasinConfig {
506 fn from(value: types::config::BasinConfig) -> Self {
507 let types::config::BasinConfig {
508 default_stream_config,
509 stream_cipher,
510 create_stream_on_append,
511 create_stream_on_read,
512 } = value;
513
514 Self {
515 default_stream_config: StreamConfig::to_opt(default_stream_config),
516 stream_cipher: stream_cipher.map(Into::into),
517 create_stream_on_append,
518 create_stream_on_read,
519 }
520 }
521}
522
523#[rustfmt::skip]
524#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
525#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
526pub struct BasinReconfiguration {
527 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
529 #[cfg_attr(feature = "utoipa", schema(value_type = Option<StreamReconfiguration>))]
530 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
531 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
533 #[cfg_attr(feature = "utoipa", schema(value_type = Option<EncryptionAlgorithm>))]
534 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
535 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
537 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
538 pub create_stream_on_append: Maybe<bool>,
539 #[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
541 #[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
542 pub create_stream_on_read: Maybe<bool>,
543}
544
545impl TryFrom<BasinReconfiguration> for types::config::BasinReconfiguration {
546 type Error = types::ValidationError;
547
548 fn try_from(value: BasinReconfiguration) -> Result<Self, Self::Error> {
549 let BasinReconfiguration {
550 default_stream_config,
551 stream_cipher,
552 create_stream_on_append,
553 create_stream_on_read,
554 } = value;
555
556 Ok(Self {
557 default_stream_config: default_stream_config.try_map_opt(TryInto::try_into)?,
558 stream_cipher: stream_cipher.map_opt(Into::into),
559 create_stream_on_append: create_stream_on_append.map(Into::into),
560 create_stream_on_read: create_stream_on_read.map(Into::into),
561 })
562 }
563}
564
565impl From<types::config::BasinReconfiguration> for BasinReconfiguration {
566 fn from(value: types::config::BasinReconfiguration) -> Self {
567 let types::config::BasinReconfiguration {
568 default_stream_config,
569 stream_cipher,
570 create_stream_on_append,
571 create_stream_on_read,
572 } = value;
573
574 Self {
575 default_stream_config: default_stream_config.map_opt(Into::into),
576 stream_cipher: stream_cipher.map_opt(Into::into),
577 create_stream_on_append: create_stream_on_append.map(Into::into),
578 create_stream_on_read: create_stream_on_read.map(Into::into),
579 }
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 use proptest::prelude::*;
586
587 use super::*;
588
589 fn gen_storage_class() -> impl Strategy<Value = StorageClass> {
590 prop_oneof![Just(StorageClass::Standard), Just(StorageClass::Express)]
591 }
592
593 fn gen_timestamping_mode() -> impl Strategy<Value = TimestampingMode> {
594 prop_oneof![
595 Just(TimestampingMode::ClientPrefer),
596 Just(TimestampingMode::ClientRequire),
597 Just(TimestampingMode::Arrival),
598 ]
599 }
600
601 fn gen_retention_policy() -> impl Strategy<Value = RetentionPolicy> {
602 prop_oneof![
603 any::<u64>().prop_map(RetentionPolicy::Age),
604 Just(RetentionPolicy::Infinite(InfiniteRetention {})),
605 ]
606 }
607
608 fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
609 (
610 proptest::option::of(gen_timestamping_mode()),
611 proptest::option::of(any::<bool>()),
612 )
613 .prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
614 }
615
616 fn gen_delete_on_empty_config() -> impl Strategy<Value = DeleteOnEmptyConfig> {
617 any::<u64>().prop_map(|min_age_secs| DeleteOnEmptyConfig { min_age_secs })
618 }
619
620 fn gen_encryption_algorithm() -> impl Strategy<Value = EncryptionAlgorithm> {
621 prop_oneof![
622 Just(EncryptionAlgorithm::Aegis256),
623 Just(EncryptionAlgorithm::Aes256Gcm),
624 ]
625 }
626
627 fn gen_stream_config() -> impl Strategy<Value = StreamConfig> {
628 (
629 proptest::option::of(gen_storage_class()),
630 proptest::option::of(gen_retention_policy()),
631 proptest::option::of(gen_timestamping_config()),
632 proptest::option::of(gen_delete_on_empty_config()),
633 )
634 .prop_map(
635 |(storage_class, retention_policy, timestamping, delete_on_empty)| StreamConfig {
636 storage_class,
637 retention_policy,
638 timestamping,
639 delete_on_empty,
640 },
641 )
642 }
643
644 fn gen_basin_config() -> impl Strategy<Value = BasinConfig> {
645 (
646 proptest::option::of(gen_stream_config()),
647 proptest::option::of(gen_encryption_algorithm()),
648 any::<bool>(),
649 any::<bool>(),
650 )
651 .prop_map(
652 |(
653 default_stream_config,
654 stream_cipher,
655 create_stream_on_append,
656 create_stream_on_read,
657 )| {
658 BasinConfig {
659 default_stream_config,
660 stream_cipher,
661 create_stream_on_append,
662 create_stream_on_read,
663 }
664 },
665 )
666 }
667
668 fn gen_maybe<T: std::fmt::Debug + Clone + 'static>(
669 inner: impl Strategy<Value = T>,
670 ) -> impl Strategy<Value = Maybe<Option<T>>> {
671 prop_oneof![
672 Just(Maybe::Unspecified),
673 Just(Maybe::Specified(None)),
674 inner.prop_map(|v| Maybe::Specified(Some(v))),
675 ]
676 }
677
678 fn gen_stream_reconfiguration() -> impl Strategy<Value = StreamReconfiguration> {
679 (
680 gen_maybe(gen_storage_class()),
681 gen_maybe(gen_retention_policy()),
682 gen_maybe(gen_timestamping_reconfiguration()),
683 gen_maybe(gen_delete_on_empty_reconfiguration()),
684 )
685 .prop_map(
686 |(storage_class, retention_policy, timestamping, delete_on_empty)| {
687 StreamReconfiguration {
688 storage_class,
689 retention_policy,
690 timestamping,
691 delete_on_empty,
692 }
693 },
694 )
695 }
696
697 fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
698 (gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
699 .prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
700 }
701
702 fn gen_delete_on_empty_reconfiguration() -> impl Strategy<Value = DeleteOnEmptyReconfiguration>
703 {
704 gen_maybe(any::<u64>())
705 .prop_map(|min_age_secs| DeleteOnEmptyReconfiguration { min_age_secs })
706 }
707
708 fn gen_basin_reconfiguration() -> impl Strategy<Value = BasinReconfiguration> {
709 (
710 gen_maybe(gen_stream_reconfiguration()),
711 gen_maybe(gen_encryption_algorithm()),
712 prop_oneof![
713 Just(Maybe::Unspecified),
714 any::<bool>().prop_map(Maybe::Specified),
715 ],
716 prop_oneof![
717 Just(Maybe::Unspecified),
718 any::<bool>().prop_map(Maybe::Specified),
719 ],
720 )
721 .prop_map(
722 |(
723 default_stream_config,
724 stream_cipher,
725 create_stream_on_append,
726 create_stream_on_read,
727 )| BasinReconfiguration {
728 default_stream_config,
729 stream_cipher,
730 create_stream_on_append,
731 create_stream_on_read,
732 },
733 )
734 }
735
736 fn gen_internal_optional_stream_config()
737 -> impl Strategy<Value = types::config::OptionalStreamConfig> {
738 (
739 proptest::option::of(gen_storage_class()),
740 proptest::option::of(gen_retention_policy()),
741 proptest::option::of(gen_timestamping_mode()),
742 proptest::option::of(any::<bool>()),
743 proptest::option::of(any::<u64>()),
744 )
745 .prop_map(|(sc, rp, ts_mode, ts_uncapped, doe)| {
746 types::config::OptionalStreamConfig {
747 storage_class: sc.map(Into::into),
748 retention_policy: rp.map(|rp| match rp {
749 RetentionPolicy::Age(secs) => {
750 types::config::RetentionPolicy::Age(Duration::from_secs(secs))
751 }
752 RetentionPolicy::Infinite(_) => types::config::RetentionPolicy::Infinite(),
753 }),
754 timestamping: types::config::OptionalTimestampingConfig {
755 mode: ts_mode.map(Into::into),
756 uncapped: ts_uncapped,
757 },
758 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
759 min_age: doe.map(Duration::from_secs),
760 },
761 }
762 })
763 }
764
765 proptest! {
766 #[test]
767 fn stream_config_conversion_validates(config in gen_stream_config()) {
768 let has_zero_age = matches!(config.retention_policy, Some(RetentionPolicy::Age(0)));
769 let result: Result<types::config::OptionalStreamConfig, _> = config.try_into();
770
771 if has_zero_age {
772 prop_assert!(result.is_err());
773 } else {
774 prop_assert!(result.is_ok());
775 }
776 }
777
778 #[test]
779 fn basin_config_conversion_validates(config in gen_basin_config()) {
780 let has_invalid_config = config.default_stream_config.as_ref().is_some_and(|sc| {
781 matches!(sc.retention_policy, Some(RetentionPolicy::Age(0)))
782 });
783
784 let result: Result<types::config::BasinConfig, _> = config.try_into();
785
786 if has_invalid_config {
787 prop_assert!(result.is_err());
788 } else {
789 prop_assert!(result.is_ok());
790 }
791 }
792
793 #[test]
794 fn stream_reconfiguration_conversion_validates(reconfig in gen_stream_reconfiguration()) {
795 let has_zero_age = matches!(
796 reconfig.retention_policy,
797 Maybe::Specified(Some(RetentionPolicy::Age(0)))
798 );
799 let result: Result<types::config::StreamReconfiguration, _> = reconfig.try_into();
800
801 if has_zero_age {
802 prop_assert!(result.is_err());
803 } else {
804 prop_assert!(result.is_ok());
805 }
806 }
807
808 #[test]
809 fn merge_stream_or_basin_or_default(
810 stream in gen_internal_optional_stream_config(),
811 basin in gen_internal_optional_stream_config(),
812 ) {
813 let merged = stream.clone().merge(basin.clone());
814
815 prop_assert_eq!(
816 merged.storage_class,
817 stream.storage_class.or(basin.storage_class).unwrap_or_default()
818 );
819 prop_assert_eq!(
820 merged.retention_policy,
821 stream.retention_policy.or(basin.retention_policy).unwrap_or_default()
822 );
823 prop_assert_eq!(
824 merged.timestamping.mode,
825 stream.timestamping.mode.or(basin.timestamping.mode).unwrap_or_default()
826 );
827 prop_assert_eq!(
828 merged.timestamping.uncapped,
829 stream.timestamping.uncapped.or(basin.timestamping.uncapped).unwrap_or_default()
830 );
831 prop_assert_eq!(
832 merged.delete_on_empty.min_age,
833 stream.delete_on_empty.min_age.or(basin.delete_on_empty.min_age).unwrap_or_default()
834 );
835 }
836
837 #[test]
838 fn reconfigure_unspecified_preserves_base(base in gen_internal_optional_stream_config()) {
839 let reconfig = types::config::StreamReconfiguration::default();
840 let result = base.clone().reconfigure(reconfig);
841
842 prop_assert_eq!(result.storage_class, base.storage_class);
843 prop_assert_eq!(result.retention_policy, base.retention_policy);
844 prop_assert_eq!(result.timestamping.mode, base.timestamping.mode);
845 prop_assert_eq!(result.timestamping.uncapped, base.timestamping.uncapped);
846 prop_assert_eq!(result.delete_on_empty.min_age, base.delete_on_empty.min_age);
847 }
848
849 #[test]
850 fn reconfigure_specified_none_clears(base in gen_internal_optional_stream_config()) {
851 let reconfig = types::config::StreamReconfiguration {
852 storage_class: Maybe::Specified(None),
853 retention_policy: Maybe::Specified(None),
854 timestamping: Maybe::Specified(None),
855 delete_on_empty: Maybe::Specified(None),
856 };
857 let result = base.reconfigure(reconfig);
858
859 prop_assert!(result.storage_class.is_none());
860 prop_assert!(result.retention_policy.is_none());
861 prop_assert!(result.timestamping.mode.is_none());
862 prop_assert!(result.timestamping.uncapped.is_none());
863 prop_assert!(result.delete_on_empty.min_age.is_none());
864 }
865
866 #[test]
867 fn reconfigure_specified_some_sets_value(
868 base in gen_internal_optional_stream_config(),
869 new_sc in gen_storage_class(),
870 new_rp_secs in 1u64..u64::MAX,
871 ) {
872 let reconfig = types::config::StreamReconfiguration {
873 storage_class: Maybe::Specified(Some(new_sc.into())),
874 retention_policy: Maybe::Specified(Some(
875 types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs))
876 )),
877 ..Default::default()
878 };
879 let result = base.reconfigure(reconfig);
880
881 prop_assert_eq!(result.storage_class, Some(new_sc.into()));
882 prop_assert_eq!(
883 result.retention_policy,
884 Some(types::config::RetentionPolicy::Age(Duration::from_secs(new_rp_secs)))
885 );
886 }
887
888 #[test]
889 fn to_opt_returns_some_for_non_defaults(
890 sc in gen_storage_class(),
891 doe_secs in 1u64..u64::MAX,
892 ts_mode in gen_timestamping_mode(),
893 ) {
894 let internal = types::config::OptionalStreamConfig {
896 storage_class: Some(sc.into()),
897 ..Default::default()
898 };
899 prop_assert!(StreamConfig::to_opt(internal).is_some());
900
901 let internal = types::config::OptionalDeleteOnEmptyConfig {
903 min_age: Some(Duration::from_secs(doe_secs)),
904 };
905 let api = DeleteOnEmptyConfig::to_opt(internal);
906 prop_assert!(api.is_some());
907 prop_assert_eq!(api.unwrap().min_age_secs, doe_secs);
908
909 let internal = types::config::OptionalTimestampingConfig {
911 mode: Some(ts_mode.into()),
912 uncapped: None,
913 };
914 prop_assert!(TimestampingConfig::to_opt(internal).is_some());
915 }
916
917 #[test]
918 fn basin_reconfiguration_conversion_validates(reconfig in gen_basin_reconfiguration()) {
919 let has_zero_age = matches!(
920 &reconfig.default_stream_config,
921 Maybe::Specified(Some(sr)) if matches!(
922 sr.retention_policy,
923 Maybe::Specified(Some(RetentionPolicy::Age(0)))
924 )
925 );
926 let result: Result<types::config::BasinReconfiguration, _> = reconfig.try_into();
927
928 if has_zero_age {
929 prop_assert!(result.is_err());
930 } else {
931 prop_assert!(result.is_ok());
932 }
933 }
934
935 #[test]
936 fn reconfigure_basin_unspecified_preserves(
937 base_sc in proptest::option::of(gen_storage_class()),
938 base_algorithm in proptest::option::of(gen_encryption_algorithm()),
939 base_on_append in any::<bool>(),
940 base_on_read in any::<bool>(),
941 ) {
942 let base = types::config::BasinConfig {
943 default_stream_config: types::config::OptionalStreamConfig {
944 storage_class: base_sc.map(Into::into),
945 ..Default::default()
946 },
947 stream_cipher: base_algorithm.map(Into::into),
948 create_stream_on_append: base_on_append,
949 create_stream_on_read: base_on_read,
950 };
951
952 let reconfig = types::config::BasinReconfiguration::default();
953 let result = base.clone().reconfigure(reconfig);
954
955 prop_assert_eq!(result.default_stream_config.storage_class, base.default_stream_config.storage_class);
956 prop_assert_eq!(result.stream_cipher, base.stream_cipher);
957 prop_assert_eq!(result.create_stream_on_append, base.create_stream_on_append);
958 prop_assert_eq!(result.create_stream_on_read, base.create_stream_on_read);
959 }
960
961 #[test]
962 fn reconfigure_basin_specified_updates(
963 base_on_append in any::<bool>(),
964 new_on_append in any::<bool>(),
965 new_sc in gen_storage_class(),
966 new_algorithm in gen_encryption_algorithm(),
967 ) {
968 let base = types::config::BasinConfig {
969 create_stream_on_append: base_on_append,
970 ..Default::default()
971 };
972
973 let reconfig = types::config::BasinReconfiguration {
974 default_stream_config: Maybe::Specified(Some(types::config::StreamReconfiguration {
975 storage_class: Maybe::Specified(Some(new_sc.into())),
976 ..Default::default()
977 })),
978 stream_cipher: Maybe::Specified(Some(new_algorithm.into())),
979 create_stream_on_append: Maybe::Specified(new_on_append),
980 ..Default::default()
981 };
982 let result = base.reconfigure(reconfig);
983
984 prop_assert_eq!(result.default_stream_config.storage_class, Some(new_sc.into()));
985 prop_assert_eq!(result.stream_cipher, Some(new_algorithm.into()));
986 prop_assert_eq!(result.create_stream_on_append, new_on_append);
987 }
988
989 #[test]
990 fn reconfigure_nested_partial_update(
991 base_mode in gen_timestamping_mode(),
992 base_uncapped in any::<bool>(),
993 new_mode in gen_timestamping_mode(),
994 ) {
995 let base = types::config::OptionalStreamConfig {
996 timestamping: types::config::OptionalTimestampingConfig {
997 mode: Some(base_mode.into()),
998 uncapped: Some(base_uncapped),
999 },
1000 ..Default::default()
1001 };
1002
1003 let expected_mode: types::config::TimestampingMode = new_mode.into();
1004
1005 let reconfig = types::config::StreamReconfiguration {
1006 timestamping: Maybe::Specified(Some(types::config::TimestampingReconfiguration {
1007 mode: Maybe::Specified(Some(expected_mode)),
1008 uncapped: Maybe::Unspecified,
1009 })),
1010 ..Default::default()
1011 };
1012 let result = base.reconfigure(reconfig);
1013
1014 prop_assert_eq!(result.timestamping.mode, Some(expected_mode));
1015 prop_assert_eq!(result.timestamping.uncapped, Some(base_uncapped));
1016 }
1017 }
1018
1019 #[test]
1020 fn to_opt_returns_none_for_defaults() {
1021 assert!(StreamConfig::to_opt(types::config::OptionalStreamConfig::default()).is_none());
1023
1024 let doe_none = types::config::OptionalDeleteOnEmptyConfig { min_age: None };
1026 assert!(DeleteOnEmptyConfig::to_opt(doe_none).is_none());
1027
1028 assert!(
1030 TimestampingConfig::to_opt(types::config::OptionalTimestampingConfig::default())
1031 .is_none()
1032 );
1033 }
1034
1035 #[test]
1036 fn optional_stream_config_into_api_preserves_explicit_zero_delete_on_empty() {
1037 let api: StreamConfig = types::config::OptionalStreamConfig {
1038 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
1039 min_age: Some(Duration::ZERO),
1040 },
1041 ..Default::default()
1042 }
1043 .into();
1044
1045 assert_eq!(
1046 api.delete_on_empty,
1047 Some(DeleteOnEmptyConfig { min_age_secs: 0 })
1048 );
1049 }
1050
1051 #[test]
1052 fn optional_stream_config_to_opt_preserves_explicit_zero_delete_on_empty() {
1053 let api = StreamConfig::to_opt(types::config::OptionalStreamConfig {
1054 delete_on_empty: types::config::OptionalDeleteOnEmptyConfig {
1055 min_age: Some(Duration::ZERO),
1056 },
1057 ..Default::default()
1058 })
1059 .unwrap();
1060
1061 assert_eq!(
1062 api.delete_on_empty,
1063 Some(DeleteOnEmptyConfig { min_age_secs: 0 })
1064 );
1065 }
1066
1067 #[test]
1068 fn optional_stream_config_into_api_preserves_nested_timestamping_omission() {
1069 let api: StreamConfig = types::config::OptionalStreamConfig {
1070 timestamping: types::config::OptionalTimestampingConfig {
1071 mode: Some(types::config::TimestampingMode::Arrival),
1072 uncapped: None,
1073 },
1074 ..Default::default()
1075 }
1076 .into();
1077
1078 assert_eq!(
1079 api.timestamping,
1080 Some(TimestampingConfig {
1081 mode: Some(TimestampingMode::Arrival),
1082 uncapped: None
1083 })
1084 );
1085 }
1086
1087 #[test]
1088 fn empty_json_converts_to_all_none() {
1089 let json = serde_json::json!({});
1090 let parsed: StreamConfig = serde_json::from_value(json).unwrap();
1091 let internal: types::config::OptionalStreamConfig = parsed.try_into().unwrap();
1092
1093 assert!(
1094 internal.storage_class.is_none(),
1095 "storage_class should be None"
1096 );
1097 assert!(
1098 internal.retention_policy.is_none(),
1099 "retention_policy should be None"
1100 );
1101 assert!(
1102 internal.timestamping.mode.is_none(),
1103 "timestamping.mode should be None"
1104 );
1105 assert!(
1106 internal.timestamping.uncapped.is_none(),
1107 "timestamping.uncapped should be None"
1108 );
1109 assert!(
1110 internal.delete_on_empty.min_age.is_none(),
1111 "delete_on_empty.min_age should be None"
1112 );
1113 }
1114}