1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::stream::StreamName;
45pub use s2_common::types::stream::StreamNamePrefix;
47pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66 fn from(dt: time::OffsetDateTime) -> Self {
67 Self(dt)
68 }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72 fn from(dt: S2DateTime) -> Self {
73 dt.0
74 }
75}
76
77impl FromStr for S2DateTime {
78 type Err = ValidationError;
79
80 fn from_str(s: &str) -> Result<Self, Self::Err> {
81 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82 .map(Self)
83 .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84 }
85}
86
87impl fmt::Display for S2DateTime {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 write!(
90 f,
91 "{}",
92 self.0
93 .format(&time::format_description::well_known::Rfc3339)
94 .expect("RFC3339 formatting should not fail for S2DateTime")
95 )
96 }
97}
98
99#[derive(Debug, Clone, PartialEq)]
101pub enum BasinAuthority {
102 ParentZone(Authority),
104 Direct(Authority),
106}
107
108#[derive(Debug, Clone)]
109#[non_exhaustive]
110pub struct S2Endpoints {
112 pub scheme: Scheme,
116 pub account_authority: Authority,
118 pub basin_authority: BasinAuthority,
120}
121
122impl S2Endpoints {
123 pub fn new(account_authority: Authority, basin_authority: BasinAuthority) -> Self {
125 Self {
126 scheme: Scheme::HTTPS,
127 account_authority,
128 basin_authority,
129 }
130 }
131
132 pub fn from_env() -> Result<Self, ValidationError> {
138 fn parse_account_endpoint(s: &str) -> Result<(Scheme, Authority), ValidationError> {
139 let (scheme, authority) = match s.find("://") {
140 Some(idx) => {
141 let scheme: Scheme = s[..idx]
142 .parse()
143 .map_err(|_| "invalid S2_ACCOUNT_ENDPOINT scheme".to_string())?;
144 (scheme, &s[idx + 3..])
145 }
146 None => (Scheme::HTTPS, s),
147 };
148 Ok((
149 scheme,
150 authority
151 .parse()
152 .map_err(|e| format!("invalid S2_ACCOUNT_ENDPOINT authority: {e}"))?,
153 ))
154 }
155
156 fn parse_basin_endpoint(s: &str) -> Result<(Scheme, BasinAuthority), ValidationError> {
157 let (scheme, authority) = match s.find("://") {
158 Some(idx) => {
159 let scheme: Scheme = s[..idx]
160 .parse()
161 .map_err(|_| "invalid S2_BASIN_ENDPOINT scheme".to_string())?;
162 (scheme, &s[idx + 3..])
163 }
164 None => (Scheme::HTTPS, s),
165 };
166 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
167 BasinAuthority::ParentZone(
168 authority
169 .parse()
170 .map_err(|e| format!("invalid S2_BASIN_ENDPOINT authority: {e}"))?,
171 )
172 } else {
173 BasinAuthority::Direct(
174 authority
175 .parse()
176 .map_err(|e| format!("invalid S2_BASIN_ENDPOINT authority: {e}"))?,
177 )
178 };
179 Ok((scheme, authority))
180 }
181
182 let (account_scheme, account_authority) = match std::env::var("S2_ACCOUNT_ENDPOINT") {
183 Ok(s) => parse_account_endpoint(&s)?,
184 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
185 Err(VarError::NotUnicode(_)) => {
186 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
187 }
188 };
189
190 let (basin_scheme, basin_authority) = match std::env::var("S2_BASIN_ENDPOINT") {
191 Ok(s) => parse_basin_endpoint(&s)?,
192 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
193 Err(VarError::NotUnicode(_)) => {
194 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
195 }
196 };
197
198 if account_scheme != basin_scheme {
199 return Err(
200 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
201 );
202 }
203
204 Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
205 }
206
207 pub fn with_scheme(self, scheme: Scheme) -> Self {
209 Self { scheme, ..self }
210 }
211
212 pub(crate) fn for_aws() -> Self {
213 Self {
214 scheme: Scheme::HTTPS,
215 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
216 basin_authority: BasinAuthority::ParentZone(
217 "b.aws.s2.dev".try_into().expect("valid authority"),
218 ),
219 }
220 }
221}
222
223#[derive(Debug, Clone, Copy)]
224pub enum Compression {
226 None,
228 Gzip,
230 Zstd,
232}
233
234impl From<Compression> for CompressionAlgorithm {
235 fn from(value: Compression) -> Self {
236 match value {
237 Compression::None => CompressionAlgorithm::None,
238 Compression::Gzip => CompressionAlgorithm::Gzip,
239 Compression::Zstd => CompressionAlgorithm::Zstd,
240 }
241 }
242}
243
244#[derive(Debug, Clone, Copy, PartialEq)]
245#[non_exhaustive]
246pub enum AppendRetryPolicy {
249 All,
251 NoSideEffects,
253}
254
255impl AppendRetryPolicy {
256 pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
257 match self {
258 Self::All => true,
259 Self::NoSideEffects => input.match_seq_num.is_some(),
260 }
261 }
262}
263
264#[derive(Debug, Clone)]
265#[non_exhaustive]
266pub struct RetryConfig {
275 pub max_attempts: NonZeroU32,
279 pub min_base_delay: Duration,
283 pub max_base_delay: Duration,
287 pub append_retry_policy: AppendRetryPolicy,
292}
293
294impl Default for RetryConfig {
295 fn default() -> Self {
296 Self {
297 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
298 min_base_delay: Duration::from_millis(100),
299 max_base_delay: Duration::from_secs(1),
300 append_retry_policy: AppendRetryPolicy::All,
301 }
302 }
303}
304
305impl RetryConfig {
306 pub fn new() -> Self {
308 Self::default()
309 }
310
311 pub(crate) fn max_retries(&self) -> u32 {
312 self.max_attempts.get() - 1
313 }
314
315 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
317 Self {
318 max_attempts,
319 ..self
320 }
321 }
322
323 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
325 Self {
326 min_base_delay,
327 ..self
328 }
329 }
330
331 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
333 Self {
334 max_base_delay,
335 ..self
336 }
337 }
338
339 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
342 Self {
343 append_retry_policy,
344 ..self
345 }
346 }
347}
348
349#[derive(Debug, Clone)]
350#[non_exhaustive]
351pub struct S2Config {
353 pub(crate) access_token: SecretString,
354 pub(crate) endpoints: S2Endpoints,
355 pub(crate) connection_timeout: Duration,
356 pub(crate) request_timeout: Duration,
357 pub(crate) retry: RetryConfig,
358 pub(crate) compression: Compression,
359 pub(crate) user_agent: HeaderValue,
360}
361
362impl S2Config {
363 pub fn new(access_token: impl Into<String>) -> Self {
365 Self {
366 access_token: access_token.into().into(),
367 endpoints: S2Endpoints::for_aws(),
368 connection_timeout: Duration::from_secs(3),
369 request_timeout: Duration::from_secs(5),
370 retry: RetryConfig::new(),
371 compression: Compression::None,
372 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
373 .parse()
374 .expect("valid user agent"),
375 }
376 }
377
378 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
380 Self { endpoints, ..self }
381 }
382
383 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
387 Self {
388 connection_timeout,
389 ..self
390 }
391 }
392
393 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
397 Self {
398 request_timeout,
399 ..self
400 }
401 }
402
403 pub fn with_retry(self, retry: RetryConfig) -> Self {
407 Self { retry, ..self }
408 }
409
410 pub fn with_compression(self, compression: Compression) -> Self {
414 Self {
415 compression,
416 ..self
417 }
418 }
419
420 #[doc(hidden)]
421 #[cfg(feature = "_hidden")]
422 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
423 let user_agent = user_agent
424 .into()
425 .parse()
426 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
427 Ok(Self { user_agent, ..self })
428 }
429}
430
431#[derive(Debug, Default, Clone, PartialEq, Eq)]
432#[non_exhaustive]
433pub struct Page<T> {
435 pub values: Vec<T>,
437 pub has_more: bool,
439}
440
441impl<T> Page<T> {
442 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
443 Self {
444 values: values.into(),
445 has_more,
446 }
447 }
448}
449
450#[derive(Debug, Clone, Copy, PartialEq, Eq)]
451pub enum StorageClass {
453 Standard,
455 Express,
457}
458
459impl From<api::config::StorageClass> for StorageClass {
460 fn from(value: api::config::StorageClass) -> Self {
461 match value {
462 api::config::StorageClass::Standard => StorageClass::Standard,
463 api::config::StorageClass::Express => StorageClass::Express,
464 }
465 }
466}
467
468impl From<StorageClass> for api::config::StorageClass {
469 fn from(value: StorageClass) -> Self {
470 match value {
471 StorageClass::Standard => api::config::StorageClass::Standard,
472 StorageClass::Express => api::config::StorageClass::Express,
473 }
474 }
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
478pub enum RetentionPolicy {
480 Age(u64),
482 Infinite,
484}
485
486impl From<api::config::RetentionPolicy> for RetentionPolicy {
487 fn from(value: api::config::RetentionPolicy) -> Self {
488 match value {
489 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
490 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
491 }
492 }
493}
494
495impl From<RetentionPolicy> for api::config::RetentionPolicy {
496 fn from(value: RetentionPolicy) -> Self {
497 match value {
498 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
499 RetentionPolicy::Infinite => {
500 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
501 }
502 }
503 }
504}
505
506#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507pub enum TimestampingMode {
509 ClientPrefer,
511 ClientRequire,
513 Arrival,
515}
516
517impl From<api::config::TimestampingMode> for TimestampingMode {
518 fn from(value: api::config::TimestampingMode) -> Self {
519 match value {
520 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
521 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
522 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
523 }
524 }
525}
526
527impl From<TimestampingMode> for api::config::TimestampingMode {
528 fn from(value: TimestampingMode) -> Self {
529 match value {
530 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
531 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
532 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
533 }
534 }
535}
536
537#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
538#[non_exhaustive]
539pub struct TimestampingConfig {
541 pub mode: Option<TimestampingMode>,
545 pub uncapped: bool,
549}
550
551impl TimestampingConfig {
552 pub fn new() -> Self {
554 Self::default()
555 }
556
557 pub fn with_mode(self, mode: TimestampingMode) -> Self {
559 Self {
560 mode: Some(mode),
561 ..self
562 }
563 }
564
565 pub fn with_uncapped(self, uncapped: bool) -> Self {
567 Self { uncapped, ..self }
568 }
569}
570
571impl From<api::config::TimestampingConfig> for TimestampingConfig {
572 fn from(value: api::config::TimestampingConfig) -> Self {
573 Self {
574 mode: value.mode.map(Into::into),
575 uncapped: value.uncapped.unwrap_or_default(),
576 }
577 }
578}
579
580impl From<TimestampingConfig> for api::config::TimestampingConfig {
581 fn from(value: TimestampingConfig) -> Self {
582 Self {
583 mode: value.mode.map(Into::into),
584 uncapped: Some(value.uncapped),
585 }
586 }
587}
588
589#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
590#[non_exhaustive]
591pub struct DeleteOnEmptyConfig {
593 pub min_age_secs: u64,
597}
598
599impl DeleteOnEmptyConfig {
600 pub fn new() -> Self {
602 Self::default()
603 }
604
605 pub fn with_min_age(self, min_age: Duration) -> Self {
607 Self {
608 min_age_secs: min_age.as_secs(),
609 }
610 }
611}
612
613impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
614 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
615 Self {
616 min_age_secs: value.min_age_secs,
617 }
618 }
619}
620
621impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
622 fn from(value: DeleteOnEmptyConfig) -> Self {
623 Self {
624 min_age_secs: value.min_age_secs,
625 }
626 }
627}
628
629#[derive(Debug, Clone, Default, PartialEq, Eq)]
630#[non_exhaustive]
631pub struct StreamConfig {
633 pub storage_class: Option<StorageClass>,
637 pub retention_policy: Option<RetentionPolicy>,
641 pub timestamping: Option<TimestampingConfig>,
645 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
649}
650
651impl StreamConfig {
652 pub fn new() -> Self {
654 Self::default()
655 }
656
657 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
659 Self {
660 storage_class: Some(storage_class),
661 ..self
662 }
663 }
664
665 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
667 Self {
668 retention_policy: Some(retention_policy),
669 ..self
670 }
671 }
672
673 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
675 Self {
676 timestamping: Some(timestamping),
677 ..self
678 }
679 }
680
681 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
683 Self {
684 delete_on_empty: Some(delete_on_empty),
685 ..self
686 }
687 }
688}
689
690impl From<api::config::StreamConfig> for StreamConfig {
691 fn from(value: api::config::StreamConfig) -> Self {
692 Self {
693 storage_class: value.storage_class.map(Into::into),
694 retention_policy: value.retention_policy.map(Into::into),
695 timestamping: value.timestamping.map(Into::into),
696 delete_on_empty: value.delete_on_empty.map(Into::into),
697 }
698 }
699}
700
701impl From<StreamConfig> for api::config::StreamConfig {
702 fn from(value: StreamConfig) -> Self {
703 Self {
704 storage_class: value.storage_class.map(Into::into),
705 retention_policy: value.retention_policy.map(Into::into),
706 timestamping: value.timestamping.map(Into::into),
707 delete_on_empty: value.delete_on_empty.map(Into::into),
708 }
709 }
710}
711
712#[derive(Debug, Clone, Default)]
713#[non_exhaustive]
714pub struct BasinConfig {
716 pub default_stream_config: Option<StreamConfig>,
720 pub create_stream_on_append: bool,
724 pub create_stream_on_read: bool,
728}
729
730impl BasinConfig {
731 pub fn new() -> Self {
733 Self::default()
734 }
735
736 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
738 Self {
739 default_stream_config: Some(config),
740 ..self
741 }
742 }
743
744 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
747 Self {
748 create_stream_on_append,
749 ..self
750 }
751 }
752
753 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
755 Self {
756 create_stream_on_read,
757 ..self
758 }
759 }
760}
761
762impl From<api::config::BasinConfig> for BasinConfig {
763 fn from(value: api::config::BasinConfig) -> Self {
764 Self {
765 default_stream_config: value.default_stream_config.map(Into::into),
766 create_stream_on_append: value.create_stream_on_append,
767 create_stream_on_read: value.create_stream_on_read,
768 }
769 }
770}
771
772impl From<BasinConfig> for api::config::BasinConfig {
773 fn from(value: BasinConfig) -> Self {
774 Self {
775 default_stream_config: value.default_stream_config.map(Into::into),
776 create_stream_on_append: value.create_stream_on_append,
777 create_stream_on_read: value.create_stream_on_read,
778 }
779 }
780}
781
782#[derive(Debug, Clone, PartialEq, Eq)]
783pub enum BasinScope {
785 AwsUsEast1,
787}
788
789impl From<api::basin::BasinScope> for BasinScope {
790 fn from(value: api::basin::BasinScope) -> Self {
791 match value {
792 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
793 }
794 }
795}
796
797impl From<BasinScope> for api::basin::BasinScope {
798 fn from(value: BasinScope) -> Self {
799 match value {
800 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
801 }
802 }
803}
804
805#[derive(Debug, Clone)]
806#[non_exhaustive]
807pub struct CreateBasinInput {
809 pub name: BasinName,
811 pub config: Option<BasinConfig>,
815 pub scope: Option<BasinScope>,
819 pub idempotency_token: String,
827}
828
829impl CreateBasinInput {
830 pub fn new(name: BasinName) -> Self {
832 Self {
833 name,
834 config: None,
835 scope: None,
836 idempotency_token: idempotency_token(),
837 }
838 }
839
840 pub fn with_config(self, config: BasinConfig) -> Self {
842 Self {
843 config: Some(config),
844 ..self
845 }
846 }
847
848 pub fn with_scope(self, scope: BasinScope) -> Self {
850 Self {
851 scope: Some(scope),
852 ..self
853 }
854 }
855
856 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
858 Self {
859 idempotency_token: idempotency_token.into(),
860 ..self
861 }
862 }
863}
864
865impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
866 fn from(value: CreateBasinInput) -> Self {
867 (
868 api::basin::CreateBasinRequest {
869 basin: value.name,
870 config: value.config.map(Into::into),
871 scope: value.scope.map(Into::into),
872 },
873 value.idempotency_token,
874 )
875 }
876}
877
878#[derive(Debug, Clone, Default)]
879#[non_exhaustive]
880pub struct ListBasinsInput {
882 pub prefix: BasinNamePrefix,
886 pub start_after: BasinNameStartAfter,
892 pub limit: Option<usize>,
896}
897
898impl ListBasinsInput {
899 pub fn new() -> Self {
901 Self::default()
902 }
903
904 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
906 Self { prefix, ..self }
907 }
908
909 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
912 Self {
913 start_after,
914 ..self
915 }
916 }
917
918 pub fn with_limit(self, limit: usize) -> Self {
920 Self {
921 limit: Some(limit),
922 ..self
923 }
924 }
925}
926
927impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
928 fn from(value: ListBasinsInput) -> Self {
929 Self {
930 prefix: Some(value.prefix),
931 start_after: Some(value.start_after),
932 limit: value.limit,
933 }
934 }
935}
936
937#[derive(Debug, Clone, Default)]
938pub struct ListAllBasinsInput {
940 pub prefix: BasinNamePrefix,
944 pub start_after: BasinNameStartAfter,
950 pub ignore_pending_deletions: bool,
954}
955
956impl ListAllBasinsInput {
957 pub fn new() -> Self {
959 Self::default()
960 }
961
962 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
964 Self { prefix, ..self }
965 }
966
967 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
970 Self {
971 start_after,
972 ..self
973 }
974 }
975
976 pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
978 Self {
979 ignore_pending_deletions,
980 ..self
981 }
982 }
983}
984
985#[derive(Debug, Clone, PartialEq, Eq)]
986pub enum BasinState {
988 Active,
990 Creating,
992 Deleting,
994}
995
996impl From<api::basin::BasinState> for BasinState {
997 fn from(value: api::basin::BasinState) -> Self {
998 match value {
999 api::basin::BasinState::Active => BasinState::Active,
1000 api::basin::BasinState::Creating => BasinState::Creating,
1001 api::basin::BasinState::Deleting => BasinState::Deleting,
1002 }
1003 }
1004}
1005
1006#[derive(Debug, Clone, PartialEq, Eq)]
1007#[non_exhaustive]
1008pub struct BasinInfo {
1010 pub name: BasinName,
1012 pub scope: Option<BasinScope>,
1014 pub state: BasinState,
1016}
1017
1018impl From<api::basin::BasinInfo> for BasinInfo {
1019 fn from(value: api::basin::BasinInfo) -> Self {
1020 Self {
1021 name: value.name,
1022 scope: value.scope.map(Into::into),
1023 state: value.state.into(),
1024 }
1025 }
1026}
1027
1028#[derive(Debug, Clone)]
1029#[non_exhaustive]
1030pub struct DeleteBasinInput {
1032 pub name: BasinName,
1034 pub ignore_not_found: bool,
1036}
1037
1038impl DeleteBasinInput {
1039 pub fn new(name: BasinName) -> Self {
1041 Self {
1042 name,
1043 ignore_not_found: false,
1044 }
1045 }
1046
1047 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1049 Self {
1050 ignore_not_found,
1051 ..self
1052 }
1053 }
1054}
1055
1056#[derive(Debug, Clone, Default)]
1057#[non_exhaustive]
1058pub struct TimestampingReconfiguration {
1060 pub mode: Maybe<Option<TimestampingMode>>,
1062 pub uncapped: Maybe<Option<bool>>,
1064}
1065
1066impl TimestampingReconfiguration {
1067 pub fn new() -> Self {
1069 Self::default()
1070 }
1071
1072 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1074 Self {
1075 mode: Maybe::Specified(Some(mode)),
1076 ..self
1077 }
1078 }
1079
1080 pub fn with_uncapped(self, uncapped: bool) -> Self {
1082 Self {
1083 uncapped: Maybe::Specified(Some(uncapped)),
1084 ..self
1085 }
1086 }
1087}
1088
1089impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1090 fn from(value: TimestampingReconfiguration) -> Self {
1091 Self {
1092 mode: value.mode.map(|m| m.map(Into::into)),
1093 uncapped: value.uncapped,
1094 }
1095 }
1096}
1097
1098#[derive(Debug, Clone, Default)]
1099#[non_exhaustive]
1100pub struct DeleteOnEmptyReconfiguration {
1102 pub min_age_secs: Maybe<Option<u64>>,
1104}
1105
1106impl DeleteOnEmptyReconfiguration {
1107 pub fn new() -> Self {
1109 Self::default()
1110 }
1111
1112 pub fn with_min_age(self, min_age: Duration) -> Self {
1114 Self {
1115 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1116 }
1117 }
1118}
1119
1120impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1121 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1122 Self {
1123 min_age_secs: value.min_age_secs,
1124 }
1125 }
1126}
1127
1128#[derive(Debug, Clone, Default)]
1129#[non_exhaustive]
1130pub struct StreamReconfiguration {
1132 pub storage_class: Maybe<Option<StorageClass>>,
1134 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1136 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1138 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1140}
1141
1142impl StreamReconfiguration {
1143 pub fn new() -> Self {
1145 Self::default()
1146 }
1147
1148 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1150 Self {
1151 storage_class: Maybe::Specified(Some(storage_class)),
1152 ..self
1153 }
1154 }
1155
1156 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1158 Self {
1159 retention_policy: Maybe::Specified(Some(retention_policy)),
1160 ..self
1161 }
1162 }
1163
1164 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1166 Self {
1167 timestamping: Maybe::Specified(Some(timestamping)),
1168 ..self
1169 }
1170 }
1171
1172 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1174 Self {
1175 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1176 ..self
1177 }
1178 }
1179}
1180
1181impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1182 fn from(value: StreamReconfiguration) -> Self {
1183 Self {
1184 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1185 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1186 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1187 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1188 }
1189 }
1190}
1191
1192#[derive(Debug, Clone, Default)]
1193#[non_exhaustive]
1194pub struct BasinReconfiguration {
1196 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1198 pub create_stream_on_append: Maybe<bool>,
1201 pub create_stream_on_read: Maybe<bool>,
1203}
1204
1205impl BasinReconfiguration {
1206 pub fn new() -> Self {
1208 Self::default()
1209 }
1210
1211 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1214 Self {
1215 default_stream_config: Maybe::Specified(Some(config)),
1216 ..self
1217 }
1218 }
1219
1220 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1223 Self {
1224 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1225 ..self
1226 }
1227 }
1228
1229 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1232 Self {
1233 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1234 ..self
1235 }
1236 }
1237}
1238
1239impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1240 fn from(value: BasinReconfiguration) -> Self {
1241 Self {
1242 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1243 create_stream_on_append: value.create_stream_on_append,
1244 create_stream_on_read: value.create_stream_on_read,
1245 }
1246 }
1247}
1248
1249#[derive(Debug, Clone)]
1250#[non_exhaustive]
1251pub struct ReconfigureBasinInput {
1253 pub name: BasinName,
1255 pub config: BasinReconfiguration,
1257}
1258
1259impl ReconfigureBasinInput {
1260 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1262 Self { name, config }
1263 }
1264}
1265
1266#[derive(Debug, Clone, Default)]
1267#[non_exhaustive]
1268pub struct ListAccessTokensInput {
1270 pub prefix: AccessTokenIdPrefix,
1274 pub start_after: AccessTokenIdStartAfter,
1280 pub limit: Option<usize>,
1284}
1285
1286impl ListAccessTokensInput {
1287 pub fn new() -> Self {
1289 Self::default()
1290 }
1291
1292 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1294 Self { prefix, ..self }
1295 }
1296
1297 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1300 Self {
1301 start_after,
1302 ..self
1303 }
1304 }
1305
1306 pub fn with_limit(self, limit: usize) -> Self {
1308 Self {
1309 limit: Some(limit),
1310 ..self
1311 }
1312 }
1313}
1314
1315impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1316 fn from(value: ListAccessTokensInput) -> Self {
1317 Self {
1318 prefix: Some(value.prefix),
1319 start_after: Some(value.start_after),
1320 limit: value.limit,
1321 }
1322 }
1323}
1324
1325#[derive(Debug, Clone, Default)]
1326pub struct ListAllAccessTokensInput {
1328 pub prefix: AccessTokenIdPrefix,
1332 pub start_after: AccessTokenIdStartAfter,
1338}
1339
1340impl ListAllAccessTokensInput {
1341 pub fn new() -> Self {
1343 Self::default()
1344 }
1345
1346 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1348 Self { prefix, ..self }
1349 }
1350
1351 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1354 Self {
1355 start_after,
1356 ..self
1357 }
1358 }
1359}
1360
1361#[derive(Debug, Clone)]
1362#[non_exhaustive]
1363pub struct AccessTokenInfo {
1365 pub id: AccessTokenId,
1367 pub expires_at: S2DateTime,
1369 pub auto_prefix_streams: bool,
1372 pub scope: AccessTokenScope,
1374}
1375
1376impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1377 type Error = ValidationError;
1378
1379 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1380 let expires_at = value
1381 .expires_at
1382 .map(Into::into)
1383 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1384 Ok(Self {
1385 id: value.id,
1386 expires_at,
1387 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1388 scope: value.scope.into(),
1389 })
1390 }
1391}
1392
1393#[derive(Debug, Clone)]
1394pub enum BasinMatcher {
1398 None,
1400 Exact(BasinName),
1402 Prefix(BasinNamePrefix),
1404}
1405
1406#[derive(Debug, Clone)]
1407pub enum StreamMatcher {
1411 None,
1413 Exact(StreamName),
1415 Prefix(StreamNamePrefix),
1417}
1418
1419#[derive(Debug, Clone)]
1420pub enum AccessTokenMatcher {
1424 None,
1426 Exact(AccessTokenId),
1428 Prefix(AccessTokenIdPrefix),
1430}
1431
1432#[derive(Debug, Clone, Default)]
1433#[non_exhaustive]
1434pub struct ReadWritePermissions {
1436 pub read: bool,
1440 pub write: bool,
1444}
1445
1446impl ReadWritePermissions {
1447 pub fn new() -> Self {
1449 Self::default()
1450 }
1451
1452 pub fn read_only() -> Self {
1454 Self {
1455 read: true,
1456 write: false,
1457 }
1458 }
1459
1460 pub fn write_only() -> Self {
1462 Self {
1463 read: false,
1464 write: true,
1465 }
1466 }
1467
1468 pub fn read_write() -> Self {
1470 Self {
1471 read: true,
1472 write: true,
1473 }
1474 }
1475}
1476
1477impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1478 fn from(value: ReadWritePermissions) -> Self {
1479 Self {
1480 read: Some(value.read),
1481 write: Some(value.write),
1482 }
1483 }
1484}
1485
1486impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1487 fn from(value: api::access::ReadWritePermissions) -> Self {
1488 Self {
1489 read: value.read.unwrap_or_default(),
1490 write: value.write.unwrap_or_default(),
1491 }
1492 }
1493}
1494
1495#[derive(Debug, Clone, Default)]
1496#[non_exhaustive]
1497pub struct OperationGroupPermissions {
1501 pub account: Option<ReadWritePermissions>,
1505 pub basin: Option<ReadWritePermissions>,
1509 pub stream: Option<ReadWritePermissions>,
1513}
1514
1515impl OperationGroupPermissions {
1516 pub fn new() -> Self {
1518 Self::default()
1519 }
1520
1521 pub fn read_only_all() -> Self {
1523 Self {
1524 account: Some(ReadWritePermissions::read_only()),
1525 basin: Some(ReadWritePermissions::read_only()),
1526 stream: Some(ReadWritePermissions::read_only()),
1527 }
1528 }
1529
1530 pub fn write_only_all() -> Self {
1532 Self {
1533 account: Some(ReadWritePermissions::write_only()),
1534 basin: Some(ReadWritePermissions::write_only()),
1535 stream: Some(ReadWritePermissions::write_only()),
1536 }
1537 }
1538
1539 pub fn read_write_all() -> Self {
1541 Self {
1542 account: Some(ReadWritePermissions::read_write()),
1543 basin: Some(ReadWritePermissions::read_write()),
1544 stream: Some(ReadWritePermissions::read_write()),
1545 }
1546 }
1547
1548 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1550 Self {
1551 account: Some(account),
1552 ..self
1553 }
1554 }
1555
1556 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1558 Self {
1559 basin: Some(basin),
1560 ..self
1561 }
1562 }
1563
1564 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1566 Self {
1567 stream: Some(stream),
1568 ..self
1569 }
1570 }
1571}
1572
1573impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1574 fn from(value: OperationGroupPermissions) -> Self {
1575 Self {
1576 account: value.account.map(Into::into),
1577 basin: value.basin.map(Into::into),
1578 stream: value.stream.map(Into::into),
1579 }
1580 }
1581}
1582
1583impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1584 fn from(value: api::access::PermittedOperationGroups) -> Self {
1585 Self {
1586 account: value.account.map(Into::into),
1587 basin: value.basin.map(Into::into),
1588 stream: value.stream.map(Into::into),
1589 }
1590 }
1591}
1592
1593#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1594pub enum Operation {
1598 ListBasins,
1600 CreateBasin,
1602 GetBasinConfig,
1604 DeleteBasin,
1606 ReconfigureBasin,
1608 ListAccessTokens,
1610 IssueAccessToken,
1612 RevokeAccessToken,
1614 GetAccountMetrics,
1616 GetBasinMetrics,
1618 GetStreamMetrics,
1620 ListStreams,
1622 CreateStream,
1624 GetStreamConfig,
1626 DeleteStream,
1628 ReconfigureStream,
1630 CheckTail,
1632 Append,
1634 Read,
1636 Trim,
1638 Fence,
1640}
1641
1642impl From<Operation> for api::access::Operation {
1643 fn from(value: Operation) -> Self {
1644 match value {
1645 Operation::ListBasins => api::access::Operation::ListBasins,
1646 Operation::CreateBasin => api::access::Operation::CreateBasin,
1647 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1648 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1649 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1650 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1651 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1652 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1653 Operation::ListStreams => api::access::Operation::ListStreams,
1654 Operation::CreateStream => api::access::Operation::CreateStream,
1655 Operation::DeleteStream => api::access::Operation::DeleteStream,
1656 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1657 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1658 Operation::CheckTail => api::access::Operation::CheckTail,
1659 Operation::Append => api::access::Operation::Append,
1660 Operation::Read => api::access::Operation::Read,
1661 Operation::Trim => api::access::Operation::Trim,
1662 Operation::Fence => api::access::Operation::Fence,
1663 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1664 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1665 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1666 }
1667 }
1668}
1669
1670impl From<api::access::Operation> for Operation {
1671 fn from(value: api::access::Operation) -> Self {
1672 match value {
1673 api::access::Operation::ListBasins => Operation::ListBasins,
1674 api::access::Operation::CreateBasin => Operation::CreateBasin,
1675 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1676 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1677 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1678 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1679 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1680 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1681 api::access::Operation::ListStreams => Operation::ListStreams,
1682 api::access::Operation::CreateStream => Operation::CreateStream,
1683 api::access::Operation::DeleteStream => Operation::DeleteStream,
1684 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1685 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1686 api::access::Operation::CheckTail => Operation::CheckTail,
1687 api::access::Operation::Append => Operation::Append,
1688 api::access::Operation::Read => Operation::Read,
1689 api::access::Operation::Trim => Operation::Trim,
1690 api::access::Operation::Fence => Operation::Fence,
1691 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1692 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1693 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1694 }
1695 }
1696}
1697
1698#[derive(Debug, Clone)]
1699#[non_exhaustive]
1700pub struct AccessTokenScopeInput {
1708 basins: Option<BasinMatcher>,
1709 streams: Option<StreamMatcher>,
1710 access_tokens: Option<AccessTokenMatcher>,
1711 op_group_perms: Option<OperationGroupPermissions>,
1712 ops: HashSet<Operation>,
1713}
1714
1715impl AccessTokenScopeInput {
1716 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1718 Self {
1719 basins: None,
1720 streams: None,
1721 access_tokens: None,
1722 op_group_perms: None,
1723 ops: ops.into_iter().collect(),
1724 }
1725 }
1726
1727 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1729 Self {
1730 basins: None,
1731 streams: None,
1732 access_tokens: None,
1733 op_group_perms: Some(op_group_perms),
1734 ops: HashSet::default(),
1735 }
1736 }
1737
1738 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1740 Self {
1741 ops: ops.into_iter().collect(),
1742 ..self
1743 }
1744 }
1745
1746 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1748 Self {
1749 op_group_perms: Some(op_group_perms),
1750 ..self
1751 }
1752 }
1753
1754 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1758 Self {
1759 basins: Some(basins),
1760 ..self
1761 }
1762 }
1763
1764 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1768 Self {
1769 streams: Some(streams),
1770 ..self
1771 }
1772 }
1773
1774 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1778 Self {
1779 access_tokens: Some(access_tokens),
1780 ..self
1781 }
1782 }
1783}
1784
1785#[derive(Debug, Clone)]
1786#[non_exhaustive]
1787pub struct AccessTokenScope {
1789 pub basins: Option<BasinMatcher>,
1791 pub streams: Option<StreamMatcher>,
1793 pub access_tokens: Option<AccessTokenMatcher>,
1795 pub op_group_perms: Option<OperationGroupPermissions>,
1797 pub ops: HashSet<Operation>,
1799}
1800
1801impl From<api::access::AccessTokenScope> for AccessTokenScope {
1802 fn from(value: api::access::AccessTokenScope) -> Self {
1803 Self {
1804 basins: value.basins.map(|rs| match rs {
1805 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1806 BasinMatcher::Exact(e)
1807 }
1808 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1809 BasinMatcher::None
1810 }
1811 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1812 }),
1813 streams: value.streams.map(|rs| match rs {
1814 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1815 StreamMatcher::Exact(e)
1816 }
1817 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1818 StreamMatcher::None
1819 }
1820 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1821 }),
1822 access_tokens: value.access_tokens.map(|rs| match rs {
1823 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1824 AccessTokenMatcher::Exact(e)
1825 }
1826 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1827 AccessTokenMatcher::None
1828 }
1829 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1830 }),
1831 op_group_perms: value.op_groups.map(Into::into),
1832 ops: value
1833 .ops
1834 .map(|ops| ops.into_iter().map(Into::into).collect())
1835 .unwrap_or_default(),
1836 }
1837 }
1838}
1839
1840impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1841 fn from(value: AccessTokenScopeInput) -> Self {
1842 Self {
1843 basins: value.basins.map(|rs| match rs {
1844 BasinMatcher::None => {
1845 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1846 }
1847 BasinMatcher::Exact(e) => {
1848 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1849 }
1850 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1851 }),
1852 streams: value.streams.map(|rs| match rs {
1853 StreamMatcher::None => {
1854 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1855 }
1856 StreamMatcher::Exact(e) => {
1857 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1858 }
1859 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1860 }),
1861 access_tokens: value.access_tokens.map(|rs| match rs {
1862 AccessTokenMatcher::None => {
1863 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1864 }
1865 AccessTokenMatcher::Exact(e) => {
1866 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1867 }
1868 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1869 }),
1870 op_groups: value.op_group_perms.map(Into::into),
1871 ops: if value.ops.is_empty() {
1872 None
1873 } else {
1874 Some(value.ops.into_iter().map(Into::into).collect())
1875 },
1876 }
1877 }
1878}
1879
1880#[derive(Debug, Clone)]
1881#[non_exhaustive]
1882pub struct IssueAccessTokenInput {
1884 pub id: AccessTokenId,
1886 pub expires_at: Option<S2DateTime>,
1891 pub auto_prefix_streams: bool,
1899 pub scope: AccessTokenScopeInput,
1901}
1902
1903impl IssueAccessTokenInput {
1904 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1906 Self {
1907 id,
1908 expires_at: None,
1909 auto_prefix_streams: false,
1910 scope,
1911 }
1912 }
1913
1914 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1916 Self {
1917 expires_at: Some(expires_at),
1918 ..self
1919 }
1920 }
1921
1922 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1925 Self {
1926 auto_prefix_streams,
1927 ..self
1928 }
1929 }
1930}
1931
1932impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1933 fn from(value: IssueAccessTokenInput) -> Self {
1934 Self {
1935 id: value.id,
1936 expires_at: value.expires_at.map(Into::into),
1937 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1938 scope: value.scope.into(),
1939 }
1940 }
1941}
1942
1943#[derive(Debug, Clone, Copy)]
1944pub enum TimeseriesInterval {
1946 Minute,
1948 Hour,
1950 Day,
1952}
1953
1954impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
1955 fn from(value: TimeseriesInterval) -> Self {
1956 match value {
1957 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
1958 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
1959 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
1960 }
1961 }
1962}
1963
1964impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
1965 fn from(value: api::metrics::TimeseriesInterval) -> Self {
1966 match value {
1967 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
1968 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
1969 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
1970 }
1971 }
1972}
1973
1974#[derive(Debug, Clone, Copy)]
1975#[non_exhaustive]
1976pub struct TimeRange {
1978 pub start: u32,
1980 pub end: u32,
1982}
1983
1984impl TimeRange {
1985 pub fn new(start: u32, end: u32) -> Self {
1987 Self { start, end }
1988 }
1989}
1990
1991#[derive(Debug, Clone, Copy)]
1992#[non_exhaustive]
1993pub struct TimeRangeAndInterval {
1995 pub start: u32,
1997 pub end: u32,
1999 pub interval: Option<TimeseriesInterval>,
2003}
2004
2005impl TimeRangeAndInterval {
2006 pub fn new(start: u32, end: u32) -> Self {
2008 Self {
2009 start,
2010 end,
2011 interval: None,
2012 }
2013 }
2014
2015 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2017 Self {
2018 interval: Some(interval),
2019 ..self
2020 }
2021 }
2022}
2023
2024#[derive(Debug, Clone, Copy)]
2025pub enum AccountMetricSet {
2027 ActiveBasins(TimeRange),
2030 AccountOps(TimeRangeAndInterval),
2037}
2038
2039#[derive(Debug, Clone)]
2040#[non_exhaustive]
2041pub struct GetAccountMetricsInput {
2043 pub set: AccountMetricSet,
2045}
2046
2047impl GetAccountMetricsInput {
2048 pub fn new(set: AccountMetricSet) -> Self {
2050 Self { set }
2051 }
2052}
2053
2054impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2055 fn from(value: GetAccountMetricsInput) -> Self {
2056 let (set, start, end, interval) = match value.set {
2057 AccountMetricSet::ActiveBasins(args) => (
2058 api::metrics::AccountMetricSet::ActiveBasins,
2059 args.start,
2060 args.end,
2061 None,
2062 ),
2063 AccountMetricSet::AccountOps(args) => (
2064 api::metrics::AccountMetricSet::AccountOps,
2065 args.start,
2066 args.end,
2067 args.interval,
2068 ),
2069 };
2070 Self {
2071 set,
2072 start: Some(start),
2073 end: Some(end),
2074 interval: interval.map(Into::into),
2075 }
2076 }
2077}
2078
2079#[derive(Debug, Clone, Copy)]
2080pub enum BasinMetricSet {
2082 Storage(TimeRange),
2085 AppendOps(TimeRangeAndInterval),
2093 ReadOps(TimeRangeAndInterval),
2101 ReadThroughput(TimeRangeAndInterval),
2108 AppendThroughput(TimeRangeAndInterval),
2115 BasinOps(TimeRangeAndInterval),
2122}
2123
2124#[derive(Debug, Clone)]
2125#[non_exhaustive]
2126pub struct GetBasinMetricsInput {
2128 pub name: BasinName,
2130 pub set: BasinMetricSet,
2132}
2133
2134impl GetBasinMetricsInput {
2135 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2137 Self { name, set }
2138 }
2139}
2140
2141impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2142 fn from(value: GetBasinMetricsInput) -> Self {
2143 let (set, start, end, interval) = match value.set {
2144 BasinMetricSet::Storage(args) => (
2145 api::metrics::BasinMetricSet::Storage,
2146 args.start,
2147 args.end,
2148 None,
2149 ),
2150 BasinMetricSet::AppendOps(args) => (
2151 api::metrics::BasinMetricSet::AppendOps,
2152 args.start,
2153 args.end,
2154 args.interval,
2155 ),
2156 BasinMetricSet::ReadOps(args) => (
2157 api::metrics::BasinMetricSet::ReadOps,
2158 args.start,
2159 args.end,
2160 args.interval,
2161 ),
2162 BasinMetricSet::ReadThroughput(args) => (
2163 api::metrics::BasinMetricSet::ReadThroughput,
2164 args.start,
2165 args.end,
2166 args.interval,
2167 ),
2168 BasinMetricSet::AppendThroughput(args) => (
2169 api::metrics::BasinMetricSet::AppendThroughput,
2170 args.start,
2171 args.end,
2172 args.interval,
2173 ),
2174 BasinMetricSet::BasinOps(args) => (
2175 api::metrics::BasinMetricSet::BasinOps,
2176 args.start,
2177 args.end,
2178 args.interval,
2179 ),
2180 };
2181 (
2182 value.name,
2183 api::metrics::BasinMetricSetRequest {
2184 set,
2185 start: Some(start),
2186 end: Some(end),
2187 interval: interval.map(Into::into),
2188 },
2189 )
2190 }
2191}
2192
2193#[derive(Debug, Clone, Copy)]
2194pub enum StreamMetricSet {
2196 Storage(TimeRange),
2199}
2200
2201#[derive(Debug, Clone)]
2202#[non_exhaustive]
2203pub struct GetStreamMetricsInput {
2205 pub basin_name: BasinName,
2207 pub stream_name: StreamName,
2209 pub set: StreamMetricSet,
2211}
2212
2213impl GetStreamMetricsInput {
2214 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2217 Self {
2218 basin_name,
2219 stream_name,
2220 set,
2221 }
2222 }
2223}
2224
2225impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2226 fn from(value: GetStreamMetricsInput) -> Self {
2227 let (set, start, end, interval) = match value.set {
2228 StreamMetricSet::Storage(args) => (
2229 api::metrics::StreamMetricSet::Storage,
2230 args.start,
2231 args.end,
2232 None,
2233 ),
2234 };
2235 (
2236 value.basin_name,
2237 value.stream_name,
2238 api::metrics::StreamMetricSetRequest {
2239 set,
2240 start: Some(start),
2241 end: Some(end),
2242 interval,
2243 },
2244 )
2245 }
2246}
2247
2248#[derive(Debug, Clone, Copy)]
2249pub enum MetricUnit {
2251 Bytes,
2253 Operations,
2255}
2256
2257impl From<api::metrics::MetricUnit> for MetricUnit {
2258 fn from(value: api::metrics::MetricUnit) -> Self {
2259 match value {
2260 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2261 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2262 }
2263 }
2264}
2265
2266#[derive(Debug, Clone)]
2267#[non_exhaustive]
2268pub struct ScalarMetric {
2270 pub name: String,
2272 pub unit: MetricUnit,
2274 pub value: f64,
2276}
2277
2278#[derive(Debug, Clone)]
2279#[non_exhaustive]
2280pub struct AccumulationMetric {
2283 pub name: String,
2285 pub unit: MetricUnit,
2287 pub interval: TimeseriesInterval,
2289 pub values: Vec<(u32, f64)>,
2293}
2294
2295#[derive(Debug, Clone)]
2296#[non_exhaustive]
2297pub struct GaugeMetric {
2299 pub name: String,
2301 pub unit: MetricUnit,
2303 pub values: Vec<(u32, f64)>,
2306}
2307
2308#[derive(Debug, Clone)]
2309#[non_exhaustive]
2310pub struct LabelMetric {
2312 pub name: String,
2314 pub values: Vec<String>,
2316}
2317
2318#[derive(Debug, Clone)]
2319pub enum Metric {
2321 Scalar(ScalarMetric),
2323 Accumulation(AccumulationMetric),
2326 Gauge(GaugeMetric),
2328 Label(LabelMetric),
2330}
2331
2332impl From<api::metrics::Metric> for Metric {
2333 fn from(value: api::metrics::Metric) -> Self {
2334 match value {
2335 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2336 name: sm.name.into(),
2337 unit: sm.unit.into(),
2338 value: sm.value,
2339 }),
2340 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2341 name: am.name.into(),
2342 unit: am.unit.into(),
2343 interval: am.interval.into(),
2344 values: am.values,
2345 }),
2346 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2347 name: gm.name.into(),
2348 unit: gm.unit.into(),
2349 values: gm.values,
2350 }),
2351 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2352 name: lm.name.into(),
2353 values: lm.values,
2354 }),
2355 }
2356 }
2357}
2358
2359#[derive(Debug, Clone, Default)]
2360#[non_exhaustive]
2361pub struct ListStreamsInput {
2363 pub prefix: StreamNamePrefix,
2367 pub start_after: StreamNameStartAfter,
2373 pub limit: Option<usize>,
2377}
2378
2379impl ListStreamsInput {
2380 pub fn new() -> Self {
2382 Self::default()
2383 }
2384
2385 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2387 Self { prefix, ..self }
2388 }
2389
2390 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2393 Self {
2394 start_after,
2395 ..self
2396 }
2397 }
2398
2399 pub fn with_limit(self, limit: usize) -> Self {
2401 Self {
2402 limit: Some(limit),
2403 ..self
2404 }
2405 }
2406}
2407
2408impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2409 fn from(value: ListStreamsInput) -> Self {
2410 Self {
2411 prefix: Some(value.prefix),
2412 start_after: Some(value.start_after),
2413 limit: value.limit,
2414 }
2415 }
2416}
2417
2418#[derive(Debug, Clone, Default)]
2419pub struct ListAllStreamsInput {
2421 pub prefix: StreamNamePrefix,
2425 pub start_after: StreamNameStartAfter,
2431 pub ignore_pending_deletions: bool,
2435}
2436
2437impl ListAllStreamsInput {
2438 pub fn new() -> Self {
2440 Self::default()
2441 }
2442
2443 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2445 Self { prefix, ..self }
2446 }
2447
2448 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2451 Self {
2452 start_after,
2453 ..self
2454 }
2455 }
2456
2457 pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
2459 Self {
2460 ignore_pending_deletions,
2461 ..self
2462 }
2463 }
2464}
2465
2466#[derive(Debug, Clone, PartialEq)]
2467#[non_exhaustive]
2468pub struct StreamInfo {
2470 pub name: StreamName,
2472 pub created_at: S2DateTime,
2474 pub deleted_at: Option<S2DateTime>,
2476}
2477
2478impl From<api::stream::StreamInfo> for StreamInfo {
2479 fn from(value: api::stream::StreamInfo) -> Self {
2480 Self {
2481 name: value.name,
2482 created_at: value.created_at.into(),
2483 deleted_at: value.deleted_at.map(Into::into),
2484 }
2485 }
2486}
2487
2488#[derive(Debug, Clone)]
2489#[non_exhaustive]
2490pub struct CreateStreamInput {
2492 pub name: StreamName,
2494 pub config: Option<StreamConfig>,
2498 pub idempotency_token: String,
2506}
2507
2508impl CreateStreamInput {
2509 pub fn new(name: StreamName) -> Self {
2511 Self {
2512 name,
2513 config: None,
2514 idempotency_token: idempotency_token(),
2515 }
2516 }
2517
2518 pub fn with_config(self, config: StreamConfig) -> Self {
2520 Self {
2521 config: Some(config),
2522 ..self
2523 }
2524 }
2525
2526 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2528 Self {
2529 idempotency_token: idempotency_token.into(),
2530 ..self
2531 }
2532 }
2533}
2534
2535impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2536 fn from(value: CreateStreamInput) -> Self {
2537 (
2538 api::stream::CreateStreamRequest {
2539 stream: value.name,
2540 config: value.config.map(Into::into),
2541 },
2542 value.idempotency_token,
2543 )
2544 }
2545}
2546
2547#[derive(Debug, Clone)]
2548#[non_exhaustive]
2549pub struct DeleteStreamInput {
2551 pub name: StreamName,
2553 pub ignore_not_found: bool,
2555}
2556
2557impl DeleteStreamInput {
2558 pub fn new(name: StreamName) -> Self {
2560 Self {
2561 name,
2562 ignore_not_found: false,
2563 }
2564 }
2565
2566 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2568 Self {
2569 ignore_not_found,
2570 ..self
2571 }
2572 }
2573}
2574
2575#[derive(Debug, Clone)]
2576#[non_exhaustive]
2577pub struct ReconfigureStreamInput {
2579 pub name: StreamName,
2581 pub config: StreamReconfiguration,
2583}
2584
2585impl ReconfigureStreamInput {
2586 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2588 Self { name, config }
2589 }
2590}
2591
2592#[derive(Debug, Clone, PartialEq, Eq)]
2593pub struct FencingToken(String);
2599
2600impl FencingToken {
2601 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2603 rand::rng()
2604 .sample_iter(&rand::distr::Alphanumeric)
2605 .take(n)
2606 .map(char::from)
2607 .collect::<String>()
2608 .parse()
2609 }
2610}
2611
2612impl FromStr for FencingToken {
2613 type Err = ValidationError;
2614
2615 fn from_str(s: &str) -> Result<Self, Self::Err> {
2616 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2617 return Err(ValidationError(format!(
2618 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2619 )));
2620 }
2621 Ok(FencingToken(s.to_string()))
2622 }
2623}
2624
2625impl std::fmt::Display for FencingToken {
2626 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2627 write!(f, "{}", self.0)
2628 }
2629}
2630
2631impl Deref for FencingToken {
2632 type Target = str;
2633
2634 fn deref(&self) -> &Self::Target {
2635 &self.0
2636 }
2637}
2638
2639#[derive(Debug, Clone, Copy, PartialEq)]
2640#[non_exhaustive]
2641pub struct StreamPosition {
2643 pub seq_num: u64,
2645 pub timestamp: u64,
2648}
2649
2650impl std::fmt::Display for StreamPosition {
2651 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2652 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2653 }
2654}
2655
2656impl From<api::stream::proto::StreamPosition> for StreamPosition {
2657 fn from(value: api::stream::proto::StreamPosition) -> Self {
2658 Self {
2659 seq_num: value.seq_num,
2660 timestamp: value.timestamp,
2661 }
2662 }
2663}
2664
2665impl From<api::stream::StreamPosition> for StreamPosition {
2666 fn from(value: api::stream::StreamPosition) -> Self {
2667 Self {
2668 seq_num: value.seq_num,
2669 timestamp: value.timestamp,
2670 }
2671 }
2672}
2673
2674#[derive(Debug, Clone, PartialEq)]
2675#[non_exhaustive]
2676pub struct Header {
2678 pub name: Bytes,
2680 pub value: Bytes,
2682}
2683
2684impl Header {
2685 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2687 Self {
2688 name: name.into(),
2689 value: value.into(),
2690 }
2691 }
2692}
2693
2694impl From<Header> for api::stream::proto::Header {
2695 fn from(value: Header) -> Self {
2696 Self {
2697 name: value.name,
2698 value: value.value,
2699 }
2700 }
2701}
2702
2703impl From<api::stream::proto::Header> for Header {
2704 fn from(value: api::stream::proto::Header) -> Self {
2705 Self {
2706 name: value.name,
2707 value: value.value,
2708 }
2709 }
2710}
2711
2712#[derive(Debug, Clone, PartialEq)]
2713pub struct AppendRecord {
2715 body: Bytes,
2716 headers: Vec<Header>,
2717 timestamp: Option<u64>,
2718}
2719
2720impl AppendRecord {
2721 fn validate(self) -> Result<Self, ValidationError> {
2722 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2723 Err(ValidationError(format!(
2724 "metered_bytes: {} exceeds {}",
2725 self.metered_bytes(),
2726 RECORD_BATCH_MAX.bytes
2727 )))
2728 } else {
2729 Ok(self)
2730 }
2731 }
2732
2733 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2735 let record = Self {
2736 body: body.into(),
2737 headers: Vec::default(),
2738 timestamp: None,
2739 };
2740 record.validate()
2741 }
2742
2743 pub fn with_headers(
2745 self,
2746 headers: impl IntoIterator<Item = Header>,
2747 ) -> Result<Self, ValidationError> {
2748 let record = Self {
2749 headers: headers.into_iter().collect(),
2750 ..self
2751 };
2752 record.validate()
2753 }
2754
2755 pub fn with_timestamp(self, timestamp: u64) -> Self {
2759 Self {
2760 timestamp: Some(timestamp),
2761 ..self
2762 }
2763 }
2764}
2765
2766impl From<AppendRecord> for api::stream::proto::AppendRecord {
2767 fn from(value: AppendRecord) -> Self {
2768 Self {
2769 timestamp: value.timestamp,
2770 headers: value.headers.into_iter().map(Into::into).collect(),
2771 body: value.body,
2772 }
2773 }
2774}
2775
2776pub trait MeteredBytes {
2783 fn metered_bytes(&self) -> usize;
2785}
2786
2787macro_rules! metered_bytes_impl {
2788 ($ty:ty) => {
2789 impl MeteredBytes for $ty {
2790 fn metered_bytes(&self) -> usize {
2791 8 + (2 * self.headers.len())
2792 + self
2793 .headers
2794 .iter()
2795 .map(|h| h.name.len() + h.value.len())
2796 .sum::<usize>()
2797 + self.body.len()
2798 }
2799 }
2800 };
2801}
2802
2803metered_bytes_impl!(AppendRecord);
2804
2805#[derive(Debug, Clone)]
2806pub struct AppendRecordBatch {
2815 records: Vec<AppendRecord>,
2816 metered_bytes: usize,
2817}
2818
2819impl AppendRecordBatch {
2820 pub(crate) fn with_capacity(capacity: usize) -> Self {
2821 Self {
2822 records: Vec::with_capacity(capacity),
2823 metered_bytes: 0,
2824 }
2825 }
2826
2827 pub(crate) fn push(&mut self, record: AppendRecord) {
2828 self.metered_bytes += record.metered_bytes();
2829 self.records.push(record);
2830 }
2831
2832 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2834 where
2835 I: IntoIterator<Item = AppendRecord>,
2836 {
2837 let mut records = Vec::new();
2838 let mut metered_bytes = 0;
2839
2840 for record in iter {
2841 metered_bytes += record.metered_bytes();
2842 records.push(record);
2843
2844 if metered_bytes > RECORD_BATCH_MAX.bytes {
2845 return Err(ValidationError(format!(
2846 "batch size in metered bytes ({metered_bytes}) exceeds {}",
2847 RECORD_BATCH_MAX.bytes
2848 )));
2849 }
2850
2851 if records.len() > RECORD_BATCH_MAX.count {
2852 return Err(ValidationError(format!(
2853 "number of records in the batch exceeds {}",
2854 RECORD_BATCH_MAX.count
2855 )));
2856 }
2857 }
2858
2859 if records.is_empty() {
2860 return Err(ValidationError("batch is empty".into()));
2861 }
2862
2863 Ok(Self {
2864 records,
2865 metered_bytes,
2866 })
2867 }
2868}
2869
2870impl Deref for AppendRecordBatch {
2871 type Target = [AppendRecord];
2872
2873 fn deref(&self) -> &Self::Target {
2874 &self.records
2875 }
2876}
2877
2878impl MeteredBytes for AppendRecordBatch {
2879 fn metered_bytes(&self) -> usize {
2880 self.metered_bytes
2881 }
2882}
2883
2884#[derive(Debug, Clone)]
2885pub enum Command {
2887 Fence {
2889 fencing_token: FencingToken,
2891 },
2892 Trim {
2894 trim_point: u64,
2896 },
2897}
2898
2899#[derive(Debug, Clone)]
2900#[non_exhaustive]
2901pub struct CommandRecord {
2905 pub command: Command,
2907 pub timestamp: Option<u64>,
2909}
2910
2911impl CommandRecord {
2912 const FENCE: &[u8] = b"fence";
2913 const TRIM: &[u8] = b"trim";
2914
2915 pub fn fence(fencing_token: FencingToken) -> Self {
2920 Self {
2921 command: Command::Fence { fencing_token },
2922 timestamp: None,
2923 }
2924 }
2925
2926 pub fn trim(trim_point: u64) -> Self {
2933 Self {
2934 command: Command::Trim { trim_point },
2935 timestamp: None,
2936 }
2937 }
2938
2939 pub fn with_timestamp(self, timestamp: u64) -> Self {
2941 Self {
2942 timestamp: Some(timestamp),
2943 ..self
2944 }
2945 }
2946}
2947
2948impl From<CommandRecord> for AppendRecord {
2949 fn from(value: CommandRecord) -> Self {
2950 let (header_value, body) = match value.command {
2951 Command::Fence { fencing_token } => (
2952 CommandRecord::FENCE,
2953 Bytes::copy_from_slice(fencing_token.as_bytes()),
2954 ),
2955 Command::Trim { trim_point } => (
2956 CommandRecord::TRIM,
2957 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
2958 ),
2959 };
2960 Self {
2961 body,
2962 headers: vec![Header::new("", header_value)],
2963 timestamp: value.timestamp,
2964 }
2965 }
2966}
2967
2968#[derive(Debug, Clone)]
2969#[non_exhaustive]
2970pub struct AppendInput {
2973 pub records: AppendRecordBatch,
2975 pub match_seq_num: Option<u64>,
2979 pub fencing_token: Option<FencingToken>,
2984}
2985
2986impl AppendInput {
2987 pub fn new(records: AppendRecordBatch) -> Self {
2989 Self {
2990 records,
2991 match_seq_num: None,
2992 fencing_token: None,
2993 }
2994 }
2995
2996 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
2998 Self {
2999 match_seq_num: Some(match_seq_num),
3000 ..self
3001 }
3002 }
3003
3004 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3006 Self {
3007 fencing_token: Some(fencing_token),
3008 ..self
3009 }
3010 }
3011}
3012
3013impl From<AppendInput> for api::stream::proto::AppendInput {
3014 fn from(value: AppendInput) -> Self {
3015 Self {
3016 records: value.records.iter().cloned().map(Into::into).collect(),
3017 match_seq_num: value.match_seq_num,
3018 fencing_token: value.fencing_token.map(|t| t.to_string()),
3019 }
3020 }
3021}
3022
3023#[derive(Debug, Clone, PartialEq)]
3024#[non_exhaustive]
3025pub struct AppendAck {
3027 pub start: StreamPosition,
3029 pub end: StreamPosition,
3035 pub tail: StreamPosition,
3040}
3041
3042impl From<api::stream::proto::AppendAck> for AppendAck {
3043 fn from(value: api::stream::proto::AppendAck) -> Self {
3044 Self {
3045 start: value.start.unwrap_or_default().into(),
3046 end: value.end.unwrap_or_default().into(),
3047 tail: value.tail.unwrap_or_default().into(),
3048 }
3049 }
3050}
3051
3052#[derive(Debug, Clone, Copy)]
3053pub enum ReadFrom {
3055 SeqNum(u64),
3057 Timestamp(u64),
3059 TailOffset(u64),
3061}
3062
3063impl Default for ReadFrom {
3064 fn default() -> Self {
3065 Self::SeqNum(0)
3066 }
3067}
3068
3069#[derive(Debug, Default, Clone)]
3070#[non_exhaustive]
3071pub struct ReadStart {
3073 pub from: ReadFrom,
3077 pub clamp_to_tail: bool,
3081}
3082
3083impl ReadStart {
3084 pub fn new() -> Self {
3086 Self::default()
3087 }
3088
3089 pub fn with_from(self, from: ReadFrom) -> Self {
3091 Self { from, ..self }
3092 }
3093
3094 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3096 Self {
3097 clamp_to_tail,
3098 ..self
3099 }
3100 }
3101}
3102
3103impl From<ReadStart> for api::stream::ReadStart {
3104 fn from(value: ReadStart) -> Self {
3105 let (seq_num, timestamp, tail_offset) = match value.from {
3106 ReadFrom::SeqNum(n) => (Some(n), None, None),
3107 ReadFrom::Timestamp(t) => (None, Some(t), None),
3108 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3109 };
3110 Self {
3111 seq_num,
3112 timestamp,
3113 tail_offset,
3114 clamp: if value.clamp_to_tail {
3115 Some(true)
3116 } else {
3117 None
3118 },
3119 }
3120 }
3121}
3122
3123#[derive(Debug, Clone, Default)]
3124#[non_exhaustive]
3125pub struct ReadLimits {
3127 pub count: Option<usize>,
3131 pub bytes: Option<usize>,
3135}
3136
3137impl ReadLimits {
3138 pub fn new() -> Self {
3140 Self::default()
3141 }
3142
3143 pub fn with_count(self, count: usize) -> Self {
3145 Self {
3146 count: Some(count),
3147 ..self
3148 }
3149 }
3150
3151 pub fn with_bytes(self, bytes: usize) -> Self {
3153 Self {
3154 bytes: Some(bytes),
3155 ..self
3156 }
3157 }
3158}
3159
3160#[derive(Debug, Clone, Default)]
3161#[non_exhaustive]
3162pub struct ReadStop {
3164 pub limits: ReadLimits,
3166 pub until: Option<RangeTo<u64>>,
3168 pub wait: Option<u32>,
3178}
3179
3180impl ReadStop {
3181 pub fn new() -> Self {
3183 Self::default()
3184 }
3185
3186 pub fn with_limits(self, limits: ReadLimits) -> Self {
3188 Self { limits, ..self }
3189 }
3190
3191 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3193 Self {
3194 until: Some(until),
3195 ..self
3196 }
3197 }
3198
3199 pub fn with_wait(self, wait: u32) -> Self {
3201 Self {
3202 wait: Some(wait),
3203 ..self
3204 }
3205 }
3206}
3207
3208impl From<ReadStop> for api::stream::ReadEnd {
3209 fn from(value: ReadStop) -> Self {
3210 Self {
3211 count: value.limits.count,
3212 bytes: value.limits.bytes,
3213 until: value.until.map(|r| r.end),
3214 wait: value.wait,
3215 }
3216 }
3217}
3218
3219#[derive(Debug, Clone, Default)]
3220#[non_exhaustive]
3221pub struct ReadInput {
3224 pub start: ReadStart,
3226 pub stop: ReadStop,
3228 pub ignore_command_records: bool,
3232}
3233
3234impl ReadInput {
3235 pub fn new() -> Self {
3237 Self::default()
3238 }
3239
3240 pub fn with_start(self, start: ReadStart) -> Self {
3242 Self { start, ..self }
3243 }
3244
3245 pub fn with_stop(self, stop: ReadStop) -> Self {
3247 Self { stop, ..self }
3248 }
3249
3250 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3252 Self {
3253 ignore_command_records,
3254 ..self
3255 }
3256 }
3257}
3258
3259#[derive(Debug, Clone)]
3260#[non_exhaustive]
3261pub struct SequencedRecord {
3263 pub seq_num: u64,
3265 pub body: Bytes,
3267 pub headers: Vec<Header>,
3269 pub timestamp: u64,
3271}
3272
3273impl SequencedRecord {
3274 pub fn is_command_record(&self) -> bool {
3276 self.headers.len() == 1 && *self.headers[0].name == *b""
3277 }
3278}
3279
3280impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3281 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3282 Self {
3283 seq_num: value.seq_num,
3284 body: value.body,
3285 headers: value.headers.into_iter().map(Into::into).collect(),
3286 timestamp: value.timestamp,
3287 }
3288 }
3289}
3290
3291metered_bytes_impl!(SequencedRecord);
3292
3293#[derive(Debug, Clone)]
3294#[non_exhaustive]
3295pub struct ReadBatch {
3298 pub records: Vec<SequencedRecord>,
3305 pub tail: Option<StreamPosition>,
3310}
3311
3312impl ReadBatch {
3313 pub(crate) fn from_api(
3314 batch: api::stream::proto::ReadBatch,
3315 ignore_command_records: bool,
3316 ) -> Self {
3317 Self {
3318 records: batch
3319 .records
3320 .into_iter()
3321 .map(Into::into)
3322 .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3323 .collect(),
3324 tail: batch.tail.map(Into::into),
3325 }
3326 }
3327}
3328
3329pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3331
3332#[derive(Debug, Clone, thiserror::Error)]
3333pub enum AppendConditionFailed {
3335 #[error("fencing token mismatch, expected: {0}")]
3336 FencingTokenMismatch(FencingToken),
3338 #[error("sequence number mismatch, expected: {0}")]
3339 SeqNumMismatch(u64),
3341}
3342
3343impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3344 fn from(value: api::stream::AppendConditionFailed) -> Self {
3345 match value {
3346 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3347 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3348 }
3349 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3350 AppendConditionFailed::SeqNumMismatch(seq)
3351 }
3352 }
3353 }
3354}
3355
3356#[derive(Debug, Clone, thiserror::Error)]
3357pub enum S2Error {
3359 #[error("{0}")]
3360 Client(String),
3362 #[error(transparent)]
3363 Validation(#[from] ValidationError),
3365 #[error("{0}")]
3366 AppendConditionFailed(AppendConditionFailed),
3368 #[error("read from an unwritten position. current tail: {0}")]
3369 ReadUnwritten(StreamPosition),
3371 #[error("{0}")]
3372 Server(ErrorResponse),
3374}
3375
3376impl From<ApiError> for S2Error {
3377 fn from(err: ApiError) -> Self {
3378 match err {
3379 ApiError::ReadUnwritten(tail_response) => {
3380 Self::ReadUnwritten(tail_response.tail.into())
3381 }
3382 ApiError::AppendConditionFailed(condition_failed) => {
3383 Self::AppendConditionFailed(condition_failed.into())
3384 }
3385 ApiError::Server(_, response) => Self::Server(response.into()),
3386 other => Self::Client(other.to_string()),
3387 }
3388 }
3389}
3390
3391#[derive(Debug, Clone, thiserror::Error)]
3392#[error("{code}: {message}")]
3393#[non_exhaustive]
3394pub struct ErrorResponse {
3396 pub code: String,
3398 pub message: String,
3400}
3401
3402impl From<ApiErrorResponse> for ErrorResponse {
3403 fn from(response: ApiErrorResponse) -> Self {
3404 Self {
3405 code: response.code,
3406 message: response.message,
3407 }
3408 }
3409}
3410
3411fn idempotency_token() -> String {
3412 uuid::Uuid::new_v4().simple().to_string()
3413}