1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::stream::StreamName;
45pub use s2_common::types::stream::StreamNamePrefix;
47pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66 fn from(dt: time::OffsetDateTime) -> Self {
67 Self(dt)
68 }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72 fn from(dt: S2DateTime) -> Self {
73 dt.0
74 }
75}
76
77impl FromStr for S2DateTime {
78 type Err = ValidationError;
79
80 fn from_str(s: &str) -> Result<Self, Self::Err> {
81 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82 .map(Self)
83 .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84 }
85}
86
87impl fmt::Display for S2DateTime {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 write!(
90 f,
91 "{}",
92 self.0
93 .format(&time::format_description::well_known::Rfc3339)
94 .expect("RFC3339 formatting should not fail for S2DateTime")
95 )
96 }
97}
98
99#[derive(Debug, Clone, PartialEq)]
101pub enum BasinAuthority {
102 ParentZone(Authority),
104 Direct(Authority),
106}
107
108#[derive(Debug, Clone)]
109#[non_exhaustive]
110pub struct S2Endpoints {
112 pub scheme: Scheme,
116 pub account_authority: Authority,
118 pub basin_authority: BasinAuthority,
120}
121
122impl S2Endpoints {
123 pub fn new(account_authority: Authority, basin_authority: BasinAuthority) -> Self {
125 Self {
126 scheme: Scheme::HTTPS,
127 account_authority,
128 basin_authority,
129 }
130 }
131
132 pub fn from_env() -> Result<Self, ValidationError> {
138 let account_endpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
139 Ok(s) => s,
140 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
141 Err(VarError::NotUnicode(_)) => {
142 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
143 }
144 };
145
146 let basin_endpoint = match std::env::var("S2_BASIN_ENDPOINT") {
147 Ok(s) => s,
148 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
149 Err(VarError::NotUnicode(_)) => {
150 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
151 }
152 };
153
154 let (account_scheme, account_authority) = parse_account_endpoint(&account_endpoint)?;
155
156 let (basin_scheme, basin_authority) = parse_basin_endpoint(&basin_endpoint)?;
157
158 if account_scheme != basin_scheme {
159 return Err(
160 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
161 );
162 }
163
164 Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
165 }
166
167 #[doc(hidden)]
168 #[cfg(feature = "_hidden")]
169 pub fn parse_from(
170 account_endpoint: &str,
171 basin_endpoint: &str,
172 ) -> Result<Self, ValidationError> {
173 let (account_scheme, account_authority) = parse_account_endpoint(account_endpoint)?;
174 let (basin_scheme, basin_authority) = parse_basin_endpoint(basin_endpoint)?;
175
176 if account_scheme != basin_scheme {
177 return Err("account and basin endpoints must have the same scheme".into());
178 }
179 Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
180 }
181
182 pub fn with_scheme(self, scheme: Scheme) -> Self {
184 Self { scheme, ..self }
185 }
186
187 pub(crate) fn for_aws() -> Self {
188 Self {
189 scheme: Scheme::HTTPS,
190 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
191 basin_authority: BasinAuthority::ParentZone(
192 "b.aws.s2.dev".try_into().expect("valid authority"),
193 ),
194 }
195 }
196}
197
198fn parse_account_endpoint(s: &str) -> Result<(Scheme, Authority), ValidationError> {
199 let (scheme, authority) = match s.find("://") {
200 Some(idx) => {
201 let scheme: Scheme = s[..idx]
202 .parse()
203 .map_err(|_| "invalid account endpoint scheme".to_string())?;
204 (scheme, &s[idx + 3..])
205 }
206 None => (Scheme::HTTPS, s),
207 };
208 Ok((
209 scheme,
210 authority
211 .parse()
212 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
213 ))
214}
215
216fn parse_basin_endpoint(s: &str) -> Result<(Scheme, BasinAuthority), ValidationError> {
217 let (scheme, authority) = match s.find("://") {
218 Some(idx) => {
219 let scheme: Scheme = s[..idx]
220 .parse()
221 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
222 (scheme, &s[idx + 3..])
223 }
224 None => (Scheme::HTTPS, s),
225 };
226 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
227 BasinAuthority::ParentZone(
228 authority
229 .parse()
230 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
231 )
232 } else {
233 BasinAuthority::Direct(
234 authority
235 .parse()
236 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
237 )
238 };
239 Ok((scheme, authority))
240}
241
242#[derive(Debug, Clone, Copy)]
243pub enum Compression {
245 None,
247 Gzip,
249 Zstd,
251}
252
253impl From<Compression> for CompressionAlgorithm {
254 fn from(value: Compression) -> Self {
255 match value {
256 Compression::None => CompressionAlgorithm::None,
257 Compression::Gzip => CompressionAlgorithm::Gzip,
258 Compression::Zstd => CompressionAlgorithm::Zstd,
259 }
260 }
261}
262
263#[derive(Debug, Clone, Copy, PartialEq)]
264#[non_exhaustive]
265pub enum AppendRetryPolicy {
268 All,
270 NoSideEffects,
272}
273
274impl AppendRetryPolicy {
275 pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
276 match self {
277 Self::All => true,
278 Self::NoSideEffects => input.match_seq_num.is_some(),
279 }
280 }
281}
282
283#[derive(Debug, Clone)]
284#[non_exhaustive]
285pub struct RetryConfig {
294 pub max_attempts: NonZeroU32,
298 pub min_base_delay: Duration,
302 pub max_base_delay: Duration,
306 pub append_retry_policy: AppendRetryPolicy,
311}
312
313impl Default for RetryConfig {
314 fn default() -> Self {
315 Self {
316 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
317 min_base_delay: Duration::from_millis(100),
318 max_base_delay: Duration::from_secs(1),
319 append_retry_policy: AppendRetryPolicy::All,
320 }
321 }
322}
323
324impl RetryConfig {
325 pub fn new() -> Self {
327 Self::default()
328 }
329
330 pub(crate) fn max_retries(&self) -> u32 {
331 self.max_attempts.get() - 1
332 }
333
334 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
336 Self {
337 max_attempts,
338 ..self
339 }
340 }
341
342 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
344 Self {
345 min_base_delay,
346 ..self
347 }
348 }
349
350 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
352 Self {
353 max_base_delay,
354 ..self
355 }
356 }
357
358 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
361 Self {
362 append_retry_policy,
363 ..self
364 }
365 }
366}
367
368#[derive(Debug, Clone)]
369#[non_exhaustive]
370pub struct S2Config {
372 pub(crate) access_token: SecretString,
373 pub(crate) endpoints: S2Endpoints,
374 pub(crate) connection_timeout: Duration,
375 pub(crate) request_timeout: Duration,
376 pub(crate) retry: RetryConfig,
377 pub(crate) compression: Compression,
378 pub(crate) user_agent: HeaderValue,
379}
380
381impl S2Config {
382 pub fn new(access_token: impl Into<String>) -> Self {
384 Self {
385 access_token: access_token.into().into(),
386 endpoints: S2Endpoints::for_aws(),
387 connection_timeout: Duration::from_secs(3),
388 request_timeout: Duration::from_secs(5),
389 retry: RetryConfig::new(),
390 compression: Compression::None,
391 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
392 .parse()
393 .expect("valid user agent"),
394 }
395 }
396
397 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
399 Self { endpoints, ..self }
400 }
401
402 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
406 Self {
407 connection_timeout,
408 ..self
409 }
410 }
411
412 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
416 Self {
417 request_timeout,
418 ..self
419 }
420 }
421
422 pub fn with_retry(self, retry: RetryConfig) -> Self {
426 Self { retry, ..self }
427 }
428
429 pub fn with_compression(self, compression: Compression) -> Self {
433 Self {
434 compression,
435 ..self
436 }
437 }
438
439 #[doc(hidden)]
440 #[cfg(feature = "_hidden")]
441 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
442 let user_agent = user_agent
443 .into()
444 .parse()
445 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
446 Ok(Self { user_agent, ..self })
447 }
448}
449
450#[derive(Debug, Default, Clone, PartialEq, Eq)]
451#[non_exhaustive]
452pub struct Page<T> {
454 pub values: Vec<T>,
456 pub has_more: bool,
458}
459
460impl<T> Page<T> {
461 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
462 Self {
463 values: values.into(),
464 has_more,
465 }
466 }
467}
468
469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
470pub enum StorageClass {
472 Standard,
474 Express,
476}
477
478impl From<api::config::StorageClass> for StorageClass {
479 fn from(value: api::config::StorageClass) -> Self {
480 match value {
481 api::config::StorageClass::Standard => StorageClass::Standard,
482 api::config::StorageClass::Express => StorageClass::Express,
483 }
484 }
485}
486
487impl From<StorageClass> for api::config::StorageClass {
488 fn from(value: StorageClass) -> Self {
489 match value {
490 StorageClass::Standard => api::config::StorageClass::Standard,
491 StorageClass::Express => api::config::StorageClass::Express,
492 }
493 }
494}
495
496#[derive(Debug, Clone, Copy, PartialEq, Eq)]
497pub enum RetentionPolicy {
499 Age(u64),
501 Infinite,
503}
504
505impl From<api::config::RetentionPolicy> for RetentionPolicy {
506 fn from(value: api::config::RetentionPolicy) -> Self {
507 match value {
508 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
509 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
510 }
511 }
512}
513
514impl From<RetentionPolicy> for api::config::RetentionPolicy {
515 fn from(value: RetentionPolicy) -> Self {
516 match value {
517 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
518 RetentionPolicy::Infinite => {
519 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
520 }
521 }
522 }
523}
524
525#[derive(Debug, Clone, Copy, PartialEq, Eq)]
526pub enum TimestampingMode {
528 ClientPrefer,
530 ClientRequire,
532 Arrival,
534}
535
536impl From<api::config::TimestampingMode> for TimestampingMode {
537 fn from(value: api::config::TimestampingMode) -> Self {
538 match value {
539 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
540 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
541 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
542 }
543 }
544}
545
546impl From<TimestampingMode> for api::config::TimestampingMode {
547 fn from(value: TimestampingMode) -> Self {
548 match value {
549 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
550 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
551 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
552 }
553 }
554}
555
556#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
557#[non_exhaustive]
558pub struct TimestampingConfig {
560 pub mode: Option<TimestampingMode>,
564 pub uncapped: bool,
568}
569
570impl TimestampingConfig {
571 pub fn new() -> Self {
573 Self::default()
574 }
575
576 pub fn with_mode(self, mode: TimestampingMode) -> Self {
578 Self {
579 mode: Some(mode),
580 ..self
581 }
582 }
583
584 pub fn with_uncapped(self, uncapped: bool) -> Self {
586 Self { uncapped, ..self }
587 }
588}
589
590impl From<api::config::TimestampingConfig> for TimestampingConfig {
591 fn from(value: api::config::TimestampingConfig) -> Self {
592 Self {
593 mode: value.mode.map(Into::into),
594 uncapped: value.uncapped.unwrap_or_default(),
595 }
596 }
597}
598
599impl From<TimestampingConfig> for api::config::TimestampingConfig {
600 fn from(value: TimestampingConfig) -> Self {
601 Self {
602 mode: value.mode.map(Into::into),
603 uncapped: Some(value.uncapped),
604 }
605 }
606}
607
608#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
609#[non_exhaustive]
610pub struct DeleteOnEmptyConfig {
612 pub min_age_secs: u64,
616}
617
618impl DeleteOnEmptyConfig {
619 pub fn new() -> Self {
621 Self::default()
622 }
623
624 pub fn with_min_age(self, min_age: Duration) -> Self {
626 Self {
627 min_age_secs: min_age.as_secs(),
628 }
629 }
630}
631
632impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
633 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
634 Self {
635 min_age_secs: value.min_age_secs,
636 }
637 }
638}
639
640impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
641 fn from(value: DeleteOnEmptyConfig) -> Self {
642 Self {
643 min_age_secs: value.min_age_secs,
644 }
645 }
646}
647
648#[derive(Debug, Clone, Default, PartialEq, Eq)]
649#[non_exhaustive]
650pub struct StreamConfig {
652 pub storage_class: Option<StorageClass>,
656 pub retention_policy: Option<RetentionPolicy>,
660 pub timestamping: Option<TimestampingConfig>,
664 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
668}
669
670impl StreamConfig {
671 pub fn new() -> Self {
673 Self::default()
674 }
675
676 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
678 Self {
679 storage_class: Some(storage_class),
680 ..self
681 }
682 }
683
684 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
686 Self {
687 retention_policy: Some(retention_policy),
688 ..self
689 }
690 }
691
692 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
694 Self {
695 timestamping: Some(timestamping),
696 ..self
697 }
698 }
699
700 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
702 Self {
703 delete_on_empty: Some(delete_on_empty),
704 ..self
705 }
706 }
707}
708
709impl From<api::config::StreamConfig> for StreamConfig {
710 fn from(value: api::config::StreamConfig) -> Self {
711 Self {
712 storage_class: value.storage_class.map(Into::into),
713 retention_policy: value.retention_policy.map(Into::into),
714 timestamping: value.timestamping.map(Into::into),
715 delete_on_empty: value.delete_on_empty.map(Into::into),
716 }
717 }
718}
719
720impl From<StreamConfig> for api::config::StreamConfig {
721 fn from(value: StreamConfig) -> Self {
722 Self {
723 storage_class: value.storage_class.map(Into::into),
724 retention_policy: value.retention_policy.map(Into::into),
725 timestamping: value.timestamping.map(Into::into),
726 delete_on_empty: value.delete_on_empty.map(Into::into),
727 }
728 }
729}
730
731#[derive(Debug, Clone, Default)]
732#[non_exhaustive]
733pub struct BasinConfig {
735 pub default_stream_config: Option<StreamConfig>,
739 pub create_stream_on_append: bool,
743 pub create_stream_on_read: bool,
747}
748
749impl BasinConfig {
750 pub fn new() -> Self {
752 Self::default()
753 }
754
755 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
757 Self {
758 default_stream_config: Some(config),
759 ..self
760 }
761 }
762
763 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
766 Self {
767 create_stream_on_append,
768 ..self
769 }
770 }
771
772 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
774 Self {
775 create_stream_on_read,
776 ..self
777 }
778 }
779}
780
781impl From<api::config::BasinConfig> for BasinConfig {
782 fn from(value: api::config::BasinConfig) -> Self {
783 Self {
784 default_stream_config: value.default_stream_config.map(Into::into),
785 create_stream_on_append: value.create_stream_on_append,
786 create_stream_on_read: value.create_stream_on_read,
787 }
788 }
789}
790
791impl From<BasinConfig> for api::config::BasinConfig {
792 fn from(value: BasinConfig) -> Self {
793 Self {
794 default_stream_config: value.default_stream_config.map(Into::into),
795 create_stream_on_append: value.create_stream_on_append,
796 create_stream_on_read: value.create_stream_on_read,
797 }
798 }
799}
800
801#[derive(Debug, Clone, PartialEq, Eq)]
802pub enum BasinScope {
804 AwsUsEast1,
806}
807
808impl From<api::basin::BasinScope> for BasinScope {
809 fn from(value: api::basin::BasinScope) -> Self {
810 match value {
811 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
812 }
813 }
814}
815
816impl From<BasinScope> for api::basin::BasinScope {
817 fn from(value: BasinScope) -> Self {
818 match value {
819 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
820 }
821 }
822}
823
824#[derive(Debug, Clone)]
825#[non_exhaustive]
826pub struct CreateBasinInput {
828 pub name: BasinName,
830 pub config: Option<BasinConfig>,
834 pub scope: Option<BasinScope>,
838 pub idempotency_token: String,
846}
847
848impl CreateBasinInput {
849 pub fn new(name: BasinName) -> Self {
851 Self {
852 name,
853 config: None,
854 scope: None,
855 idempotency_token: idempotency_token(),
856 }
857 }
858
859 pub fn with_config(self, config: BasinConfig) -> Self {
861 Self {
862 config: Some(config),
863 ..self
864 }
865 }
866
867 pub fn with_scope(self, scope: BasinScope) -> Self {
869 Self {
870 scope: Some(scope),
871 ..self
872 }
873 }
874
875 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
877 Self {
878 idempotency_token: idempotency_token.into(),
879 ..self
880 }
881 }
882}
883
884impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
885 fn from(value: CreateBasinInput) -> Self {
886 (
887 api::basin::CreateBasinRequest {
888 basin: value.name,
889 config: value.config.map(Into::into),
890 scope: value.scope.map(Into::into),
891 },
892 value.idempotency_token,
893 )
894 }
895}
896
897#[derive(Debug, Clone, Default)]
898#[non_exhaustive]
899pub struct ListBasinsInput {
901 pub prefix: BasinNamePrefix,
905 pub start_after: BasinNameStartAfter,
911 pub limit: Option<usize>,
915}
916
917impl ListBasinsInput {
918 pub fn new() -> Self {
920 Self::default()
921 }
922
923 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
925 Self { prefix, ..self }
926 }
927
928 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
931 Self {
932 start_after,
933 ..self
934 }
935 }
936
937 pub fn with_limit(self, limit: usize) -> Self {
939 Self {
940 limit: Some(limit),
941 ..self
942 }
943 }
944}
945
946impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
947 fn from(value: ListBasinsInput) -> Self {
948 Self {
949 prefix: Some(value.prefix),
950 start_after: Some(value.start_after),
951 limit: value.limit,
952 }
953 }
954}
955
956#[derive(Debug, Clone, Default)]
957pub struct ListAllBasinsInput {
959 pub prefix: BasinNamePrefix,
963 pub start_after: BasinNameStartAfter,
969 pub ignore_pending_deletions: bool,
973}
974
975impl ListAllBasinsInput {
976 pub fn new() -> Self {
978 Self::default()
979 }
980
981 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
983 Self { prefix, ..self }
984 }
985
986 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
989 Self {
990 start_after,
991 ..self
992 }
993 }
994
995 pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
997 Self {
998 ignore_pending_deletions,
999 ..self
1000 }
1001 }
1002}
1003
1004#[derive(Debug, Clone, PartialEq, Eq)]
1005pub enum BasinState {
1007 Active,
1009 Creating,
1011 Deleting,
1013}
1014
1015impl From<api::basin::BasinState> for BasinState {
1016 fn from(value: api::basin::BasinState) -> Self {
1017 match value {
1018 api::basin::BasinState::Active => BasinState::Active,
1019 api::basin::BasinState::Creating => BasinState::Creating,
1020 api::basin::BasinState::Deleting => BasinState::Deleting,
1021 }
1022 }
1023}
1024
1025#[derive(Debug, Clone, PartialEq, Eq)]
1026#[non_exhaustive]
1027pub struct BasinInfo {
1029 pub name: BasinName,
1031 pub scope: Option<BasinScope>,
1033 pub state: BasinState,
1035}
1036
1037impl From<api::basin::BasinInfo> for BasinInfo {
1038 fn from(value: api::basin::BasinInfo) -> Self {
1039 Self {
1040 name: value.name,
1041 scope: value.scope.map(Into::into),
1042 state: value.state.into(),
1043 }
1044 }
1045}
1046
1047#[derive(Debug, Clone)]
1048#[non_exhaustive]
1049pub struct DeleteBasinInput {
1051 pub name: BasinName,
1053 pub ignore_not_found: bool,
1055}
1056
1057impl DeleteBasinInput {
1058 pub fn new(name: BasinName) -> Self {
1060 Self {
1061 name,
1062 ignore_not_found: false,
1063 }
1064 }
1065
1066 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1068 Self {
1069 ignore_not_found,
1070 ..self
1071 }
1072 }
1073}
1074
1075#[derive(Debug, Clone, Default)]
1076#[non_exhaustive]
1077pub struct TimestampingReconfiguration {
1079 pub mode: Maybe<Option<TimestampingMode>>,
1081 pub uncapped: Maybe<Option<bool>>,
1083}
1084
1085impl TimestampingReconfiguration {
1086 pub fn new() -> Self {
1088 Self::default()
1089 }
1090
1091 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1093 Self {
1094 mode: Maybe::Specified(Some(mode)),
1095 ..self
1096 }
1097 }
1098
1099 pub fn with_uncapped(self, uncapped: bool) -> Self {
1101 Self {
1102 uncapped: Maybe::Specified(Some(uncapped)),
1103 ..self
1104 }
1105 }
1106}
1107
1108impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1109 fn from(value: TimestampingReconfiguration) -> Self {
1110 Self {
1111 mode: value.mode.map(|m| m.map(Into::into)),
1112 uncapped: value.uncapped,
1113 }
1114 }
1115}
1116
1117#[derive(Debug, Clone, Default)]
1118#[non_exhaustive]
1119pub struct DeleteOnEmptyReconfiguration {
1121 pub min_age_secs: Maybe<Option<u64>>,
1123}
1124
1125impl DeleteOnEmptyReconfiguration {
1126 pub fn new() -> Self {
1128 Self::default()
1129 }
1130
1131 pub fn with_min_age(self, min_age: Duration) -> Self {
1133 Self {
1134 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1135 }
1136 }
1137}
1138
1139impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1140 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1141 Self {
1142 min_age_secs: value.min_age_secs,
1143 }
1144 }
1145}
1146
1147#[derive(Debug, Clone, Default)]
1148#[non_exhaustive]
1149pub struct StreamReconfiguration {
1151 pub storage_class: Maybe<Option<StorageClass>>,
1153 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1155 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1157 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1159}
1160
1161impl StreamReconfiguration {
1162 pub fn new() -> Self {
1164 Self::default()
1165 }
1166
1167 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1169 Self {
1170 storage_class: Maybe::Specified(Some(storage_class)),
1171 ..self
1172 }
1173 }
1174
1175 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1177 Self {
1178 retention_policy: Maybe::Specified(Some(retention_policy)),
1179 ..self
1180 }
1181 }
1182
1183 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1185 Self {
1186 timestamping: Maybe::Specified(Some(timestamping)),
1187 ..self
1188 }
1189 }
1190
1191 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1193 Self {
1194 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1195 ..self
1196 }
1197 }
1198}
1199
1200impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1201 fn from(value: StreamReconfiguration) -> Self {
1202 Self {
1203 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1204 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1205 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1206 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1207 }
1208 }
1209}
1210
1211#[derive(Debug, Clone, Default)]
1212#[non_exhaustive]
1213pub struct BasinReconfiguration {
1215 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1217 pub create_stream_on_append: Maybe<bool>,
1220 pub create_stream_on_read: Maybe<bool>,
1222}
1223
1224impl BasinReconfiguration {
1225 pub fn new() -> Self {
1227 Self::default()
1228 }
1229
1230 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1233 Self {
1234 default_stream_config: Maybe::Specified(Some(config)),
1235 ..self
1236 }
1237 }
1238
1239 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1242 Self {
1243 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1244 ..self
1245 }
1246 }
1247
1248 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1251 Self {
1252 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1253 ..self
1254 }
1255 }
1256}
1257
1258impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1259 fn from(value: BasinReconfiguration) -> Self {
1260 Self {
1261 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1262 create_stream_on_append: value.create_stream_on_append,
1263 create_stream_on_read: value.create_stream_on_read,
1264 }
1265 }
1266}
1267
1268#[derive(Debug, Clone)]
1269#[non_exhaustive]
1270pub struct ReconfigureBasinInput {
1272 pub name: BasinName,
1274 pub config: BasinReconfiguration,
1276}
1277
1278impl ReconfigureBasinInput {
1279 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1281 Self { name, config }
1282 }
1283}
1284
1285#[derive(Debug, Clone, Default)]
1286#[non_exhaustive]
1287pub struct ListAccessTokensInput {
1289 pub prefix: AccessTokenIdPrefix,
1293 pub start_after: AccessTokenIdStartAfter,
1299 pub limit: Option<usize>,
1303}
1304
1305impl ListAccessTokensInput {
1306 pub fn new() -> Self {
1308 Self::default()
1309 }
1310
1311 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1313 Self { prefix, ..self }
1314 }
1315
1316 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1319 Self {
1320 start_after,
1321 ..self
1322 }
1323 }
1324
1325 pub fn with_limit(self, limit: usize) -> Self {
1327 Self {
1328 limit: Some(limit),
1329 ..self
1330 }
1331 }
1332}
1333
1334impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1335 fn from(value: ListAccessTokensInput) -> Self {
1336 Self {
1337 prefix: Some(value.prefix),
1338 start_after: Some(value.start_after),
1339 limit: value.limit,
1340 }
1341 }
1342}
1343
1344#[derive(Debug, Clone, Default)]
1345pub struct ListAllAccessTokensInput {
1347 pub prefix: AccessTokenIdPrefix,
1351 pub start_after: AccessTokenIdStartAfter,
1357}
1358
1359impl ListAllAccessTokensInput {
1360 pub fn new() -> Self {
1362 Self::default()
1363 }
1364
1365 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1367 Self { prefix, ..self }
1368 }
1369
1370 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1373 Self {
1374 start_after,
1375 ..self
1376 }
1377 }
1378}
1379
1380#[derive(Debug, Clone)]
1381#[non_exhaustive]
1382pub struct AccessTokenInfo {
1384 pub id: AccessTokenId,
1386 pub expires_at: S2DateTime,
1388 pub auto_prefix_streams: bool,
1391 pub scope: AccessTokenScope,
1393}
1394
1395impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1396 type Error = ValidationError;
1397
1398 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1399 let expires_at = value
1400 .expires_at
1401 .map(Into::into)
1402 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1403 Ok(Self {
1404 id: value.id,
1405 expires_at,
1406 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1407 scope: value.scope.into(),
1408 })
1409 }
1410}
1411
1412#[derive(Debug, Clone)]
1413pub enum BasinMatcher {
1417 None,
1419 Exact(BasinName),
1421 Prefix(BasinNamePrefix),
1423}
1424
1425#[derive(Debug, Clone)]
1426pub enum StreamMatcher {
1430 None,
1432 Exact(StreamName),
1434 Prefix(StreamNamePrefix),
1436}
1437
1438#[derive(Debug, Clone)]
1439pub enum AccessTokenMatcher {
1443 None,
1445 Exact(AccessTokenId),
1447 Prefix(AccessTokenIdPrefix),
1449}
1450
1451#[derive(Debug, Clone, Default)]
1452#[non_exhaustive]
1453pub struct ReadWritePermissions {
1455 pub read: bool,
1459 pub write: bool,
1463}
1464
1465impl ReadWritePermissions {
1466 pub fn new() -> Self {
1468 Self::default()
1469 }
1470
1471 pub fn read_only() -> Self {
1473 Self {
1474 read: true,
1475 write: false,
1476 }
1477 }
1478
1479 pub fn write_only() -> Self {
1481 Self {
1482 read: false,
1483 write: true,
1484 }
1485 }
1486
1487 pub fn read_write() -> Self {
1489 Self {
1490 read: true,
1491 write: true,
1492 }
1493 }
1494}
1495
1496impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1497 fn from(value: ReadWritePermissions) -> Self {
1498 Self {
1499 read: Some(value.read),
1500 write: Some(value.write),
1501 }
1502 }
1503}
1504
1505impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1506 fn from(value: api::access::ReadWritePermissions) -> Self {
1507 Self {
1508 read: value.read.unwrap_or_default(),
1509 write: value.write.unwrap_or_default(),
1510 }
1511 }
1512}
1513
1514#[derive(Debug, Clone, Default)]
1515#[non_exhaustive]
1516pub struct OperationGroupPermissions {
1520 pub account: Option<ReadWritePermissions>,
1524 pub basin: Option<ReadWritePermissions>,
1528 pub stream: Option<ReadWritePermissions>,
1532}
1533
1534impl OperationGroupPermissions {
1535 pub fn new() -> Self {
1537 Self::default()
1538 }
1539
1540 pub fn read_only_all() -> Self {
1542 Self {
1543 account: Some(ReadWritePermissions::read_only()),
1544 basin: Some(ReadWritePermissions::read_only()),
1545 stream: Some(ReadWritePermissions::read_only()),
1546 }
1547 }
1548
1549 pub fn write_only_all() -> Self {
1551 Self {
1552 account: Some(ReadWritePermissions::write_only()),
1553 basin: Some(ReadWritePermissions::write_only()),
1554 stream: Some(ReadWritePermissions::write_only()),
1555 }
1556 }
1557
1558 pub fn read_write_all() -> Self {
1560 Self {
1561 account: Some(ReadWritePermissions::read_write()),
1562 basin: Some(ReadWritePermissions::read_write()),
1563 stream: Some(ReadWritePermissions::read_write()),
1564 }
1565 }
1566
1567 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1569 Self {
1570 account: Some(account),
1571 ..self
1572 }
1573 }
1574
1575 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1577 Self {
1578 basin: Some(basin),
1579 ..self
1580 }
1581 }
1582
1583 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1585 Self {
1586 stream: Some(stream),
1587 ..self
1588 }
1589 }
1590}
1591
1592impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1593 fn from(value: OperationGroupPermissions) -> Self {
1594 Self {
1595 account: value.account.map(Into::into),
1596 basin: value.basin.map(Into::into),
1597 stream: value.stream.map(Into::into),
1598 }
1599 }
1600}
1601
1602impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1603 fn from(value: api::access::PermittedOperationGroups) -> Self {
1604 Self {
1605 account: value.account.map(Into::into),
1606 basin: value.basin.map(Into::into),
1607 stream: value.stream.map(Into::into),
1608 }
1609 }
1610}
1611
1612#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1613pub enum Operation {
1617 ListBasins,
1619 CreateBasin,
1621 GetBasinConfig,
1623 DeleteBasin,
1625 ReconfigureBasin,
1627 ListAccessTokens,
1629 IssueAccessToken,
1631 RevokeAccessToken,
1633 GetAccountMetrics,
1635 GetBasinMetrics,
1637 GetStreamMetrics,
1639 ListStreams,
1641 CreateStream,
1643 GetStreamConfig,
1645 DeleteStream,
1647 ReconfigureStream,
1649 CheckTail,
1651 Append,
1653 Read,
1655 Trim,
1657 Fence,
1659}
1660
1661impl From<Operation> for api::access::Operation {
1662 fn from(value: Operation) -> Self {
1663 match value {
1664 Operation::ListBasins => api::access::Operation::ListBasins,
1665 Operation::CreateBasin => api::access::Operation::CreateBasin,
1666 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1667 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1668 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1669 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1670 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1671 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1672 Operation::ListStreams => api::access::Operation::ListStreams,
1673 Operation::CreateStream => api::access::Operation::CreateStream,
1674 Operation::DeleteStream => api::access::Operation::DeleteStream,
1675 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1676 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1677 Operation::CheckTail => api::access::Operation::CheckTail,
1678 Operation::Append => api::access::Operation::Append,
1679 Operation::Read => api::access::Operation::Read,
1680 Operation::Trim => api::access::Operation::Trim,
1681 Operation::Fence => api::access::Operation::Fence,
1682 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1683 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1684 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1685 }
1686 }
1687}
1688
1689impl From<api::access::Operation> for Operation {
1690 fn from(value: api::access::Operation) -> Self {
1691 match value {
1692 api::access::Operation::ListBasins => Operation::ListBasins,
1693 api::access::Operation::CreateBasin => Operation::CreateBasin,
1694 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1695 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1696 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1697 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1698 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1699 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1700 api::access::Operation::ListStreams => Operation::ListStreams,
1701 api::access::Operation::CreateStream => Operation::CreateStream,
1702 api::access::Operation::DeleteStream => Operation::DeleteStream,
1703 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1704 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1705 api::access::Operation::CheckTail => Operation::CheckTail,
1706 api::access::Operation::Append => Operation::Append,
1707 api::access::Operation::Read => Operation::Read,
1708 api::access::Operation::Trim => Operation::Trim,
1709 api::access::Operation::Fence => Operation::Fence,
1710 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1711 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1712 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1713 }
1714 }
1715}
1716
1717#[derive(Debug, Clone)]
1718#[non_exhaustive]
1719pub struct AccessTokenScopeInput {
1727 basins: Option<BasinMatcher>,
1728 streams: Option<StreamMatcher>,
1729 access_tokens: Option<AccessTokenMatcher>,
1730 op_group_perms: Option<OperationGroupPermissions>,
1731 ops: HashSet<Operation>,
1732}
1733
1734impl AccessTokenScopeInput {
1735 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1737 Self {
1738 basins: None,
1739 streams: None,
1740 access_tokens: None,
1741 op_group_perms: None,
1742 ops: ops.into_iter().collect(),
1743 }
1744 }
1745
1746 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1748 Self {
1749 basins: None,
1750 streams: None,
1751 access_tokens: None,
1752 op_group_perms: Some(op_group_perms),
1753 ops: HashSet::default(),
1754 }
1755 }
1756
1757 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1759 Self {
1760 ops: ops.into_iter().collect(),
1761 ..self
1762 }
1763 }
1764
1765 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1767 Self {
1768 op_group_perms: Some(op_group_perms),
1769 ..self
1770 }
1771 }
1772
1773 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1777 Self {
1778 basins: Some(basins),
1779 ..self
1780 }
1781 }
1782
1783 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1787 Self {
1788 streams: Some(streams),
1789 ..self
1790 }
1791 }
1792
1793 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1797 Self {
1798 access_tokens: Some(access_tokens),
1799 ..self
1800 }
1801 }
1802}
1803
1804#[derive(Debug, Clone)]
1805#[non_exhaustive]
1806pub struct AccessTokenScope {
1808 pub basins: Option<BasinMatcher>,
1810 pub streams: Option<StreamMatcher>,
1812 pub access_tokens: Option<AccessTokenMatcher>,
1814 pub op_group_perms: Option<OperationGroupPermissions>,
1816 pub ops: HashSet<Operation>,
1818}
1819
1820impl From<api::access::AccessTokenScope> for AccessTokenScope {
1821 fn from(value: api::access::AccessTokenScope) -> Self {
1822 Self {
1823 basins: value.basins.map(|rs| match rs {
1824 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1825 BasinMatcher::Exact(e)
1826 }
1827 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1828 BasinMatcher::None
1829 }
1830 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1831 }),
1832 streams: value.streams.map(|rs| match rs {
1833 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1834 StreamMatcher::Exact(e)
1835 }
1836 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1837 StreamMatcher::None
1838 }
1839 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1840 }),
1841 access_tokens: value.access_tokens.map(|rs| match rs {
1842 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1843 AccessTokenMatcher::Exact(e)
1844 }
1845 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1846 AccessTokenMatcher::None
1847 }
1848 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1849 }),
1850 op_group_perms: value.op_groups.map(Into::into),
1851 ops: value
1852 .ops
1853 .map(|ops| ops.into_iter().map(Into::into).collect())
1854 .unwrap_or_default(),
1855 }
1856 }
1857}
1858
1859impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1860 fn from(value: AccessTokenScopeInput) -> Self {
1861 Self {
1862 basins: value.basins.map(|rs| match rs {
1863 BasinMatcher::None => {
1864 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1865 }
1866 BasinMatcher::Exact(e) => {
1867 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1868 }
1869 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1870 }),
1871 streams: value.streams.map(|rs| match rs {
1872 StreamMatcher::None => {
1873 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1874 }
1875 StreamMatcher::Exact(e) => {
1876 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1877 }
1878 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1879 }),
1880 access_tokens: value.access_tokens.map(|rs| match rs {
1881 AccessTokenMatcher::None => {
1882 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1883 }
1884 AccessTokenMatcher::Exact(e) => {
1885 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1886 }
1887 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1888 }),
1889 op_groups: value.op_group_perms.map(Into::into),
1890 ops: if value.ops.is_empty() {
1891 None
1892 } else {
1893 Some(value.ops.into_iter().map(Into::into).collect())
1894 },
1895 }
1896 }
1897}
1898
1899#[derive(Debug, Clone)]
1900#[non_exhaustive]
1901pub struct IssueAccessTokenInput {
1903 pub id: AccessTokenId,
1905 pub expires_at: Option<S2DateTime>,
1910 pub auto_prefix_streams: bool,
1918 pub scope: AccessTokenScopeInput,
1920}
1921
1922impl IssueAccessTokenInput {
1923 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1925 Self {
1926 id,
1927 expires_at: None,
1928 auto_prefix_streams: false,
1929 scope,
1930 }
1931 }
1932
1933 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1935 Self {
1936 expires_at: Some(expires_at),
1937 ..self
1938 }
1939 }
1940
1941 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1944 Self {
1945 auto_prefix_streams,
1946 ..self
1947 }
1948 }
1949}
1950
1951impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1952 fn from(value: IssueAccessTokenInput) -> Self {
1953 Self {
1954 id: value.id,
1955 expires_at: value.expires_at.map(Into::into),
1956 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1957 scope: value.scope.into(),
1958 }
1959 }
1960}
1961
1962#[derive(Debug, Clone, Copy)]
1963pub enum TimeseriesInterval {
1965 Minute,
1967 Hour,
1969 Day,
1971}
1972
1973impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
1974 fn from(value: TimeseriesInterval) -> Self {
1975 match value {
1976 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
1977 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
1978 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
1979 }
1980 }
1981}
1982
1983impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
1984 fn from(value: api::metrics::TimeseriesInterval) -> Self {
1985 match value {
1986 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
1987 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
1988 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
1989 }
1990 }
1991}
1992
1993#[derive(Debug, Clone, Copy)]
1994#[non_exhaustive]
1995pub struct TimeRange {
1997 pub start: u32,
1999 pub end: u32,
2001}
2002
2003impl TimeRange {
2004 pub fn new(start: u32, end: u32) -> Self {
2006 Self { start, end }
2007 }
2008}
2009
2010#[derive(Debug, Clone, Copy)]
2011#[non_exhaustive]
2012pub struct TimeRangeAndInterval {
2014 pub start: u32,
2016 pub end: u32,
2018 pub interval: Option<TimeseriesInterval>,
2022}
2023
2024impl TimeRangeAndInterval {
2025 pub fn new(start: u32, end: u32) -> Self {
2027 Self {
2028 start,
2029 end,
2030 interval: None,
2031 }
2032 }
2033
2034 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2036 Self {
2037 interval: Some(interval),
2038 ..self
2039 }
2040 }
2041}
2042
2043#[derive(Debug, Clone, Copy)]
2044pub enum AccountMetricSet {
2046 ActiveBasins(TimeRange),
2049 AccountOps(TimeRangeAndInterval),
2056}
2057
2058#[derive(Debug, Clone)]
2059#[non_exhaustive]
2060pub struct GetAccountMetricsInput {
2062 pub set: AccountMetricSet,
2064}
2065
2066impl GetAccountMetricsInput {
2067 pub fn new(set: AccountMetricSet) -> Self {
2069 Self { set }
2070 }
2071}
2072
2073impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2074 fn from(value: GetAccountMetricsInput) -> Self {
2075 let (set, start, end, interval) = match value.set {
2076 AccountMetricSet::ActiveBasins(args) => (
2077 api::metrics::AccountMetricSet::ActiveBasins,
2078 args.start,
2079 args.end,
2080 None,
2081 ),
2082 AccountMetricSet::AccountOps(args) => (
2083 api::metrics::AccountMetricSet::AccountOps,
2084 args.start,
2085 args.end,
2086 args.interval,
2087 ),
2088 };
2089 Self {
2090 set,
2091 start: Some(start),
2092 end: Some(end),
2093 interval: interval.map(Into::into),
2094 }
2095 }
2096}
2097
2098#[derive(Debug, Clone, Copy)]
2099pub enum BasinMetricSet {
2101 Storage(TimeRange),
2104 AppendOps(TimeRangeAndInterval),
2112 ReadOps(TimeRangeAndInterval),
2120 ReadThroughput(TimeRangeAndInterval),
2127 AppendThroughput(TimeRangeAndInterval),
2134 BasinOps(TimeRangeAndInterval),
2141}
2142
2143#[derive(Debug, Clone)]
2144#[non_exhaustive]
2145pub struct GetBasinMetricsInput {
2147 pub name: BasinName,
2149 pub set: BasinMetricSet,
2151}
2152
2153impl GetBasinMetricsInput {
2154 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2156 Self { name, set }
2157 }
2158}
2159
2160impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2161 fn from(value: GetBasinMetricsInput) -> Self {
2162 let (set, start, end, interval) = match value.set {
2163 BasinMetricSet::Storage(args) => (
2164 api::metrics::BasinMetricSet::Storage,
2165 args.start,
2166 args.end,
2167 None,
2168 ),
2169 BasinMetricSet::AppendOps(args) => (
2170 api::metrics::BasinMetricSet::AppendOps,
2171 args.start,
2172 args.end,
2173 args.interval,
2174 ),
2175 BasinMetricSet::ReadOps(args) => (
2176 api::metrics::BasinMetricSet::ReadOps,
2177 args.start,
2178 args.end,
2179 args.interval,
2180 ),
2181 BasinMetricSet::ReadThroughput(args) => (
2182 api::metrics::BasinMetricSet::ReadThroughput,
2183 args.start,
2184 args.end,
2185 args.interval,
2186 ),
2187 BasinMetricSet::AppendThroughput(args) => (
2188 api::metrics::BasinMetricSet::AppendThroughput,
2189 args.start,
2190 args.end,
2191 args.interval,
2192 ),
2193 BasinMetricSet::BasinOps(args) => (
2194 api::metrics::BasinMetricSet::BasinOps,
2195 args.start,
2196 args.end,
2197 args.interval,
2198 ),
2199 };
2200 (
2201 value.name,
2202 api::metrics::BasinMetricSetRequest {
2203 set,
2204 start: Some(start),
2205 end: Some(end),
2206 interval: interval.map(Into::into),
2207 },
2208 )
2209 }
2210}
2211
2212#[derive(Debug, Clone, Copy)]
2213pub enum StreamMetricSet {
2215 Storage(TimeRange),
2218}
2219
2220#[derive(Debug, Clone)]
2221#[non_exhaustive]
2222pub struct GetStreamMetricsInput {
2224 pub basin_name: BasinName,
2226 pub stream_name: StreamName,
2228 pub set: StreamMetricSet,
2230}
2231
2232impl GetStreamMetricsInput {
2233 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2236 Self {
2237 basin_name,
2238 stream_name,
2239 set,
2240 }
2241 }
2242}
2243
2244impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2245 fn from(value: GetStreamMetricsInput) -> Self {
2246 let (set, start, end, interval) = match value.set {
2247 StreamMetricSet::Storage(args) => (
2248 api::metrics::StreamMetricSet::Storage,
2249 args.start,
2250 args.end,
2251 None,
2252 ),
2253 };
2254 (
2255 value.basin_name,
2256 value.stream_name,
2257 api::metrics::StreamMetricSetRequest {
2258 set,
2259 start: Some(start),
2260 end: Some(end),
2261 interval,
2262 },
2263 )
2264 }
2265}
2266
2267#[derive(Debug, Clone, Copy)]
2268pub enum MetricUnit {
2270 Bytes,
2272 Operations,
2274}
2275
2276impl From<api::metrics::MetricUnit> for MetricUnit {
2277 fn from(value: api::metrics::MetricUnit) -> Self {
2278 match value {
2279 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2280 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2281 }
2282 }
2283}
2284
2285#[derive(Debug, Clone)]
2286#[non_exhaustive]
2287pub struct ScalarMetric {
2289 pub name: String,
2291 pub unit: MetricUnit,
2293 pub value: f64,
2295}
2296
2297#[derive(Debug, Clone)]
2298#[non_exhaustive]
2299pub struct AccumulationMetric {
2302 pub name: String,
2304 pub unit: MetricUnit,
2306 pub interval: TimeseriesInterval,
2308 pub values: Vec<(u32, f64)>,
2312}
2313
2314#[derive(Debug, Clone)]
2315#[non_exhaustive]
2316pub struct GaugeMetric {
2318 pub name: String,
2320 pub unit: MetricUnit,
2322 pub values: Vec<(u32, f64)>,
2325}
2326
2327#[derive(Debug, Clone)]
2328#[non_exhaustive]
2329pub struct LabelMetric {
2331 pub name: String,
2333 pub values: Vec<String>,
2335}
2336
2337#[derive(Debug, Clone)]
2338pub enum Metric {
2340 Scalar(ScalarMetric),
2342 Accumulation(AccumulationMetric),
2345 Gauge(GaugeMetric),
2347 Label(LabelMetric),
2349}
2350
2351impl From<api::metrics::Metric> for Metric {
2352 fn from(value: api::metrics::Metric) -> Self {
2353 match value {
2354 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2355 name: sm.name.into(),
2356 unit: sm.unit.into(),
2357 value: sm.value,
2358 }),
2359 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2360 name: am.name.into(),
2361 unit: am.unit.into(),
2362 interval: am.interval.into(),
2363 values: am.values,
2364 }),
2365 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2366 name: gm.name.into(),
2367 unit: gm.unit.into(),
2368 values: gm.values,
2369 }),
2370 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2371 name: lm.name.into(),
2372 values: lm.values,
2373 }),
2374 }
2375 }
2376}
2377
2378#[derive(Debug, Clone, Default)]
2379#[non_exhaustive]
2380pub struct ListStreamsInput {
2382 pub prefix: StreamNamePrefix,
2386 pub start_after: StreamNameStartAfter,
2392 pub limit: Option<usize>,
2396}
2397
2398impl ListStreamsInput {
2399 pub fn new() -> Self {
2401 Self::default()
2402 }
2403
2404 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2406 Self { prefix, ..self }
2407 }
2408
2409 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2412 Self {
2413 start_after,
2414 ..self
2415 }
2416 }
2417
2418 pub fn with_limit(self, limit: usize) -> Self {
2420 Self {
2421 limit: Some(limit),
2422 ..self
2423 }
2424 }
2425}
2426
2427impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2428 fn from(value: ListStreamsInput) -> Self {
2429 Self {
2430 prefix: Some(value.prefix),
2431 start_after: Some(value.start_after),
2432 limit: value.limit,
2433 }
2434 }
2435}
2436
2437#[derive(Debug, Clone, Default)]
2438pub struct ListAllStreamsInput {
2440 pub prefix: StreamNamePrefix,
2444 pub start_after: StreamNameStartAfter,
2450 pub ignore_pending_deletions: bool,
2454}
2455
2456impl ListAllStreamsInput {
2457 pub fn new() -> Self {
2459 Self::default()
2460 }
2461
2462 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2464 Self { prefix, ..self }
2465 }
2466
2467 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2470 Self {
2471 start_after,
2472 ..self
2473 }
2474 }
2475
2476 pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
2478 Self {
2479 ignore_pending_deletions,
2480 ..self
2481 }
2482 }
2483}
2484
2485#[derive(Debug, Clone, PartialEq)]
2486#[non_exhaustive]
2487pub struct StreamInfo {
2489 pub name: StreamName,
2491 pub created_at: S2DateTime,
2493 pub deleted_at: Option<S2DateTime>,
2495}
2496
2497impl From<api::stream::StreamInfo> for StreamInfo {
2498 fn from(value: api::stream::StreamInfo) -> Self {
2499 Self {
2500 name: value.name,
2501 created_at: value.created_at.into(),
2502 deleted_at: value.deleted_at.map(Into::into),
2503 }
2504 }
2505}
2506
2507#[derive(Debug, Clone)]
2508#[non_exhaustive]
2509pub struct CreateStreamInput {
2511 pub name: StreamName,
2513 pub config: Option<StreamConfig>,
2517 pub idempotency_token: String,
2525}
2526
2527impl CreateStreamInput {
2528 pub fn new(name: StreamName) -> Self {
2530 Self {
2531 name,
2532 config: None,
2533 idempotency_token: idempotency_token(),
2534 }
2535 }
2536
2537 pub fn with_config(self, config: StreamConfig) -> Self {
2539 Self {
2540 config: Some(config),
2541 ..self
2542 }
2543 }
2544
2545 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2547 Self {
2548 idempotency_token: idempotency_token.into(),
2549 ..self
2550 }
2551 }
2552}
2553
2554impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2555 fn from(value: CreateStreamInput) -> Self {
2556 (
2557 api::stream::CreateStreamRequest {
2558 stream: value.name,
2559 config: value.config.map(Into::into),
2560 },
2561 value.idempotency_token,
2562 )
2563 }
2564}
2565
2566#[derive(Debug, Clone)]
2567#[non_exhaustive]
2568pub struct DeleteStreamInput {
2570 pub name: StreamName,
2572 pub ignore_not_found: bool,
2574}
2575
2576impl DeleteStreamInput {
2577 pub fn new(name: StreamName) -> Self {
2579 Self {
2580 name,
2581 ignore_not_found: false,
2582 }
2583 }
2584
2585 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2587 Self {
2588 ignore_not_found,
2589 ..self
2590 }
2591 }
2592}
2593
2594#[derive(Debug, Clone)]
2595#[non_exhaustive]
2596pub struct ReconfigureStreamInput {
2598 pub name: StreamName,
2600 pub config: StreamReconfiguration,
2602}
2603
2604impl ReconfigureStreamInput {
2605 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2607 Self { name, config }
2608 }
2609}
2610
2611#[derive(Debug, Clone, PartialEq, Eq)]
2612pub struct FencingToken(String);
2618
2619impl FencingToken {
2620 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2622 rand::rng()
2623 .sample_iter(&rand::distr::Alphanumeric)
2624 .take(n)
2625 .map(char::from)
2626 .collect::<String>()
2627 .parse()
2628 }
2629}
2630
2631impl FromStr for FencingToken {
2632 type Err = ValidationError;
2633
2634 fn from_str(s: &str) -> Result<Self, Self::Err> {
2635 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2636 return Err(ValidationError(format!(
2637 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2638 )));
2639 }
2640 Ok(FencingToken(s.to_string()))
2641 }
2642}
2643
2644impl std::fmt::Display for FencingToken {
2645 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2646 write!(f, "{}", self.0)
2647 }
2648}
2649
2650impl Deref for FencingToken {
2651 type Target = str;
2652
2653 fn deref(&self) -> &Self::Target {
2654 &self.0
2655 }
2656}
2657
2658#[derive(Debug, Clone, Copy, PartialEq)]
2659#[non_exhaustive]
2660pub struct StreamPosition {
2662 pub seq_num: u64,
2664 pub timestamp: u64,
2667}
2668
2669impl std::fmt::Display for StreamPosition {
2670 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2671 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2672 }
2673}
2674
2675impl From<api::stream::proto::StreamPosition> for StreamPosition {
2676 fn from(value: api::stream::proto::StreamPosition) -> Self {
2677 Self {
2678 seq_num: value.seq_num,
2679 timestamp: value.timestamp,
2680 }
2681 }
2682}
2683
2684impl From<api::stream::StreamPosition> for StreamPosition {
2685 fn from(value: api::stream::StreamPosition) -> Self {
2686 Self {
2687 seq_num: value.seq_num,
2688 timestamp: value.timestamp,
2689 }
2690 }
2691}
2692
2693#[derive(Debug, Clone, PartialEq)]
2694#[non_exhaustive]
2695pub struct Header {
2697 pub name: Bytes,
2699 pub value: Bytes,
2701}
2702
2703impl Header {
2704 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2706 Self {
2707 name: name.into(),
2708 value: value.into(),
2709 }
2710 }
2711}
2712
2713impl From<Header> for api::stream::proto::Header {
2714 fn from(value: Header) -> Self {
2715 Self {
2716 name: value.name,
2717 value: value.value,
2718 }
2719 }
2720}
2721
2722impl From<api::stream::proto::Header> for Header {
2723 fn from(value: api::stream::proto::Header) -> Self {
2724 Self {
2725 name: value.name,
2726 value: value.value,
2727 }
2728 }
2729}
2730
2731#[derive(Debug, Clone, PartialEq)]
2732pub struct AppendRecord {
2734 body: Bytes,
2735 headers: Vec<Header>,
2736 timestamp: Option<u64>,
2737}
2738
2739impl AppendRecord {
2740 fn validate(self) -> Result<Self, ValidationError> {
2741 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2742 Err(ValidationError(format!(
2743 "metered_bytes: {} exceeds {}",
2744 self.metered_bytes(),
2745 RECORD_BATCH_MAX.bytes
2746 )))
2747 } else {
2748 Ok(self)
2749 }
2750 }
2751
2752 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2754 let record = Self {
2755 body: body.into(),
2756 headers: Vec::default(),
2757 timestamp: None,
2758 };
2759 record.validate()
2760 }
2761
2762 pub fn with_headers(
2764 self,
2765 headers: impl IntoIterator<Item = Header>,
2766 ) -> Result<Self, ValidationError> {
2767 let record = Self {
2768 headers: headers.into_iter().collect(),
2769 ..self
2770 };
2771 record.validate()
2772 }
2773
2774 pub fn with_timestamp(self, timestamp: u64) -> Self {
2778 Self {
2779 timestamp: Some(timestamp),
2780 ..self
2781 }
2782 }
2783}
2784
2785impl From<AppendRecord> for api::stream::proto::AppendRecord {
2786 fn from(value: AppendRecord) -> Self {
2787 Self {
2788 timestamp: value.timestamp,
2789 headers: value.headers.into_iter().map(Into::into).collect(),
2790 body: value.body,
2791 }
2792 }
2793}
2794
2795pub trait MeteredBytes {
2802 fn metered_bytes(&self) -> usize;
2804}
2805
2806macro_rules! metered_bytes_impl {
2807 ($ty:ty) => {
2808 impl MeteredBytes for $ty {
2809 fn metered_bytes(&self) -> usize {
2810 8 + (2 * self.headers.len())
2811 + self
2812 .headers
2813 .iter()
2814 .map(|h| h.name.len() + h.value.len())
2815 .sum::<usize>()
2816 + self.body.len()
2817 }
2818 }
2819 };
2820}
2821
2822metered_bytes_impl!(AppendRecord);
2823
2824#[derive(Debug, Clone)]
2825pub struct AppendRecordBatch {
2834 records: Vec<AppendRecord>,
2835 metered_bytes: usize,
2836}
2837
2838impl AppendRecordBatch {
2839 pub(crate) fn with_capacity(capacity: usize) -> Self {
2840 Self {
2841 records: Vec::with_capacity(capacity),
2842 metered_bytes: 0,
2843 }
2844 }
2845
2846 pub(crate) fn push(&mut self, record: AppendRecord) {
2847 self.metered_bytes += record.metered_bytes();
2848 self.records.push(record);
2849 }
2850
2851 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2853 where
2854 I: IntoIterator<Item = AppendRecord>,
2855 {
2856 let mut records = Vec::new();
2857 let mut metered_bytes = 0;
2858
2859 for record in iter {
2860 metered_bytes += record.metered_bytes();
2861 records.push(record);
2862
2863 if metered_bytes > RECORD_BATCH_MAX.bytes {
2864 return Err(ValidationError(format!(
2865 "batch size in metered bytes ({metered_bytes}) exceeds {}",
2866 RECORD_BATCH_MAX.bytes
2867 )));
2868 }
2869
2870 if records.len() > RECORD_BATCH_MAX.count {
2871 return Err(ValidationError(format!(
2872 "number of records in the batch exceeds {}",
2873 RECORD_BATCH_MAX.count
2874 )));
2875 }
2876 }
2877
2878 if records.is_empty() {
2879 return Err(ValidationError("batch is empty".into()));
2880 }
2881
2882 Ok(Self {
2883 records,
2884 metered_bytes,
2885 })
2886 }
2887}
2888
2889impl Deref for AppendRecordBatch {
2890 type Target = [AppendRecord];
2891
2892 fn deref(&self) -> &Self::Target {
2893 &self.records
2894 }
2895}
2896
2897impl MeteredBytes for AppendRecordBatch {
2898 fn metered_bytes(&self) -> usize {
2899 self.metered_bytes
2900 }
2901}
2902
2903#[derive(Debug, Clone)]
2904pub enum Command {
2906 Fence {
2908 fencing_token: FencingToken,
2910 },
2911 Trim {
2913 trim_point: u64,
2915 },
2916}
2917
2918#[derive(Debug, Clone)]
2919#[non_exhaustive]
2920pub struct CommandRecord {
2924 pub command: Command,
2926 pub timestamp: Option<u64>,
2928}
2929
2930impl CommandRecord {
2931 const FENCE: &[u8] = b"fence";
2932 const TRIM: &[u8] = b"trim";
2933
2934 pub fn fence(fencing_token: FencingToken) -> Self {
2939 Self {
2940 command: Command::Fence { fencing_token },
2941 timestamp: None,
2942 }
2943 }
2944
2945 pub fn trim(trim_point: u64) -> Self {
2952 Self {
2953 command: Command::Trim { trim_point },
2954 timestamp: None,
2955 }
2956 }
2957
2958 pub fn with_timestamp(self, timestamp: u64) -> Self {
2960 Self {
2961 timestamp: Some(timestamp),
2962 ..self
2963 }
2964 }
2965}
2966
2967impl From<CommandRecord> for AppendRecord {
2968 fn from(value: CommandRecord) -> Self {
2969 let (header_value, body) = match value.command {
2970 Command::Fence { fencing_token } => (
2971 CommandRecord::FENCE,
2972 Bytes::copy_from_slice(fencing_token.as_bytes()),
2973 ),
2974 Command::Trim { trim_point } => (
2975 CommandRecord::TRIM,
2976 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
2977 ),
2978 };
2979 Self {
2980 body,
2981 headers: vec![Header::new("", header_value)],
2982 timestamp: value.timestamp,
2983 }
2984 }
2985}
2986
2987#[derive(Debug, Clone)]
2988#[non_exhaustive]
2989pub struct AppendInput {
2992 pub records: AppendRecordBatch,
2994 pub match_seq_num: Option<u64>,
2998 pub fencing_token: Option<FencingToken>,
3003}
3004
3005impl AppendInput {
3006 pub fn new(records: AppendRecordBatch) -> Self {
3008 Self {
3009 records,
3010 match_seq_num: None,
3011 fencing_token: None,
3012 }
3013 }
3014
3015 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3017 Self {
3018 match_seq_num: Some(match_seq_num),
3019 ..self
3020 }
3021 }
3022
3023 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3025 Self {
3026 fencing_token: Some(fencing_token),
3027 ..self
3028 }
3029 }
3030}
3031
3032impl From<AppendInput> for api::stream::proto::AppendInput {
3033 fn from(value: AppendInput) -> Self {
3034 Self {
3035 records: value.records.iter().cloned().map(Into::into).collect(),
3036 match_seq_num: value.match_seq_num,
3037 fencing_token: value.fencing_token.map(|t| t.to_string()),
3038 }
3039 }
3040}
3041
3042#[derive(Debug, Clone, PartialEq)]
3043#[non_exhaustive]
3044pub struct AppendAck {
3046 pub start: StreamPosition,
3048 pub end: StreamPosition,
3054 pub tail: StreamPosition,
3059}
3060
3061impl From<api::stream::proto::AppendAck> for AppendAck {
3062 fn from(value: api::stream::proto::AppendAck) -> Self {
3063 Self {
3064 start: value.start.unwrap_or_default().into(),
3065 end: value.end.unwrap_or_default().into(),
3066 tail: value.tail.unwrap_or_default().into(),
3067 }
3068 }
3069}
3070
3071#[derive(Debug, Clone, Copy)]
3072pub enum ReadFrom {
3074 SeqNum(u64),
3076 Timestamp(u64),
3078 TailOffset(u64),
3080}
3081
3082impl Default for ReadFrom {
3083 fn default() -> Self {
3084 Self::SeqNum(0)
3085 }
3086}
3087
3088#[derive(Debug, Default, Clone)]
3089#[non_exhaustive]
3090pub struct ReadStart {
3092 pub from: ReadFrom,
3096 pub clamp_to_tail: bool,
3100}
3101
3102impl ReadStart {
3103 pub fn new() -> Self {
3105 Self::default()
3106 }
3107
3108 pub fn with_from(self, from: ReadFrom) -> Self {
3110 Self { from, ..self }
3111 }
3112
3113 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3115 Self {
3116 clamp_to_tail,
3117 ..self
3118 }
3119 }
3120}
3121
3122impl From<ReadStart> for api::stream::ReadStart {
3123 fn from(value: ReadStart) -> Self {
3124 let (seq_num, timestamp, tail_offset) = match value.from {
3125 ReadFrom::SeqNum(n) => (Some(n), None, None),
3126 ReadFrom::Timestamp(t) => (None, Some(t), None),
3127 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3128 };
3129 Self {
3130 seq_num,
3131 timestamp,
3132 tail_offset,
3133 clamp: if value.clamp_to_tail {
3134 Some(true)
3135 } else {
3136 None
3137 },
3138 }
3139 }
3140}
3141
3142#[derive(Debug, Clone, Default)]
3143#[non_exhaustive]
3144pub struct ReadLimits {
3146 pub count: Option<usize>,
3150 pub bytes: Option<usize>,
3154}
3155
3156impl ReadLimits {
3157 pub fn new() -> Self {
3159 Self::default()
3160 }
3161
3162 pub fn with_count(self, count: usize) -> Self {
3164 Self {
3165 count: Some(count),
3166 ..self
3167 }
3168 }
3169
3170 pub fn with_bytes(self, bytes: usize) -> Self {
3172 Self {
3173 bytes: Some(bytes),
3174 ..self
3175 }
3176 }
3177}
3178
3179#[derive(Debug, Clone, Default)]
3180#[non_exhaustive]
3181pub struct ReadStop {
3183 pub limits: ReadLimits,
3185 pub until: Option<RangeTo<u64>>,
3187 pub wait: Option<u32>,
3197}
3198
3199impl ReadStop {
3200 pub fn new() -> Self {
3202 Self::default()
3203 }
3204
3205 pub fn with_limits(self, limits: ReadLimits) -> Self {
3207 Self { limits, ..self }
3208 }
3209
3210 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3212 Self {
3213 until: Some(until),
3214 ..self
3215 }
3216 }
3217
3218 pub fn with_wait(self, wait: u32) -> Self {
3220 Self {
3221 wait: Some(wait),
3222 ..self
3223 }
3224 }
3225}
3226
3227impl From<ReadStop> for api::stream::ReadEnd {
3228 fn from(value: ReadStop) -> Self {
3229 Self {
3230 count: value.limits.count,
3231 bytes: value.limits.bytes,
3232 until: value.until.map(|r| r.end),
3233 wait: value.wait,
3234 }
3235 }
3236}
3237
3238#[derive(Debug, Clone, Default)]
3239#[non_exhaustive]
3240pub struct ReadInput {
3243 pub start: ReadStart,
3245 pub stop: ReadStop,
3247 pub ignore_command_records: bool,
3251}
3252
3253impl ReadInput {
3254 pub fn new() -> Self {
3256 Self::default()
3257 }
3258
3259 pub fn with_start(self, start: ReadStart) -> Self {
3261 Self { start, ..self }
3262 }
3263
3264 pub fn with_stop(self, stop: ReadStop) -> Self {
3266 Self { stop, ..self }
3267 }
3268
3269 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3271 Self {
3272 ignore_command_records,
3273 ..self
3274 }
3275 }
3276}
3277
3278#[derive(Debug, Clone)]
3279#[non_exhaustive]
3280pub struct SequencedRecord {
3282 pub seq_num: u64,
3284 pub body: Bytes,
3286 pub headers: Vec<Header>,
3288 pub timestamp: u64,
3290}
3291
3292impl SequencedRecord {
3293 pub fn is_command_record(&self) -> bool {
3295 self.headers.len() == 1 && *self.headers[0].name == *b""
3296 }
3297}
3298
3299impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3300 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3301 Self {
3302 seq_num: value.seq_num,
3303 body: value.body,
3304 headers: value.headers.into_iter().map(Into::into).collect(),
3305 timestamp: value.timestamp,
3306 }
3307 }
3308}
3309
3310metered_bytes_impl!(SequencedRecord);
3311
3312#[derive(Debug, Clone)]
3313#[non_exhaustive]
3314pub struct ReadBatch {
3317 pub records: Vec<SequencedRecord>,
3324 pub tail: Option<StreamPosition>,
3329}
3330
3331impl ReadBatch {
3332 pub(crate) fn from_api(
3333 batch: api::stream::proto::ReadBatch,
3334 ignore_command_records: bool,
3335 ) -> Self {
3336 Self {
3337 records: batch
3338 .records
3339 .into_iter()
3340 .map(Into::into)
3341 .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3342 .collect(),
3343 tail: batch.tail.map(Into::into),
3344 }
3345 }
3346}
3347
3348pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3350
3351#[derive(Debug, Clone, thiserror::Error)]
3352pub enum AppendConditionFailed {
3354 #[error("fencing token mismatch, expected: {0}")]
3355 FencingTokenMismatch(FencingToken),
3357 #[error("sequence number mismatch, expected: {0}")]
3358 SeqNumMismatch(u64),
3360}
3361
3362impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3363 fn from(value: api::stream::AppendConditionFailed) -> Self {
3364 match value {
3365 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3366 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3367 }
3368 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3369 AppendConditionFailed::SeqNumMismatch(seq)
3370 }
3371 }
3372 }
3373}
3374
3375#[derive(Debug, Clone, thiserror::Error)]
3376pub enum S2Error {
3378 #[error("{0}")]
3379 Client(String),
3381 #[error(transparent)]
3382 Validation(#[from] ValidationError),
3384 #[error("{0}")]
3385 AppendConditionFailed(AppendConditionFailed),
3387 #[error("read from an unwritten position. current tail: {0}")]
3388 ReadUnwritten(StreamPosition),
3390 #[error("{0}")]
3391 Server(ErrorResponse),
3393}
3394
3395impl From<ApiError> for S2Error {
3396 fn from(err: ApiError) -> Self {
3397 match err {
3398 ApiError::ReadUnwritten(tail_response) => {
3399 Self::ReadUnwritten(tail_response.tail.into())
3400 }
3401 ApiError::AppendConditionFailed(condition_failed) => {
3402 Self::AppendConditionFailed(condition_failed.into())
3403 }
3404 ApiError::Server(_, response) => Self::Server(response.into()),
3405 other => Self::Client(other.to_string()),
3406 }
3407 }
3408}
3409
3410#[derive(Debug, Clone, thiserror::Error)]
3411#[error("{code}: {message}")]
3412#[non_exhaustive]
3413pub struct ErrorResponse {
3415 pub code: String,
3417 pub message: String,
3419}
3420
3421impl From<ApiErrorResponse> for ErrorResponse {
3422 fn from(response: ApiErrorResponse) -> Self {
3423 Self {
3424 code: response.code,
3425 message: response.message,
3426 }
3427 }
3428}
3429
3430fn idempotency_token() -> String {
3431 uuid::Uuid::new_v4().simple().to_string()
3432}