1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::encryption::EncryptionAlgorithm;
24pub use s2_common::encryption::EncryptionKey;
26pub use s2_common::types::ValidationError;
28pub use s2_common::types::access::AccessTokenId;
32pub use s2_common::types::access::AccessTokenIdPrefix;
34pub use s2_common::types::access::AccessTokenIdStartAfter;
36pub use s2_common::types::basin::BasinName;
41pub use s2_common::types::basin::BasinNamePrefix;
43pub use s2_common::types::basin::BasinNameStartAfter;
45pub use s2_common::types::stream::StreamName;
49pub use s2_common::types::stream::StreamNamePrefix;
51pub use s2_common::types::stream::StreamNameStartAfter;
53
54pub(crate) const ONE_MIB: u32 = 1024 * 1024;
55
56use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
57use secrecy::SecretString;
58
59use crate::api::{ApiError, ApiErrorResponse};
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct S2DateTime(time::OffsetDateTime);
68
69impl TryFrom<time::OffsetDateTime> for S2DateTime {
70 type Error = ValidationError;
71
72 fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
73 dt.format(&time::format_description::well_known::Rfc3339)
74 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
75 Ok(Self(dt))
76 }
77}
78
79impl From<S2DateTime> for time::OffsetDateTime {
80 fn from(dt: S2DateTime) -> Self {
81 dt.0
82 }
83}
84
85impl FromStr for S2DateTime {
86 type Err = ValidationError;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
90 .map(Self)
91 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
92 }
93}
94
95impl fmt::Display for S2DateTime {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 write!(
98 f,
99 "{}",
100 self.0
101 .format(&time::format_description::well_known::Rfc3339)
102 .expect("RFC3339 formatting should not fail for S2DateTime")
103 )
104 }
105}
106
107#[derive(Debug, Clone, PartialEq)]
109pub(crate) enum BasinAuthority {
110 ParentZone(Authority),
112 Direct(Authority),
114}
115
116#[derive(Debug, Clone)]
118pub struct AccountEndpoint {
119 scheme: Scheme,
120 authority: Authority,
121}
122
123impl AccountEndpoint {
124 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
126 endpoint.parse()
127 }
128}
129
130impl FromStr for AccountEndpoint {
131 type Err = ValidationError;
132
133 fn from_str(s: &str) -> Result<Self, Self::Err> {
134 let (scheme, authority) = match s.find("://") {
135 Some(idx) => {
136 let scheme: Scheme = s[..idx]
137 .parse()
138 .map_err(|_| "invalid account endpoint scheme".to_string())?;
139 (scheme, &s[idx + 3..])
140 }
141 None => (Scheme::HTTPS, s),
142 };
143 Ok(Self {
144 scheme,
145 authority: authority
146 .parse()
147 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
148 })
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct BasinEndpoint {
155 scheme: Scheme,
156 authority: BasinAuthority,
157}
158
159impl BasinEndpoint {
160 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
162 endpoint.parse()
163 }
164}
165
166impl FromStr for BasinEndpoint {
167 type Err = ValidationError;
168
169 fn from_str(s: &str) -> Result<Self, Self::Err> {
170 let (scheme, authority) = match s.find("://") {
171 Some(idx) => {
172 let scheme: Scheme = s[..idx]
173 .parse()
174 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
175 (scheme, &s[idx + 3..])
176 }
177 None => (Scheme::HTTPS, s),
178 };
179 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
180 BasinAuthority::ParentZone(
181 authority
182 .parse()
183 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
184 )
185 } else {
186 BasinAuthority::Direct(
187 authority
188 .parse()
189 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
190 )
191 };
192 Ok(Self { scheme, authority })
193 }
194}
195
196#[derive(Debug, Clone)]
197#[non_exhaustive]
198pub struct S2Endpoints {
200 pub(crate) scheme: Scheme,
201 pub(crate) account_authority: Authority,
202 pub(crate) basin_authority: BasinAuthority,
203}
204
205impl S2Endpoints {
206 pub fn new(
208 account_endpoint: AccountEndpoint,
209 basin_endpoint: BasinEndpoint,
210 ) -> Result<Self, ValidationError> {
211 if account_endpoint.scheme != basin_endpoint.scheme {
212 return Err("account and basin endpoints must have the same scheme".into());
213 }
214 Ok(Self {
215 scheme: account_endpoint.scheme,
216 account_authority: account_endpoint.authority,
217 basin_authority: basin_endpoint.authority,
218 })
219 }
220
221 pub fn from_env() -> Result<Self, ValidationError> {
227 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
228 Ok(endpoint) => endpoint.parse()?,
229 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
230 Err(VarError::NotUnicode(_)) => {
231 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
232 }
233 };
234
235 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
236 Ok(endpoint) => endpoint.parse()?,
237 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
238 Err(VarError::NotUnicode(_)) => {
239 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
240 }
241 };
242
243 if account_endpoint.scheme != basin_endpoint.scheme {
244 return Err(
245 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
246 );
247 }
248
249 Ok(Self {
250 scheme: account_endpoint.scheme,
251 account_authority: account_endpoint.authority,
252 basin_authority: basin_endpoint.authority,
253 })
254 }
255
256 pub(crate) fn for_aws() -> Self {
257 Self {
258 scheme: Scheme::HTTPS,
259 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
260 basin_authority: BasinAuthority::ParentZone(
261 "b.s2.dev".try_into().expect("valid authority"),
262 ),
263 }
264 }
265}
266
267#[derive(Debug, Clone, Copy)]
268pub enum Compression {
270 None,
272 Gzip,
274 Zstd,
276}
277
278impl From<Compression> for CompressionAlgorithm {
279 fn from(value: Compression) -> Self {
280 match value {
281 Compression::None => CompressionAlgorithm::None,
282 Compression::Gzip => CompressionAlgorithm::Gzip,
283 Compression::Zstd => CompressionAlgorithm::Zstd,
284 }
285 }
286}
287
288#[derive(Debug, Clone, Copy, PartialEq)]
289#[non_exhaustive]
290pub enum AppendRetryPolicy {
293 All,
295 NoSideEffects,
305}
306
307#[derive(Debug, Clone)]
308#[non_exhaustive]
309pub struct RetryConfig {
318 pub max_attempts: NonZeroU32,
322 pub min_base_delay: Duration,
326 pub max_base_delay: Duration,
330 pub append_retry_policy: AppendRetryPolicy,
335}
336
337impl Default for RetryConfig {
338 fn default() -> Self {
339 Self {
340 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
341 min_base_delay: Duration::from_millis(100),
342 max_base_delay: Duration::from_secs(1),
343 append_retry_policy: AppendRetryPolicy::All,
344 }
345 }
346}
347
348impl RetryConfig {
349 pub fn new() -> Self {
351 Self::default()
352 }
353
354 pub(crate) fn max_retries(&self) -> u32 {
355 self.max_attempts.get() - 1
356 }
357
358 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
360 Self {
361 max_attempts,
362 ..self
363 }
364 }
365
366 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
368 Self {
369 min_base_delay,
370 ..self
371 }
372 }
373
374 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
376 Self {
377 max_base_delay,
378 ..self
379 }
380 }
381
382 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
385 Self {
386 append_retry_policy,
387 ..self
388 }
389 }
390}
391
392#[derive(Debug, Clone)]
393#[non_exhaustive]
394pub struct S2Config {
396 pub(crate) access_token: SecretString,
397 pub(crate) endpoints: S2Endpoints,
398 pub(crate) connection_timeout: Duration,
399 pub(crate) request_timeout: Duration,
400 pub(crate) retry: RetryConfig,
401 pub(crate) compression: Compression,
402 pub(crate) user_agent: HeaderValue,
403 pub(crate) insecure_skip_cert_verification: bool,
404}
405
406impl S2Config {
407 pub fn new(access_token: impl Into<String>) -> Self {
409 Self {
410 access_token: access_token.into().into(),
411 endpoints: S2Endpoints::for_aws(),
412 connection_timeout: Duration::from_secs(3),
413 request_timeout: Duration::from_secs(5),
414 retry: RetryConfig::new(),
415 compression: Compression::None,
416 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
417 .parse()
418 .expect("valid user agent"),
419 insecure_skip_cert_verification: false,
420 }
421 }
422
423 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
425 Self { endpoints, ..self }
426 }
427
428 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
432 Self {
433 connection_timeout,
434 ..self
435 }
436 }
437
438 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
442 Self {
443 request_timeout,
444 ..self
445 }
446 }
447
448 pub fn with_retry(self, retry: RetryConfig) -> Self {
452 Self { retry, ..self }
453 }
454
455 pub fn with_compression(self, compression: Compression) -> Self {
459 Self {
460 compression,
461 ..self
462 }
463 }
464
465 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
477 Self {
478 insecure_skip_cert_verification: skip,
479 ..self
480 }
481 }
482
483 #[doc(hidden)]
484 #[cfg(feature = "_hidden")]
485 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
486 let user_agent = user_agent
487 .into()
488 .parse()
489 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
490 Ok(Self { user_agent, ..self })
491 }
492}
493
494#[derive(Debug, Default, Clone, PartialEq, Eq)]
495#[non_exhaustive]
496pub struct Page<T> {
498 pub values: Vec<T>,
500 pub has_more: bool,
502}
503
504impl<T> Page<T> {
505 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
506 Self {
507 values: values.into(),
508 has_more,
509 }
510 }
511}
512
513#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514pub enum StorageClass {
516 Standard,
518 Express,
520}
521
522impl From<api::config::StorageClass> for StorageClass {
523 fn from(value: api::config::StorageClass) -> Self {
524 match value {
525 api::config::StorageClass::Standard => StorageClass::Standard,
526 api::config::StorageClass::Express => StorageClass::Express,
527 }
528 }
529}
530
531impl From<StorageClass> for api::config::StorageClass {
532 fn from(value: StorageClass) -> Self {
533 match value {
534 StorageClass::Standard => api::config::StorageClass::Standard,
535 StorageClass::Express => api::config::StorageClass::Express,
536 }
537 }
538}
539
540#[derive(Debug, Clone, Copy, PartialEq, Eq)]
541pub enum RetentionPolicy {
543 Age(u64),
545 Infinite,
547}
548
549impl From<api::config::RetentionPolicy> for RetentionPolicy {
550 fn from(value: api::config::RetentionPolicy) -> Self {
551 match value {
552 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
553 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
554 }
555 }
556}
557
558impl From<RetentionPolicy> for api::config::RetentionPolicy {
559 fn from(value: RetentionPolicy) -> Self {
560 match value {
561 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
562 RetentionPolicy::Infinite => {
563 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
564 }
565 }
566 }
567}
568
569#[derive(Debug, Clone, Copy, PartialEq, Eq)]
570pub enum TimestampingMode {
572 ClientPrefer,
574 ClientRequire,
576 Arrival,
578}
579
580impl From<api::config::TimestampingMode> for TimestampingMode {
581 fn from(value: api::config::TimestampingMode) -> Self {
582 match value {
583 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
584 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
585 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
586 }
587 }
588}
589
590impl From<TimestampingMode> for api::config::TimestampingMode {
591 fn from(value: TimestampingMode) -> Self {
592 match value {
593 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
594 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
595 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
596 }
597 }
598}
599
600#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
601#[non_exhaustive]
602pub struct TimestampingConfig {
604 pub mode: Option<TimestampingMode>,
608 pub uncapped: bool,
612}
613
614impl TimestampingConfig {
615 pub fn new() -> Self {
617 Self::default()
618 }
619
620 pub fn with_mode(self, mode: TimestampingMode) -> Self {
622 Self {
623 mode: Some(mode),
624 ..self
625 }
626 }
627
628 pub fn with_uncapped(self, uncapped: bool) -> Self {
630 Self { uncapped, ..self }
631 }
632}
633
634impl From<api::config::TimestampingConfig> for TimestampingConfig {
635 fn from(value: api::config::TimestampingConfig) -> Self {
636 Self {
637 mode: value.mode.map(Into::into),
638 uncapped: value.uncapped.unwrap_or_default(),
639 }
640 }
641}
642
643impl From<TimestampingConfig> for api::config::TimestampingConfig {
644 fn from(value: TimestampingConfig) -> Self {
645 Self {
646 mode: value.mode.map(Into::into),
647 uncapped: Some(value.uncapped),
648 }
649 }
650}
651
652#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
653#[non_exhaustive]
654pub struct DeleteOnEmptyConfig {
656 pub min_age_secs: u64,
660}
661
662impl DeleteOnEmptyConfig {
663 pub fn new() -> Self {
665 Self::default()
666 }
667
668 pub fn with_min_age(self, min_age: Duration) -> Self {
670 Self {
671 min_age_secs: min_age.as_secs(),
672 }
673 }
674}
675
676impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
677 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
678 Self {
679 min_age_secs: value.min_age_secs,
680 }
681 }
682}
683
684impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
685 fn from(value: DeleteOnEmptyConfig) -> Self {
686 Self {
687 min_age_secs: value.min_age_secs,
688 }
689 }
690}
691
692#[derive(Debug, Clone, Default, PartialEq, Eq)]
693#[non_exhaustive]
694pub struct StreamConfig {
696 pub storage_class: Option<StorageClass>,
700 pub retention_policy: Option<RetentionPolicy>,
704 pub timestamping: Option<TimestampingConfig>,
708 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
712}
713
714impl StreamConfig {
715 pub fn new() -> Self {
717 Self::default()
718 }
719
720 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
722 Self {
723 storage_class: Some(storage_class),
724 ..self
725 }
726 }
727
728 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
730 Self {
731 retention_policy: Some(retention_policy),
732 ..self
733 }
734 }
735
736 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
738 Self {
739 timestamping: Some(timestamping),
740 ..self
741 }
742 }
743
744 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
746 Self {
747 delete_on_empty: Some(delete_on_empty),
748 ..self
749 }
750 }
751}
752
753impl From<api::config::StreamConfig> for StreamConfig {
754 fn from(value: api::config::StreamConfig) -> Self {
755 Self {
756 storage_class: value.storage_class.map(Into::into),
757 retention_policy: value.retention_policy.map(Into::into),
758 timestamping: value.timestamping.map(Into::into),
759 delete_on_empty: value.delete_on_empty.map(Into::into),
760 }
761 }
762}
763
764impl From<StreamConfig> for api::config::StreamConfig {
765 fn from(value: StreamConfig) -> Self {
766 Self {
767 storage_class: value.storage_class.map(Into::into),
768 retention_policy: value.retention_policy.map(Into::into),
769 timestamping: value.timestamping.map(Into::into),
770 delete_on_empty: value.delete_on_empty.map(Into::into),
771 }
772 }
773}
774
775#[derive(Debug, Clone, Default)]
776#[non_exhaustive]
777pub struct BasinConfig {
779 pub default_stream_config: Option<StreamConfig>,
783 pub stream_cipher: Option<EncryptionAlgorithm>,
785 pub create_stream_on_append: bool,
789 pub create_stream_on_read: bool,
793}
794
795impl BasinConfig {
796 pub fn new() -> Self {
798 Self::default()
799 }
800
801 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
803 Self {
804 default_stream_config: Some(config),
805 ..self
806 }
807 }
808
809 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
811 Self {
812 stream_cipher: Some(stream_cipher),
813 ..self
814 }
815 }
816
817 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
820 Self {
821 create_stream_on_append,
822 ..self
823 }
824 }
825
826 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
828 Self {
829 create_stream_on_read,
830 ..self
831 }
832 }
833}
834
835impl From<api::config::BasinConfig> for BasinConfig {
836 fn from(value: api::config::BasinConfig) -> Self {
837 Self {
838 default_stream_config: value.default_stream_config.map(Into::into),
839 stream_cipher: value.stream_cipher.map(Into::into),
840 create_stream_on_append: value.create_stream_on_append,
841 create_stream_on_read: value.create_stream_on_read,
842 }
843 }
844}
845
846impl From<BasinConfig> for api::config::BasinConfig {
847 fn from(value: BasinConfig) -> Self {
848 Self {
849 default_stream_config: value.default_stream_config.map(Into::into),
850 stream_cipher: value.stream_cipher.map(Into::into),
851 create_stream_on_append: value.create_stream_on_append,
852 create_stream_on_read: value.create_stream_on_read,
853 }
854 }
855}
856
857#[derive(Debug, Clone, PartialEq, Eq)]
858pub enum BasinScope {
860 AwsUsEast1,
862 AwsUsWest2,
864 AwsEuNorth1,
866}
867
868impl From<api::basin::BasinScope> for BasinScope {
869 fn from(value: api::basin::BasinScope) -> Self {
870 match value {
871 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
872 api::basin::BasinScope::AwsUsWest2 => BasinScope::AwsUsWest2,
873 api::basin::BasinScope::AwsEuNorth1 => BasinScope::AwsEuNorth1,
874 }
875 }
876}
877
878impl From<BasinScope> for api::basin::BasinScope {
879 fn from(value: BasinScope) -> Self {
880 match value {
881 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
882 BasinScope::AwsUsWest2 => api::basin::BasinScope::AwsUsWest2,
883 BasinScope::AwsEuNorth1 => api::basin::BasinScope::AwsEuNorth1,
884 }
885 }
886}
887
888#[doc(hidden)]
893#[cfg(feature = "_hidden")]
894#[derive(Debug, Clone, PartialEq, Eq)]
895pub enum CreateOrReconfigured<T> {
896 Created(T),
898 Reconfigured(T),
900}
901
902#[cfg(feature = "_hidden")]
903impl<T> CreateOrReconfigured<T> {
904 pub fn is_created(&self) -> bool {
906 matches!(self, Self::Created(_))
907 }
908
909 pub fn into_inner(self) -> T {
911 match self {
912 Self::Created(t) | Self::Reconfigured(t) => t,
913 }
914 }
915}
916
917#[derive(Debug, Clone)]
918#[non_exhaustive]
919pub struct CreateBasinInput {
921 pub name: BasinName,
923 pub config: Option<BasinConfig>,
927 pub scope: Option<BasinScope>,
931 idempotency_token: String,
932}
933
934impl CreateBasinInput {
935 pub fn new(name: BasinName) -> Self {
937 Self {
938 name,
939 config: None,
940 scope: None,
941 idempotency_token: idempotency_token(),
942 }
943 }
944
945 pub fn with_config(self, config: BasinConfig) -> Self {
947 Self {
948 config: Some(config),
949 ..self
950 }
951 }
952
953 pub fn with_scope(self, scope: BasinScope) -> Self {
955 Self {
956 scope: Some(scope),
957 ..self
958 }
959 }
960}
961
962impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
963 fn from(value: CreateBasinInput) -> Self {
964 (
965 api::basin::CreateBasinRequest {
966 basin: value.name,
967 config: value.config.map(Into::into),
968 scope: value.scope.map(Into::into),
969 },
970 value.idempotency_token,
971 )
972 }
973}
974
975#[derive(Debug, Clone)]
976#[non_exhaustive]
977#[doc(hidden)]
979#[cfg(feature = "_hidden")]
980pub struct CreateOrReconfigureBasinInput {
981 pub name: BasinName,
983 pub config: Option<BasinReconfiguration>,
987 pub scope: Option<BasinScope>,
991}
992
993#[cfg(feature = "_hidden")]
994impl CreateOrReconfigureBasinInput {
995 pub fn new(name: BasinName) -> Self {
997 Self {
998 name,
999 config: None,
1000 scope: None,
1001 }
1002 }
1003
1004 pub fn with_config(self, config: BasinReconfiguration) -> Self {
1006 Self {
1007 config: Some(config),
1008 ..self
1009 }
1010 }
1011
1012 pub fn with_scope(self, scope: BasinScope) -> Self {
1014 Self {
1015 scope: Some(scope),
1016 ..self
1017 }
1018 }
1019}
1020
1021#[cfg(feature = "_hidden")]
1022impl From<CreateOrReconfigureBasinInput>
1023 for (
1024 BasinName,
1025 Option<api::basin::CreateOrReconfigureBasinRequest>,
1026 )
1027{
1028 fn from(value: CreateOrReconfigureBasinInput) -> Self {
1029 let request = if value.config.is_some() || value.scope.is_some() {
1030 Some(api::basin::CreateOrReconfigureBasinRequest {
1031 config: value.config.map(Into::into),
1032 scope: value.scope.map(Into::into),
1033 })
1034 } else {
1035 None
1036 };
1037 (value.name, request)
1038 }
1039}
1040
1041#[derive(Debug, Clone, Default)]
1042#[non_exhaustive]
1043pub struct ListBasinsInput {
1045 pub prefix: BasinNamePrefix,
1049 pub start_after: BasinNameStartAfter,
1055 pub limit: Option<usize>,
1059}
1060
1061impl ListBasinsInput {
1062 pub fn new() -> Self {
1064 Self::default()
1065 }
1066
1067 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1069 Self { prefix, ..self }
1070 }
1071
1072 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1075 Self {
1076 start_after,
1077 ..self
1078 }
1079 }
1080
1081 pub fn with_limit(self, limit: usize) -> Self {
1083 Self {
1084 limit: Some(limit),
1085 ..self
1086 }
1087 }
1088}
1089
1090impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1091 fn from(value: ListBasinsInput) -> Self {
1092 Self {
1093 prefix: Some(value.prefix),
1094 start_after: Some(value.start_after),
1095 limit: value.limit,
1096 }
1097 }
1098}
1099
1100#[derive(Debug, Clone, Default)]
1101pub struct ListAllBasinsInput {
1103 pub prefix: BasinNamePrefix,
1107 pub start_after: BasinNameStartAfter,
1113 pub include_deleted: bool,
1117}
1118
1119impl ListAllBasinsInput {
1120 pub fn new() -> Self {
1122 Self::default()
1123 }
1124
1125 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1127 Self { prefix, ..self }
1128 }
1129
1130 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1133 Self {
1134 start_after,
1135 ..self
1136 }
1137 }
1138
1139 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1141 Self {
1142 include_deleted,
1143 ..self
1144 }
1145 }
1146}
1147
1148#[derive(Debug, Clone, PartialEq, Eq)]
1149#[non_exhaustive]
1150pub struct BasinInfo {
1152 pub name: BasinName,
1154 pub scope: Option<BasinScope>,
1156 pub created_at: S2DateTime,
1158 pub deleted_at: Option<S2DateTime>,
1160}
1161
1162impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1163 type Error = ValidationError;
1164
1165 fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1166 Ok(Self {
1167 name: value.name,
1168 scope: value.scope.map(Into::into),
1169 created_at: value.created_at.try_into()?,
1170 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1171 })
1172 }
1173}
1174
1175#[derive(Debug, Clone)]
1176#[non_exhaustive]
1177pub struct DeleteBasinInput {
1179 pub name: BasinName,
1181 pub ignore_not_found: bool,
1183}
1184
1185impl DeleteBasinInput {
1186 pub fn new(name: BasinName) -> Self {
1188 Self {
1189 name,
1190 ignore_not_found: false,
1191 }
1192 }
1193
1194 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1196 Self {
1197 ignore_not_found,
1198 ..self
1199 }
1200 }
1201}
1202
1203#[derive(Debug, Clone, Default)]
1204#[non_exhaustive]
1205pub struct TimestampingReconfiguration {
1207 pub mode: Maybe<Option<TimestampingMode>>,
1209 pub uncapped: Maybe<Option<bool>>,
1211}
1212
1213impl TimestampingReconfiguration {
1214 pub fn new() -> Self {
1216 Self::default()
1217 }
1218
1219 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1221 Self {
1222 mode: Maybe::Specified(Some(mode)),
1223 ..self
1224 }
1225 }
1226
1227 pub fn with_uncapped(self, uncapped: bool) -> Self {
1229 Self {
1230 uncapped: Maybe::Specified(Some(uncapped)),
1231 ..self
1232 }
1233 }
1234}
1235
1236impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1237 fn from(value: TimestampingReconfiguration) -> Self {
1238 Self {
1239 mode: value.mode.map(|m| m.map(Into::into)),
1240 uncapped: value.uncapped,
1241 }
1242 }
1243}
1244
1245#[derive(Debug, Clone, Default)]
1246#[non_exhaustive]
1247pub struct DeleteOnEmptyReconfiguration {
1249 pub min_age_secs: Maybe<Option<u64>>,
1251}
1252
1253impl DeleteOnEmptyReconfiguration {
1254 pub fn new() -> Self {
1256 Self::default()
1257 }
1258
1259 pub fn with_min_age(self, min_age: Duration) -> Self {
1261 Self {
1262 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1263 }
1264 }
1265}
1266
1267impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1268 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1269 Self {
1270 min_age_secs: value.min_age_secs,
1271 }
1272 }
1273}
1274
1275#[derive(Debug, Clone, Default)]
1276#[non_exhaustive]
1277pub struct StreamReconfiguration {
1279 pub storage_class: Maybe<Option<StorageClass>>,
1281 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1283 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1285 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1287}
1288
1289impl StreamReconfiguration {
1290 pub fn new() -> Self {
1292 Self::default()
1293 }
1294
1295 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1297 Self {
1298 storage_class: Maybe::Specified(Some(storage_class)),
1299 ..self
1300 }
1301 }
1302
1303 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1305 Self {
1306 retention_policy: Maybe::Specified(Some(retention_policy)),
1307 ..self
1308 }
1309 }
1310
1311 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1313 Self {
1314 timestamping: Maybe::Specified(Some(timestamping)),
1315 ..self
1316 }
1317 }
1318
1319 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1321 Self {
1322 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1323 ..self
1324 }
1325 }
1326}
1327
1328impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1329 fn from(value: StreamReconfiguration) -> Self {
1330 Self {
1331 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1332 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1333 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1334 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1335 }
1336 }
1337}
1338
1339#[derive(Debug, Clone, Default)]
1340#[non_exhaustive]
1341pub struct BasinReconfiguration {
1343 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1345 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1347 pub create_stream_on_append: Maybe<bool>,
1350 pub create_stream_on_read: Maybe<bool>,
1352}
1353
1354impl BasinReconfiguration {
1355 pub fn new() -> Self {
1357 Self::default()
1358 }
1359
1360 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1363 Self {
1364 default_stream_config: Maybe::Specified(Some(config)),
1365 ..self
1366 }
1367 }
1368
1369 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1371 Self {
1372 stream_cipher: Maybe::Specified(Some(stream_cipher)),
1373 ..self
1374 }
1375 }
1376
1377 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1380 Self {
1381 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1382 ..self
1383 }
1384 }
1385
1386 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1389 Self {
1390 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1391 ..self
1392 }
1393 }
1394}
1395
1396impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1397 fn from(value: BasinReconfiguration) -> Self {
1398 Self {
1399 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1400 stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1401 create_stream_on_append: value.create_stream_on_append,
1402 create_stream_on_read: value.create_stream_on_read,
1403 }
1404 }
1405}
1406
1407#[derive(Debug, Clone)]
1408#[non_exhaustive]
1409pub struct ReconfigureBasinInput {
1411 pub name: BasinName,
1413 pub config: BasinReconfiguration,
1415}
1416
1417impl ReconfigureBasinInput {
1418 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1420 Self { name, config }
1421 }
1422}
1423
1424#[derive(Debug, Clone, Default)]
1425#[non_exhaustive]
1426pub struct ListAccessTokensInput {
1428 pub prefix: AccessTokenIdPrefix,
1432 pub start_after: AccessTokenIdStartAfter,
1438 pub limit: Option<usize>,
1442}
1443
1444impl ListAccessTokensInput {
1445 pub fn new() -> Self {
1447 Self::default()
1448 }
1449
1450 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1452 Self { prefix, ..self }
1453 }
1454
1455 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1458 Self {
1459 start_after,
1460 ..self
1461 }
1462 }
1463
1464 pub fn with_limit(self, limit: usize) -> Self {
1466 Self {
1467 limit: Some(limit),
1468 ..self
1469 }
1470 }
1471}
1472
1473impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1474 fn from(value: ListAccessTokensInput) -> Self {
1475 Self {
1476 prefix: Some(value.prefix),
1477 start_after: Some(value.start_after),
1478 limit: value.limit,
1479 }
1480 }
1481}
1482
1483#[derive(Debug, Clone, Default)]
1484pub struct ListAllAccessTokensInput {
1486 pub prefix: AccessTokenIdPrefix,
1490 pub start_after: AccessTokenIdStartAfter,
1496}
1497
1498impl ListAllAccessTokensInput {
1499 pub fn new() -> Self {
1501 Self::default()
1502 }
1503
1504 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1506 Self { prefix, ..self }
1507 }
1508
1509 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1512 Self {
1513 start_after,
1514 ..self
1515 }
1516 }
1517}
1518
1519#[derive(Debug, Clone)]
1520#[non_exhaustive]
1521pub struct AccessTokenInfo {
1523 pub id: AccessTokenId,
1525 pub expires_at: S2DateTime,
1527 pub auto_prefix_streams: bool,
1530 pub scope: AccessTokenScope,
1532}
1533
1534impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1535 type Error = ValidationError;
1536
1537 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1538 let expires_at = value
1539 .expires_at
1540 .map(S2DateTime::try_from)
1541 .transpose()?
1542 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1543 Ok(Self {
1544 id: value.id,
1545 expires_at,
1546 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1547 scope: value.scope.into(),
1548 })
1549 }
1550}
1551
1552#[derive(Debug, Clone)]
1553pub enum BasinMatcher {
1557 None,
1559 Exact(BasinName),
1561 Prefix(BasinNamePrefix),
1563}
1564
1565#[derive(Debug, Clone)]
1566pub enum StreamMatcher {
1570 None,
1572 Exact(StreamName),
1574 Prefix(StreamNamePrefix),
1576}
1577
1578#[derive(Debug, Clone)]
1579pub enum AccessTokenMatcher {
1583 None,
1585 Exact(AccessTokenId),
1587 Prefix(AccessTokenIdPrefix),
1589}
1590
1591#[derive(Debug, Clone, Default)]
1592#[non_exhaustive]
1593pub struct ReadWritePermissions {
1595 pub read: bool,
1599 pub write: bool,
1603}
1604
1605impl ReadWritePermissions {
1606 pub fn new() -> Self {
1608 Self::default()
1609 }
1610
1611 pub fn read_only() -> Self {
1613 Self {
1614 read: true,
1615 write: false,
1616 }
1617 }
1618
1619 pub fn write_only() -> Self {
1621 Self {
1622 read: false,
1623 write: true,
1624 }
1625 }
1626
1627 pub fn read_write() -> Self {
1629 Self {
1630 read: true,
1631 write: true,
1632 }
1633 }
1634}
1635
1636impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1637 fn from(value: ReadWritePermissions) -> Self {
1638 Self {
1639 read: Some(value.read),
1640 write: Some(value.write),
1641 }
1642 }
1643}
1644
1645impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1646 fn from(value: api::access::ReadWritePermissions) -> Self {
1647 Self {
1648 read: value.read.unwrap_or_default(),
1649 write: value.write.unwrap_or_default(),
1650 }
1651 }
1652}
1653
1654#[derive(Debug, Clone, Default)]
1655#[non_exhaustive]
1656pub struct OperationGroupPermissions {
1660 pub account: Option<ReadWritePermissions>,
1664 pub basin: Option<ReadWritePermissions>,
1668 pub stream: Option<ReadWritePermissions>,
1672}
1673
1674impl OperationGroupPermissions {
1675 pub fn new() -> Self {
1677 Self::default()
1678 }
1679
1680 pub fn read_only_all() -> Self {
1682 Self {
1683 account: Some(ReadWritePermissions::read_only()),
1684 basin: Some(ReadWritePermissions::read_only()),
1685 stream: Some(ReadWritePermissions::read_only()),
1686 }
1687 }
1688
1689 pub fn write_only_all() -> Self {
1691 Self {
1692 account: Some(ReadWritePermissions::write_only()),
1693 basin: Some(ReadWritePermissions::write_only()),
1694 stream: Some(ReadWritePermissions::write_only()),
1695 }
1696 }
1697
1698 pub fn read_write_all() -> Self {
1700 Self {
1701 account: Some(ReadWritePermissions::read_write()),
1702 basin: Some(ReadWritePermissions::read_write()),
1703 stream: Some(ReadWritePermissions::read_write()),
1704 }
1705 }
1706
1707 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1709 Self {
1710 account: Some(account),
1711 ..self
1712 }
1713 }
1714
1715 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1717 Self {
1718 basin: Some(basin),
1719 ..self
1720 }
1721 }
1722
1723 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1725 Self {
1726 stream: Some(stream),
1727 ..self
1728 }
1729 }
1730}
1731
1732impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1733 fn from(value: OperationGroupPermissions) -> Self {
1734 Self {
1735 account: value.account.map(Into::into),
1736 basin: value.basin.map(Into::into),
1737 stream: value.stream.map(Into::into),
1738 }
1739 }
1740}
1741
1742impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1743 fn from(value: api::access::PermittedOperationGroups) -> Self {
1744 Self {
1745 account: value.account.map(Into::into),
1746 basin: value.basin.map(Into::into),
1747 stream: value.stream.map(Into::into),
1748 }
1749 }
1750}
1751
1752#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1753pub enum Operation {
1757 ListBasins,
1759 CreateBasin,
1761 GetBasinConfig,
1763 DeleteBasin,
1765 ReconfigureBasin,
1767 ListAccessTokens,
1769 IssueAccessToken,
1771 RevokeAccessToken,
1773 GetAccountMetrics,
1775 GetBasinMetrics,
1777 GetStreamMetrics,
1779 ListStreams,
1781 CreateStream,
1783 GetStreamConfig,
1785 DeleteStream,
1787 ReconfigureStream,
1789 CheckTail,
1791 Append,
1793 Read,
1795 Trim,
1797 Fence,
1799}
1800
1801impl From<Operation> for api::access::Operation {
1802 fn from(value: Operation) -> Self {
1803 match value {
1804 Operation::ListBasins => api::access::Operation::ListBasins,
1805 Operation::CreateBasin => api::access::Operation::CreateBasin,
1806 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1807 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1808 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1809 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1810 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1811 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1812 Operation::ListStreams => api::access::Operation::ListStreams,
1813 Operation::CreateStream => api::access::Operation::CreateStream,
1814 Operation::DeleteStream => api::access::Operation::DeleteStream,
1815 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1816 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1817 Operation::CheckTail => api::access::Operation::CheckTail,
1818 Operation::Append => api::access::Operation::Append,
1819 Operation::Read => api::access::Operation::Read,
1820 Operation::Trim => api::access::Operation::Trim,
1821 Operation::Fence => api::access::Operation::Fence,
1822 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1823 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1824 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1825 }
1826 }
1827}
1828
1829impl From<api::access::Operation> for Operation {
1830 fn from(value: api::access::Operation) -> Self {
1831 match value {
1832 api::access::Operation::ListBasins => Operation::ListBasins,
1833 api::access::Operation::CreateBasin => Operation::CreateBasin,
1834 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1835 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1836 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1837 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1838 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1839 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1840 api::access::Operation::ListStreams => Operation::ListStreams,
1841 api::access::Operation::CreateStream => Operation::CreateStream,
1842 api::access::Operation::DeleteStream => Operation::DeleteStream,
1843 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1844 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1845 api::access::Operation::CheckTail => Operation::CheckTail,
1846 api::access::Operation::Append => Operation::Append,
1847 api::access::Operation::Read => Operation::Read,
1848 api::access::Operation::Trim => Operation::Trim,
1849 api::access::Operation::Fence => Operation::Fence,
1850 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1851 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1852 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1853 }
1854 }
1855}
1856
1857#[derive(Debug, Clone)]
1858#[non_exhaustive]
1859pub struct AccessTokenScopeInput {
1867 basins: Option<BasinMatcher>,
1868 streams: Option<StreamMatcher>,
1869 access_tokens: Option<AccessTokenMatcher>,
1870 op_group_perms: Option<OperationGroupPermissions>,
1871 ops: HashSet<Operation>,
1872}
1873
1874impl AccessTokenScopeInput {
1875 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1877 Self {
1878 basins: None,
1879 streams: None,
1880 access_tokens: None,
1881 op_group_perms: None,
1882 ops: ops.into_iter().collect(),
1883 }
1884 }
1885
1886 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1888 Self {
1889 basins: None,
1890 streams: None,
1891 access_tokens: None,
1892 op_group_perms: Some(op_group_perms),
1893 ops: HashSet::default(),
1894 }
1895 }
1896
1897 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1899 Self {
1900 ops: ops.into_iter().collect(),
1901 ..self
1902 }
1903 }
1904
1905 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1907 Self {
1908 op_group_perms: Some(op_group_perms),
1909 ..self
1910 }
1911 }
1912
1913 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1917 Self {
1918 basins: Some(basins),
1919 ..self
1920 }
1921 }
1922
1923 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1927 Self {
1928 streams: Some(streams),
1929 ..self
1930 }
1931 }
1932
1933 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1937 Self {
1938 access_tokens: Some(access_tokens),
1939 ..self
1940 }
1941 }
1942}
1943
1944#[derive(Debug, Clone)]
1945#[non_exhaustive]
1946pub struct AccessTokenScope {
1948 pub basins: Option<BasinMatcher>,
1950 pub streams: Option<StreamMatcher>,
1952 pub access_tokens: Option<AccessTokenMatcher>,
1954 pub op_group_perms: Option<OperationGroupPermissions>,
1956 pub ops: HashSet<Operation>,
1958}
1959
1960impl From<api::access::AccessTokenScope> for AccessTokenScope {
1961 fn from(value: api::access::AccessTokenScope) -> Self {
1962 Self {
1963 basins: value.basins.map(|rs| match rs {
1964 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1965 BasinMatcher::Exact(e)
1966 }
1967 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1968 BasinMatcher::None
1969 }
1970 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1971 }),
1972 streams: value.streams.map(|rs| match rs {
1973 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1974 StreamMatcher::Exact(e)
1975 }
1976 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1977 StreamMatcher::None
1978 }
1979 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1980 }),
1981 access_tokens: value.access_tokens.map(|rs| match rs {
1982 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1983 AccessTokenMatcher::Exact(e)
1984 }
1985 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1986 AccessTokenMatcher::None
1987 }
1988 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1989 }),
1990 op_group_perms: value.op_groups.map(Into::into),
1991 ops: value
1992 .ops
1993 .map(|ops| ops.into_iter().map(Into::into).collect())
1994 .unwrap_or_default(),
1995 }
1996 }
1997}
1998
1999impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2000 fn from(value: AccessTokenScopeInput) -> Self {
2001 Self {
2002 basins: value.basins.map(|rs| match rs {
2003 BasinMatcher::None => {
2004 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2005 }
2006 BasinMatcher::Exact(e) => {
2007 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2008 }
2009 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2010 }),
2011 streams: value.streams.map(|rs| match rs {
2012 StreamMatcher::None => {
2013 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2014 }
2015 StreamMatcher::Exact(e) => {
2016 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2017 }
2018 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2019 }),
2020 access_tokens: value.access_tokens.map(|rs| match rs {
2021 AccessTokenMatcher::None => {
2022 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2023 }
2024 AccessTokenMatcher::Exact(e) => {
2025 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2026 }
2027 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2028 }),
2029 op_groups: value.op_group_perms.map(Into::into),
2030 ops: if value.ops.is_empty() {
2031 None
2032 } else {
2033 Some(value.ops.into_iter().map(Into::into).collect())
2034 },
2035 }
2036 }
2037}
2038
2039#[derive(Debug, Clone)]
2040#[non_exhaustive]
2041pub struct IssueAccessTokenInput {
2043 pub id: AccessTokenId,
2045 pub expires_at: Option<S2DateTime>,
2050 pub auto_prefix_streams: bool,
2058 pub scope: AccessTokenScopeInput,
2060}
2061
2062impl IssueAccessTokenInput {
2063 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2065 Self {
2066 id,
2067 expires_at: None,
2068 auto_prefix_streams: false,
2069 scope,
2070 }
2071 }
2072
2073 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2075 Self {
2076 expires_at: Some(expires_at),
2077 ..self
2078 }
2079 }
2080
2081 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2084 Self {
2085 auto_prefix_streams,
2086 ..self
2087 }
2088 }
2089}
2090
2091impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2092 fn from(value: IssueAccessTokenInput) -> Self {
2093 Self {
2094 id: value.id,
2095 expires_at: value.expires_at.map(Into::into),
2096 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2097 scope: value.scope.into(),
2098 }
2099 }
2100}
2101
2102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2103pub enum TimeseriesInterval {
2105 Minute,
2107 Hour,
2109 Day,
2111}
2112
2113impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2114 fn from(value: TimeseriesInterval) -> Self {
2115 match value {
2116 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2117 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2118 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2119 }
2120 }
2121}
2122
2123impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2124 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2125 match value {
2126 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2127 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2128 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2129 }
2130 }
2131}
2132
2133#[derive(Debug, Clone, Copy)]
2134#[non_exhaustive]
2135pub struct TimeRange {
2137 pub start: u32,
2139 pub end: u32,
2141}
2142
2143impl TimeRange {
2144 pub fn new(start: u32, end: u32) -> Self {
2146 Self { start, end }
2147 }
2148}
2149
2150#[derive(Debug, Clone, Copy)]
2151#[non_exhaustive]
2152pub struct TimeRangeAndInterval {
2154 pub start: u32,
2156 pub end: u32,
2158 pub interval: Option<TimeseriesInterval>,
2162}
2163
2164impl TimeRangeAndInterval {
2165 pub fn new(start: u32, end: u32) -> Self {
2167 Self {
2168 start,
2169 end,
2170 interval: None,
2171 }
2172 }
2173
2174 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2176 Self {
2177 interval: Some(interval),
2178 ..self
2179 }
2180 }
2181}
2182
2183#[derive(Debug, Clone, Copy)]
2184pub enum AccountMetricSet {
2186 ActiveBasins(TimeRange),
2189 AccountOps(TimeRangeAndInterval),
2196}
2197
2198#[derive(Debug, Clone)]
2199#[non_exhaustive]
2200pub struct GetAccountMetricsInput {
2202 pub set: AccountMetricSet,
2204}
2205
2206impl GetAccountMetricsInput {
2207 pub fn new(set: AccountMetricSet) -> Self {
2209 Self { set }
2210 }
2211}
2212
2213impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2214 fn from(value: GetAccountMetricsInput) -> Self {
2215 let (set, start, end, interval) = match value.set {
2216 AccountMetricSet::ActiveBasins(args) => (
2217 api::metrics::AccountMetricSet::ActiveBasins,
2218 args.start,
2219 args.end,
2220 None,
2221 ),
2222 AccountMetricSet::AccountOps(args) => (
2223 api::metrics::AccountMetricSet::AccountOps,
2224 args.start,
2225 args.end,
2226 args.interval,
2227 ),
2228 };
2229 Self {
2230 set,
2231 start: Some(start),
2232 end: Some(end),
2233 interval: interval.map(Into::into),
2234 }
2235 }
2236}
2237
2238#[derive(Debug, Clone, Copy)]
2239pub enum BasinMetricSet {
2241 Storage(TimeRange),
2244 AppendOps(TimeRangeAndInterval),
2252 ReadOps(TimeRangeAndInterval),
2260 ReadThroughput(TimeRangeAndInterval),
2267 AppendThroughput(TimeRangeAndInterval),
2274 BasinOps(TimeRangeAndInterval),
2281}
2282
2283#[derive(Debug, Clone)]
2284#[non_exhaustive]
2285pub struct GetBasinMetricsInput {
2287 pub name: BasinName,
2289 pub set: BasinMetricSet,
2291}
2292
2293impl GetBasinMetricsInput {
2294 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2296 Self { name, set }
2297 }
2298}
2299
2300impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2301 fn from(value: GetBasinMetricsInput) -> Self {
2302 let (set, start, end, interval) = match value.set {
2303 BasinMetricSet::Storage(args) => (
2304 api::metrics::BasinMetricSet::Storage,
2305 args.start,
2306 args.end,
2307 None,
2308 ),
2309 BasinMetricSet::AppendOps(args) => (
2310 api::metrics::BasinMetricSet::AppendOps,
2311 args.start,
2312 args.end,
2313 args.interval,
2314 ),
2315 BasinMetricSet::ReadOps(args) => (
2316 api::metrics::BasinMetricSet::ReadOps,
2317 args.start,
2318 args.end,
2319 args.interval,
2320 ),
2321 BasinMetricSet::ReadThroughput(args) => (
2322 api::metrics::BasinMetricSet::ReadThroughput,
2323 args.start,
2324 args.end,
2325 args.interval,
2326 ),
2327 BasinMetricSet::AppendThroughput(args) => (
2328 api::metrics::BasinMetricSet::AppendThroughput,
2329 args.start,
2330 args.end,
2331 args.interval,
2332 ),
2333 BasinMetricSet::BasinOps(args) => (
2334 api::metrics::BasinMetricSet::BasinOps,
2335 args.start,
2336 args.end,
2337 args.interval,
2338 ),
2339 };
2340 (
2341 value.name,
2342 api::metrics::BasinMetricSetRequest {
2343 set,
2344 start: Some(start),
2345 end: Some(end),
2346 interval: interval.map(Into::into),
2347 },
2348 )
2349 }
2350}
2351
2352#[derive(Debug, Clone, Copy)]
2353pub enum StreamMetricSet {
2355 Storage(TimeRange),
2358}
2359
2360#[derive(Debug, Clone)]
2361#[non_exhaustive]
2362pub struct GetStreamMetricsInput {
2364 pub basin_name: BasinName,
2366 pub stream_name: StreamName,
2368 pub set: StreamMetricSet,
2370}
2371
2372impl GetStreamMetricsInput {
2373 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2376 Self {
2377 basin_name,
2378 stream_name,
2379 set,
2380 }
2381 }
2382}
2383
2384impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2385 fn from(value: GetStreamMetricsInput) -> Self {
2386 let (set, start, end, interval) = match value.set {
2387 StreamMetricSet::Storage(args) => (
2388 api::metrics::StreamMetricSet::Storage,
2389 args.start,
2390 args.end,
2391 None,
2392 ),
2393 };
2394 (
2395 value.basin_name,
2396 value.stream_name,
2397 api::metrics::StreamMetricSetRequest {
2398 set,
2399 start: Some(start),
2400 end: Some(end),
2401 interval,
2402 },
2403 )
2404 }
2405}
2406
2407#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2408pub enum MetricUnit {
2410 Bytes,
2412 Operations,
2414}
2415
2416impl From<api::metrics::MetricUnit> for MetricUnit {
2417 fn from(value: api::metrics::MetricUnit) -> Self {
2418 match value {
2419 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2420 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2421 }
2422 }
2423}
2424
2425#[derive(Debug, Clone)]
2426#[non_exhaustive]
2427pub struct ScalarMetric {
2429 pub name: String,
2431 pub unit: MetricUnit,
2433 pub value: f64,
2435}
2436
2437#[derive(Debug, Clone)]
2438#[non_exhaustive]
2439pub struct AccumulationMetric {
2442 pub name: String,
2444 pub unit: MetricUnit,
2446 pub interval: TimeseriesInterval,
2448 pub values: Vec<(u32, f64)>,
2452}
2453
2454#[derive(Debug, Clone)]
2455#[non_exhaustive]
2456pub struct GaugeMetric {
2458 pub name: String,
2460 pub unit: MetricUnit,
2462 pub values: Vec<(u32, f64)>,
2465}
2466
2467#[derive(Debug, Clone)]
2468#[non_exhaustive]
2469pub struct LabelMetric {
2471 pub name: String,
2473 pub values: Vec<String>,
2475}
2476
2477#[derive(Debug, Clone)]
2478pub enum Metric {
2480 Scalar(ScalarMetric),
2482 Accumulation(AccumulationMetric),
2485 Gauge(GaugeMetric),
2487 Label(LabelMetric),
2489}
2490
2491impl From<api::metrics::Metric> for Metric {
2492 fn from(value: api::metrics::Metric) -> Self {
2493 match value {
2494 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2495 name: sm.name.into(),
2496 unit: sm.unit.into(),
2497 value: sm.value,
2498 }),
2499 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2500 name: am.name.into(),
2501 unit: am.unit.into(),
2502 interval: am.interval.into(),
2503 values: am.values,
2504 }),
2505 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2506 name: gm.name.into(),
2507 unit: gm.unit.into(),
2508 values: gm.values,
2509 }),
2510 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2511 name: lm.name.into(),
2512 values: lm.values,
2513 }),
2514 }
2515 }
2516}
2517
2518#[derive(Debug, Clone, Default)]
2519#[non_exhaustive]
2520pub struct ListStreamsInput {
2522 pub prefix: StreamNamePrefix,
2526 pub start_after: StreamNameStartAfter,
2532 pub limit: Option<usize>,
2536}
2537
2538impl ListStreamsInput {
2539 pub fn new() -> Self {
2541 Self::default()
2542 }
2543
2544 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2546 Self { prefix, ..self }
2547 }
2548
2549 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2552 Self {
2553 start_after,
2554 ..self
2555 }
2556 }
2557
2558 pub fn with_limit(self, limit: usize) -> Self {
2560 Self {
2561 limit: Some(limit),
2562 ..self
2563 }
2564 }
2565}
2566
2567impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2568 fn from(value: ListStreamsInput) -> Self {
2569 Self {
2570 prefix: Some(value.prefix),
2571 start_after: Some(value.start_after),
2572 limit: value.limit,
2573 }
2574 }
2575}
2576
2577#[derive(Debug, Clone, Default)]
2578pub struct ListAllStreamsInput {
2580 pub prefix: StreamNamePrefix,
2584 pub start_after: StreamNameStartAfter,
2590 pub include_deleted: bool,
2594}
2595
2596impl ListAllStreamsInput {
2597 pub fn new() -> Self {
2599 Self::default()
2600 }
2601
2602 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2604 Self { prefix, ..self }
2605 }
2606
2607 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2610 Self {
2611 start_after,
2612 ..self
2613 }
2614 }
2615
2616 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2618 Self {
2619 include_deleted,
2620 ..self
2621 }
2622 }
2623}
2624
2625#[derive(Debug, Clone, PartialEq, Eq)]
2626#[non_exhaustive]
2627pub struct StreamInfo {
2629 pub name: StreamName,
2631 pub created_at: S2DateTime,
2633 pub deleted_at: Option<S2DateTime>,
2635 pub cipher: Option<EncryptionAlgorithm>,
2637}
2638
2639impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2640 type Error = ValidationError;
2641
2642 fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2643 Ok(Self {
2644 name: value.name,
2645 created_at: value.created_at.try_into()?,
2646 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2647 cipher: value.cipher.map(Into::into),
2648 })
2649 }
2650}
2651
2652#[derive(Debug, Clone)]
2653#[non_exhaustive]
2654pub struct CreateStreamInput {
2656 pub name: StreamName,
2658 pub config: Option<StreamConfig>,
2662 idempotency_token: String,
2663}
2664
2665impl CreateStreamInput {
2666 pub fn new(name: StreamName) -> Self {
2668 Self {
2669 name,
2670 config: None,
2671 idempotency_token: idempotency_token(),
2672 }
2673 }
2674
2675 pub fn with_config(self, config: StreamConfig) -> Self {
2677 Self {
2678 config: Some(config),
2679 ..self
2680 }
2681 }
2682}
2683
2684impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2685 fn from(value: CreateStreamInput) -> Self {
2686 (
2687 api::stream::CreateStreamRequest {
2688 stream: value.name,
2689 config: value.config.map(Into::into),
2690 },
2691 value.idempotency_token,
2692 )
2693 }
2694}
2695
2696#[derive(Debug, Clone)]
2697#[non_exhaustive]
2698#[doc(hidden)]
2701#[cfg(feature = "_hidden")]
2702pub struct CreateOrReconfigureStreamInput {
2703 pub name: StreamName,
2705 pub config: Option<StreamReconfiguration>,
2709}
2710
2711#[cfg(feature = "_hidden")]
2712impl CreateOrReconfigureStreamInput {
2713 pub fn new(name: StreamName) -> Self {
2715 Self { name, config: None }
2716 }
2717
2718 pub fn with_config(self, config: StreamReconfiguration) -> Self {
2720 Self {
2721 config: Some(config),
2722 ..self
2723 }
2724 }
2725}
2726
2727#[cfg(feature = "_hidden")]
2728impl From<CreateOrReconfigureStreamInput>
2729 for (StreamName, Option<api::config::StreamReconfiguration>)
2730{
2731 fn from(value: CreateOrReconfigureStreamInput) -> Self {
2732 (value.name, value.config.map(Into::into))
2733 }
2734}
2735
2736#[derive(Debug, Clone)]
2737#[non_exhaustive]
2738pub struct DeleteStreamInput {
2740 pub name: StreamName,
2742 pub ignore_not_found: bool,
2744}
2745
2746impl DeleteStreamInput {
2747 pub fn new(name: StreamName) -> Self {
2749 Self {
2750 name,
2751 ignore_not_found: false,
2752 }
2753 }
2754
2755 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2757 Self {
2758 ignore_not_found,
2759 ..self
2760 }
2761 }
2762}
2763
2764#[derive(Debug, Clone)]
2765#[non_exhaustive]
2766pub struct ReconfigureStreamInput {
2768 pub name: StreamName,
2770 pub config: StreamReconfiguration,
2772}
2773
2774impl ReconfigureStreamInput {
2775 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2777 Self { name, config }
2778 }
2779}
2780
2781#[derive(Debug, Clone, PartialEq, Eq)]
2782pub struct FencingToken(String);
2788
2789impl FencingToken {
2790 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2792 rand::rng()
2793 .sample_iter(&rand::distr::Alphanumeric)
2794 .take(n)
2795 .map(char::from)
2796 .collect::<String>()
2797 .parse()
2798 }
2799}
2800
2801impl FromStr for FencingToken {
2802 type Err = ValidationError;
2803
2804 fn from_str(s: &str) -> Result<Self, Self::Err> {
2805 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2806 return Err(ValidationError(format!(
2807 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2808 )));
2809 }
2810 Ok(FencingToken(s.to_string()))
2811 }
2812}
2813
2814impl std::fmt::Display for FencingToken {
2815 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2816 write!(f, "{}", self.0)
2817 }
2818}
2819
2820impl Deref for FencingToken {
2821 type Target = str;
2822
2823 fn deref(&self) -> &Self::Target {
2824 &self.0
2825 }
2826}
2827
2828#[derive(Debug, Clone, Copy, PartialEq)]
2829#[non_exhaustive]
2830pub struct StreamPosition {
2832 pub seq_num: u64,
2834 pub timestamp: u64,
2837}
2838
2839impl std::fmt::Display for StreamPosition {
2840 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2841 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2842 }
2843}
2844
2845impl From<api::stream::proto::StreamPosition> for StreamPosition {
2846 fn from(value: api::stream::proto::StreamPosition) -> Self {
2847 Self {
2848 seq_num: value.seq_num,
2849 timestamp: value.timestamp,
2850 }
2851 }
2852}
2853
2854impl From<api::stream::StreamPosition> for StreamPosition {
2855 fn from(value: api::stream::StreamPosition) -> Self {
2856 Self {
2857 seq_num: value.seq_num,
2858 timestamp: value.timestamp,
2859 }
2860 }
2861}
2862
2863#[derive(Debug, Clone, PartialEq)]
2864#[non_exhaustive]
2865pub struct Header {
2867 pub name: Bytes,
2869 pub value: Bytes,
2871}
2872
2873impl Header {
2874 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2876 Self {
2877 name: name.into(),
2878 value: value.into(),
2879 }
2880 }
2881}
2882
2883impl From<Header> for api::stream::proto::Header {
2884 fn from(value: Header) -> Self {
2885 Self {
2886 name: value.name,
2887 value: value.value,
2888 }
2889 }
2890}
2891
2892impl From<api::stream::proto::Header> for Header {
2893 fn from(value: api::stream::proto::Header) -> Self {
2894 Self {
2895 name: value.name,
2896 value: value.value,
2897 }
2898 }
2899}
2900
2901#[derive(Debug, Clone, PartialEq)]
2902pub struct AppendRecord {
2904 body: Bytes,
2905 headers: Vec<Header>,
2906 timestamp: Option<u64>,
2907}
2908
2909impl AppendRecord {
2910 fn validate(self) -> Result<Self, ValidationError> {
2911 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2912 Err(ValidationError(format!(
2913 "metered_bytes: {} exceeds {}",
2914 self.metered_bytes(),
2915 RECORD_BATCH_MAX.bytes
2916 )))
2917 } else {
2918 Ok(self)
2919 }
2920 }
2921
2922 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2924 let record = Self {
2925 body: body.into(),
2926 headers: Vec::default(),
2927 timestamp: None,
2928 };
2929 record.validate()
2930 }
2931
2932 pub fn with_headers(
2934 self,
2935 headers: impl IntoIterator<Item = Header>,
2936 ) -> Result<Self, ValidationError> {
2937 let record = Self {
2938 headers: headers.into_iter().collect(),
2939 ..self
2940 };
2941 record.validate()
2942 }
2943
2944 pub fn with_timestamp(self, timestamp: u64) -> Self {
2948 Self {
2949 timestamp: Some(timestamp),
2950 ..self
2951 }
2952 }
2953
2954 pub fn body(&self) -> &[u8] {
2956 &self.body
2957 }
2958
2959 pub fn headers(&self) -> &[Header] {
2961 &self.headers
2962 }
2963
2964 pub fn timestamp(&self) -> Option<u64> {
2966 self.timestamp
2967 }
2968}
2969
2970impl From<AppendRecord> for api::stream::proto::AppendRecord {
2971 fn from(value: AppendRecord) -> Self {
2972 Self {
2973 timestamp: value.timestamp,
2974 headers: value.headers.into_iter().map(Into::into).collect(),
2975 body: value.body,
2976 }
2977 }
2978}
2979
2980pub trait MeteredBytes {
2987 fn metered_bytes(&self) -> usize;
2989}
2990
2991macro_rules! metered_bytes_impl {
2992 ($ty:ty) => {
2993 impl MeteredBytes for $ty {
2994 fn metered_bytes(&self) -> usize {
2995 8 + (2 * self.headers.len())
2996 + self
2997 .headers
2998 .iter()
2999 .map(|h| h.name.len() + h.value.len())
3000 .sum::<usize>()
3001 + self.body.len()
3002 }
3003 }
3004 };
3005}
3006
3007metered_bytes_impl!(AppendRecord);
3008
3009#[derive(Debug, Clone)]
3010pub struct AppendRecordBatch {
3019 records: Vec<AppendRecord>,
3020 metered_bytes: usize,
3021}
3022
3023impl AppendRecordBatch {
3024 pub(crate) fn with_capacity(capacity: usize) -> Self {
3025 Self {
3026 records: Vec::with_capacity(capacity),
3027 metered_bytes: 0,
3028 }
3029 }
3030
3031 pub(crate) fn push(&mut self, record: AppendRecord) {
3032 self.metered_bytes += record.metered_bytes();
3033 self.records.push(record);
3034 }
3035
3036 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3038 where
3039 I: IntoIterator<Item = AppendRecord>,
3040 {
3041 let mut records = Vec::new();
3042 let mut metered_bytes = 0;
3043
3044 for record in iter {
3045 metered_bytes += record.metered_bytes();
3046 records.push(record);
3047
3048 if metered_bytes > RECORD_BATCH_MAX.bytes {
3049 return Err(ValidationError(format!(
3050 "batch size in metered bytes ({metered_bytes}) exceeds {}",
3051 RECORD_BATCH_MAX.bytes
3052 )));
3053 }
3054
3055 if records.len() > RECORD_BATCH_MAX.count {
3056 return Err(ValidationError(format!(
3057 "number of records in the batch exceeds {}",
3058 RECORD_BATCH_MAX.count
3059 )));
3060 }
3061 }
3062
3063 if records.is_empty() {
3064 return Err(ValidationError("batch is empty".into()));
3065 }
3066
3067 Ok(Self {
3068 records,
3069 metered_bytes,
3070 })
3071 }
3072}
3073
3074impl Deref for AppendRecordBatch {
3075 type Target = [AppendRecord];
3076
3077 fn deref(&self) -> &Self::Target {
3078 &self.records
3079 }
3080}
3081
3082impl MeteredBytes for AppendRecordBatch {
3083 fn metered_bytes(&self) -> usize {
3084 self.metered_bytes
3085 }
3086}
3087
3088#[derive(Debug, Clone)]
3089pub enum Command {
3091 Fence {
3093 fencing_token: FencingToken,
3095 },
3096 Trim {
3098 trim_point: u64,
3100 },
3101}
3102
3103#[derive(Debug, Clone)]
3104#[non_exhaustive]
3105pub struct CommandRecord {
3109 pub command: Command,
3111 pub timestamp: Option<u64>,
3113}
3114
3115impl CommandRecord {
3116 const FENCE: &[u8] = b"fence";
3117 const TRIM: &[u8] = b"trim";
3118
3119 pub fn fence(fencing_token: FencingToken) -> Self {
3124 Self {
3125 command: Command::Fence { fencing_token },
3126 timestamp: None,
3127 }
3128 }
3129
3130 pub fn trim(trim_point: u64) -> Self {
3137 Self {
3138 command: Command::Trim { trim_point },
3139 timestamp: None,
3140 }
3141 }
3142
3143 pub fn with_timestamp(self, timestamp: u64) -> Self {
3145 Self {
3146 timestamp: Some(timestamp),
3147 ..self
3148 }
3149 }
3150}
3151
3152impl From<CommandRecord> for AppendRecord {
3153 fn from(value: CommandRecord) -> Self {
3154 let (header_value, body) = match value.command {
3155 Command::Fence { fencing_token } => (
3156 CommandRecord::FENCE,
3157 Bytes::copy_from_slice(fencing_token.as_bytes()),
3158 ),
3159 Command::Trim { trim_point } => (
3160 CommandRecord::TRIM,
3161 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3162 ),
3163 };
3164 Self {
3165 body,
3166 headers: vec![Header::new("", header_value)],
3167 timestamp: value.timestamp,
3168 }
3169 }
3170}
3171
3172#[derive(Debug, Clone)]
3173#[non_exhaustive]
3174pub struct AppendInput {
3177 pub records: AppendRecordBatch,
3179 pub match_seq_num: Option<u64>,
3183 pub fencing_token: Option<FencingToken>,
3188}
3189
3190impl AppendInput {
3191 pub fn new(records: AppendRecordBatch) -> Self {
3193 Self {
3194 records,
3195 match_seq_num: None,
3196 fencing_token: None,
3197 }
3198 }
3199
3200 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3202 Self {
3203 match_seq_num: Some(match_seq_num),
3204 ..self
3205 }
3206 }
3207
3208 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3210 Self {
3211 fencing_token: Some(fencing_token),
3212 ..self
3213 }
3214 }
3215}
3216
3217impl From<AppendInput> for api::stream::proto::AppendInput {
3218 fn from(value: AppendInput) -> Self {
3219 Self {
3220 records: value.records.iter().cloned().map(Into::into).collect(),
3221 match_seq_num: value.match_seq_num,
3222 fencing_token: value.fencing_token.map(|t| t.to_string()),
3223 }
3224 }
3225}
3226
3227#[derive(Debug, Clone, PartialEq)]
3228#[non_exhaustive]
3229pub struct AppendAck {
3231 pub start: StreamPosition,
3233 pub end: StreamPosition,
3239 pub tail: StreamPosition,
3244}
3245
3246impl From<api::stream::proto::AppendAck> for AppendAck {
3247 fn from(value: api::stream::proto::AppendAck) -> Self {
3248 Self {
3249 start: value.start.unwrap_or_default().into(),
3250 end: value.end.unwrap_or_default().into(),
3251 tail: value.tail.unwrap_or_default().into(),
3252 }
3253 }
3254}
3255
3256#[derive(Debug, Clone, Copy)]
3257pub enum ReadFrom {
3259 SeqNum(u64),
3261 Timestamp(u64),
3263 TailOffset(u64),
3265}
3266
3267impl Default for ReadFrom {
3268 fn default() -> Self {
3269 Self::SeqNum(0)
3270 }
3271}
3272
3273#[derive(Debug, Default, Clone)]
3274#[non_exhaustive]
3275pub struct ReadStart {
3277 pub from: ReadFrom,
3281 pub clamp_to_tail: bool,
3285}
3286
3287impl ReadStart {
3288 pub fn new() -> Self {
3290 Self::default()
3291 }
3292
3293 pub fn with_from(self, from: ReadFrom) -> Self {
3295 Self { from, ..self }
3296 }
3297
3298 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3300 Self {
3301 clamp_to_tail,
3302 ..self
3303 }
3304 }
3305}
3306
3307impl From<ReadStart> for api::stream::ReadStart {
3308 fn from(value: ReadStart) -> Self {
3309 let (seq_num, timestamp, tail_offset) = match value.from {
3310 ReadFrom::SeqNum(n) => (Some(n), None, None),
3311 ReadFrom::Timestamp(t) => (None, Some(t), None),
3312 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3313 };
3314 Self {
3315 seq_num,
3316 timestamp,
3317 tail_offset,
3318 clamp: if value.clamp_to_tail {
3319 Some(true)
3320 } else {
3321 None
3322 },
3323 }
3324 }
3325}
3326
3327#[derive(Debug, Clone, Default)]
3328#[non_exhaustive]
3329pub struct ReadLimits {
3331 pub count: Option<usize>,
3335 pub bytes: Option<usize>,
3339}
3340
3341impl ReadLimits {
3342 pub fn new() -> Self {
3344 Self::default()
3345 }
3346
3347 pub fn with_count(self, count: usize) -> Self {
3349 Self {
3350 count: Some(count),
3351 ..self
3352 }
3353 }
3354
3355 pub fn with_bytes(self, bytes: usize) -> Self {
3357 Self {
3358 bytes: Some(bytes),
3359 ..self
3360 }
3361 }
3362}
3363
3364#[derive(Debug, Clone, Default)]
3365#[non_exhaustive]
3366pub struct ReadStop {
3368 pub limits: ReadLimits,
3372 pub until: Option<RangeTo<u64>>,
3376 pub wait: Option<u32>,
3386}
3387
3388impl ReadStop {
3389 pub fn new() -> Self {
3391 Self::default()
3392 }
3393
3394 pub fn with_limits(self, limits: ReadLimits) -> Self {
3396 Self { limits, ..self }
3397 }
3398
3399 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3401 Self {
3402 until: Some(until),
3403 ..self
3404 }
3405 }
3406
3407 pub fn with_wait(self, wait: u32) -> Self {
3409 Self {
3410 wait: Some(wait),
3411 ..self
3412 }
3413 }
3414}
3415
3416impl From<ReadStop> for api::stream::ReadEnd {
3417 fn from(value: ReadStop) -> Self {
3418 Self {
3419 count: value.limits.count,
3420 bytes: value.limits.bytes,
3421 until: value.until.map(|r| r.end),
3422 wait: value.wait,
3423 }
3424 }
3425}
3426
3427#[derive(Debug, Clone, Default)]
3428#[non_exhaustive]
3429pub struct ReadInput {
3432 pub start: ReadStart,
3436 pub stop: ReadStop,
3440 pub ignore_command_records: bool,
3444}
3445
3446impl ReadInput {
3447 pub fn new() -> Self {
3449 Self::default()
3450 }
3451
3452 pub fn with_start(self, start: ReadStart) -> Self {
3454 Self { start, ..self }
3455 }
3456
3457 pub fn with_stop(self, stop: ReadStop) -> Self {
3459 Self { stop, ..self }
3460 }
3461
3462 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3464 Self {
3465 ignore_command_records,
3466 ..self
3467 }
3468 }
3469}
3470
3471#[derive(Debug, Clone)]
3472#[non_exhaustive]
3473pub struct SequencedRecord {
3475 pub seq_num: u64,
3477 pub body: Bytes,
3479 pub headers: Vec<Header>,
3481 pub timestamp: u64,
3483}
3484
3485impl SequencedRecord {
3486 pub fn is_command_record(&self) -> bool {
3488 self.headers.len() == 1 && *self.headers[0].name == *b""
3489 }
3490}
3491
3492impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3493 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3494 Self {
3495 seq_num: value.seq_num,
3496 body: value.body,
3497 headers: value.headers.into_iter().map(Into::into).collect(),
3498 timestamp: value.timestamp,
3499 }
3500 }
3501}
3502
3503metered_bytes_impl!(SequencedRecord);
3504
3505#[derive(Debug, Clone)]
3506#[non_exhaustive]
3507pub struct ReadBatch {
3510 pub records: Vec<SequencedRecord>,
3517 pub tail: Option<StreamPosition>,
3522}
3523
3524impl ReadBatch {
3525 pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3526 Self {
3527 records: batch.records.into_iter().map(Into::into).collect(),
3528 tail: batch.tail.map(Into::into),
3529 }
3530 }
3531}
3532
3533pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3535
3536#[derive(Debug, Clone, thiserror::Error)]
3537pub enum AppendConditionFailed {
3539 #[error("fencing token mismatch, expected: {0}")]
3540 FencingTokenMismatch(FencingToken),
3542 #[error("sequence number mismatch, expected: {0}")]
3543 SeqNumMismatch(u64),
3545}
3546
3547impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3548 fn from(value: api::stream::AppendConditionFailed) -> Self {
3549 match value {
3550 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3551 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3552 }
3553 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3554 AppendConditionFailed::SeqNumMismatch(seq)
3555 }
3556 }
3557 }
3558}
3559
3560#[derive(Debug, Clone, thiserror::Error)]
3561pub enum S2Error {
3563 #[error("{0}")]
3564 Client(String),
3566 #[error(transparent)]
3567 Validation(#[from] ValidationError),
3569 #[error("{0}")]
3570 AppendConditionFailed(AppendConditionFailed),
3572 #[error("read from an unwritten position. current tail: {0}")]
3573 ReadUnwritten(StreamPosition),
3575 #[error("{0}")]
3576 Server(ErrorResponse),
3578}
3579
3580impl From<ApiError> for S2Error {
3581 fn from(err: ApiError) -> Self {
3582 match err {
3583 ApiError::ReadUnwritten(tail_response) => {
3584 Self::ReadUnwritten(tail_response.tail.into())
3585 }
3586 ApiError::AppendConditionFailed(condition_failed) => {
3587 Self::AppendConditionFailed(condition_failed.into())
3588 }
3589 ApiError::Server(_, response) => Self::Server(response.into()),
3590 other => Self::Client(other.to_string()),
3591 }
3592 }
3593}
3594
3595#[derive(Debug, Clone, thiserror::Error)]
3596#[error("{code}: {message}")]
3597#[non_exhaustive]
3598pub struct ErrorResponse {
3600 pub code: String,
3602 pub message: String,
3604}
3605
3606impl From<ApiErrorResponse> for ErrorResponse {
3607 fn from(response: ApiErrorResponse) -> Self {
3608 Self {
3609 code: response.code,
3610 message: response.message,
3611 }
3612 }
3613}
3614
3615fn idempotency_token() -> String {
3616 uuid::Uuid::new_v4().simple().to_string()
3617}