1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::stream::StreamName;
45pub use s2_common::types::stream::StreamNamePrefix;
47pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66 fn from(dt: time::OffsetDateTime) -> Self {
67 Self(dt)
68 }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72 fn from(dt: S2DateTime) -> Self {
73 dt.0
74 }
75}
76
77impl FromStr for S2DateTime {
78 type Err = ValidationError;
79
80 fn from_str(s: &str) -> Result<Self, Self::Err> {
81 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82 .map(Self)
83 .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84 }
85}
86
87impl fmt::Display for S2DateTime {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 write!(
90 f,
91 "{}",
92 self.0
93 .format(&time::format_description::well_known::Rfc3339)
94 .expect("RFC3339 formatting should not fail for S2DateTime")
95 )
96 }
97}
98
99#[derive(Debug, Clone, PartialEq)]
101pub enum BasinAuthority {
102 ParentZone(Authority),
104 Direct(Authority),
106}
107
108#[derive(Debug, Clone)]
109#[non_exhaustive]
110pub struct S2Endpoints {
112 pub scheme: Scheme,
116 pub account_authority: Authority,
118 pub basin_authority: BasinAuthority,
120}
121
122impl S2Endpoints {
123 pub fn new(account_authority: Authority, basin_authority: BasinAuthority) -> Self {
125 Self {
126 scheme: Scheme::HTTPS,
127 account_authority,
128 basin_authority,
129 }
130 }
131
132 pub fn from_env() -> Result<Self, ValidationError> {
138 let account_endpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
139 Ok(s) => s,
140 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
141 Err(VarError::NotUnicode(_)) => {
142 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
143 }
144 };
145
146 let basin_endpoint = match std::env::var("S2_BASIN_ENDPOINT") {
147 Ok(s) => s,
148 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
149 Err(VarError::NotUnicode(_)) => {
150 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
151 }
152 };
153
154 let (account_scheme, account_authority) = parse_account_endpoint(&account_endpoint)?;
155
156 let (basin_scheme, basin_authority) = parse_basin_endpoint(&basin_endpoint)?;
157
158 if account_scheme != basin_scheme {
159 return Err(
160 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
161 );
162 }
163
164 Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
165 }
166
167 #[doc(hidden)]
168 #[cfg(feature = "_hidden")]
169 pub fn parse_from(
170 account_endpoint: &str,
171 basin_endpoint: &str,
172 ) -> Result<Self, ValidationError> {
173 let (account_scheme, account_authority) = parse_account_endpoint(account_endpoint)?;
174 let (basin_scheme, basin_authority) = parse_basin_endpoint(basin_endpoint)?;
175
176 if account_scheme != basin_scheme {
177 return Err("account and basin endpoints must have the same scheme".into());
178 }
179 Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
180 }
181
182 pub fn with_scheme(self, scheme: Scheme) -> Self {
184 Self { scheme, ..self }
185 }
186
187 pub(crate) fn for_aws() -> Self {
188 Self {
189 scheme: Scheme::HTTPS,
190 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
191 basin_authority: BasinAuthority::ParentZone(
192 "b.aws.s2.dev".try_into().expect("valid authority"),
193 ),
194 }
195 }
196}
197
198fn parse_account_endpoint(s: &str) -> Result<(Scheme, Authority), ValidationError> {
199 let (scheme, authority) = match s.find("://") {
200 Some(idx) => {
201 let scheme: Scheme = s[..idx]
202 .parse()
203 .map_err(|_| "invalid account endpoint scheme".to_string())?;
204 (scheme, &s[idx + 3..])
205 }
206 None => (Scheme::HTTPS, s),
207 };
208 Ok((
209 scheme,
210 authority
211 .parse()
212 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
213 ))
214}
215
216fn parse_basin_endpoint(s: &str) -> Result<(Scheme, BasinAuthority), ValidationError> {
217 let (scheme, authority) = match s.find("://") {
218 Some(idx) => {
219 let scheme: Scheme = s[..idx]
220 .parse()
221 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
222 (scheme, &s[idx + 3..])
223 }
224 None => (Scheme::HTTPS, s),
225 };
226 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
227 BasinAuthority::ParentZone(
228 authority
229 .parse()
230 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
231 )
232 } else {
233 BasinAuthority::Direct(
234 authority
235 .parse()
236 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
237 )
238 };
239 Ok((scheme, authority))
240}
241
242#[derive(Debug, Clone, Copy)]
243pub enum Compression {
245 None,
247 Gzip,
249 Zstd,
251}
252
253impl From<Compression> for CompressionAlgorithm {
254 fn from(value: Compression) -> Self {
255 match value {
256 Compression::None => CompressionAlgorithm::None,
257 Compression::Gzip => CompressionAlgorithm::Gzip,
258 Compression::Zstd => CompressionAlgorithm::Zstd,
259 }
260 }
261}
262
263#[derive(Debug, Clone, Copy, PartialEq)]
264#[non_exhaustive]
265pub enum AppendRetryPolicy {
268 All,
270 NoSideEffects,
272}
273
274impl AppendRetryPolicy {
275 pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
276 match self {
277 Self::All => true,
278 Self::NoSideEffects => input.match_seq_num.is_some(),
279 }
280 }
281}
282
283#[derive(Debug, Clone)]
284#[non_exhaustive]
285pub struct RetryConfig {
294 pub max_attempts: NonZeroU32,
298 pub min_base_delay: Duration,
302 pub max_base_delay: Duration,
306 pub append_retry_policy: AppendRetryPolicy,
311}
312
313impl Default for RetryConfig {
314 fn default() -> Self {
315 Self {
316 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
317 min_base_delay: Duration::from_millis(100),
318 max_base_delay: Duration::from_secs(1),
319 append_retry_policy: AppendRetryPolicy::All,
320 }
321 }
322}
323
324impl RetryConfig {
325 pub fn new() -> Self {
327 Self::default()
328 }
329
330 pub(crate) fn max_retries(&self) -> u32 {
331 self.max_attempts.get() - 1
332 }
333
334 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
336 Self {
337 max_attempts,
338 ..self
339 }
340 }
341
342 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
344 Self {
345 min_base_delay,
346 ..self
347 }
348 }
349
350 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
352 Self {
353 max_base_delay,
354 ..self
355 }
356 }
357
358 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
361 Self {
362 append_retry_policy,
363 ..self
364 }
365 }
366}
367
368#[derive(Debug, Clone)]
369#[non_exhaustive]
370pub struct S2Config {
372 pub(crate) access_token: SecretString,
373 pub(crate) endpoints: S2Endpoints,
374 pub(crate) connection_timeout: Duration,
375 pub(crate) request_timeout: Duration,
376 pub(crate) retry: RetryConfig,
377 pub(crate) compression: Compression,
378 pub(crate) user_agent: HeaderValue,
379}
380
381impl S2Config {
382 pub fn new(access_token: impl Into<String>) -> Self {
384 Self {
385 access_token: access_token.into().into(),
386 endpoints: S2Endpoints::for_aws(),
387 connection_timeout: Duration::from_secs(3),
388 request_timeout: Duration::from_secs(5),
389 retry: RetryConfig::new(),
390 compression: Compression::None,
391 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
392 .parse()
393 .expect("valid user agent"),
394 }
395 }
396
397 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
399 Self { endpoints, ..self }
400 }
401
402 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
406 Self {
407 connection_timeout,
408 ..self
409 }
410 }
411
412 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
416 Self {
417 request_timeout,
418 ..self
419 }
420 }
421
422 pub fn with_retry(self, retry: RetryConfig) -> Self {
426 Self { retry, ..self }
427 }
428
429 pub fn with_compression(self, compression: Compression) -> Self {
433 Self {
434 compression,
435 ..self
436 }
437 }
438
439 #[doc(hidden)]
440 #[cfg(feature = "_hidden")]
441 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
442 let user_agent = user_agent
443 .into()
444 .parse()
445 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
446 Ok(Self { user_agent, ..self })
447 }
448}
449
450#[derive(Debug, Default, Clone, PartialEq, Eq)]
451#[non_exhaustive]
452pub struct Page<T> {
454 pub values: Vec<T>,
456 pub has_more: bool,
458}
459
460impl<T> Page<T> {
461 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
462 Self {
463 values: values.into(),
464 has_more,
465 }
466 }
467}
468
469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
470pub enum StorageClass {
472 Standard,
474 Express,
476}
477
478impl From<api::config::StorageClass> for StorageClass {
479 fn from(value: api::config::StorageClass) -> Self {
480 match value {
481 api::config::StorageClass::Standard => StorageClass::Standard,
482 api::config::StorageClass::Express => StorageClass::Express,
483 }
484 }
485}
486
487impl From<StorageClass> for api::config::StorageClass {
488 fn from(value: StorageClass) -> Self {
489 match value {
490 StorageClass::Standard => api::config::StorageClass::Standard,
491 StorageClass::Express => api::config::StorageClass::Express,
492 }
493 }
494}
495
496#[derive(Debug, Clone, Copy, PartialEq, Eq)]
497pub enum RetentionPolicy {
499 Age(u64),
501 Infinite,
503}
504
505impl From<api::config::RetentionPolicy> for RetentionPolicy {
506 fn from(value: api::config::RetentionPolicy) -> Self {
507 match value {
508 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
509 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
510 }
511 }
512}
513
514impl From<RetentionPolicy> for api::config::RetentionPolicy {
515 fn from(value: RetentionPolicy) -> Self {
516 match value {
517 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
518 RetentionPolicy::Infinite => {
519 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
520 }
521 }
522 }
523}
524
525#[derive(Debug, Clone, Copy, PartialEq, Eq)]
526pub enum TimestampingMode {
528 ClientPrefer,
530 ClientRequire,
532 Arrival,
534}
535
536impl From<api::config::TimestampingMode> for TimestampingMode {
537 fn from(value: api::config::TimestampingMode) -> Self {
538 match value {
539 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
540 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
541 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
542 }
543 }
544}
545
546impl From<TimestampingMode> for api::config::TimestampingMode {
547 fn from(value: TimestampingMode) -> Self {
548 match value {
549 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
550 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
551 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
552 }
553 }
554}
555
556#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
557#[non_exhaustive]
558pub struct TimestampingConfig {
560 pub mode: Option<TimestampingMode>,
564 pub uncapped: bool,
568}
569
570impl TimestampingConfig {
571 pub fn new() -> Self {
573 Self::default()
574 }
575
576 pub fn with_mode(self, mode: TimestampingMode) -> Self {
578 Self {
579 mode: Some(mode),
580 ..self
581 }
582 }
583
584 pub fn with_uncapped(self, uncapped: bool) -> Self {
586 Self { uncapped, ..self }
587 }
588}
589
590impl From<api::config::TimestampingConfig> for TimestampingConfig {
591 fn from(value: api::config::TimestampingConfig) -> Self {
592 Self {
593 mode: value.mode.map(Into::into),
594 uncapped: value.uncapped.unwrap_or_default(),
595 }
596 }
597}
598
599impl From<TimestampingConfig> for api::config::TimestampingConfig {
600 fn from(value: TimestampingConfig) -> Self {
601 Self {
602 mode: value.mode.map(Into::into),
603 uncapped: Some(value.uncapped),
604 }
605 }
606}
607
608#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
609#[non_exhaustive]
610pub struct DeleteOnEmptyConfig {
612 pub min_age_secs: u64,
616}
617
618impl DeleteOnEmptyConfig {
619 pub fn new() -> Self {
621 Self::default()
622 }
623
624 pub fn with_min_age(self, min_age: Duration) -> Self {
626 Self {
627 min_age_secs: min_age.as_secs(),
628 }
629 }
630}
631
632impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
633 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
634 Self {
635 min_age_secs: value.min_age_secs,
636 }
637 }
638}
639
640impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
641 fn from(value: DeleteOnEmptyConfig) -> Self {
642 Self {
643 min_age_secs: value.min_age_secs,
644 }
645 }
646}
647
648#[derive(Debug, Clone, Default, PartialEq, Eq)]
649#[non_exhaustive]
650pub struct StreamConfig {
652 pub storage_class: Option<StorageClass>,
656 pub retention_policy: Option<RetentionPolicy>,
660 pub timestamping: Option<TimestampingConfig>,
664 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
668}
669
670impl StreamConfig {
671 pub fn new() -> Self {
673 Self::default()
674 }
675
676 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
678 Self {
679 storage_class: Some(storage_class),
680 ..self
681 }
682 }
683
684 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
686 Self {
687 retention_policy: Some(retention_policy),
688 ..self
689 }
690 }
691
692 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
694 Self {
695 timestamping: Some(timestamping),
696 ..self
697 }
698 }
699
700 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
702 Self {
703 delete_on_empty: Some(delete_on_empty),
704 ..self
705 }
706 }
707}
708
709impl From<api::config::StreamConfig> for StreamConfig {
710 fn from(value: api::config::StreamConfig) -> Self {
711 Self {
712 storage_class: value.storage_class.map(Into::into),
713 retention_policy: value.retention_policy.map(Into::into),
714 timestamping: value.timestamping.map(Into::into),
715 delete_on_empty: value.delete_on_empty.map(Into::into),
716 }
717 }
718}
719
720impl From<StreamConfig> for api::config::StreamConfig {
721 fn from(value: StreamConfig) -> Self {
722 Self {
723 storage_class: value.storage_class.map(Into::into),
724 retention_policy: value.retention_policy.map(Into::into),
725 timestamping: value.timestamping.map(Into::into),
726 delete_on_empty: value.delete_on_empty.map(Into::into),
727 }
728 }
729}
730
731#[derive(Debug, Clone, Default)]
732#[non_exhaustive]
733pub struct BasinConfig {
735 pub default_stream_config: Option<StreamConfig>,
739 pub create_stream_on_append: bool,
743 pub create_stream_on_read: bool,
747}
748
749impl BasinConfig {
750 pub fn new() -> Self {
752 Self::default()
753 }
754
755 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
757 Self {
758 default_stream_config: Some(config),
759 ..self
760 }
761 }
762
763 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
766 Self {
767 create_stream_on_append,
768 ..self
769 }
770 }
771
772 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
774 Self {
775 create_stream_on_read,
776 ..self
777 }
778 }
779}
780
781impl From<api::config::BasinConfig> for BasinConfig {
782 fn from(value: api::config::BasinConfig) -> Self {
783 Self {
784 default_stream_config: value.default_stream_config.map(Into::into),
785 create_stream_on_append: value.create_stream_on_append,
786 create_stream_on_read: value.create_stream_on_read,
787 }
788 }
789}
790
791impl From<BasinConfig> for api::config::BasinConfig {
792 fn from(value: BasinConfig) -> Self {
793 Self {
794 default_stream_config: value.default_stream_config.map(Into::into),
795 create_stream_on_append: value.create_stream_on_append,
796 create_stream_on_read: value.create_stream_on_read,
797 }
798 }
799}
800
801#[derive(Debug, Clone, PartialEq, Eq)]
802pub enum BasinScope {
804 AwsUsEast1,
806}
807
808impl From<api::basin::BasinScope> for BasinScope {
809 fn from(value: api::basin::BasinScope) -> Self {
810 match value {
811 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
812 }
813 }
814}
815
816impl From<BasinScope> for api::basin::BasinScope {
817 fn from(value: BasinScope) -> Self {
818 match value {
819 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
820 }
821 }
822}
823
824#[derive(Debug, Clone)]
825#[non_exhaustive]
826pub struct CreateBasinInput {
828 pub name: BasinName,
830 pub config: Option<BasinConfig>,
834 pub scope: Option<BasinScope>,
838 pub idempotency_token: String,
846}
847
848impl CreateBasinInput {
849 pub fn new(name: BasinName) -> Self {
851 Self {
852 name,
853 config: None,
854 scope: None,
855 idempotency_token: idempotency_token(),
856 }
857 }
858
859 pub fn with_config(self, config: BasinConfig) -> Self {
861 Self {
862 config: Some(config),
863 ..self
864 }
865 }
866
867 pub fn with_scope(self, scope: BasinScope) -> Self {
869 Self {
870 scope: Some(scope),
871 ..self
872 }
873 }
874
875 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
877 Self {
878 idempotency_token: idempotency_token.into(),
879 ..self
880 }
881 }
882}
883
884impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
885 fn from(value: CreateBasinInput) -> Self {
886 (
887 api::basin::CreateBasinRequest {
888 basin: value.name,
889 config: value.config.map(Into::into),
890 scope: value.scope.map(Into::into),
891 },
892 value.idempotency_token,
893 )
894 }
895}
896
897#[derive(Debug, Clone, Default)]
898#[non_exhaustive]
899pub struct ListBasinsInput {
901 pub prefix: BasinNamePrefix,
905 pub start_after: BasinNameStartAfter,
911 pub limit: Option<usize>,
915}
916
917impl ListBasinsInput {
918 pub fn new() -> Self {
920 Self::default()
921 }
922
923 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
925 Self { prefix, ..self }
926 }
927
928 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
931 Self {
932 start_after,
933 ..self
934 }
935 }
936
937 pub fn with_limit(self, limit: usize) -> Self {
939 Self {
940 limit: Some(limit),
941 ..self
942 }
943 }
944}
945
946impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
947 fn from(value: ListBasinsInput) -> Self {
948 Self {
949 prefix: Some(value.prefix),
950 start_after: Some(value.start_after),
951 limit: value.limit,
952 }
953 }
954}
955
956#[derive(Debug, Clone)]
957pub struct ListAllBasinsInput {
959 pub prefix: BasinNamePrefix,
963 pub start_after: BasinNameStartAfter,
969 pub ignore_pending_deletions: bool,
973}
974
975impl Default for ListAllBasinsInput {
976 fn default() -> Self {
977 Self {
978 prefix: BasinNamePrefix::default(),
979 start_after: BasinNameStartAfter::default(),
980 ignore_pending_deletions: true,
981 }
982 }
983}
984
985impl ListAllBasinsInput {
986 pub fn new() -> Self {
988 Self::default()
989 }
990
991 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
993 Self { prefix, ..self }
994 }
995
996 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
999 Self {
1000 start_after,
1001 ..self
1002 }
1003 }
1004
1005 pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
1007 Self {
1008 ignore_pending_deletions,
1009 ..self
1010 }
1011 }
1012}
1013
1014#[derive(Debug, Clone, PartialEq, Eq)]
1015pub enum BasinState {
1017 Active,
1019 Creating,
1021 Deleting,
1023}
1024
1025impl From<api::basin::BasinState> for BasinState {
1026 fn from(value: api::basin::BasinState) -> Self {
1027 match value {
1028 api::basin::BasinState::Active => BasinState::Active,
1029 api::basin::BasinState::Creating => BasinState::Creating,
1030 api::basin::BasinState::Deleting => BasinState::Deleting,
1031 }
1032 }
1033}
1034
1035#[derive(Debug, Clone, PartialEq, Eq)]
1036#[non_exhaustive]
1037pub struct BasinInfo {
1039 pub name: BasinName,
1041 pub scope: Option<BasinScope>,
1043 pub state: BasinState,
1045}
1046
1047impl From<api::basin::BasinInfo> for BasinInfo {
1048 fn from(value: api::basin::BasinInfo) -> Self {
1049 Self {
1050 name: value.name,
1051 scope: value.scope.map(Into::into),
1052 state: value.state.into(),
1053 }
1054 }
1055}
1056
1057#[derive(Debug, Clone)]
1058#[non_exhaustive]
1059pub struct DeleteBasinInput {
1061 pub name: BasinName,
1063 pub ignore_not_found: bool,
1065}
1066
1067impl DeleteBasinInput {
1068 pub fn new(name: BasinName) -> Self {
1070 Self {
1071 name,
1072 ignore_not_found: false,
1073 }
1074 }
1075
1076 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1078 Self {
1079 ignore_not_found,
1080 ..self
1081 }
1082 }
1083}
1084
1085#[derive(Debug, Clone, Default)]
1086#[non_exhaustive]
1087pub struct TimestampingReconfiguration {
1089 pub mode: Maybe<Option<TimestampingMode>>,
1091 pub uncapped: Maybe<Option<bool>>,
1093}
1094
1095impl TimestampingReconfiguration {
1096 pub fn new() -> Self {
1098 Self::default()
1099 }
1100
1101 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1103 Self {
1104 mode: Maybe::Specified(Some(mode)),
1105 ..self
1106 }
1107 }
1108
1109 pub fn with_uncapped(self, uncapped: bool) -> Self {
1111 Self {
1112 uncapped: Maybe::Specified(Some(uncapped)),
1113 ..self
1114 }
1115 }
1116}
1117
1118impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1119 fn from(value: TimestampingReconfiguration) -> Self {
1120 Self {
1121 mode: value.mode.map(|m| m.map(Into::into)),
1122 uncapped: value.uncapped,
1123 }
1124 }
1125}
1126
1127#[derive(Debug, Clone, Default)]
1128#[non_exhaustive]
1129pub struct DeleteOnEmptyReconfiguration {
1131 pub min_age_secs: Maybe<Option<u64>>,
1133}
1134
1135impl DeleteOnEmptyReconfiguration {
1136 pub fn new() -> Self {
1138 Self::default()
1139 }
1140
1141 pub fn with_min_age(self, min_age: Duration) -> Self {
1143 Self {
1144 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1145 }
1146 }
1147}
1148
1149impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1150 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1151 Self {
1152 min_age_secs: value.min_age_secs,
1153 }
1154 }
1155}
1156
1157#[derive(Debug, Clone, Default)]
1158#[non_exhaustive]
1159pub struct StreamReconfiguration {
1161 pub storage_class: Maybe<Option<StorageClass>>,
1163 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1165 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1167 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1169}
1170
1171impl StreamReconfiguration {
1172 pub fn new() -> Self {
1174 Self::default()
1175 }
1176
1177 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1179 Self {
1180 storage_class: Maybe::Specified(Some(storage_class)),
1181 ..self
1182 }
1183 }
1184
1185 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1187 Self {
1188 retention_policy: Maybe::Specified(Some(retention_policy)),
1189 ..self
1190 }
1191 }
1192
1193 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1195 Self {
1196 timestamping: Maybe::Specified(Some(timestamping)),
1197 ..self
1198 }
1199 }
1200
1201 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1203 Self {
1204 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1205 ..self
1206 }
1207 }
1208}
1209
1210impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1211 fn from(value: StreamReconfiguration) -> Self {
1212 Self {
1213 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1214 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1215 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1216 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1217 }
1218 }
1219}
1220
1221#[derive(Debug, Clone, Default)]
1222#[non_exhaustive]
1223pub struct BasinReconfiguration {
1225 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1227 pub create_stream_on_append: Maybe<bool>,
1230 pub create_stream_on_read: Maybe<bool>,
1232}
1233
1234impl BasinReconfiguration {
1235 pub fn new() -> Self {
1237 Self::default()
1238 }
1239
1240 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1243 Self {
1244 default_stream_config: Maybe::Specified(Some(config)),
1245 ..self
1246 }
1247 }
1248
1249 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1252 Self {
1253 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1254 ..self
1255 }
1256 }
1257
1258 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1261 Self {
1262 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1263 ..self
1264 }
1265 }
1266}
1267
1268impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1269 fn from(value: BasinReconfiguration) -> Self {
1270 Self {
1271 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1272 create_stream_on_append: value.create_stream_on_append,
1273 create_stream_on_read: value.create_stream_on_read,
1274 }
1275 }
1276}
1277
1278#[derive(Debug, Clone)]
1279#[non_exhaustive]
1280pub struct ReconfigureBasinInput {
1282 pub name: BasinName,
1284 pub config: BasinReconfiguration,
1286}
1287
1288impl ReconfigureBasinInput {
1289 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1291 Self { name, config }
1292 }
1293}
1294
1295#[derive(Debug, Clone, Default)]
1296#[non_exhaustive]
1297pub struct ListAccessTokensInput {
1299 pub prefix: AccessTokenIdPrefix,
1303 pub start_after: AccessTokenIdStartAfter,
1309 pub limit: Option<usize>,
1313}
1314
1315impl ListAccessTokensInput {
1316 pub fn new() -> Self {
1318 Self::default()
1319 }
1320
1321 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1323 Self { prefix, ..self }
1324 }
1325
1326 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1329 Self {
1330 start_after,
1331 ..self
1332 }
1333 }
1334
1335 pub fn with_limit(self, limit: usize) -> Self {
1337 Self {
1338 limit: Some(limit),
1339 ..self
1340 }
1341 }
1342}
1343
1344impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1345 fn from(value: ListAccessTokensInput) -> Self {
1346 Self {
1347 prefix: Some(value.prefix),
1348 start_after: Some(value.start_after),
1349 limit: value.limit,
1350 }
1351 }
1352}
1353
1354#[derive(Debug, Clone, Default)]
1355pub struct ListAllAccessTokensInput {
1357 pub prefix: AccessTokenIdPrefix,
1361 pub start_after: AccessTokenIdStartAfter,
1367}
1368
1369impl ListAllAccessTokensInput {
1370 pub fn new() -> Self {
1372 Self::default()
1373 }
1374
1375 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1377 Self { prefix, ..self }
1378 }
1379
1380 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1383 Self {
1384 start_after,
1385 ..self
1386 }
1387 }
1388}
1389
1390#[derive(Debug, Clone)]
1391#[non_exhaustive]
1392pub struct AccessTokenInfo {
1394 pub id: AccessTokenId,
1396 pub expires_at: S2DateTime,
1398 pub auto_prefix_streams: bool,
1401 pub scope: AccessTokenScope,
1403}
1404
1405impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1406 type Error = ValidationError;
1407
1408 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1409 let expires_at = value
1410 .expires_at
1411 .map(Into::into)
1412 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1413 Ok(Self {
1414 id: value.id,
1415 expires_at,
1416 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1417 scope: value.scope.into(),
1418 })
1419 }
1420}
1421
1422#[derive(Debug, Clone)]
1423pub enum BasinMatcher {
1427 None,
1429 Exact(BasinName),
1431 Prefix(BasinNamePrefix),
1433}
1434
1435#[derive(Debug, Clone)]
1436pub enum StreamMatcher {
1440 None,
1442 Exact(StreamName),
1444 Prefix(StreamNamePrefix),
1446}
1447
1448#[derive(Debug, Clone)]
1449pub enum AccessTokenMatcher {
1453 None,
1455 Exact(AccessTokenId),
1457 Prefix(AccessTokenIdPrefix),
1459}
1460
1461#[derive(Debug, Clone, Default)]
1462#[non_exhaustive]
1463pub struct ReadWritePermissions {
1465 pub read: bool,
1469 pub write: bool,
1473}
1474
1475impl ReadWritePermissions {
1476 pub fn new() -> Self {
1478 Self::default()
1479 }
1480
1481 pub fn read_only() -> Self {
1483 Self {
1484 read: true,
1485 write: false,
1486 }
1487 }
1488
1489 pub fn write_only() -> Self {
1491 Self {
1492 read: false,
1493 write: true,
1494 }
1495 }
1496
1497 pub fn read_write() -> Self {
1499 Self {
1500 read: true,
1501 write: true,
1502 }
1503 }
1504}
1505
1506impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1507 fn from(value: ReadWritePermissions) -> Self {
1508 Self {
1509 read: Some(value.read),
1510 write: Some(value.write),
1511 }
1512 }
1513}
1514
1515impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1516 fn from(value: api::access::ReadWritePermissions) -> Self {
1517 Self {
1518 read: value.read.unwrap_or_default(),
1519 write: value.write.unwrap_or_default(),
1520 }
1521 }
1522}
1523
1524#[derive(Debug, Clone, Default)]
1525#[non_exhaustive]
1526pub struct OperationGroupPermissions {
1530 pub account: Option<ReadWritePermissions>,
1534 pub basin: Option<ReadWritePermissions>,
1538 pub stream: Option<ReadWritePermissions>,
1542}
1543
1544impl OperationGroupPermissions {
1545 pub fn new() -> Self {
1547 Self::default()
1548 }
1549
1550 pub fn read_only_all() -> Self {
1552 Self {
1553 account: Some(ReadWritePermissions::read_only()),
1554 basin: Some(ReadWritePermissions::read_only()),
1555 stream: Some(ReadWritePermissions::read_only()),
1556 }
1557 }
1558
1559 pub fn write_only_all() -> Self {
1561 Self {
1562 account: Some(ReadWritePermissions::write_only()),
1563 basin: Some(ReadWritePermissions::write_only()),
1564 stream: Some(ReadWritePermissions::write_only()),
1565 }
1566 }
1567
1568 pub fn read_write_all() -> Self {
1570 Self {
1571 account: Some(ReadWritePermissions::read_write()),
1572 basin: Some(ReadWritePermissions::read_write()),
1573 stream: Some(ReadWritePermissions::read_write()),
1574 }
1575 }
1576
1577 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1579 Self {
1580 account: Some(account),
1581 ..self
1582 }
1583 }
1584
1585 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1587 Self {
1588 basin: Some(basin),
1589 ..self
1590 }
1591 }
1592
1593 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1595 Self {
1596 stream: Some(stream),
1597 ..self
1598 }
1599 }
1600}
1601
1602impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1603 fn from(value: OperationGroupPermissions) -> Self {
1604 Self {
1605 account: value.account.map(Into::into),
1606 basin: value.basin.map(Into::into),
1607 stream: value.stream.map(Into::into),
1608 }
1609 }
1610}
1611
1612impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1613 fn from(value: api::access::PermittedOperationGroups) -> Self {
1614 Self {
1615 account: value.account.map(Into::into),
1616 basin: value.basin.map(Into::into),
1617 stream: value.stream.map(Into::into),
1618 }
1619 }
1620}
1621
1622#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1623pub enum Operation {
1627 ListBasins,
1629 CreateBasin,
1631 GetBasinConfig,
1633 DeleteBasin,
1635 ReconfigureBasin,
1637 ListAccessTokens,
1639 IssueAccessToken,
1641 RevokeAccessToken,
1643 GetAccountMetrics,
1645 GetBasinMetrics,
1647 GetStreamMetrics,
1649 ListStreams,
1651 CreateStream,
1653 GetStreamConfig,
1655 DeleteStream,
1657 ReconfigureStream,
1659 CheckTail,
1661 Append,
1663 Read,
1665 Trim,
1667 Fence,
1669}
1670
1671impl From<Operation> for api::access::Operation {
1672 fn from(value: Operation) -> Self {
1673 match value {
1674 Operation::ListBasins => api::access::Operation::ListBasins,
1675 Operation::CreateBasin => api::access::Operation::CreateBasin,
1676 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1677 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1678 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1679 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1680 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1681 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1682 Operation::ListStreams => api::access::Operation::ListStreams,
1683 Operation::CreateStream => api::access::Operation::CreateStream,
1684 Operation::DeleteStream => api::access::Operation::DeleteStream,
1685 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1686 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1687 Operation::CheckTail => api::access::Operation::CheckTail,
1688 Operation::Append => api::access::Operation::Append,
1689 Operation::Read => api::access::Operation::Read,
1690 Operation::Trim => api::access::Operation::Trim,
1691 Operation::Fence => api::access::Operation::Fence,
1692 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1693 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1694 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1695 }
1696 }
1697}
1698
1699impl From<api::access::Operation> for Operation {
1700 fn from(value: api::access::Operation) -> Self {
1701 match value {
1702 api::access::Operation::ListBasins => Operation::ListBasins,
1703 api::access::Operation::CreateBasin => Operation::CreateBasin,
1704 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1705 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1706 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1707 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1708 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1709 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1710 api::access::Operation::ListStreams => Operation::ListStreams,
1711 api::access::Operation::CreateStream => Operation::CreateStream,
1712 api::access::Operation::DeleteStream => Operation::DeleteStream,
1713 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1714 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1715 api::access::Operation::CheckTail => Operation::CheckTail,
1716 api::access::Operation::Append => Operation::Append,
1717 api::access::Operation::Read => Operation::Read,
1718 api::access::Operation::Trim => Operation::Trim,
1719 api::access::Operation::Fence => Operation::Fence,
1720 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1721 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1722 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1723 }
1724 }
1725}
1726
1727#[derive(Debug, Clone)]
1728#[non_exhaustive]
1729pub struct AccessTokenScopeInput {
1737 basins: Option<BasinMatcher>,
1738 streams: Option<StreamMatcher>,
1739 access_tokens: Option<AccessTokenMatcher>,
1740 op_group_perms: Option<OperationGroupPermissions>,
1741 ops: HashSet<Operation>,
1742}
1743
1744impl AccessTokenScopeInput {
1745 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1747 Self {
1748 basins: None,
1749 streams: None,
1750 access_tokens: None,
1751 op_group_perms: None,
1752 ops: ops.into_iter().collect(),
1753 }
1754 }
1755
1756 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1758 Self {
1759 basins: None,
1760 streams: None,
1761 access_tokens: None,
1762 op_group_perms: Some(op_group_perms),
1763 ops: HashSet::default(),
1764 }
1765 }
1766
1767 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1769 Self {
1770 ops: ops.into_iter().collect(),
1771 ..self
1772 }
1773 }
1774
1775 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1777 Self {
1778 op_group_perms: Some(op_group_perms),
1779 ..self
1780 }
1781 }
1782
1783 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1787 Self {
1788 basins: Some(basins),
1789 ..self
1790 }
1791 }
1792
1793 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1797 Self {
1798 streams: Some(streams),
1799 ..self
1800 }
1801 }
1802
1803 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1807 Self {
1808 access_tokens: Some(access_tokens),
1809 ..self
1810 }
1811 }
1812}
1813
1814#[derive(Debug, Clone)]
1815#[non_exhaustive]
1816pub struct AccessTokenScope {
1818 pub basins: Option<BasinMatcher>,
1820 pub streams: Option<StreamMatcher>,
1822 pub access_tokens: Option<AccessTokenMatcher>,
1824 pub op_group_perms: Option<OperationGroupPermissions>,
1826 pub ops: HashSet<Operation>,
1828}
1829
1830impl From<api::access::AccessTokenScope> for AccessTokenScope {
1831 fn from(value: api::access::AccessTokenScope) -> Self {
1832 Self {
1833 basins: value.basins.map(|rs| match rs {
1834 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1835 BasinMatcher::Exact(e)
1836 }
1837 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1838 BasinMatcher::None
1839 }
1840 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1841 }),
1842 streams: value.streams.map(|rs| match rs {
1843 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1844 StreamMatcher::Exact(e)
1845 }
1846 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1847 StreamMatcher::None
1848 }
1849 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1850 }),
1851 access_tokens: value.access_tokens.map(|rs| match rs {
1852 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1853 AccessTokenMatcher::Exact(e)
1854 }
1855 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1856 AccessTokenMatcher::None
1857 }
1858 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1859 }),
1860 op_group_perms: value.op_groups.map(Into::into),
1861 ops: value
1862 .ops
1863 .map(|ops| ops.into_iter().map(Into::into).collect())
1864 .unwrap_or_default(),
1865 }
1866 }
1867}
1868
1869impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1870 fn from(value: AccessTokenScopeInput) -> Self {
1871 Self {
1872 basins: value.basins.map(|rs| match rs {
1873 BasinMatcher::None => {
1874 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1875 }
1876 BasinMatcher::Exact(e) => {
1877 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1878 }
1879 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1880 }),
1881 streams: value.streams.map(|rs| match rs {
1882 StreamMatcher::None => {
1883 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1884 }
1885 StreamMatcher::Exact(e) => {
1886 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1887 }
1888 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1889 }),
1890 access_tokens: value.access_tokens.map(|rs| match rs {
1891 AccessTokenMatcher::None => {
1892 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1893 }
1894 AccessTokenMatcher::Exact(e) => {
1895 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1896 }
1897 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1898 }),
1899 op_groups: value.op_group_perms.map(Into::into),
1900 ops: if value.ops.is_empty() {
1901 None
1902 } else {
1903 Some(value.ops.into_iter().map(Into::into).collect())
1904 },
1905 }
1906 }
1907}
1908
1909#[derive(Debug, Clone)]
1910#[non_exhaustive]
1911pub struct IssueAccessTokenInput {
1913 pub id: AccessTokenId,
1915 pub expires_at: Option<S2DateTime>,
1920 pub auto_prefix_streams: bool,
1928 pub scope: AccessTokenScopeInput,
1930}
1931
1932impl IssueAccessTokenInput {
1933 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1935 Self {
1936 id,
1937 expires_at: None,
1938 auto_prefix_streams: false,
1939 scope,
1940 }
1941 }
1942
1943 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1945 Self {
1946 expires_at: Some(expires_at),
1947 ..self
1948 }
1949 }
1950
1951 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1954 Self {
1955 auto_prefix_streams,
1956 ..self
1957 }
1958 }
1959}
1960
1961impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1962 fn from(value: IssueAccessTokenInput) -> Self {
1963 Self {
1964 id: value.id,
1965 expires_at: value.expires_at.map(Into::into),
1966 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1967 scope: value.scope.into(),
1968 }
1969 }
1970}
1971
1972#[derive(Debug, Clone, Copy)]
1973pub enum TimeseriesInterval {
1975 Minute,
1977 Hour,
1979 Day,
1981}
1982
1983impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
1984 fn from(value: TimeseriesInterval) -> Self {
1985 match value {
1986 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
1987 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
1988 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
1989 }
1990 }
1991}
1992
1993impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
1994 fn from(value: api::metrics::TimeseriesInterval) -> Self {
1995 match value {
1996 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
1997 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
1998 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
1999 }
2000 }
2001}
2002
2003#[derive(Debug, Clone, Copy)]
2004#[non_exhaustive]
2005pub struct TimeRange {
2007 pub start: u32,
2009 pub end: u32,
2011}
2012
2013impl TimeRange {
2014 pub fn new(start: u32, end: u32) -> Self {
2016 Self { start, end }
2017 }
2018}
2019
2020#[derive(Debug, Clone, Copy)]
2021#[non_exhaustive]
2022pub struct TimeRangeAndInterval {
2024 pub start: u32,
2026 pub end: u32,
2028 pub interval: Option<TimeseriesInterval>,
2032}
2033
2034impl TimeRangeAndInterval {
2035 pub fn new(start: u32, end: u32) -> Self {
2037 Self {
2038 start,
2039 end,
2040 interval: None,
2041 }
2042 }
2043
2044 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2046 Self {
2047 interval: Some(interval),
2048 ..self
2049 }
2050 }
2051}
2052
2053#[derive(Debug, Clone, Copy)]
2054pub enum AccountMetricSet {
2056 ActiveBasins(TimeRange),
2059 AccountOps(TimeRangeAndInterval),
2066}
2067
2068#[derive(Debug, Clone)]
2069#[non_exhaustive]
2070pub struct GetAccountMetricsInput {
2072 pub set: AccountMetricSet,
2074}
2075
2076impl GetAccountMetricsInput {
2077 pub fn new(set: AccountMetricSet) -> Self {
2079 Self { set }
2080 }
2081}
2082
2083impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2084 fn from(value: GetAccountMetricsInput) -> Self {
2085 let (set, start, end, interval) = match value.set {
2086 AccountMetricSet::ActiveBasins(args) => (
2087 api::metrics::AccountMetricSet::ActiveBasins,
2088 args.start,
2089 args.end,
2090 None,
2091 ),
2092 AccountMetricSet::AccountOps(args) => (
2093 api::metrics::AccountMetricSet::AccountOps,
2094 args.start,
2095 args.end,
2096 args.interval,
2097 ),
2098 };
2099 Self {
2100 set,
2101 start: Some(start),
2102 end: Some(end),
2103 interval: interval.map(Into::into),
2104 }
2105 }
2106}
2107
2108#[derive(Debug, Clone, Copy)]
2109pub enum BasinMetricSet {
2111 Storage(TimeRange),
2114 AppendOps(TimeRangeAndInterval),
2122 ReadOps(TimeRangeAndInterval),
2130 ReadThroughput(TimeRangeAndInterval),
2137 AppendThroughput(TimeRangeAndInterval),
2144 BasinOps(TimeRangeAndInterval),
2151}
2152
2153#[derive(Debug, Clone)]
2154#[non_exhaustive]
2155pub struct GetBasinMetricsInput {
2157 pub name: BasinName,
2159 pub set: BasinMetricSet,
2161}
2162
2163impl GetBasinMetricsInput {
2164 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2166 Self { name, set }
2167 }
2168}
2169
2170impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2171 fn from(value: GetBasinMetricsInput) -> Self {
2172 let (set, start, end, interval) = match value.set {
2173 BasinMetricSet::Storage(args) => (
2174 api::metrics::BasinMetricSet::Storage,
2175 args.start,
2176 args.end,
2177 None,
2178 ),
2179 BasinMetricSet::AppendOps(args) => (
2180 api::metrics::BasinMetricSet::AppendOps,
2181 args.start,
2182 args.end,
2183 args.interval,
2184 ),
2185 BasinMetricSet::ReadOps(args) => (
2186 api::metrics::BasinMetricSet::ReadOps,
2187 args.start,
2188 args.end,
2189 args.interval,
2190 ),
2191 BasinMetricSet::ReadThroughput(args) => (
2192 api::metrics::BasinMetricSet::ReadThroughput,
2193 args.start,
2194 args.end,
2195 args.interval,
2196 ),
2197 BasinMetricSet::AppendThroughput(args) => (
2198 api::metrics::BasinMetricSet::AppendThroughput,
2199 args.start,
2200 args.end,
2201 args.interval,
2202 ),
2203 BasinMetricSet::BasinOps(args) => (
2204 api::metrics::BasinMetricSet::BasinOps,
2205 args.start,
2206 args.end,
2207 args.interval,
2208 ),
2209 };
2210 (
2211 value.name,
2212 api::metrics::BasinMetricSetRequest {
2213 set,
2214 start: Some(start),
2215 end: Some(end),
2216 interval: interval.map(Into::into),
2217 },
2218 )
2219 }
2220}
2221
2222#[derive(Debug, Clone, Copy)]
2223pub enum StreamMetricSet {
2225 Storage(TimeRange),
2228}
2229
2230#[derive(Debug, Clone)]
2231#[non_exhaustive]
2232pub struct GetStreamMetricsInput {
2234 pub basin_name: BasinName,
2236 pub stream_name: StreamName,
2238 pub set: StreamMetricSet,
2240}
2241
2242impl GetStreamMetricsInput {
2243 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2246 Self {
2247 basin_name,
2248 stream_name,
2249 set,
2250 }
2251 }
2252}
2253
2254impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2255 fn from(value: GetStreamMetricsInput) -> Self {
2256 let (set, start, end, interval) = match value.set {
2257 StreamMetricSet::Storage(args) => (
2258 api::metrics::StreamMetricSet::Storage,
2259 args.start,
2260 args.end,
2261 None,
2262 ),
2263 };
2264 (
2265 value.basin_name,
2266 value.stream_name,
2267 api::metrics::StreamMetricSetRequest {
2268 set,
2269 start: Some(start),
2270 end: Some(end),
2271 interval,
2272 },
2273 )
2274 }
2275}
2276
2277#[derive(Debug, Clone, Copy)]
2278pub enum MetricUnit {
2280 Bytes,
2282 Operations,
2284}
2285
2286impl From<api::metrics::MetricUnit> for MetricUnit {
2287 fn from(value: api::metrics::MetricUnit) -> Self {
2288 match value {
2289 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2290 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2291 }
2292 }
2293}
2294
2295#[derive(Debug, Clone)]
2296#[non_exhaustive]
2297pub struct ScalarMetric {
2299 pub name: String,
2301 pub unit: MetricUnit,
2303 pub value: f64,
2305}
2306
2307#[derive(Debug, Clone)]
2308#[non_exhaustive]
2309pub struct AccumulationMetric {
2312 pub name: String,
2314 pub unit: MetricUnit,
2316 pub interval: TimeseriesInterval,
2318 pub values: Vec<(u32, f64)>,
2322}
2323
2324#[derive(Debug, Clone)]
2325#[non_exhaustive]
2326pub struct GaugeMetric {
2328 pub name: String,
2330 pub unit: MetricUnit,
2332 pub values: Vec<(u32, f64)>,
2335}
2336
2337#[derive(Debug, Clone)]
2338#[non_exhaustive]
2339pub struct LabelMetric {
2341 pub name: String,
2343 pub values: Vec<String>,
2345}
2346
2347#[derive(Debug, Clone)]
2348pub enum Metric {
2350 Scalar(ScalarMetric),
2352 Accumulation(AccumulationMetric),
2355 Gauge(GaugeMetric),
2357 Label(LabelMetric),
2359}
2360
2361impl From<api::metrics::Metric> for Metric {
2362 fn from(value: api::metrics::Metric) -> Self {
2363 match value {
2364 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2365 name: sm.name.into(),
2366 unit: sm.unit.into(),
2367 value: sm.value,
2368 }),
2369 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2370 name: am.name.into(),
2371 unit: am.unit.into(),
2372 interval: am.interval.into(),
2373 values: am.values,
2374 }),
2375 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2376 name: gm.name.into(),
2377 unit: gm.unit.into(),
2378 values: gm.values,
2379 }),
2380 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2381 name: lm.name.into(),
2382 values: lm.values,
2383 }),
2384 }
2385 }
2386}
2387
2388#[derive(Debug, Clone, Default)]
2389#[non_exhaustive]
2390pub struct ListStreamsInput {
2392 pub prefix: StreamNamePrefix,
2396 pub start_after: StreamNameStartAfter,
2402 pub limit: Option<usize>,
2406}
2407
2408impl ListStreamsInput {
2409 pub fn new() -> Self {
2411 Self::default()
2412 }
2413
2414 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2416 Self { prefix, ..self }
2417 }
2418
2419 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2422 Self {
2423 start_after,
2424 ..self
2425 }
2426 }
2427
2428 pub fn with_limit(self, limit: usize) -> Self {
2430 Self {
2431 limit: Some(limit),
2432 ..self
2433 }
2434 }
2435}
2436
2437impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2438 fn from(value: ListStreamsInput) -> Self {
2439 Self {
2440 prefix: Some(value.prefix),
2441 start_after: Some(value.start_after),
2442 limit: value.limit,
2443 }
2444 }
2445}
2446
2447#[derive(Debug, Clone)]
2448pub struct ListAllStreamsInput {
2450 pub prefix: StreamNamePrefix,
2454 pub start_after: StreamNameStartAfter,
2460 pub ignore_pending_deletions: bool,
2464}
2465
2466impl Default for ListAllStreamsInput {
2467 fn default() -> Self {
2468 Self {
2469 prefix: StreamNamePrefix::default(),
2470 start_after: StreamNameStartAfter::default(),
2471 ignore_pending_deletions: true,
2472 }
2473 }
2474}
2475
2476impl ListAllStreamsInput {
2477 pub fn new() -> Self {
2479 Self::default()
2480 }
2481
2482 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2484 Self { prefix, ..self }
2485 }
2486
2487 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2490 Self {
2491 start_after,
2492 ..self
2493 }
2494 }
2495
2496 pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
2498 Self {
2499 ignore_pending_deletions,
2500 ..self
2501 }
2502 }
2503}
2504
2505#[derive(Debug, Clone, PartialEq)]
2506#[non_exhaustive]
2507pub struct StreamInfo {
2509 pub name: StreamName,
2511 pub created_at: S2DateTime,
2513 pub deleted_at: Option<S2DateTime>,
2515}
2516
2517impl From<api::stream::StreamInfo> for StreamInfo {
2518 fn from(value: api::stream::StreamInfo) -> Self {
2519 Self {
2520 name: value.name,
2521 created_at: value.created_at.into(),
2522 deleted_at: value.deleted_at.map(Into::into),
2523 }
2524 }
2525}
2526
2527#[derive(Debug, Clone)]
2528#[non_exhaustive]
2529pub struct CreateStreamInput {
2531 pub name: StreamName,
2533 pub config: Option<StreamConfig>,
2537 pub idempotency_token: String,
2545}
2546
2547impl CreateStreamInput {
2548 pub fn new(name: StreamName) -> Self {
2550 Self {
2551 name,
2552 config: None,
2553 idempotency_token: idempotency_token(),
2554 }
2555 }
2556
2557 pub fn with_config(self, config: StreamConfig) -> Self {
2559 Self {
2560 config: Some(config),
2561 ..self
2562 }
2563 }
2564
2565 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2567 Self {
2568 idempotency_token: idempotency_token.into(),
2569 ..self
2570 }
2571 }
2572}
2573
2574impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2575 fn from(value: CreateStreamInput) -> Self {
2576 (
2577 api::stream::CreateStreamRequest {
2578 stream: value.name,
2579 config: value.config.map(Into::into),
2580 },
2581 value.idempotency_token,
2582 )
2583 }
2584}
2585
2586#[derive(Debug, Clone)]
2587#[non_exhaustive]
2588pub struct DeleteStreamInput {
2590 pub name: StreamName,
2592 pub ignore_not_found: bool,
2594}
2595
2596impl DeleteStreamInput {
2597 pub fn new(name: StreamName) -> Self {
2599 Self {
2600 name,
2601 ignore_not_found: false,
2602 }
2603 }
2604
2605 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2607 Self {
2608 ignore_not_found,
2609 ..self
2610 }
2611 }
2612}
2613
2614#[derive(Debug, Clone)]
2615#[non_exhaustive]
2616pub struct ReconfigureStreamInput {
2618 pub name: StreamName,
2620 pub config: StreamReconfiguration,
2622}
2623
2624impl ReconfigureStreamInput {
2625 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2627 Self { name, config }
2628 }
2629}
2630
2631#[derive(Debug, Clone, PartialEq, Eq)]
2632pub struct FencingToken(String);
2638
2639impl FencingToken {
2640 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2642 rand::rng()
2643 .sample_iter(&rand::distr::Alphanumeric)
2644 .take(n)
2645 .map(char::from)
2646 .collect::<String>()
2647 .parse()
2648 }
2649}
2650
2651impl FromStr for FencingToken {
2652 type Err = ValidationError;
2653
2654 fn from_str(s: &str) -> Result<Self, Self::Err> {
2655 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2656 return Err(ValidationError(format!(
2657 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2658 )));
2659 }
2660 Ok(FencingToken(s.to_string()))
2661 }
2662}
2663
2664impl std::fmt::Display for FencingToken {
2665 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2666 write!(f, "{}", self.0)
2667 }
2668}
2669
2670impl Deref for FencingToken {
2671 type Target = str;
2672
2673 fn deref(&self) -> &Self::Target {
2674 &self.0
2675 }
2676}
2677
2678#[derive(Debug, Clone, Copy, PartialEq)]
2679#[non_exhaustive]
2680pub struct StreamPosition {
2682 pub seq_num: u64,
2684 pub timestamp: u64,
2687}
2688
2689impl std::fmt::Display for StreamPosition {
2690 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2691 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2692 }
2693}
2694
2695impl From<api::stream::proto::StreamPosition> for StreamPosition {
2696 fn from(value: api::stream::proto::StreamPosition) -> Self {
2697 Self {
2698 seq_num: value.seq_num,
2699 timestamp: value.timestamp,
2700 }
2701 }
2702}
2703
2704impl From<api::stream::StreamPosition> for StreamPosition {
2705 fn from(value: api::stream::StreamPosition) -> Self {
2706 Self {
2707 seq_num: value.seq_num,
2708 timestamp: value.timestamp,
2709 }
2710 }
2711}
2712
2713#[derive(Debug, Clone, PartialEq)]
2714#[non_exhaustive]
2715pub struct Header {
2717 pub name: Bytes,
2719 pub value: Bytes,
2721}
2722
2723impl Header {
2724 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2726 Self {
2727 name: name.into(),
2728 value: value.into(),
2729 }
2730 }
2731}
2732
2733impl From<Header> for api::stream::proto::Header {
2734 fn from(value: Header) -> Self {
2735 Self {
2736 name: value.name,
2737 value: value.value,
2738 }
2739 }
2740}
2741
2742impl From<api::stream::proto::Header> for Header {
2743 fn from(value: api::stream::proto::Header) -> Self {
2744 Self {
2745 name: value.name,
2746 value: value.value,
2747 }
2748 }
2749}
2750
2751#[derive(Debug, Clone, PartialEq)]
2752pub struct AppendRecord {
2754 body: Bytes,
2755 headers: Vec<Header>,
2756 timestamp: Option<u64>,
2757}
2758
2759impl AppendRecord {
2760 fn validate(self) -> Result<Self, ValidationError> {
2761 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2762 Err(ValidationError(format!(
2763 "metered_bytes: {} exceeds {}",
2764 self.metered_bytes(),
2765 RECORD_BATCH_MAX.bytes
2766 )))
2767 } else {
2768 Ok(self)
2769 }
2770 }
2771
2772 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2774 let record = Self {
2775 body: body.into(),
2776 headers: Vec::default(),
2777 timestamp: None,
2778 };
2779 record.validate()
2780 }
2781
2782 pub fn with_headers(
2784 self,
2785 headers: impl IntoIterator<Item = Header>,
2786 ) -> Result<Self, ValidationError> {
2787 let record = Self {
2788 headers: headers.into_iter().collect(),
2789 ..self
2790 };
2791 record.validate()
2792 }
2793
2794 pub fn with_timestamp(self, timestamp: u64) -> Self {
2798 Self {
2799 timestamp: Some(timestamp),
2800 ..self
2801 }
2802 }
2803}
2804
2805impl From<AppendRecord> for api::stream::proto::AppendRecord {
2806 fn from(value: AppendRecord) -> Self {
2807 Self {
2808 timestamp: value.timestamp,
2809 headers: value.headers.into_iter().map(Into::into).collect(),
2810 body: value.body,
2811 }
2812 }
2813}
2814
2815pub trait MeteredBytes {
2822 fn metered_bytes(&self) -> usize;
2824}
2825
2826macro_rules! metered_bytes_impl {
2827 ($ty:ty) => {
2828 impl MeteredBytes for $ty {
2829 fn metered_bytes(&self) -> usize {
2830 8 + (2 * self.headers.len())
2831 + self
2832 .headers
2833 .iter()
2834 .map(|h| h.name.len() + h.value.len())
2835 .sum::<usize>()
2836 + self.body.len()
2837 }
2838 }
2839 };
2840}
2841
2842metered_bytes_impl!(AppendRecord);
2843
2844#[derive(Debug, Clone)]
2845pub struct AppendRecordBatch {
2854 records: Vec<AppendRecord>,
2855 metered_bytes: usize,
2856}
2857
2858impl AppendRecordBatch {
2859 pub(crate) fn with_capacity(capacity: usize) -> Self {
2860 Self {
2861 records: Vec::with_capacity(capacity),
2862 metered_bytes: 0,
2863 }
2864 }
2865
2866 pub(crate) fn push(&mut self, record: AppendRecord) {
2867 self.metered_bytes += record.metered_bytes();
2868 self.records.push(record);
2869 }
2870
2871 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2873 where
2874 I: IntoIterator<Item = AppendRecord>,
2875 {
2876 let mut records = Vec::new();
2877 let mut metered_bytes = 0;
2878
2879 for record in iter {
2880 metered_bytes += record.metered_bytes();
2881 records.push(record);
2882
2883 if metered_bytes > RECORD_BATCH_MAX.bytes {
2884 return Err(ValidationError(format!(
2885 "batch size in metered bytes ({metered_bytes}) exceeds {}",
2886 RECORD_BATCH_MAX.bytes
2887 )));
2888 }
2889
2890 if records.len() > RECORD_BATCH_MAX.count {
2891 return Err(ValidationError(format!(
2892 "number of records in the batch exceeds {}",
2893 RECORD_BATCH_MAX.count
2894 )));
2895 }
2896 }
2897
2898 if records.is_empty() {
2899 return Err(ValidationError("batch is empty".into()));
2900 }
2901
2902 Ok(Self {
2903 records,
2904 metered_bytes,
2905 })
2906 }
2907}
2908
2909impl Deref for AppendRecordBatch {
2910 type Target = [AppendRecord];
2911
2912 fn deref(&self) -> &Self::Target {
2913 &self.records
2914 }
2915}
2916
2917impl MeteredBytes for AppendRecordBatch {
2918 fn metered_bytes(&self) -> usize {
2919 self.metered_bytes
2920 }
2921}
2922
2923#[derive(Debug, Clone)]
2924pub enum Command {
2926 Fence {
2928 fencing_token: FencingToken,
2930 },
2931 Trim {
2933 trim_point: u64,
2935 },
2936}
2937
2938#[derive(Debug, Clone)]
2939#[non_exhaustive]
2940pub struct CommandRecord {
2944 pub command: Command,
2946 pub timestamp: Option<u64>,
2948}
2949
2950impl CommandRecord {
2951 const FENCE: &[u8] = b"fence";
2952 const TRIM: &[u8] = b"trim";
2953
2954 pub fn fence(fencing_token: FencingToken) -> Self {
2959 Self {
2960 command: Command::Fence { fencing_token },
2961 timestamp: None,
2962 }
2963 }
2964
2965 pub fn trim(trim_point: u64) -> Self {
2972 Self {
2973 command: Command::Trim { trim_point },
2974 timestamp: None,
2975 }
2976 }
2977
2978 pub fn with_timestamp(self, timestamp: u64) -> Self {
2980 Self {
2981 timestamp: Some(timestamp),
2982 ..self
2983 }
2984 }
2985}
2986
2987impl From<CommandRecord> for AppendRecord {
2988 fn from(value: CommandRecord) -> Self {
2989 let (header_value, body) = match value.command {
2990 Command::Fence { fencing_token } => (
2991 CommandRecord::FENCE,
2992 Bytes::copy_from_slice(fencing_token.as_bytes()),
2993 ),
2994 Command::Trim { trim_point } => (
2995 CommandRecord::TRIM,
2996 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
2997 ),
2998 };
2999 Self {
3000 body,
3001 headers: vec![Header::new("", header_value)],
3002 timestamp: value.timestamp,
3003 }
3004 }
3005}
3006
3007#[derive(Debug, Clone)]
3008#[non_exhaustive]
3009pub struct AppendInput {
3012 pub records: AppendRecordBatch,
3014 pub match_seq_num: Option<u64>,
3018 pub fencing_token: Option<FencingToken>,
3023}
3024
3025impl AppendInput {
3026 pub fn new(records: AppendRecordBatch) -> Self {
3028 Self {
3029 records,
3030 match_seq_num: None,
3031 fencing_token: None,
3032 }
3033 }
3034
3035 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3037 Self {
3038 match_seq_num: Some(match_seq_num),
3039 ..self
3040 }
3041 }
3042
3043 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3045 Self {
3046 fencing_token: Some(fencing_token),
3047 ..self
3048 }
3049 }
3050}
3051
3052impl From<AppendInput> for api::stream::proto::AppendInput {
3053 fn from(value: AppendInput) -> Self {
3054 Self {
3055 records: value.records.iter().cloned().map(Into::into).collect(),
3056 match_seq_num: value.match_seq_num,
3057 fencing_token: value.fencing_token.map(|t| t.to_string()),
3058 }
3059 }
3060}
3061
3062#[derive(Debug, Clone, PartialEq)]
3063#[non_exhaustive]
3064pub struct AppendAck {
3066 pub start: StreamPosition,
3068 pub end: StreamPosition,
3074 pub tail: StreamPosition,
3079}
3080
3081impl From<api::stream::proto::AppendAck> for AppendAck {
3082 fn from(value: api::stream::proto::AppendAck) -> Self {
3083 Self {
3084 start: value.start.unwrap_or_default().into(),
3085 end: value.end.unwrap_or_default().into(),
3086 tail: value.tail.unwrap_or_default().into(),
3087 }
3088 }
3089}
3090
3091#[derive(Debug, Clone, Copy)]
3092pub enum ReadFrom {
3094 SeqNum(u64),
3096 Timestamp(u64),
3098 TailOffset(u64),
3100}
3101
3102impl Default for ReadFrom {
3103 fn default() -> Self {
3104 Self::SeqNum(0)
3105 }
3106}
3107
3108#[derive(Debug, Default, Clone)]
3109#[non_exhaustive]
3110pub struct ReadStart {
3112 pub from: ReadFrom,
3116 pub clamp_to_tail: bool,
3120}
3121
3122impl ReadStart {
3123 pub fn new() -> Self {
3125 Self::default()
3126 }
3127
3128 pub fn with_from(self, from: ReadFrom) -> Self {
3130 Self { from, ..self }
3131 }
3132
3133 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3135 Self {
3136 clamp_to_tail,
3137 ..self
3138 }
3139 }
3140}
3141
3142impl From<ReadStart> for api::stream::ReadStart {
3143 fn from(value: ReadStart) -> Self {
3144 let (seq_num, timestamp, tail_offset) = match value.from {
3145 ReadFrom::SeqNum(n) => (Some(n), None, None),
3146 ReadFrom::Timestamp(t) => (None, Some(t), None),
3147 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3148 };
3149 Self {
3150 seq_num,
3151 timestamp,
3152 tail_offset,
3153 clamp: if value.clamp_to_tail {
3154 Some(true)
3155 } else {
3156 None
3157 },
3158 }
3159 }
3160}
3161
3162#[derive(Debug, Clone, Default)]
3163#[non_exhaustive]
3164pub struct ReadLimits {
3166 pub count: Option<usize>,
3170 pub bytes: Option<usize>,
3174}
3175
3176impl ReadLimits {
3177 pub fn new() -> Self {
3179 Self::default()
3180 }
3181
3182 pub fn with_count(self, count: usize) -> Self {
3184 Self {
3185 count: Some(count),
3186 ..self
3187 }
3188 }
3189
3190 pub fn with_bytes(self, bytes: usize) -> Self {
3192 Self {
3193 bytes: Some(bytes),
3194 ..self
3195 }
3196 }
3197}
3198
3199#[derive(Debug, Clone, Default)]
3200#[non_exhaustive]
3201pub struct ReadStop {
3203 pub limits: ReadLimits,
3205 pub until: Option<RangeTo<u64>>,
3207 pub wait: Option<u32>,
3217}
3218
3219impl ReadStop {
3220 pub fn new() -> Self {
3222 Self::default()
3223 }
3224
3225 pub fn with_limits(self, limits: ReadLimits) -> Self {
3227 Self { limits, ..self }
3228 }
3229
3230 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3232 Self {
3233 until: Some(until),
3234 ..self
3235 }
3236 }
3237
3238 pub fn with_wait(self, wait: u32) -> Self {
3240 Self {
3241 wait: Some(wait),
3242 ..self
3243 }
3244 }
3245}
3246
3247impl From<ReadStop> for api::stream::ReadEnd {
3248 fn from(value: ReadStop) -> Self {
3249 Self {
3250 count: value.limits.count,
3251 bytes: value.limits.bytes,
3252 until: value.until.map(|r| r.end),
3253 wait: value.wait,
3254 }
3255 }
3256}
3257
3258#[derive(Debug, Clone, Default)]
3259#[non_exhaustive]
3260pub struct ReadInput {
3263 pub start: ReadStart,
3265 pub stop: ReadStop,
3267 pub ignore_command_records: bool,
3271}
3272
3273impl ReadInput {
3274 pub fn new() -> Self {
3276 Self::default()
3277 }
3278
3279 pub fn with_start(self, start: ReadStart) -> Self {
3281 Self { start, ..self }
3282 }
3283
3284 pub fn with_stop(self, stop: ReadStop) -> Self {
3286 Self { stop, ..self }
3287 }
3288
3289 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3291 Self {
3292 ignore_command_records,
3293 ..self
3294 }
3295 }
3296}
3297
3298#[derive(Debug, Clone)]
3299#[non_exhaustive]
3300pub struct SequencedRecord {
3302 pub seq_num: u64,
3304 pub body: Bytes,
3306 pub headers: Vec<Header>,
3308 pub timestamp: u64,
3310}
3311
3312impl SequencedRecord {
3313 pub fn is_command_record(&self) -> bool {
3315 self.headers.len() == 1 && *self.headers[0].name == *b""
3316 }
3317}
3318
3319impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3320 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3321 Self {
3322 seq_num: value.seq_num,
3323 body: value.body,
3324 headers: value.headers.into_iter().map(Into::into).collect(),
3325 timestamp: value.timestamp,
3326 }
3327 }
3328}
3329
3330metered_bytes_impl!(SequencedRecord);
3331
3332#[derive(Debug, Clone)]
3333#[non_exhaustive]
3334pub struct ReadBatch {
3337 pub records: Vec<SequencedRecord>,
3344 pub tail: Option<StreamPosition>,
3349}
3350
3351impl ReadBatch {
3352 pub(crate) fn from_api(
3353 batch: api::stream::proto::ReadBatch,
3354 ignore_command_records: bool,
3355 ) -> Self {
3356 Self {
3357 records: batch
3358 .records
3359 .into_iter()
3360 .map(Into::into)
3361 .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3362 .collect(),
3363 tail: batch.tail.map(Into::into),
3364 }
3365 }
3366}
3367
3368pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3370
3371#[derive(Debug, Clone, thiserror::Error)]
3372pub enum AppendConditionFailed {
3374 #[error("fencing token mismatch, expected: {0}")]
3375 FencingTokenMismatch(FencingToken),
3377 #[error("sequence number mismatch, expected: {0}")]
3378 SeqNumMismatch(u64),
3380}
3381
3382impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3383 fn from(value: api::stream::AppendConditionFailed) -> Self {
3384 match value {
3385 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3386 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3387 }
3388 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3389 AppendConditionFailed::SeqNumMismatch(seq)
3390 }
3391 }
3392 }
3393}
3394
3395#[derive(Debug, Clone, thiserror::Error)]
3396pub enum S2Error {
3398 #[error("{0}")]
3399 Client(String),
3401 #[error(transparent)]
3402 Validation(#[from] ValidationError),
3404 #[error("{0}")]
3405 AppendConditionFailed(AppendConditionFailed),
3407 #[error("read from an unwritten position. current tail: {0}")]
3408 ReadUnwritten(StreamPosition),
3410 #[error("{0}")]
3411 Server(ErrorResponse),
3413}
3414
3415impl From<ApiError> for S2Error {
3416 fn from(err: ApiError) -> Self {
3417 match err {
3418 ApiError::ReadUnwritten(tail_response) => {
3419 Self::ReadUnwritten(tail_response.tail.into())
3420 }
3421 ApiError::AppendConditionFailed(condition_failed) => {
3422 Self::AppendConditionFailed(condition_failed.into())
3423 }
3424 ApiError::Server(_, response) => Self::Server(response.into()),
3425 other => Self::Client(other.to_string()),
3426 }
3427 }
3428}
3429
3430#[derive(Debug, Clone, thiserror::Error)]
3431#[error("{code}: {message}")]
3432#[non_exhaustive]
3433pub struct ErrorResponse {
3435 pub code: String,
3437 pub message: String,
3439}
3440
3441impl From<ApiErrorResponse> for ErrorResponse {
3442 fn from(response: ApiErrorResponse) -> Self {
3443 Self {
3444 code: response.code,
3445 message: response.message,
3446 }
3447 }
3448}
3449
3450fn idempotency_token() -> String {
3451 uuid::Uuid::new_v4().simple().to_string()
3452}