1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::encryption::EncryptionAlgorithm;
24pub use s2_common::encryption::EncryptionMode;
26pub use s2_common::encryption::EncryptionSpec;
28pub use s2_common::types::ValidationError;
30pub use s2_common::types::access::AccessTokenId;
34pub use s2_common::types::access::AccessTokenIdPrefix;
36pub use s2_common::types::access::AccessTokenIdStartAfter;
38pub use s2_common::types::basin::BasinName;
43pub use s2_common::types::basin::BasinNamePrefix;
45pub use s2_common::types::basin::BasinNameStartAfter;
47pub use s2_common::types::stream::StreamName;
51pub use s2_common::types::stream::StreamNamePrefix;
53pub use s2_common::types::stream::StreamNameStartAfter;
55
56pub(crate) const ONE_MIB: u32 = 1024 * 1024;
57
58use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
59use secrecy::SecretString;
60
61use crate::api::{ApiError, ApiErrorResponse};
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub struct S2DateTime(time::OffsetDateTime);
70
71impl TryFrom<time::OffsetDateTime> for S2DateTime {
72 type Error = ValidationError;
73
74 fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
75 dt.format(&time::format_description::well_known::Rfc3339)
76 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
77 Ok(Self(dt))
78 }
79}
80
81impl From<S2DateTime> for time::OffsetDateTime {
82 fn from(dt: S2DateTime) -> Self {
83 dt.0
84 }
85}
86
87impl FromStr for S2DateTime {
88 type Err = ValidationError;
89
90 fn from_str(s: &str) -> Result<Self, Self::Err> {
91 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
92 .map(Self)
93 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
94 }
95}
96
97impl fmt::Display for S2DateTime {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 write!(
100 f,
101 "{}",
102 self.0
103 .format(&time::format_description::well_known::Rfc3339)
104 .expect("RFC3339 formatting should not fail for S2DateTime")
105 )
106 }
107}
108
109#[derive(Debug, Clone, PartialEq)]
111pub(crate) enum BasinAuthority {
112 ParentZone(Authority),
114 Direct(Authority),
116}
117
118#[derive(Debug, Clone)]
120pub struct AccountEndpoint {
121 scheme: Scheme,
122 authority: Authority,
123}
124
125impl AccountEndpoint {
126 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
128 endpoint.parse()
129 }
130}
131
132impl FromStr for AccountEndpoint {
133 type Err = ValidationError;
134
135 fn from_str(s: &str) -> Result<Self, Self::Err> {
136 let (scheme, authority) = match s.find("://") {
137 Some(idx) => {
138 let scheme: Scheme = s[..idx]
139 .parse()
140 .map_err(|_| "invalid account endpoint scheme".to_string())?;
141 (scheme, &s[idx + 3..])
142 }
143 None => (Scheme::HTTPS, s),
144 };
145 Ok(Self {
146 scheme,
147 authority: authority
148 .parse()
149 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
150 })
151 }
152}
153
154#[derive(Debug, Clone)]
156pub struct BasinEndpoint {
157 scheme: Scheme,
158 authority: BasinAuthority,
159}
160
161impl BasinEndpoint {
162 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
164 endpoint.parse()
165 }
166}
167
168impl FromStr for BasinEndpoint {
169 type Err = ValidationError;
170
171 fn from_str(s: &str) -> Result<Self, Self::Err> {
172 let (scheme, authority) = match s.find("://") {
173 Some(idx) => {
174 let scheme: Scheme = s[..idx]
175 .parse()
176 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
177 (scheme, &s[idx + 3..])
178 }
179 None => (Scheme::HTTPS, s),
180 };
181 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
182 BasinAuthority::ParentZone(
183 authority
184 .parse()
185 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
186 )
187 } else {
188 BasinAuthority::Direct(
189 authority
190 .parse()
191 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
192 )
193 };
194 Ok(Self { scheme, authority })
195 }
196}
197
198#[derive(Debug, Clone)]
199#[non_exhaustive]
200pub struct S2Endpoints {
202 pub(crate) scheme: Scheme,
203 pub(crate) account_authority: Authority,
204 pub(crate) basin_authority: BasinAuthority,
205}
206
207impl S2Endpoints {
208 pub fn new(
210 account_endpoint: AccountEndpoint,
211 basin_endpoint: BasinEndpoint,
212 ) -> Result<Self, ValidationError> {
213 if account_endpoint.scheme != basin_endpoint.scheme {
214 return Err("account and basin endpoints must have the same scheme".into());
215 }
216 Ok(Self {
217 scheme: account_endpoint.scheme,
218 account_authority: account_endpoint.authority,
219 basin_authority: basin_endpoint.authority,
220 })
221 }
222
223 pub fn from_env() -> Result<Self, ValidationError> {
229 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
230 Ok(endpoint) => endpoint.parse()?,
231 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
232 Err(VarError::NotUnicode(_)) => {
233 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
234 }
235 };
236
237 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
238 Ok(endpoint) => endpoint.parse()?,
239 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
240 Err(VarError::NotUnicode(_)) => {
241 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
242 }
243 };
244
245 if account_endpoint.scheme != basin_endpoint.scheme {
246 return Err(
247 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
248 );
249 }
250
251 Ok(Self {
252 scheme: account_endpoint.scheme,
253 account_authority: account_endpoint.authority,
254 basin_authority: basin_endpoint.authority,
255 })
256 }
257
258 pub(crate) fn for_aws() -> Self {
259 Self {
260 scheme: Scheme::HTTPS,
261 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
262 basin_authority: BasinAuthority::ParentZone(
263 "b.s2.dev".try_into().expect("valid authority"),
264 ),
265 }
266 }
267}
268
269#[derive(Debug, Clone, Copy)]
270pub enum Compression {
272 None,
274 Gzip,
276 Zstd,
278}
279
280impl From<Compression> for CompressionAlgorithm {
281 fn from(value: Compression) -> Self {
282 match value {
283 Compression::None => CompressionAlgorithm::None,
284 Compression::Gzip => CompressionAlgorithm::Gzip,
285 Compression::Zstd => CompressionAlgorithm::Zstd,
286 }
287 }
288}
289
290#[derive(Debug, Clone, Copy, PartialEq)]
291#[non_exhaustive]
292pub enum AppendRetryPolicy {
295 All,
297 NoSideEffects,
307}
308
309#[derive(Debug, Clone)]
310#[non_exhaustive]
311pub struct RetryConfig {
320 pub max_attempts: NonZeroU32,
324 pub min_base_delay: Duration,
328 pub max_base_delay: Duration,
332 pub append_retry_policy: AppendRetryPolicy,
337}
338
339impl Default for RetryConfig {
340 fn default() -> Self {
341 Self {
342 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
343 min_base_delay: Duration::from_millis(100),
344 max_base_delay: Duration::from_secs(1),
345 append_retry_policy: AppendRetryPolicy::All,
346 }
347 }
348}
349
350impl RetryConfig {
351 pub fn new() -> Self {
353 Self::default()
354 }
355
356 pub(crate) fn max_retries(&self) -> u32 {
357 self.max_attempts.get() - 1
358 }
359
360 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
362 Self {
363 max_attempts,
364 ..self
365 }
366 }
367
368 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
370 Self {
371 min_base_delay,
372 ..self
373 }
374 }
375
376 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
378 Self {
379 max_base_delay,
380 ..self
381 }
382 }
383
384 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
387 Self {
388 append_retry_policy,
389 ..self
390 }
391 }
392}
393
394#[derive(Debug, Clone)]
395#[non_exhaustive]
396pub struct S2Config {
398 pub(crate) access_token: SecretString,
399 pub(crate) endpoints: S2Endpoints,
400 pub(crate) connection_timeout: Duration,
401 pub(crate) request_timeout: Duration,
402 pub(crate) retry: RetryConfig,
403 pub(crate) compression: Compression,
404 pub(crate) user_agent: HeaderValue,
405 pub(crate) insecure_skip_cert_verification: bool,
406}
407
408impl S2Config {
409 pub fn new(access_token: impl Into<String>) -> Self {
411 Self {
412 access_token: access_token.into().into(),
413 endpoints: S2Endpoints::for_aws(),
414 connection_timeout: Duration::from_secs(3),
415 request_timeout: Duration::from_secs(5),
416 retry: RetryConfig::new(),
417 compression: Compression::None,
418 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
419 .parse()
420 .expect("valid user agent"),
421 insecure_skip_cert_verification: false,
422 }
423 }
424
425 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
427 Self { endpoints, ..self }
428 }
429
430 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
434 Self {
435 connection_timeout,
436 ..self
437 }
438 }
439
440 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
444 Self {
445 request_timeout,
446 ..self
447 }
448 }
449
450 pub fn with_retry(self, retry: RetryConfig) -> Self {
454 Self { retry, ..self }
455 }
456
457 pub fn with_compression(self, compression: Compression) -> Self {
461 Self {
462 compression,
463 ..self
464 }
465 }
466
467 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
479 Self {
480 insecure_skip_cert_verification: skip,
481 ..self
482 }
483 }
484
485 #[doc(hidden)]
486 #[cfg(feature = "_hidden")]
487 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
488 let user_agent = user_agent
489 .into()
490 .parse()
491 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
492 Ok(Self { user_agent, ..self })
493 }
494}
495
496#[derive(Debug, Default, Clone, PartialEq, Eq)]
497#[non_exhaustive]
498pub struct Page<T> {
500 pub values: Vec<T>,
502 pub has_more: bool,
504}
505
506impl<T> Page<T> {
507 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
508 Self {
509 values: values.into(),
510 has_more,
511 }
512 }
513}
514
515#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516pub enum StorageClass {
518 Standard,
520 Express,
522}
523
524impl From<api::config::StorageClass> for StorageClass {
525 fn from(value: api::config::StorageClass) -> Self {
526 match value {
527 api::config::StorageClass::Standard => StorageClass::Standard,
528 api::config::StorageClass::Express => StorageClass::Express,
529 }
530 }
531}
532
533impl From<StorageClass> for api::config::StorageClass {
534 fn from(value: StorageClass) -> Self {
535 match value {
536 StorageClass::Standard => api::config::StorageClass::Standard,
537 StorageClass::Express => api::config::StorageClass::Express,
538 }
539 }
540}
541
542#[derive(Debug, Clone, Copy, PartialEq, Eq)]
543pub enum RetentionPolicy {
545 Age(u64),
547 Infinite,
549}
550
551impl From<api::config::RetentionPolicy> for RetentionPolicy {
552 fn from(value: api::config::RetentionPolicy) -> Self {
553 match value {
554 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
555 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
556 }
557 }
558}
559
560impl From<RetentionPolicy> for api::config::RetentionPolicy {
561 fn from(value: RetentionPolicy) -> Self {
562 match value {
563 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
564 RetentionPolicy::Infinite => {
565 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
566 }
567 }
568 }
569}
570
571#[derive(Debug, Clone, Copy, PartialEq, Eq)]
572pub enum TimestampingMode {
574 ClientPrefer,
576 ClientRequire,
578 Arrival,
580}
581
582impl From<api::config::TimestampingMode> for TimestampingMode {
583 fn from(value: api::config::TimestampingMode) -> Self {
584 match value {
585 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
586 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
587 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
588 }
589 }
590}
591
592impl From<TimestampingMode> for api::config::TimestampingMode {
593 fn from(value: TimestampingMode) -> Self {
594 match value {
595 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
596 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
597 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
598 }
599 }
600}
601
602#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
603#[non_exhaustive]
604pub struct TimestampingConfig {
606 pub mode: Option<TimestampingMode>,
610 pub uncapped: bool,
614}
615
616impl TimestampingConfig {
617 pub fn new() -> Self {
619 Self::default()
620 }
621
622 pub fn with_mode(self, mode: TimestampingMode) -> Self {
624 Self {
625 mode: Some(mode),
626 ..self
627 }
628 }
629
630 pub fn with_uncapped(self, uncapped: bool) -> Self {
632 Self { uncapped, ..self }
633 }
634}
635
636impl From<api::config::TimestampingConfig> for TimestampingConfig {
637 fn from(value: api::config::TimestampingConfig) -> Self {
638 Self {
639 mode: value.mode.map(Into::into),
640 uncapped: value.uncapped.unwrap_or_default(),
641 }
642 }
643}
644
645impl From<TimestampingConfig> for api::config::TimestampingConfig {
646 fn from(value: TimestampingConfig) -> Self {
647 Self {
648 mode: value.mode.map(Into::into),
649 uncapped: Some(value.uncapped),
650 }
651 }
652}
653
654#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
655#[non_exhaustive]
656pub struct DeleteOnEmptyConfig {
658 pub min_age_secs: u64,
662}
663
664impl DeleteOnEmptyConfig {
665 pub fn new() -> Self {
667 Self::default()
668 }
669
670 pub fn with_min_age(self, min_age: Duration) -> Self {
672 Self {
673 min_age_secs: min_age.as_secs(),
674 }
675 }
676}
677
678impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
679 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
680 Self {
681 min_age_secs: value.min_age_secs,
682 }
683 }
684}
685
686impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
687 fn from(value: DeleteOnEmptyConfig) -> Self {
688 Self {
689 min_age_secs: value.min_age_secs,
690 }
691 }
692}
693
694#[derive(Debug, Clone, Default, PartialEq, Eq)]
696#[non_exhaustive]
697pub struct EncryptionConfig {
698 pub allowed_modes: Vec<EncryptionMode>,
702}
703
704impl EncryptionConfig {
705 pub fn new() -> Self {
707 Self::default()
708 }
709
710 pub fn with_allowed_modes(self, allowed_modes: Vec<EncryptionMode>) -> Self {
712 Self { allowed_modes }
713 }
714}
715
716impl From<api::config::EncryptionConfig> for EncryptionConfig {
717 fn from(value: api::config::EncryptionConfig) -> Self {
718 Self {
719 allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
720 }
721 }
722}
723
724impl From<EncryptionConfig> for api::config::EncryptionConfig {
725 fn from(value: EncryptionConfig) -> Self {
726 Self {
727 allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
728 }
729 }
730}
731
732#[derive(Debug, Clone, Default, PartialEq, Eq)]
733#[non_exhaustive]
734pub struct StreamConfig {
736 pub storage_class: Option<StorageClass>,
740 pub retention_policy: Option<RetentionPolicy>,
744 pub timestamping: Option<TimestampingConfig>,
748 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
752 pub encryption: Option<EncryptionConfig>,
756}
757
758impl StreamConfig {
759 pub fn new() -> Self {
761 Self::default()
762 }
763
764 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
766 Self {
767 storage_class: Some(storage_class),
768 ..self
769 }
770 }
771
772 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
774 Self {
775 retention_policy: Some(retention_policy),
776 ..self
777 }
778 }
779
780 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
782 Self {
783 timestamping: Some(timestamping),
784 ..self
785 }
786 }
787
788 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
790 Self {
791 delete_on_empty: Some(delete_on_empty),
792 ..self
793 }
794 }
795
796 pub fn with_encryption(self, encryption: EncryptionConfig) -> Self {
798 Self {
799 encryption: Some(encryption),
800 ..self
801 }
802 }
803}
804
805impl From<api::config::StreamConfig> for StreamConfig {
806 fn from(value: api::config::StreamConfig) -> Self {
807 Self {
808 storage_class: value.storage_class.map(Into::into),
809 retention_policy: value.retention_policy.map(Into::into),
810 timestamping: value.timestamping.map(Into::into),
811 delete_on_empty: value.delete_on_empty.map(Into::into),
812 encryption: value.encryption.map(Into::into),
813 }
814 }
815}
816
817impl From<StreamConfig> for api::config::StreamConfig {
818 fn from(value: StreamConfig) -> Self {
819 Self {
820 storage_class: value.storage_class.map(Into::into),
821 retention_policy: value.retention_policy.map(Into::into),
822 timestamping: value.timestamping.map(Into::into),
823 delete_on_empty: value.delete_on_empty.map(Into::into),
824 encryption: value.encryption.map(Into::into),
825 }
826 }
827}
828
829#[derive(Debug, Clone, Default)]
830#[non_exhaustive]
831pub struct BasinConfig {
833 pub default_stream_config: Option<StreamConfig>,
837 pub create_stream_on_append: bool,
841 pub create_stream_on_read: bool,
845}
846
847impl BasinConfig {
848 pub fn new() -> Self {
850 Self::default()
851 }
852
853 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
855 Self {
856 default_stream_config: Some(config),
857 ..self
858 }
859 }
860
861 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
864 Self {
865 create_stream_on_append,
866 ..self
867 }
868 }
869
870 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
872 Self {
873 create_stream_on_read,
874 ..self
875 }
876 }
877}
878
879impl From<api::config::BasinConfig> for BasinConfig {
880 fn from(value: api::config::BasinConfig) -> Self {
881 Self {
882 default_stream_config: value.default_stream_config.map(Into::into),
883 create_stream_on_append: value.create_stream_on_append,
884 create_stream_on_read: value.create_stream_on_read,
885 }
886 }
887}
888
889impl From<BasinConfig> for api::config::BasinConfig {
890 fn from(value: BasinConfig) -> Self {
891 Self {
892 default_stream_config: value.default_stream_config.map(Into::into),
893 create_stream_on_append: value.create_stream_on_append,
894 create_stream_on_read: value.create_stream_on_read,
895 }
896 }
897}
898
899#[derive(Debug, Clone, PartialEq, Eq)]
900pub enum BasinScope {
902 AwsUsEast1,
904}
905
906impl From<api::basin::BasinScope> for BasinScope {
907 fn from(value: api::basin::BasinScope) -> Self {
908 match value {
909 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
910 }
911 }
912}
913
914impl From<BasinScope> for api::basin::BasinScope {
915 fn from(value: BasinScope) -> Self {
916 match value {
917 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
918 }
919 }
920}
921
922#[doc(hidden)]
927#[cfg(feature = "_hidden")]
928#[derive(Debug, Clone, PartialEq, Eq)]
929pub enum CreateOrReconfigured<T> {
930 Created(T),
932 Reconfigured(T),
934}
935
936#[cfg(feature = "_hidden")]
937impl<T> CreateOrReconfigured<T> {
938 pub fn is_created(&self) -> bool {
940 matches!(self, Self::Created(_))
941 }
942
943 pub fn into_inner(self) -> T {
945 match self {
946 Self::Created(t) | Self::Reconfigured(t) => t,
947 }
948 }
949}
950
951#[derive(Debug, Clone)]
952#[non_exhaustive]
953pub struct CreateBasinInput {
955 pub name: BasinName,
957 pub config: Option<BasinConfig>,
961 pub scope: Option<BasinScope>,
965 idempotency_token: String,
966}
967
968impl CreateBasinInput {
969 pub fn new(name: BasinName) -> Self {
971 Self {
972 name,
973 config: None,
974 scope: None,
975 idempotency_token: idempotency_token(),
976 }
977 }
978
979 pub fn with_config(self, config: BasinConfig) -> Self {
981 Self {
982 config: Some(config),
983 ..self
984 }
985 }
986
987 pub fn with_scope(self, scope: BasinScope) -> Self {
989 Self {
990 scope: Some(scope),
991 ..self
992 }
993 }
994}
995
996impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
997 fn from(value: CreateBasinInput) -> Self {
998 (
999 api::basin::CreateBasinRequest {
1000 basin: value.name,
1001 config: value.config.map(Into::into),
1002 scope: value.scope.map(Into::into),
1003 },
1004 value.idempotency_token,
1005 )
1006 }
1007}
1008
1009#[derive(Debug, Clone)]
1010#[non_exhaustive]
1011#[doc(hidden)]
1013#[cfg(feature = "_hidden")]
1014pub struct CreateOrReconfigureBasinInput {
1015 pub name: BasinName,
1017 pub config: Option<BasinReconfiguration>,
1021 pub scope: Option<BasinScope>,
1025}
1026
1027#[cfg(feature = "_hidden")]
1028impl CreateOrReconfigureBasinInput {
1029 pub fn new(name: BasinName) -> Self {
1031 Self {
1032 name,
1033 config: None,
1034 scope: None,
1035 }
1036 }
1037
1038 pub fn with_config(self, config: BasinReconfiguration) -> Self {
1040 Self {
1041 config: Some(config),
1042 ..self
1043 }
1044 }
1045
1046 pub fn with_scope(self, scope: BasinScope) -> Self {
1048 Self {
1049 scope: Some(scope),
1050 ..self
1051 }
1052 }
1053}
1054
1055#[cfg(feature = "_hidden")]
1056impl From<CreateOrReconfigureBasinInput>
1057 for (
1058 BasinName,
1059 Option<api::basin::CreateOrReconfigureBasinRequest>,
1060 )
1061{
1062 fn from(value: CreateOrReconfigureBasinInput) -> Self {
1063 let request = if value.config.is_some() || value.scope.is_some() {
1064 Some(api::basin::CreateOrReconfigureBasinRequest {
1065 config: value.config.map(Into::into),
1066 scope: value.scope.map(Into::into),
1067 })
1068 } else {
1069 None
1070 };
1071 (value.name, request)
1072 }
1073}
1074
1075#[derive(Debug, Clone, Default)]
1076#[non_exhaustive]
1077pub struct ListBasinsInput {
1079 pub prefix: BasinNamePrefix,
1083 pub start_after: BasinNameStartAfter,
1089 pub limit: Option<usize>,
1093}
1094
1095impl ListBasinsInput {
1096 pub fn new() -> Self {
1098 Self::default()
1099 }
1100
1101 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1103 Self { prefix, ..self }
1104 }
1105
1106 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1109 Self {
1110 start_after,
1111 ..self
1112 }
1113 }
1114
1115 pub fn with_limit(self, limit: usize) -> Self {
1117 Self {
1118 limit: Some(limit),
1119 ..self
1120 }
1121 }
1122}
1123
1124impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1125 fn from(value: ListBasinsInput) -> Self {
1126 Self {
1127 prefix: Some(value.prefix),
1128 start_after: Some(value.start_after),
1129 limit: value.limit,
1130 }
1131 }
1132}
1133
1134#[derive(Debug, Clone, Default)]
1135pub struct ListAllBasinsInput {
1137 pub prefix: BasinNamePrefix,
1141 pub start_after: BasinNameStartAfter,
1147 pub include_deleted: bool,
1151}
1152
1153impl ListAllBasinsInput {
1154 pub fn new() -> Self {
1156 Self::default()
1157 }
1158
1159 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1161 Self { prefix, ..self }
1162 }
1163
1164 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1167 Self {
1168 start_after,
1169 ..self
1170 }
1171 }
1172
1173 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1175 Self {
1176 include_deleted,
1177 ..self
1178 }
1179 }
1180}
1181
1182#[derive(Debug, Clone, PartialEq, Eq)]
1183#[non_exhaustive]
1184pub struct BasinInfo {
1186 pub name: BasinName,
1188 pub scope: Option<BasinScope>,
1190 pub created_at: S2DateTime,
1192 pub deleted_at: Option<S2DateTime>,
1194}
1195
1196impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1197 type Error = ValidationError;
1198
1199 fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1200 Ok(Self {
1201 name: value.name,
1202 scope: value.scope.map(Into::into),
1203 created_at: value.created_at.try_into()?,
1204 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1205 })
1206 }
1207}
1208
1209#[derive(Debug, Clone)]
1210#[non_exhaustive]
1211pub struct DeleteBasinInput {
1213 pub name: BasinName,
1215 pub ignore_not_found: bool,
1217}
1218
1219impl DeleteBasinInput {
1220 pub fn new(name: BasinName) -> Self {
1222 Self {
1223 name,
1224 ignore_not_found: false,
1225 }
1226 }
1227
1228 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1230 Self {
1231 ignore_not_found,
1232 ..self
1233 }
1234 }
1235}
1236
1237#[derive(Debug, Clone, Default)]
1238#[non_exhaustive]
1239pub struct TimestampingReconfiguration {
1241 pub mode: Maybe<Option<TimestampingMode>>,
1243 pub uncapped: Maybe<Option<bool>>,
1245}
1246
1247impl TimestampingReconfiguration {
1248 pub fn new() -> Self {
1250 Self::default()
1251 }
1252
1253 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1255 Self {
1256 mode: Maybe::Specified(Some(mode)),
1257 ..self
1258 }
1259 }
1260
1261 pub fn with_uncapped(self, uncapped: bool) -> Self {
1263 Self {
1264 uncapped: Maybe::Specified(Some(uncapped)),
1265 ..self
1266 }
1267 }
1268}
1269
1270impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1271 fn from(value: TimestampingReconfiguration) -> Self {
1272 Self {
1273 mode: value.mode.map(|m| m.map(Into::into)),
1274 uncapped: value.uncapped,
1275 }
1276 }
1277}
1278
1279#[derive(Debug, Clone, Default)]
1280#[non_exhaustive]
1281pub struct DeleteOnEmptyReconfiguration {
1283 pub min_age_secs: Maybe<Option<u64>>,
1285}
1286
1287impl DeleteOnEmptyReconfiguration {
1288 pub fn new() -> Self {
1290 Self::default()
1291 }
1292
1293 pub fn with_min_age(self, min_age: Duration) -> Self {
1295 Self {
1296 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1297 }
1298 }
1299}
1300
1301impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1302 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1303 Self {
1304 min_age_secs: value.min_age_secs,
1305 }
1306 }
1307}
1308
1309#[derive(Debug, Clone, Default)]
1311#[non_exhaustive]
1312pub struct EncryptionReconfiguration {
1313 pub allowed_modes: Maybe<Vec<EncryptionMode>>,
1317}
1318
1319impl EncryptionReconfiguration {
1320 pub fn new() -> Self {
1322 Self::default()
1323 }
1324
1325 pub fn with_allowed_modes(self, allowed_modes: Vec<EncryptionMode>) -> Self {
1327 Self {
1328 allowed_modes: Maybe::Specified(allowed_modes),
1329 }
1330 }
1331}
1332
1333impl From<EncryptionReconfiguration> for api::config::EncryptionReconfiguration {
1334 fn from(value: EncryptionReconfiguration) -> Self {
1335 Self {
1336 allowed_modes: value
1337 .allowed_modes
1338 .map(|modes| modes.into_iter().map(Into::into).collect()),
1339 }
1340 }
1341}
1342
1343#[derive(Debug, Clone, Default)]
1344#[non_exhaustive]
1345pub struct StreamReconfiguration {
1347 pub storage_class: Maybe<Option<StorageClass>>,
1349 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1351 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1353 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1355 pub encryption: Maybe<Option<EncryptionReconfiguration>>,
1357}
1358
1359impl StreamReconfiguration {
1360 pub fn new() -> Self {
1362 Self::default()
1363 }
1364
1365 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1367 Self {
1368 storage_class: Maybe::Specified(Some(storage_class)),
1369 ..self
1370 }
1371 }
1372
1373 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1375 Self {
1376 retention_policy: Maybe::Specified(Some(retention_policy)),
1377 ..self
1378 }
1379 }
1380
1381 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1383 Self {
1384 timestamping: Maybe::Specified(Some(timestamping)),
1385 ..self
1386 }
1387 }
1388
1389 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1391 Self {
1392 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1393 ..self
1394 }
1395 }
1396
1397 pub fn with_encryption(self, encryption: EncryptionReconfiguration) -> Self {
1399 Self {
1400 encryption: Maybe::Specified(Some(encryption)),
1401 ..self
1402 }
1403 }
1404}
1405
1406impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1407 fn from(value: StreamReconfiguration) -> Self {
1408 Self {
1409 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1410 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1411 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1412 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1413 encryption: value.encryption.map(|m| m.map(Into::into)),
1414 }
1415 }
1416}
1417
1418#[derive(Debug, Clone, Default)]
1419#[non_exhaustive]
1420pub struct BasinReconfiguration {
1422 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1424 pub create_stream_on_append: Maybe<bool>,
1427 pub create_stream_on_read: Maybe<bool>,
1429}
1430
1431impl BasinReconfiguration {
1432 pub fn new() -> Self {
1434 Self::default()
1435 }
1436
1437 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1440 Self {
1441 default_stream_config: Maybe::Specified(Some(config)),
1442 ..self
1443 }
1444 }
1445
1446 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1449 Self {
1450 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1451 ..self
1452 }
1453 }
1454
1455 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1458 Self {
1459 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1460 ..self
1461 }
1462 }
1463}
1464
1465impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1466 fn from(value: BasinReconfiguration) -> Self {
1467 Self {
1468 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1469 create_stream_on_append: value.create_stream_on_append,
1470 create_stream_on_read: value.create_stream_on_read,
1471 }
1472 }
1473}
1474
1475#[derive(Debug, Clone)]
1476#[non_exhaustive]
1477pub struct ReconfigureBasinInput {
1479 pub name: BasinName,
1481 pub config: BasinReconfiguration,
1483}
1484
1485impl ReconfigureBasinInput {
1486 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1488 Self { name, config }
1489 }
1490}
1491
1492#[derive(Debug, Clone, Default)]
1493#[non_exhaustive]
1494pub struct ListAccessTokensInput {
1496 pub prefix: AccessTokenIdPrefix,
1500 pub start_after: AccessTokenIdStartAfter,
1506 pub limit: Option<usize>,
1510}
1511
1512impl ListAccessTokensInput {
1513 pub fn new() -> Self {
1515 Self::default()
1516 }
1517
1518 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1520 Self { prefix, ..self }
1521 }
1522
1523 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1526 Self {
1527 start_after,
1528 ..self
1529 }
1530 }
1531
1532 pub fn with_limit(self, limit: usize) -> Self {
1534 Self {
1535 limit: Some(limit),
1536 ..self
1537 }
1538 }
1539}
1540
1541impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1542 fn from(value: ListAccessTokensInput) -> Self {
1543 Self {
1544 prefix: Some(value.prefix),
1545 start_after: Some(value.start_after),
1546 limit: value.limit,
1547 }
1548 }
1549}
1550
1551#[derive(Debug, Clone, Default)]
1552pub struct ListAllAccessTokensInput {
1554 pub prefix: AccessTokenIdPrefix,
1558 pub start_after: AccessTokenIdStartAfter,
1564}
1565
1566impl ListAllAccessTokensInput {
1567 pub fn new() -> Self {
1569 Self::default()
1570 }
1571
1572 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1574 Self { prefix, ..self }
1575 }
1576
1577 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1580 Self {
1581 start_after,
1582 ..self
1583 }
1584 }
1585}
1586
1587#[derive(Debug, Clone)]
1588#[non_exhaustive]
1589pub struct AccessTokenInfo {
1591 pub id: AccessTokenId,
1593 pub expires_at: S2DateTime,
1595 pub auto_prefix_streams: bool,
1598 pub scope: AccessTokenScope,
1600}
1601
1602impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1603 type Error = ValidationError;
1604
1605 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1606 let expires_at = value
1607 .expires_at
1608 .map(S2DateTime::try_from)
1609 .transpose()?
1610 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1611 Ok(Self {
1612 id: value.id,
1613 expires_at,
1614 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1615 scope: value.scope.into(),
1616 })
1617 }
1618}
1619
1620#[derive(Debug, Clone)]
1621pub enum BasinMatcher {
1625 None,
1627 Exact(BasinName),
1629 Prefix(BasinNamePrefix),
1631}
1632
1633#[derive(Debug, Clone)]
1634pub enum StreamMatcher {
1638 None,
1640 Exact(StreamName),
1642 Prefix(StreamNamePrefix),
1644}
1645
1646#[derive(Debug, Clone)]
1647pub enum AccessTokenMatcher {
1651 None,
1653 Exact(AccessTokenId),
1655 Prefix(AccessTokenIdPrefix),
1657}
1658
1659#[derive(Debug, Clone, Default)]
1660#[non_exhaustive]
1661pub struct ReadWritePermissions {
1663 pub read: bool,
1667 pub write: bool,
1671}
1672
1673impl ReadWritePermissions {
1674 pub fn new() -> Self {
1676 Self::default()
1677 }
1678
1679 pub fn read_only() -> Self {
1681 Self {
1682 read: true,
1683 write: false,
1684 }
1685 }
1686
1687 pub fn write_only() -> Self {
1689 Self {
1690 read: false,
1691 write: true,
1692 }
1693 }
1694
1695 pub fn read_write() -> Self {
1697 Self {
1698 read: true,
1699 write: true,
1700 }
1701 }
1702}
1703
1704impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1705 fn from(value: ReadWritePermissions) -> Self {
1706 Self {
1707 read: Some(value.read),
1708 write: Some(value.write),
1709 }
1710 }
1711}
1712
1713impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1714 fn from(value: api::access::ReadWritePermissions) -> Self {
1715 Self {
1716 read: value.read.unwrap_or_default(),
1717 write: value.write.unwrap_or_default(),
1718 }
1719 }
1720}
1721
1722#[derive(Debug, Clone, Default)]
1723#[non_exhaustive]
1724pub struct OperationGroupPermissions {
1728 pub account: Option<ReadWritePermissions>,
1732 pub basin: Option<ReadWritePermissions>,
1736 pub stream: Option<ReadWritePermissions>,
1740}
1741
1742impl OperationGroupPermissions {
1743 pub fn new() -> Self {
1745 Self::default()
1746 }
1747
1748 pub fn read_only_all() -> Self {
1750 Self {
1751 account: Some(ReadWritePermissions::read_only()),
1752 basin: Some(ReadWritePermissions::read_only()),
1753 stream: Some(ReadWritePermissions::read_only()),
1754 }
1755 }
1756
1757 pub fn write_only_all() -> Self {
1759 Self {
1760 account: Some(ReadWritePermissions::write_only()),
1761 basin: Some(ReadWritePermissions::write_only()),
1762 stream: Some(ReadWritePermissions::write_only()),
1763 }
1764 }
1765
1766 pub fn read_write_all() -> Self {
1768 Self {
1769 account: Some(ReadWritePermissions::read_write()),
1770 basin: Some(ReadWritePermissions::read_write()),
1771 stream: Some(ReadWritePermissions::read_write()),
1772 }
1773 }
1774
1775 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1777 Self {
1778 account: Some(account),
1779 ..self
1780 }
1781 }
1782
1783 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1785 Self {
1786 basin: Some(basin),
1787 ..self
1788 }
1789 }
1790
1791 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1793 Self {
1794 stream: Some(stream),
1795 ..self
1796 }
1797 }
1798}
1799
1800impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1801 fn from(value: OperationGroupPermissions) -> Self {
1802 Self {
1803 account: value.account.map(Into::into),
1804 basin: value.basin.map(Into::into),
1805 stream: value.stream.map(Into::into),
1806 }
1807 }
1808}
1809
1810impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1811 fn from(value: api::access::PermittedOperationGroups) -> Self {
1812 Self {
1813 account: value.account.map(Into::into),
1814 basin: value.basin.map(Into::into),
1815 stream: value.stream.map(Into::into),
1816 }
1817 }
1818}
1819
1820#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1821pub enum Operation {
1825 ListBasins,
1827 CreateBasin,
1829 GetBasinConfig,
1831 DeleteBasin,
1833 ReconfigureBasin,
1835 ListAccessTokens,
1837 IssueAccessToken,
1839 RevokeAccessToken,
1841 GetAccountMetrics,
1843 GetBasinMetrics,
1845 GetStreamMetrics,
1847 ListStreams,
1849 CreateStream,
1851 GetStreamConfig,
1853 DeleteStream,
1855 ReconfigureStream,
1857 CheckTail,
1859 Append,
1861 Read,
1863 Trim,
1865 Fence,
1867}
1868
1869impl From<Operation> for api::access::Operation {
1870 fn from(value: Operation) -> Self {
1871 match value {
1872 Operation::ListBasins => api::access::Operation::ListBasins,
1873 Operation::CreateBasin => api::access::Operation::CreateBasin,
1874 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1875 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1876 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1877 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1878 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1879 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1880 Operation::ListStreams => api::access::Operation::ListStreams,
1881 Operation::CreateStream => api::access::Operation::CreateStream,
1882 Operation::DeleteStream => api::access::Operation::DeleteStream,
1883 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1884 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1885 Operation::CheckTail => api::access::Operation::CheckTail,
1886 Operation::Append => api::access::Operation::Append,
1887 Operation::Read => api::access::Operation::Read,
1888 Operation::Trim => api::access::Operation::Trim,
1889 Operation::Fence => api::access::Operation::Fence,
1890 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1891 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1892 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1893 }
1894 }
1895}
1896
1897impl From<api::access::Operation> for Operation {
1898 fn from(value: api::access::Operation) -> Self {
1899 match value {
1900 api::access::Operation::ListBasins => Operation::ListBasins,
1901 api::access::Operation::CreateBasin => Operation::CreateBasin,
1902 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1903 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1904 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1905 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1906 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1907 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1908 api::access::Operation::ListStreams => Operation::ListStreams,
1909 api::access::Operation::CreateStream => Operation::CreateStream,
1910 api::access::Operation::DeleteStream => Operation::DeleteStream,
1911 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1912 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1913 api::access::Operation::CheckTail => Operation::CheckTail,
1914 api::access::Operation::Append => Operation::Append,
1915 api::access::Operation::Read => Operation::Read,
1916 api::access::Operation::Trim => Operation::Trim,
1917 api::access::Operation::Fence => Operation::Fence,
1918 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1919 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1920 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1921 }
1922 }
1923}
1924
1925#[derive(Debug, Clone)]
1926#[non_exhaustive]
1927pub struct AccessTokenScopeInput {
1935 basins: Option<BasinMatcher>,
1936 streams: Option<StreamMatcher>,
1937 access_tokens: Option<AccessTokenMatcher>,
1938 op_group_perms: Option<OperationGroupPermissions>,
1939 ops: HashSet<Operation>,
1940}
1941
1942impl AccessTokenScopeInput {
1943 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1945 Self {
1946 basins: None,
1947 streams: None,
1948 access_tokens: None,
1949 op_group_perms: None,
1950 ops: ops.into_iter().collect(),
1951 }
1952 }
1953
1954 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1956 Self {
1957 basins: None,
1958 streams: None,
1959 access_tokens: None,
1960 op_group_perms: Some(op_group_perms),
1961 ops: HashSet::default(),
1962 }
1963 }
1964
1965 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1967 Self {
1968 ops: ops.into_iter().collect(),
1969 ..self
1970 }
1971 }
1972
1973 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1975 Self {
1976 op_group_perms: Some(op_group_perms),
1977 ..self
1978 }
1979 }
1980
1981 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1985 Self {
1986 basins: Some(basins),
1987 ..self
1988 }
1989 }
1990
1991 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1995 Self {
1996 streams: Some(streams),
1997 ..self
1998 }
1999 }
2000
2001 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
2005 Self {
2006 access_tokens: Some(access_tokens),
2007 ..self
2008 }
2009 }
2010}
2011
2012#[derive(Debug, Clone)]
2013#[non_exhaustive]
2014pub struct AccessTokenScope {
2016 pub basins: Option<BasinMatcher>,
2018 pub streams: Option<StreamMatcher>,
2020 pub access_tokens: Option<AccessTokenMatcher>,
2022 pub op_group_perms: Option<OperationGroupPermissions>,
2024 pub ops: HashSet<Operation>,
2026}
2027
2028impl From<api::access::AccessTokenScope> for AccessTokenScope {
2029 fn from(value: api::access::AccessTokenScope) -> Self {
2030 Self {
2031 basins: value.basins.map(|rs| match rs {
2032 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2033 BasinMatcher::Exact(e)
2034 }
2035 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2036 BasinMatcher::None
2037 }
2038 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
2039 }),
2040 streams: value.streams.map(|rs| match rs {
2041 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2042 StreamMatcher::Exact(e)
2043 }
2044 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2045 StreamMatcher::None
2046 }
2047 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
2048 }),
2049 access_tokens: value.access_tokens.map(|rs| match rs {
2050 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2051 AccessTokenMatcher::Exact(e)
2052 }
2053 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2054 AccessTokenMatcher::None
2055 }
2056 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
2057 }),
2058 op_group_perms: value.op_groups.map(Into::into),
2059 ops: value
2060 .ops
2061 .map(|ops| ops.into_iter().map(Into::into).collect())
2062 .unwrap_or_default(),
2063 }
2064 }
2065}
2066
2067impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2068 fn from(value: AccessTokenScopeInput) -> Self {
2069 Self {
2070 basins: value.basins.map(|rs| match rs {
2071 BasinMatcher::None => {
2072 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2073 }
2074 BasinMatcher::Exact(e) => {
2075 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2076 }
2077 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2078 }),
2079 streams: value.streams.map(|rs| match rs {
2080 StreamMatcher::None => {
2081 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2082 }
2083 StreamMatcher::Exact(e) => {
2084 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2085 }
2086 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2087 }),
2088 access_tokens: value.access_tokens.map(|rs| match rs {
2089 AccessTokenMatcher::None => {
2090 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2091 }
2092 AccessTokenMatcher::Exact(e) => {
2093 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2094 }
2095 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2096 }),
2097 op_groups: value.op_group_perms.map(Into::into),
2098 ops: if value.ops.is_empty() {
2099 None
2100 } else {
2101 Some(value.ops.into_iter().map(Into::into).collect())
2102 },
2103 }
2104 }
2105}
2106
2107#[derive(Debug, Clone)]
2108#[non_exhaustive]
2109pub struct IssueAccessTokenInput {
2111 pub id: AccessTokenId,
2113 pub expires_at: Option<S2DateTime>,
2118 pub auto_prefix_streams: bool,
2126 pub scope: AccessTokenScopeInput,
2128}
2129
2130impl IssueAccessTokenInput {
2131 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2133 Self {
2134 id,
2135 expires_at: None,
2136 auto_prefix_streams: false,
2137 scope,
2138 }
2139 }
2140
2141 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2143 Self {
2144 expires_at: Some(expires_at),
2145 ..self
2146 }
2147 }
2148
2149 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2152 Self {
2153 auto_prefix_streams,
2154 ..self
2155 }
2156 }
2157}
2158
2159impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2160 fn from(value: IssueAccessTokenInput) -> Self {
2161 Self {
2162 id: value.id,
2163 expires_at: value.expires_at.map(Into::into),
2164 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2165 scope: value.scope.into(),
2166 }
2167 }
2168}
2169
2170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2171pub enum TimeseriesInterval {
2173 Minute,
2175 Hour,
2177 Day,
2179}
2180
2181impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2182 fn from(value: TimeseriesInterval) -> Self {
2183 match value {
2184 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2185 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2186 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2187 }
2188 }
2189}
2190
2191impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2192 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2193 match value {
2194 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2195 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2196 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2197 }
2198 }
2199}
2200
2201#[derive(Debug, Clone, Copy)]
2202#[non_exhaustive]
2203pub struct TimeRange {
2205 pub start: u32,
2207 pub end: u32,
2209}
2210
2211impl TimeRange {
2212 pub fn new(start: u32, end: u32) -> Self {
2214 Self { start, end }
2215 }
2216}
2217
2218#[derive(Debug, Clone, Copy)]
2219#[non_exhaustive]
2220pub struct TimeRangeAndInterval {
2222 pub start: u32,
2224 pub end: u32,
2226 pub interval: Option<TimeseriesInterval>,
2230}
2231
2232impl TimeRangeAndInterval {
2233 pub fn new(start: u32, end: u32) -> Self {
2235 Self {
2236 start,
2237 end,
2238 interval: None,
2239 }
2240 }
2241
2242 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2244 Self {
2245 interval: Some(interval),
2246 ..self
2247 }
2248 }
2249}
2250
2251#[derive(Debug, Clone, Copy)]
2252pub enum AccountMetricSet {
2254 ActiveBasins(TimeRange),
2257 AccountOps(TimeRangeAndInterval),
2264}
2265
2266#[derive(Debug, Clone)]
2267#[non_exhaustive]
2268pub struct GetAccountMetricsInput {
2270 pub set: AccountMetricSet,
2272}
2273
2274impl GetAccountMetricsInput {
2275 pub fn new(set: AccountMetricSet) -> Self {
2277 Self { set }
2278 }
2279}
2280
2281impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2282 fn from(value: GetAccountMetricsInput) -> Self {
2283 let (set, start, end, interval) = match value.set {
2284 AccountMetricSet::ActiveBasins(args) => (
2285 api::metrics::AccountMetricSet::ActiveBasins,
2286 args.start,
2287 args.end,
2288 None,
2289 ),
2290 AccountMetricSet::AccountOps(args) => (
2291 api::metrics::AccountMetricSet::AccountOps,
2292 args.start,
2293 args.end,
2294 args.interval,
2295 ),
2296 };
2297 Self {
2298 set,
2299 start: Some(start),
2300 end: Some(end),
2301 interval: interval.map(Into::into),
2302 }
2303 }
2304}
2305
2306#[derive(Debug, Clone, Copy)]
2307pub enum BasinMetricSet {
2309 Storage(TimeRange),
2312 AppendOps(TimeRangeAndInterval),
2320 ReadOps(TimeRangeAndInterval),
2328 ReadThroughput(TimeRangeAndInterval),
2335 AppendThroughput(TimeRangeAndInterval),
2342 BasinOps(TimeRangeAndInterval),
2349}
2350
2351#[derive(Debug, Clone)]
2352#[non_exhaustive]
2353pub struct GetBasinMetricsInput {
2355 pub name: BasinName,
2357 pub set: BasinMetricSet,
2359}
2360
2361impl GetBasinMetricsInput {
2362 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2364 Self { name, set }
2365 }
2366}
2367
2368impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2369 fn from(value: GetBasinMetricsInput) -> Self {
2370 let (set, start, end, interval) = match value.set {
2371 BasinMetricSet::Storage(args) => (
2372 api::metrics::BasinMetricSet::Storage,
2373 args.start,
2374 args.end,
2375 None,
2376 ),
2377 BasinMetricSet::AppendOps(args) => (
2378 api::metrics::BasinMetricSet::AppendOps,
2379 args.start,
2380 args.end,
2381 args.interval,
2382 ),
2383 BasinMetricSet::ReadOps(args) => (
2384 api::metrics::BasinMetricSet::ReadOps,
2385 args.start,
2386 args.end,
2387 args.interval,
2388 ),
2389 BasinMetricSet::ReadThroughput(args) => (
2390 api::metrics::BasinMetricSet::ReadThroughput,
2391 args.start,
2392 args.end,
2393 args.interval,
2394 ),
2395 BasinMetricSet::AppendThroughput(args) => (
2396 api::metrics::BasinMetricSet::AppendThroughput,
2397 args.start,
2398 args.end,
2399 args.interval,
2400 ),
2401 BasinMetricSet::BasinOps(args) => (
2402 api::metrics::BasinMetricSet::BasinOps,
2403 args.start,
2404 args.end,
2405 args.interval,
2406 ),
2407 };
2408 (
2409 value.name,
2410 api::metrics::BasinMetricSetRequest {
2411 set,
2412 start: Some(start),
2413 end: Some(end),
2414 interval: interval.map(Into::into),
2415 },
2416 )
2417 }
2418}
2419
2420#[derive(Debug, Clone, Copy)]
2421pub enum StreamMetricSet {
2423 Storage(TimeRange),
2426}
2427
2428#[derive(Debug, Clone)]
2429#[non_exhaustive]
2430pub struct GetStreamMetricsInput {
2432 pub basin_name: BasinName,
2434 pub stream_name: StreamName,
2436 pub set: StreamMetricSet,
2438}
2439
2440impl GetStreamMetricsInput {
2441 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2444 Self {
2445 basin_name,
2446 stream_name,
2447 set,
2448 }
2449 }
2450}
2451
2452impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2453 fn from(value: GetStreamMetricsInput) -> Self {
2454 let (set, start, end, interval) = match value.set {
2455 StreamMetricSet::Storage(args) => (
2456 api::metrics::StreamMetricSet::Storage,
2457 args.start,
2458 args.end,
2459 None,
2460 ),
2461 };
2462 (
2463 value.basin_name,
2464 value.stream_name,
2465 api::metrics::StreamMetricSetRequest {
2466 set,
2467 start: Some(start),
2468 end: Some(end),
2469 interval,
2470 },
2471 )
2472 }
2473}
2474
2475#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2476pub enum MetricUnit {
2478 Bytes,
2480 Operations,
2482}
2483
2484impl From<api::metrics::MetricUnit> for MetricUnit {
2485 fn from(value: api::metrics::MetricUnit) -> Self {
2486 match value {
2487 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2488 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2489 }
2490 }
2491}
2492
2493#[derive(Debug, Clone)]
2494#[non_exhaustive]
2495pub struct ScalarMetric {
2497 pub name: String,
2499 pub unit: MetricUnit,
2501 pub value: f64,
2503}
2504
2505#[derive(Debug, Clone)]
2506#[non_exhaustive]
2507pub struct AccumulationMetric {
2510 pub name: String,
2512 pub unit: MetricUnit,
2514 pub interval: TimeseriesInterval,
2516 pub values: Vec<(u32, f64)>,
2520}
2521
2522#[derive(Debug, Clone)]
2523#[non_exhaustive]
2524pub struct GaugeMetric {
2526 pub name: String,
2528 pub unit: MetricUnit,
2530 pub values: Vec<(u32, f64)>,
2533}
2534
2535#[derive(Debug, Clone)]
2536#[non_exhaustive]
2537pub struct LabelMetric {
2539 pub name: String,
2541 pub values: Vec<String>,
2543}
2544
2545#[derive(Debug, Clone)]
2546pub enum Metric {
2548 Scalar(ScalarMetric),
2550 Accumulation(AccumulationMetric),
2553 Gauge(GaugeMetric),
2555 Label(LabelMetric),
2557}
2558
2559impl From<api::metrics::Metric> for Metric {
2560 fn from(value: api::metrics::Metric) -> Self {
2561 match value {
2562 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2563 name: sm.name.into(),
2564 unit: sm.unit.into(),
2565 value: sm.value,
2566 }),
2567 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2568 name: am.name.into(),
2569 unit: am.unit.into(),
2570 interval: am.interval.into(),
2571 values: am.values,
2572 }),
2573 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2574 name: gm.name.into(),
2575 unit: gm.unit.into(),
2576 values: gm.values,
2577 }),
2578 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2579 name: lm.name.into(),
2580 values: lm.values,
2581 }),
2582 }
2583 }
2584}
2585
2586#[derive(Debug, Clone, Default)]
2587#[non_exhaustive]
2588pub struct ListStreamsInput {
2590 pub prefix: StreamNamePrefix,
2594 pub start_after: StreamNameStartAfter,
2600 pub limit: Option<usize>,
2604}
2605
2606impl ListStreamsInput {
2607 pub fn new() -> Self {
2609 Self::default()
2610 }
2611
2612 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2614 Self { prefix, ..self }
2615 }
2616
2617 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2620 Self {
2621 start_after,
2622 ..self
2623 }
2624 }
2625
2626 pub fn with_limit(self, limit: usize) -> Self {
2628 Self {
2629 limit: Some(limit),
2630 ..self
2631 }
2632 }
2633}
2634
2635impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2636 fn from(value: ListStreamsInput) -> Self {
2637 Self {
2638 prefix: Some(value.prefix),
2639 start_after: Some(value.start_after),
2640 limit: value.limit,
2641 }
2642 }
2643}
2644
2645#[derive(Debug, Clone, Default)]
2646pub struct ListAllStreamsInput {
2648 pub prefix: StreamNamePrefix,
2652 pub start_after: StreamNameStartAfter,
2658 pub include_deleted: bool,
2662}
2663
2664impl ListAllStreamsInput {
2665 pub fn new() -> Self {
2667 Self::default()
2668 }
2669
2670 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2672 Self { prefix, ..self }
2673 }
2674
2675 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2678 Self {
2679 start_after,
2680 ..self
2681 }
2682 }
2683
2684 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2686 Self {
2687 include_deleted,
2688 ..self
2689 }
2690 }
2691}
2692
2693#[derive(Debug, Clone, PartialEq, Eq)]
2694#[non_exhaustive]
2695pub struct StreamInfo {
2697 pub name: StreamName,
2699 pub created_at: S2DateTime,
2701 pub deleted_at: Option<S2DateTime>,
2703}
2704
2705impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2706 type Error = ValidationError;
2707
2708 fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2709 Ok(Self {
2710 name: value.name,
2711 created_at: value.created_at.try_into()?,
2712 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2713 })
2714 }
2715}
2716
2717#[derive(Debug, Clone)]
2718#[non_exhaustive]
2719pub struct CreateStreamInput {
2721 pub name: StreamName,
2723 pub config: Option<StreamConfig>,
2727 idempotency_token: String,
2728}
2729
2730impl CreateStreamInput {
2731 pub fn new(name: StreamName) -> Self {
2733 Self {
2734 name,
2735 config: None,
2736 idempotency_token: idempotency_token(),
2737 }
2738 }
2739
2740 pub fn with_config(self, config: StreamConfig) -> Self {
2742 Self {
2743 config: Some(config),
2744 ..self
2745 }
2746 }
2747}
2748
2749impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2750 fn from(value: CreateStreamInput) -> Self {
2751 (
2752 api::stream::CreateStreamRequest {
2753 stream: value.name,
2754 config: value.config.map(Into::into),
2755 },
2756 value.idempotency_token,
2757 )
2758 }
2759}
2760
2761#[derive(Debug, Clone)]
2762#[non_exhaustive]
2763#[doc(hidden)]
2766#[cfg(feature = "_hidden")]
2767pub struct CreateOrReconfigureStreamInput {
2768 pub name: StreamName,
2770 pub config: Option<StreamReconfiguration>,
2774}
2775
2776#[cfg(feature = "_hidden")]
2777impl CreateOrReconfigureStreamInput {
2778 pub fn new(name: StreamName) -> Self {
2780 Self { name, config: None }
2781 }
2782
2783 pub fn with_config(self, config: StreamReconfiguration) -> Self {
2785 Self {
2786 config: Some(config),
2787 ..self
2788 }
2789 }
2790}
2791
2792#[cfg(feature = "_hidden")]
2793impl From<CreateOrReconfigureStreamInput>
2794 for (StreamName, Option<api::config::StreamReconfiguration>)
2795{
2796 fn from(value: CreateOrReconfigureStreamInput) -> Self {
2797 (value.name, value.config.map(Into::into))
2798 }
2799}
2800
2801#[derive(Debug, Clone)]
2802#[non_exhaustive]
2803pub struct DeleteStreamInput {
2805 pub name: StreamName,
2807 pub ignore_not_found: bool,
2809}
2810
2811impl DeleteStreamInput {
2812 pub fn new(name: StreamName) -> Self {
2814 Self {
2815 name,
2816 ignore_not_found: false,
2817 }
2818 }
2819
2820 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2822 Self {
2823 ignore_not_found,
2824 ..self
2825 }
2826 }
2827}
2828
2829#[derive(Debug, Clone)]
2830#[non_exhaustive]
2831pub struct ReconfigureStreamInput {
2833 pub name: StreamName,
2835 pub config: StreamReconfiguration,
2837}
2838
2839impl ReconfigureStreamInput {
2840 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2842 Self { name, config }
2843 }
2844}
2845
2846#[derive(Debug, Clone, PartialEq, Eq)]
2847pub struct FencingToken(String);
2853
2854impl FencingToken {
2855 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2857 rand::rng()
2858 .sample_iter(&rand::distr::Alphanumeric)
2859 .take(n)
2860 .map(char::from)
2861 .collect::<String>()
2862 .parse()
2863 }
2864}
2865
2866impl FromStr for FencingToken {
2867 type Err = ValidationError;
2868
2869 fn from_str(s: &str) -> Result<Self, Self::Err> {
2870 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2871 return Err(ValidationError(format!(
2872 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2873 )));
2874 }
2875 Ok(FencingToken(s.to_string()))
2876 }
2877}
2878
2879impl std::fmt::Display for FencingToken {
2880 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2881 write!(f, "{}", self.0)
2882 }
2883}
2884
2885impl Deref for FencingToken {
2886 type Target = str;
2887
2888 fn deref(&self) -> &Self::Target {
2889 &self.0
2890 }
2891}
2892
2893#[derive(Debug, Clone, Copy, PartialEq)]
2894#[non_exhaustive]
2895pub struct StreamPosition {
2897 pub seq_num: u64,
2899 pub timestamp: u64,
2902}
2903
2904impl std::fmt::Display for StreamPosition {
2905 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2906 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2907 }
2908}
2909
2910impl From<api::stream::proto::StreamPosition> for StreamPosition {
2911 fn from(value: api::stream::proto::StreamPosition) -> Self {
2912 Self {
2913 seq_num: value.seq_num,
2914 timestamp: value.timestamp,
2915 }
2916 }
2917}
2918
2919impl From<api::stream::StreamPosition> for StreamPosition {
2920 fn from(value: api::stream::StreamPosition) -> Self {
2921 Self {
2922 seq_num: value.seq_num,
2923 timestamp: value.timestamp,
2924 }
2925 }
2926}
2927
2928#[derive(Debug, Clone, PartialEq)]
2929#[non_exhaustive]
2930pub struct Header {
2932 pub name: Bytes,
2934 pub value: Bytes,
2936}
2937
2938impl Header {
2939 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2941 Self {
2942 name: name.into(),
2943 value: value.into(),
2944 }
2945 }
2946}
2947
2948impl From<Header> for api::stream::proto::Header {
2949 fn from(value: Header) -> Self {
2950 Self {
2951 name: value.name,
2952 value: value.value,
2953 }
2954 }
2955}
2956
2957impl From<api::stream::proto::Header> for Header {
2958 fn from(value: api::stream::proto::Header) -> Self {
2959 Self {
2960 name: value.name,
2961 value: value.value,
2962 }
2963 }
2964}
2965
2966#[derive(Debug, Clone, PartialEq)]
2967pub struct AppendRecord {
2969 body: Bytes,
2970 headers: Vec<Header>,
2971 timestamp: Option<u64>,
2972}
2973
2974impl AppendRecord {
2975 fn validate(self) -> Result<Self, ValidationError> {
2976 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2977 Err(ValidationError(format!(
2978 "metered_bytes: {} exceeds {}",
2979 self.metered_bytes(),
2980 RECORD_BATCH_MAX.bytes
2981 )))
2982 } else {
2983 Ok(self)
2984 }
2985 }
2986
2987 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2989 let record = Self {
2990 body: body.into(),
2991 headers: Vec::default(),
2992 timestamp: None,
2993 };
2994 record.validate()
2995 }
2996
2997 pub fn with_headers(
2999 self,
3000 headers: impl IntoIterator<Item = Header>,
3001 ) -> Result<Self, ValidationError> {
3002 let record = Self {
3003 headers: headers.into_iter().collect(),
3004 ..self
3005 };
3006 record.validate()
3007 }
3008
3009 pub fn with_timestamp(self, timestamp: u64) -> Self {
3013 Self {
3014 timestamp: Some(timestamp),
3015 ..self
3016 }
3017 }
3018
3019 pub fn body(&self) -> &[u8] {
3021 &self.body
3022 }
3023
3024 pub fn headers(&self) -> &[Header] {
3026 &self.headers
3027 }
3028
3029 pub fn timestamp(&self) -> Option<u64> {
3031 self.timestamp
3032 }
3033}
3034
3035impl From<AppendRecord> for api::stream::proto::AppendRecord {
3036 fn from(value: AppendRecord) -> Self {
3037 Self {
3038 timestamp: value.timestamp,
3039 headers: value.headers.into_iter().map(Into::into).collect(),
3040 body: value.body,
3041 }
3042 }
3043}
3044
3045pub trait MeteredBytes {
3052 fn metered_bytes(&self) -> usize;
3054}
3055
3056macro_rules! metered_bytes_impl {
3057 ($ty:ty) => {
3058 impl MeteredBytes for $ty {
3059 fn metered_bytes(&self) -> usize {
3060 8 + (2 * self.headers.len())
3061 + self
3062 .headers
3063 .iter()
3064 .map(|h| h.name.len() + h.value.len())
3065 .sum::<usize>()
3066 + self.body.len()
3067 }
3068 }
3069 };
3070}
3071
3072metered_bytes_impl!(AppendRecord);
3073
3074#[derive(Debug, Clone)]
3075pub struct AppendRecordBatch {
3084 records: Vec<AppendRecord>,
3085 metered_bytes: usize,
3086}
3087
3088impl AppendRecordBatch {
3089 pub(crate) fn with_capacity(capacity: usize) -> Self {
3090 Self {
3091 records: Vec::with_capacity(capacity),
3092 metered_bytes: 0,
3093 }
3094 }
3095
3096 pub(crate) fn push(&mut self, record: AppendRecord) {
3097 self.metered_bytes += record.metered_bytes();
3098 self.records.push(record);
3099 }
3100
3101 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3103 where
3104 I: IntoIterator<Item = AppendRecord>,
3105 {
3106 let mut records = Vec::new();
3107 let mut metered_bytes = 0;
3108
3109 for record in iter {
3110 metered_bytes += record.metered_bytes();
3111 records.push(record);
3112
3113 if metered_bytes > RECORD_BATCH_MAX.bytes {
3114 return Err(ValidationError(format!(
3115 "batch size in metered bytes ({metered_bytes}) exceeds {}",
3116 RECORD_BATCH_MAX.bytes
3117 )));
3118 }
3119
3120 if records.len() > RECORD_BATCH_MAX.count {
3121 return Err(ValidationError(format!(
3122 "number of records in the batch exceeds {}",
3123 RECORD_BATCH_MAX.count
3124 )));
3125 }
3126 }
3127
3128 if records.is_empty() {
3129 return Err(ValidationError("batch is empty".into()));
3130 }
3131
3132 Ok(Self {
3133 records,
3134 metered_bytes,
3135 })
3136 }
3137}
3138
3139impl Deref for AppendRecordBatch {
3140 type Target = [AppendRecord];
3141
3142 fn deref(&self) -> &Self::Target {
3143 &self.records
3144 }
3145}
3146
3147impl MeteredBytes for AppendRecordBatch {
3148 fn metered_bytes(&self) -> usize {
3149 self.metered_bytes
3150 }
3151}
3152
3153#[derive(Debug, Clone)]
3154pub enum Command {
3156 Fence {
3158 fencing_token: FencingToken,
3160 },
3161 Trim {
3163 trim_point: u64,
3165 },
3166}
3167
3168#[derive(Debug, Clone)]
3169#[non_exhaustive]
3170pub struct CommandRecord {
3174 pub command: Command,
3176 pub timestamp: Option<u64>,
3178}
3179
3180impl CommandRecord {
3181 const FENCE: &[u8] = b"fence";
3182 const TRIM: &[u8] = b"trim";
3183
3184 pub fn fence(fencing_token: FencingToken) -> Self {
3189 Self {
3190 command: Command::Fence { fencing_token },
3191 timestamp: None,
3192 }
3193 }
3194
3195 pub fn trim(trim_point: u64) -> Self {
3202 Self {
3203 command: Command::Trim { trim_point },
3204 timestamp: None,
3205 }
3206 }
3207
3208 pub fn with_timestamp(self, timestamp: u64) -> Self {
3210 Self {
3211 timestamp: Some(timestamp),
3212 ..self
3213 }
3214 }
3215}
3216
3217impl From<CommandRecord> for AppendRecord {
3218 fn from(value: CommandRecord) -> Self {
3219 let (header_value, body) = match value.command {
3220 Command::Fence { fencing_token } => (
3221 CommandRecord::FENCE,
3222 Bytes::copy_from_slice(fencing_token.as_bytes()),
3223 ),
3224 Command::Trim { trim_point } => (
3225 CommandRecord::TRIM,
3226 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3227 ),
3228 };
3229 Self {
3230 body,
3231 headers: vec![Header::new("", header_value)],
3232 timestamp: value.timestamp,
3233 }
3234 }
3235}
3236
3237#[derive(Debug, Clone)]
3238#[non_exhaustive]
3239pub struct AppendInput {
3242 pub records: AppendRecordBatch,
3244 pub match_seq_num: Option<u64>,
3248 pub fencing_token: Option<FencingToken>,
3253}
3254
3255impl AppendInput {
3256 pub fn new(records: AppendRecordBatch) -> Self {
3258 Self {
3259 records,
3260 match_seq_num: None,
3261 fencing_token: None,
3262 }
3263 }
3264
3265 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3267 Self {
3268 match_seq_num: Some(match_seq_num),
3269 ..self
3270 }
3271 }
3272
3273 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3275 Self {
3276 fencing_token: Some(fencing_token),
3277 ..self
3278 }
3279 }
3280}
3281
3282impl From<AppendInput> for api::stream::proto::AppendInput {
3283 fn from(value: AppendInput) -> Self {
3284 Self {
3285 records: value.records.iter().cloned().map(Into::into).collect(),
3286 match_seq_num: value.match_seq_num,
3287 fencing_token: value.fencing_token.map(|t| t.to_string()),
3288 }
3289 }
3290}
3291
3292#[derive(Debug, Clone, PartialEq)]
3293#[non_exhaustive]
3294pub struct AppendAck {
3296 pub start: StreamPosition,
3298 pub end: StreamPosition,
3304 pub tail: StreamPosition,
3309}
3310
3311impl From<api::stream::proto::AppendAck> for AppendAck {
3312 fn from(value: api::stream::proto::AppendAck) -> Self {
3313 Self {
3314 start: value.start.unwrap_or_default().into(),
3315 end: value.end.unwrap_or_default().into(),
3316 tail: value.tail.unwrap_or_default().into(),
3317 }
3318 }
3319}
3320
3321#[derive(Debug, Clone, Copy)]
3322pub enum ReadFrom {
3324 SeqNum(u64),
3326 Timestamp(u64),
3328 TailOffset(u64),
3330}
3331
3332impl Default for ReadFrom {
3333 fn default() -> Self {
3334 Self::SeqNum(0)
3335 }
3336}
3337
3338#[derive(Debug, Default, Clone)]
3339#[non_exhaustive]
3340pub struct ReadStart {
3342 pub from: ReadFrom,
3346 pub clamp_to_tail: bool,
3350}
3351
3352impl ReadStart {
3353 pub fn new() -> Self {
3355 Self::default()
3356 }
3357
3358 pub fn with_from(self, from: ReadFrom) -> Self {
3360 Self { from, ..self }
3361 }
3362
3363 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3365 Self {
3366 clamp_to_tail,
3367 ..self
3368 }
3369 }
3370}
3371
3372impl From<ReadStart> for api::stream::ReadStart {
3373 fn from(value: ReadStart) -> Self {
3374 let (seq_num, timestamp, tail_offset) = match value.from {
3375 ReadFrom::SeqNum(n) => (Some(n), None, None),
3376 ReadFrom::Timestamp(t) => (None, Some(t), None),
3377 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3378 };
3379 Self {
3380 seq_num,
3381 timestamp,
3382 tail_offset,
3383 clamp: if value.clamp_to_tail {
3384 Some(true)
3385 } else {
3386 None
3387 },
3388 }
3389 }
3390}
3391
3392#[derive(Debug, Clone, Default)]
3393#[non_exhaustive]
3394pub struct ReadLimits {
3396 pub count: Option<usize>,
3400 pub bytes: Option<usize>,
3404}
3405
3406impl ReadLimits {
3407 pub fn new() -> Self {
3409 Self::default()
3410 }
3411
3412 pub fn with_count(self, count: usize) -> Self {
3414 Self {
3415 count: Some(count),
3416 ..self
3417 }
3418 }
3419
3420 pub fn with_bytes(self, bytes: usize) -> Self {
3422 Self {
3423 bytes: Some(bytes),
3424 ..self
3425 }
3426 }
3427}
3428
3429#[derive(Debug, Clone, Default)]
3430#[non_exhaustive]
3431pub struct ReadStop {
3433 pub limits: ReadLimits,
3437 pub until: Option<RangeTo<u64>>,
3441 pub wait: Option<u32>,
3451}
3452
3453impl ReadStop {
3454 pub fn new() -> Self {
3456 Self::default()
3457 }
3458
3459 pub fn with_limits(self, limits: ReadLimits) -> Self {
3461 Self { limits, ..self }
3462 }
3463
3464 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3466 Self {
3467 until: Some(until),
3468 ..self
3469 }
3470 }
3471
3472 pub fn with_wait(self, wait: u32) -> Self {
3474 Self {
3475 wait: Some(wait),
3476 ..self
3477 }
3478 }
3479}
3480
3481impl From<ReadStop> for api::stream::ReadEnd {
3482 fn from(value: ReadStop) -> Self {
3483 Self {
3484 count: value.limits.count,
3485 bytes: value.limits.bytes,
3486 until: value.until.map(|r| r.end),
3487 wait: value.wait,
3488 }
3489 }
3490}
3491
3492#[derive(Debug, Clone, Default)]
3493#[non_exhaustive]
3494pub struct ReadInput {
3497 pub start: ReadStart,
3501 pub stop: ReadStop,
3505 pub ignore_command_records: bool,
3509}
3510
3511impl ReadInput {
3512 pub fn new() -> Self {
3514 Self::default()
3515 }
3516
3517 pub fn with_start(self, start: ReadStart) -> Self {
3519 Self { start, ..self }
3520 }
3521
3522 pub fn with_stop(self, stop: ReadStop) -> Self {
3524 Self { stop, ..self }
3525 }
3526
3527 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3529 Self {
3530 ignore_command_records,
3531 ..self
3532 }
3533 }
3534}
3535
3536#[derive(Debug, Clone)]
3537#[non_exhaustive]
3538pub struct SequencedRecord {
3540 pub seq_num: u64,
3542 pub body: Bytes,
3544 pub headers: Vec<Header>,
3546 pub timestamp: u64,
3548}
3549
3550impl SequencedRecord {
3551 pub fn is_command_record(&self) -> bool {
3553 self.headers.len() == 1 && *self.headers[0].name == *b""
3554 }
3555}
3556
3557impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3558 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3559 Self {
3560 seq_num: value.seq_num,
3561 body: value.body,
3562 headers: value.headers.into_iter().map(Into::into).collect(),
3563 timestamp: value.timestamp,
3564 }
3565 }
3566}
3567
3568metered_bytes_impl!(SequencedRecord);
3569
3570#[derive(Debug, Clone)]
3571#[non_exhaustive]
3572pub struct ReadBatch {
3575 pub records: Vec<SequencedRecord>,
3582 pub tail: Option<StreamPosition>,
3587}
3588
3589impl ReadBatch {
3590 pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3591 Self {
3592 records: batch.records.into_iter().map(Into::into).collect(),
3593 tail: batch.tail.map(Into::into),
3594 }
3595 }
3596}
3597
3598pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3600
3601#[derive(Debug, Clone, thiserror::Error)]
3602pub enum AppendConditionFailed {
3604 #[error("fencing token mismatch, expected: {0}")]
3605 FencingTokenMismatch(FencingToken),
3607 #[error("sequence number mismatch, expected: {0}")]
3608 SeqNumMismatch(u64),
3610}
3611
3612impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3613 fn from(value: api::stream::AppendConditionFailed) -> Self {
3614 match value {
3615 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3616 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3617 }
3618 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3619 AppendConditionFailed::SeqNumMismatch(seq)
3620 }
3621 }
3622 }
3623}
3624
3625#[derive(Debug, Clone, thiserror::Error)]
3626pub enum S2Error {
3628 #[error("{0}")]
3629 Client(String),
3631 #[error(transparent)]
3632 Validation(#[from] ValidationError),
3634 #[error("{0}")]
3635 AppendConditionFailed(AppendConditionFailed),
3637 #[error("read from an unwritten position. current tail: {0}")]
3638 ReadUnwritten(StreamPosition),
3640 #[error("{0}")]
3641 Server(ErrorResponse),
3643}
3644
3645impl From<ApiError> for S2Error {
3646 fn from(err: ApiError) -> Self {
3647 match err {
3648 ApiError::ReadUnwritten(tail_response) => {
3649 Self::ReadUnwritten(tail_response.tail.into())
3650 }
3651 ApiError::AppendConditionFailed(condition_failed) => {
3652 Self::AppendConditionFailed(condition_failed.into())
3653 }
3654 ApiError::Server(_, response) => Self::Server(response.into()),
3655 other => Self::Client(other.to_string()),
3656 }
3657 }
3658}
3659
3660#[derive(Debug, Clone, thiserror::Error)]
3661#[error("{code}: {message}")]
3662#[non_exhaustive]
3663pub struct ErrorResponse {
3665 pub code: String,
3667 pub message: String,
3669}
3670
3671impl From<ApiErrorResponse> for ErrorResponse {
3672 fn from(response: ApiErrorResponse) -> Self {
3673 Self {
3674 code: response.code,
3675 message: response.message,
3676 }
3677 }
3678}
3679
3680fn idempotency_token() -> String {
3681 uuid::Uuid::new_v4().simple().to_string()
3682}