1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::stream::StreamName;
45pub use s2_common::types::stream::StreamNamePrefix;
47pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl TryFrom<time::OffsetDateTime> for S2DateTime {
66 type Error = ValidationError;
67
68 fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
69 dt.format(&time::format_description::well_known::Rfc3339)
70 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
71 Ok(Self(dt))
72 }
73}
74
75impl From<S2DateTime> for time::OffsetDateTime {
76 fn from(dt: S2DateTime) -> Self {
77 dt.0
78 }
79}
80
81impl FromStr for S2DateTime {
82 type Err = ValidationError;
83
84 fn from_str(s: &str) -> Result<Self, Self::Err> {
85 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
86 .map(Self)
87 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
88 }
89}
90
91impl fmt::Display for S2DateTime {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 write!(
94 f,
95 "{}",
96 self.0
97 .format(&time::format_description::well_known::Rfc3339)
98 .expect("RFC3339 formatting should not fail for S2DateTime")
99 )
100 }
101}
102
103#[derive(Debug, Clone, PartialEq)]
105pub(crate) enum BasinAuthority {
106 ParentZone(Authority),
108 Direct(Authority),
110}
111
112#[derive(Debug, Clone)]
114pub struct AccountEndpoint {
115 scheme: Scheme,
116 authority: Authority,
117}
118
119impl AccountEndpoint {
120 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
122 endpoint.parse()
123 }
124}
125
126impl FromStr for AccountEndpoint {
127 type Err = ValidationError;
128
129 fn from_str(s: &str) -> Result<Self, Self::Err> {
130 let (scheme, authority) = match s.find("://") {
131 Some(idx) => {
132 let scheme: Scheme = s[..idx]
133 .parse()
134 .map_err(|_| "invalid account endpoint scheme".to_string())?;
135 (scheme, &s[idx + 3..])
136 }
137 None => (Scheme::HTTPS, s),
138 };
139 Ok(Self {
140 scheme,
141 authority: authority
142 .parse()
143 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
144 })
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct BasinEndpoint {
151 scheme: Scheme,
152 authority: BasinAuthority,
153}
154
155impl BasinEndpoint {
156 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
158 endpoint.parse()
159 }
160}
161
162impl FromStr for BasinEndpoint {
163 type Err = ValidationError;
164
165 fn from_str(s: &str) -> Result<Self, Self::Err> {
166 let (scheme, authority) = match s.find("://") {
167 Some(idx) => {
168 let scheme: Scheme = s[..idx]
169 .parse()
170 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
171 (scheme, &s[idx + 3..])
172 }
173 None => (Scheme::HTTPS, s),
174 };
175 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
176 BasinAuthority::ParentZone(
177 authority
178 .parse()
179 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
180 )
181 } else {
182 BasinAuthority::Direct(
183 authority
184 .parse()
185 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
186 )
187 };
188 Ok(Self { scheme, authority })
189 }
190}
191
192#[derive(Debug, Clone)]
193#[non_exhaustive]
194pub struct S2Endpoints {
196 pub(crate) scheme: Scheme,
197 pub(crate) account_authority: Authority,
198 pub(crate) basin_authority: BasinAuthority,
199}
200
201impl S2Endpoints {
202 pub fn new(
204 account_endpoint: AccountEndpoint,
205 basin_endpoint: BasinEndpoint,
206 ) -> Result<Self, ValidationError> {
207 if account_endpoint.scheme != basin_endpoint.scheme {
208 return Err("account and basin endpoints must have the same scheme".into());
209 }
210 Ok(Self {
211 scheme: account_endpoint.scheme,
212 account_authority: account_endpoint.authority,
213 basin_authority: basin_endpoint.authority,
214 })
215 }
216
217 pub fn from_env() -> Result<Self, ValidationError> {
223 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
224 Ok(endpoint) => endpoint.parse()?,
225 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
226 Err(VarError::NotUnicode(_)) => {
227 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
228 }
229 };
230
231 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
232 Ok(endpoint) => endpoint.parse()?,
233 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
234 Err(VarError::NotUnicode(_)) => {
235 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
236 }
237 };
238
239 if account_endpoint.scheme != basin_endpoint.scheme {
240 return Err(
241 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
242 );
243 }
244
245 Ok(Self {
246 scheme: account_endpoint.scheme,
247 account_authority: account_endpoint.authority,
248 basin_authority: basin_endpoint.authority,
249 })
250 }
251
252 pub(crate) fn for_aws() -> Self {
253 Self {
254 scheme: Scheme::HTTPS,
255 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
256 basin_authority: BasinAuthority::ParentZone(
257 "b.aws.s2.dev".try_into().expect("valid authority"),
258 ),
259 }
260 }
261}
262
263#[derive(Debug, Clone, Copy)]
264pub enum Compression {
266 None,
268 Gzip,
270 Zstd,
272}
273
274impl From<Compression> for CompressionAlgorithm {
275 fn from(value: Compression) -> Self {
276 match value {
277 Compression::None => CompressionAlgorithm::None,
278 Compression::Gzip => CompressionAlgorithm::Gzip,
279 Compression::Zstd => CompressionAlgorithm::Zstd,
280 }
281 }
282}
283
284#[derive(Debug, Clone, Copy, PartialEq)]
285#[non_exhaustive]
286pub enum AppendRetryPolicy {
289 All,
291 NoSideEffects,
293}
294
295impl AppendRetryPolicy {
296 pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
297 match self {
298 Self::All => true,
299 Self::NoSideEffects => input.match_seq_num.is_some(),
300 }
301 }
302}
303
304#[derive(Debug, Clone)]
305#[non_exhaustive]
306pub struct RetryConfig {
315 pub max_attempts: NonZeroU32,
319 pub min_base_delay: Duration,
323 pub max_base_delay: Duration,
327 pub append_retry_policy: AppendRetryPolicy,
332}
333
334impl Default for RetryConfig {
335 fn default() -> Self {
336 Self {
337 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
338 min_base_delay: Duration::from_millis(100),
339 max_base_delay: Duration::from_secs(1),
340 append_retry_policy: AppendRetryPolicy::All,
341 }
342 }
343}
344
345impl RetryConfig {
346 pub fn new() -> Self {
348 Self::default()
349 }
350
351 pub(crate) fn max_retries(&self) -> u32 {
352 self.max_attempts.get() - 1
353 }
354
355 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
357 Self {
358 max_attempts,
359 ..self
360 }
361 }
362
363 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
365 Self {
366 min_base_delay,
367 ..self
368 }
369 }
370
371 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
373 Self {
374 max_base_delay,
375 ..self
376 }
377 }
378
379 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
382 Self {
383 append_retry_policy,
384 ..self
385 }
386 }
387}
388
389#[derive(Debug, Clone)]
390#[non_exhaustive]
391pub struct S2Config {
393 pub(crate) access_token: SecretString,
394 pub(crate) endpoints: S2Endpoints,
395 pub(crate) connection_timeout: Duration,
396 pub(crate) request_timeout: Duration,
397 pub(crate) retry: RetryConfig,
398 pub(crate) compression: Compression,
399 pub(crate) user_agent: HeaderValue,
400 pub(crate) insecure_skip_cert_verification: bool,
401}
402
403impl S2Config {
404 pub fn new(access_token: impl Into<String>) -> Self {
406 Self {
407 access_token: access_token.into().into(),
408 endpoints: S2Endpoints::for_aws(),
409 connection_timeout: Duration::from_secs(3),
410 request_timeout: Duration::from_secs(5),
411 retry: RetryConfig::new(),
412 compression: Compression::None,
413 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
414 .parse()
415 .expect("valid user agent"),
416 insecure_skip_cert_verification: false,
417 }
418 }
419
420 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
422 Self { endpoints, ..self }
423 }
424
425 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
429 Self {
430 connection_timeout,
431 ..self
432 }
433 }
434
435 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
439 Self {
440 request_timeout,
441 ..self
442 }
443 }
444
445 pub fn with_retry(self, retry: RetryConfig) -> Self {
449 Self { retry, ..self }
450 }
451
452 pub fn with_compression(self, compression: Compression) -> Self {
456 Self {
457 compression,
458 ..self
459 }
460 }
461
462 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
474 Self {
475 insecure_skip_cert_verification: skip,
476 ..self
477 }
478 }
479
480 #[doc(hidden)]
481 #[cfg(feature = "_hidden")]
482 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
483 let user_agent = user_agent
484 .into()
485 .parse()
486 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
487 Ok(Self { user_agent, ..self })
488 }
489}
490
491#[derive(Debug, Default, Clone, PartialEq, Eq)]
492#[non_exhaustive]
493pub struct Page<T> {
495 pub values: Vec<T>,
497 pub has_more: bool,
499}
500
501impl<T> Page<T> {
502 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
503 Self {
504 values: values.into(),
505 has_more,
506 }
507 }
508}
509
510#[derive(Debug, Clone, Copy, PartialEq, Eq)]
511pub enum StorageClass {
513 Standard,
515 Express,
517}
518
519impl From<api::config::StorageClass> for StorageClass {
520 fn from(value: api::config::StorageClass) -> Self {
521 match value {
522 api::config::StorageClass::Standard => StorageClass::Standard,
523 api::config::StorageClass::Express => StorageClass::Express,
524 }
525 }
526}
527
528impl From<StorageClass> for api::config::StorageClass {
529 fn from(value: StorageClass) -> Self {
530 match value {
531 StorageClass::Standard => api::config::StorageClass::Standard,
532 StorageClass::Express => api::config::StorageClass::Express,
533 }
534 }
535}
536
537#[derive(Debug, Clone, Copy, PartialEq, Eq)]
538pub enum RetentionPolicy {
540 Age(u64),
542 Infinite,
544}
545
546impl From<api::config::RetentionPolicy> for RetentionPolicy {
547 fn from(value: api::config::RetentionPolicy) -> Self {
548 match value {
549 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
550 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
551 }
552 }
553}
554
555impl From<RetentionPolicy> for api::config::RetentionPolicy {
556 fn from(value: RetentionPolicy) -> Self {
557 match value {
558 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
559 RetentionPolicy::Infinite => {
560 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
561 }
562 }
563 }
564}
565
566#[derive(Debug, Clone, Copy, PartialEq, Eq)]
567pub enum TimestampingMode {
569 ClientPrefer,
571 ClientRequire,
573 Arrival,
575}
576
577impl From<api::config::TimestampingMode> for TimestampingMode {
578 fn from(value: api::config::TimestampingMode) -> Self {
579 match value {
580 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
581 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
582 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
583 }
584 }
585}
586
587impl From<TimestampingMode> for api::config::TimestampingMode {
588 fn from(value: TimestampingMode) -> Self {
589 match value {
590 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
591 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
592 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
593 }
594 }
595}
596
597#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
598#[non_exhaustive]
599pub struct TimestampingConfig {
601 pub mode: Option<TimestampingMode>,
605 pub uncapped: bool,
609}
610
611impl TimestampingConfig {
612 pub fn new() -> Self {
614 Self::default()
615 }
616
617 pub fn with_mode(self, mode: TimestampingMode) -> Self {
619 Self {
620 mode: Some(mode),
621 ..self
622 }
623 }
624
625 pub fn with_uncapped(self, uncapped: bool) -> Self {
627 Self { uncapped, ..self }
628 }
629}
630
631impl From<api::config::TimestampingConfig> for TimestampingConfig {
632 fn from(value: api::config::TimestampingConfig) -> Self {
633 Self {
634 mode: value.mode.map(Into::into),
635 uncapped: value.uncapped.unwrap_or_default(),
636 }
637 }
638}
639
640impl From<TimestampingConfig> for api::config::TimestampingConfig {
641 fn from(value: TimestampingConfig) -> Self {
642 Self {
643 mode: value.mode.map(Into::into),
644 uncapped: Some(value.uncapped),
645 }
646 }
647}
648
649#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
650#[non_exhaustive]
651pub struct DeleteOnEmptyConfig {
653 pub min_age_secs: u64,
657}
658
659impl DeleteOnEmptyConfig {
660 pub fn new() -> Self {
662 Self::default()
663 }
664
665 pub fn with_min_age(self, min_age: Duration) -> Self {
667 Self {
668 min_age_secs: min_age.as_secs(),
669 }
670 }
671}
672
673impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
674 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
675 Self {
676 min_age_secs: value.min_age_secs,
677 }
678 }
679}
680
681impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
682 fn from(value: DeleteOnEmptyConfig) -> Self {
683 Self {
684 min_age_secs: value.min_age_secs,
685 }
686 }
687}
688
689#[derive(Debug, Clone, Default, PartialEq, Eq)]
690#[non_exhaustive]
691pub struct StreamConfig {
693 pub storage_class: Option<StorageClass>,
697 pub retention_policy: Option<RetentionPolicy>,
701 pub timestamping: Option<TimestampingConfig>,
705 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
709}
710
711impl StreamConfig {
712 pub fn new() -> Self {
714 Self::default()
715 }
716
717 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
719 Self {
720 storage_class: Some(storage_class),
721 ..self
722 }
723 }
724
725 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
727 Self {
728 retention_policy: Some(retention_policy),
729 ..self
730 }
731 }
732
733 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
735 Self {
736 timestamping: Some(timestamping),
737 ..self
738 }
739 }
740
741 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
743 Self {
744 delete_on_empty: Some(delete_on_empty),
745 ..self
746 }
747 }
748}
749
750impl From<api::config::StreamConfig> for StreamConfig {
751 fn from(value: api::config::StreamConfig) -> Self {
752 Self {
753 storage_class: value.storage_class.map(Into::into),
754 retention_policy: value.retention_policy.map(Into::into),
755 timestamping: value.timestamping.map(Into::into),
756 delete_on_empty: value.delete_on_empty.map(Into::into),
757 }
758 }
759}
760
761impl From<StreamConfig> for api::config::StreamConfig {
762 fn from(value: StreamConfig) -> Self {
763 Self {
764 storage_class: value.storage_class.map(Into::into),
765 retention_policy: value.retention_policy.map(Into::into),
766 timestamping: value.timestamping.map(Into::into),
767 delete_on_empty: value.delete_on_empty.map(Into::into),
768 }
769 }
770}
771
772#[derive(Debug, Clone, Default)]
773#[non_exhaustive]
774pub struct BasinConfig {
776 pub default_stream_config: Option<StreamConfig>,
780 pub create_stream_on_append: bool,
784 pub create_stream_on_read: bool,
788}
789
790impl BasinConfig {
791 pub fn new() -> Self {
793 Self::default()
794 }
795
796 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
798 Self {
799 default_stream_config: Some(config),
800 ..self
801 }
802 }
803
804 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
807 Self {
808 create_stream_on_append,
809 ..self
810 }
811 }
812
813 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
815 Self {
816 create_stream_on_read,
817 ..self
818 }
819 }
820}
821
822impl From<api::config::BasinConfig> for BasinConfig {
823 fn from(value: api::config::BasinConfig) -> Self {
824 Self {
825 default_stream_config: value.default_stream_config.map(Into::into),
826 create_stream_on_append: value.create_stream_on_append,
827 create_stream_on_read: value.create_stream_on_read,
828 }
829 }
830}
831
832impl From<BasinConfig> for api::config::BasinConfig {
833 fn from(value: BasinConfig) -> Self {
834 Self {
835 default_stream_config: value.default_stream_config.map(Into::into),
836 create_stream_on_append: value.create_stream_on_append,
837 create_stream_on_read: value.create_stream_on_read,
838 }
839 }
840}
841
842#[derive(Debug, Clone, PartialEq, Eq)]
843pub enum BasinScope {
845 AwsUsEast1,
847}
848
849impl From<api::basin::BasinScope> for BasinScope {
850 fn from(value: api::basin::BasinScope) -> Self {
851 match value {
852 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
853 }
854 }
855}
856
857impl From<BasinScope> for api::basin::BasinScope {
858 fn from(value: BasinScope) -> Self {
859 match value {
860 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
861 }
862 }
863}
864
865#[derive(Debug, Clone)]
866#[non_exhaustive]
867pub struct CreateBasinInput {
869 pub name: BasinName,
871 pub config: Option<BasinConfig>,
875 pub scope: Option<BasinScope>,
879 idempotency_token: String,
880}
881
882impl CreateBasinInput {
883 pub fn new(name: BasinName) -> Self {
885 Self {
886 name,
887 config: None,
888 scope: None,
889 idempotency_token: idempotency_token(),
890 }
891 }
892
893 pub fn with_config(self, config: BasinConfig) -> Self {
895 Self {
896 config: Some(config),
897 ..self
898 }
899 }
900
901 pub fn with_scope(self, scope: BasinScope) -> Self {
903 Self {
904 scope: Some(scope),
905 ..self
906 }
907 }
908}
909
910impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
911 fn from(value: CreateBasinInput) -> Self {
912 (
913 api::basin::CreateBasinRequest {
914 basin: value.name,
915 config: value.config.map(Into::into),
916 scope: value.scope.map(Into::into),
917 },
918 value.idempotency_token,
919 )
920 }
921}
922
923#[derive(Debug, Clone, Default)]
924#[non_exhaustive]
925pub struct ListBasinsInput {
927 pub prefix: BasinNamePrefix,
931 pub start_after: BasinNameStartAfter,
937 pub limit: Option<usize>,
941}
942
943impl ListBasinsInput {
944 pub fn new() -> Self {
946 Self::default()
947 }
948
949 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
951 Self { prefix, ..self }
952 }
953
954 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
957 Self {
958 start_after,
959 ..self
960 }
961 }
962
963 pub fn with_limit(self, limit: usize) -> Self {
965 Self {
966 limit: Some(limit),
967 ..self
968 }
969 }
970}
971
972impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
973 fn from(value: ListBasinsInput) -> Self {
974 Self {
975 prefix: Some(value.prefix),
976 start_after: Some(value.start_after),
977 limit: value.limit,
978 }
979 }
980}
981
982#[derive(Debug, Clone, Default)]
983pub struct ListAllBasinsInput {
985 pub prefix: BasinNamePrefix,
989 pub start_after: BasinNameStartAfter,
995 pub include_deleted: bool,
999}
1000
1001impl ListAllBasinsInput {
1002 pub fn new() -> Self {
1004 Self::default()
1005 }
1006
1007 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1009 Self { prefix, ..self }
1010 }
1011
1012 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1015 Self {
1016 start_after,
1017 ..self
1018 }
1019 }
1020
1021 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1023 Self {
1024 include_deleted,
1025 ..self
1026 }
1027 }
1028}
1029
1030#[derive(Debug, Clone, PartialEq, Eq)]
1031pub enum BasinState {
1033 Active,
1035 Creating,
1037 Deleting,
1039}
1040
1041impl From<api::basin::BasinState> for BasinState {
1042 fn from(value: api::basin::BasinState) -> Self {
1043 match value {
1044 api::basin::BasinState::Active => BasinState::Active,
1045 api::basin::BasinState::Creating => BasinState::Creating,
1046 api::basin::BasinState::Deleting => BasinState::Deleting,
1047 }
1048 }
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq)]
1052#[non_exhaustive]
1053pub struct BasinInfo {
1055 pub name: BasinName,
1057 pub scope: Option<BasinScope>,
1059 pub state: BasinState,
1061}
1062
1063impl From<api::basin::BasinInfo> for BasinInfo {
1064 fn from(value: api::basin::BasinInfo) -> Self {
1065 Self {
1066 name: value.name,
1067 scope: value.scope.map(Into::into),
1068 state: value.state.into(),
1069 }
1070 }
1071}
1072
1073#[derive(Debug, Clone)]
1074#[non_exhaustive]
1075pub struct DeleteBasinInput {
1077 pub name: BasinName,
1079 pub ignore_not_found: bool,
1081}
1082
1083impl DeleteBasinInput {
1084 pub fn new(name: BasinName) -> Self {
1086 Self {
1087 name,
1088 ignore_not_found: false,
1089 }
1090 }
1091
1092 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1094 Self {
1095 ignore_not_found,
1096 ..self
1097 }
1098 }
1099}
1100
1101#[derive(Debug, Clone, Default)]
1102#[non_exhaustive]
1103pub struct TimestampingReconfiguration {
1105 pub mode: Maybe<Option<TimestampingMode>>,
1107 pub uncapped: Maybe<Option<bool>>,
1109}
1110
1111impl TimestampingReconfiguration {
1112 pub fn new() -> Self {
1114 Self::default()
1115 }
1116
1117 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1119 Self {
1120 mode: Maybe::Specified(Some(mode)),
1121 ..self
1122 }
1123 }
1124
1125 pub fn with_uncapped(self, uncapped: bool) -> Self {
1127 Self {
1128 uncapped: Maybe::Specified(Some(uncapped)),
1129 ..self
1130 }
1131 }
1132}
1133
1134impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1135 fn from(value: TimestampingReconfiguration) -> Self {
1136 Self {
1137 mode: value.mode.map(|m| m.map(Into::into)),
1138 uncapped: value.uncapped,
1139 }
1140 }
1141}
1142
1143#[derive(Debug, Clone, Default)]
1144#[non_exhaustive]
1145pub struct DeleteOnEmptyReconfiguration {
1147 pub min_age_secs: Maybe<Option<u64>>,
1149}
1150
1151impl DeleteOnEmptyReconfiguration {
1152 pub fn new() -> Self {
1154 Self::default()
1155 }
1156
1157 pub fn with_min_age(self, min_age: Duration) -> Self {
1159 Self {
1160 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1161 }
1162 }
1163}
1164
1165impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1166 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1167 Self {
1168 min_age_secs: value.min_age_secs,
1169 }
1170 }
1171}
1172
1173#[derive(Debug, Clone, Default)]
1174#[non_exhaustive]
1175pub struct StreamReconfiguration {
1177 pub storage_class: Maybe<Option<StorageClass>>,
1179 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1181 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1183 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1185}
1186
1187impl StreamReconfiguration {
1188 pub fn new() -> Self {
1190 Self::default()
1191 }
1192
1193 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1195 Self {
1196 storage_class: Maybe::Specified(Some(storage_class)),
1197 ..self
1198 }
1199 }
1200
1201 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1203 Self {
1204 retention_policy: Maybe::Specified(Some(retention_policy)),
1205 ..self
1206 }
1207 }
1208
1209 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1211 Self {
1212 timestamping: Maybe::Specified(Some(timestamping)),
1213 ..self
1214 }
1215 }
1216
1217 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1219 Self {
1220 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1221 ..self
1222 }
1223 }
1224}
1225
1226impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1227 fn from(value: StreamReconfiguration) -> Self {
1228 Self {
1229 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1230 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1231 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1232 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1233 }
1234 }
1235}
1236
1237#[derive(Debug, Clone, Default)]
1238#[non_exhaustive]
1239pub struct BasinReconfiguration {
1241 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1243 pub create_stream_on_append: Maybe<bool>,
1246 pub create_stream_on_read: Maybe<bool>,
1248}
1249
1250impl BasinReconfiguration {
1251 pub fn new() -> Self {
1253 Self::default()
1254 }
1255
1256 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1259 Self {
1260 default_stream_config: Maybe::Specified(Some(config)),
1261 ..self
1262 }
1263 }
1264
1265 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1268 Self {
1269 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1270 ..self
1271 }
1272 }
1273
1274 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1277 Self {
1278 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1279 ..self
1280 }
1281 }
1282}
1283
1284impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1285 fn from(value: BasinReconfiguration) -> Self {
1286 Self {
1287 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1288 create_stream_on_append: value.create_stream_on_append,
1289 create_stream_on_read: value.create_stream_on_read,
1290 }
1291 }
1292}
1293
1294#[derive(Debug, Clone)]
1295#[non_exhaustive]
1296pub struct ReconfigureBasinInput {
1298 pub name: BasinName,
1300 pub config: BasinReconfiguration,
1302}
1303
1304impl ReconfigureBasinInput {
1305 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1307 Self { name, config }
1308 }
1309}
1310
1311#[derive(Debug, Clone, Default)]
1312#[non_exhaustive]
1313pub struct ListAccessTokensInput {
1315 pub prefix: AccessTokenIdPrefix,
1319 pub start_after: AccessTokenIdStartAfter,
1325 pub limit: Option<usize>,
1329}
1330
1331impl ListAccessTokensInput {
1332 pub fn new() -> Self {
1334 Self::default()
1335 }
1336
1337 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1339 Self { prefix, ..self }
1340 }
1341
1342 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1345 Self {
1346 start_after,
1347 ..self
1348 }
1349 }
1350
1351 pub fn with_limit(self, limit: usize) -> Self {
1353 Self {
1354 limit: Some(limit),
1355 ..self
1356 }
1357 }
1358}
1359
1360impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1361 fn from(value: ListAccessTokensInput) -> Self {
1362 Self {
1363 prefix: Some(value.prefix),
1364 start_after: Some(value.start_after),
1365 limit: value.limit,
1366 }
1367 }
1368}
1369
1370#[derive(Debug, Clone, Default)]
1371pub struct ListAllAccessTokensInput {
1373 pub prefix: AccessTokenIdPrefix,
1377 pub start_after: AccessTokenIdStartAfter,
1383}
1384
1385impl ListAllAccessTokensInput {
1386 pub fn new() -> Self {
1388 Self::default()
1389 }
1390
1391 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1393 Self { prefix, ..self }
1394 }
1395
1396 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1399 Self {
1400 start_after,
1401 ..self
1402 }
1403 }
1404}
1405
1406#[derive(Debug, Clone)]
1407#[non_exhaustive]
1408pub struct AccessTokenInfo {
1410 pub id: AccessTokenId,
1412 pub expires_at: S2DateTime,
1414 pub auto_prefix_streams: bool,
1417 pub scope: AccessTokenScope,
1419}
1420
1421impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1422 type Error = ValidationError;
1423
1424 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1425 let expires_at = value
1426 .expires_at
1427 .map(S2DateTime::try_from)
1428 .transpose()?
1429 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1430 Ok(Self {
1431 id: value.id,
1432 expires_at,
1433 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1434 scope: value.scope.into(),
1435 })
1436 }
1437}
1438
1439#[derive(Debug, Clone)]
1440pub enum BasinMatcher {
1444 None,
1446 Exact(BasinName),
1448 Prefix(BasinNamePrefix),
1450}
1451
1452#[derive(Debug, Clone)]
1453pub enum StreamMatcher {
1457 None,
1459 Exact(StreamName),
1461 Prefix(StreamNamePrefix),
1463}
1464
1465#[derive(Debug, Clone)]
1466pub enum AccessTokenMatcher {
1470 None,
1472 Exact(AccessTokenId),
1474 Prefix(AccessTokenIdPrefix),
1476}
1477
1478#[derive(Debug, Clone, Default)]
1479#[non_exhaustive]
1480pub struct ReadWritePermissions {
1482 pub read: bool,
1486 pub write: bool,
1490}
1491
1492impl ReadWritePermissions {
1493 pub fn new() -> Self {
1495 Self::default()
1496 }
1497
1498 pub fn read_only() -> Self {
1500 Self {
1501 read: true,
1502 write: false,
1503 }
1504 }
1505
1506 pub fn write_only() -> Self {
1508 Self {
1509 read: false,
1510 write: true,
1511 }
1512 }
1513
1514 pub fn read_write() -> Self {
1516 Self {
1517 read: true,
1518 write: true,
1519 }
1520 }
1521}
1522
1523impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1524 fn from(value: ReadWritePermissions) -> Self {
1525 Self {
1526 read: Some(value.read),
1527 write: Some(value.write),
1528 }
1529 }
1530}
1531
1532impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1533 fn from(value: api::access::ReadWritePermissions) -> Self {
1534 Self {
1535 read: value.read.unwrap_or_default(),
1536 write: value.write.unwrap_or_default(),
1537 }
1538 }
1539}
1540
1541#[derive(Debug, Clone, Default)]
1542#[non_exhaustive]
1543pub struct OperationGroupPermissions {
1547 pub account: Option<ReadWritePermissions>,
1551 pub basin: Option<ReadWritePermissions>,
1555 pub stream: Option<ReadWritePermissions>,
1559}
1560
1561impl OperationGroupPermissions {
1562 pub fn new() -> Self {
1564 Self::default()
1565 }
1566
1567 pub fn read_only_all() -> Self {
1569 Self {
1570 account: Some(ReadWritePermissions::read_only()),
1571 basin: Some(ReadWritePermissions::read_only()),
1572 stream: Some(ReadWritePermissions::read_only()),
1573 }
1574 }
1575
1576 pub fn write_only_all() -> Self {
1578 Self {
1579 account: Some(ReadWritePermissions::write_only()),
1580 basin: Some(ReadWritePermissions::write_only()),
1581 stream: Some(ReadWritePermissions::write_only()),
1582 }
1583 }
1584
1585 pub fn read_write_all() -> Self {
1587 Self {
1588 account: Some(ReadWritePermissions::read_write()),
1589 basin: Some(ReadWritePermissions::read_write()),
1590 stream: Some(ReadWritePermissions::read_write()),
1591 }
1592 }
1593
1594 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1596 Self {
1597 account: Some(account),
1598 ..self
1599 }
1600 }
1601
1602 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1604 Self {
1605 basin: Some(basin),
1606 ..self
1607 }
1608 }
1609
1610 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1612 Self {
1613 stream: Some(stream),
1614 ..self
1615 }
1616 }
1617}
1618
1619impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1620 fn from(value: OperationGroupPermissions) -> Self {
1621 Self {
1622 account: value.account.map(Into::into),
1623 basin: value.basin.map(Into::into),
1624 stream: value.stream.map(Into::into),
1625 }
1626 }
1627}
1628
1629impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1630 fn from(value: api::access::PermittedOperationGroups) -> Self {
1631 Self {
1632 account: value.account.map(Into::into),
1633 basin: value.basin.map(Into::into),
1634 stream: value.stream.map(Into::into),
1635 }
1636 }
1637}
1638
1639#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1640pub enum Operation {
1644 ListBasins,
1646 CreateBasin,
1648 GetBasinConfig,
1650 DeleteBasin,
1652 ReconfigureBasin,
1654 ListAccessTokens,
1656 IssueAccessToken,
1658 RevokeAccessToken,
1660 GetAccountMetrics,
1662 GetBasinMetrics,
1664 GetStreamMetrics,
1666 ListStreams,
1668 CreateStream,
1670 GetStreamConfig,
1672 DeleteStream,
1674 ReconfigureStream,
1676 CheckTail,
1678 Append,
1680 Read,
1682 Trim,
1684 Fence,
1686}
1687
1688impl From<Operation> for api::access::Operation {
1689 fn from(value: Operation) -> Self {
1690 match value {
1691 Operation::ListBasins => api::access::Operation::ListBasins,
1692 Operation::CreateBasin => api::access::Operation::CreateBasin,
1693 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1694 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1695 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1696 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1697 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1698 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1699 Operation::ListStreams => api::access::Operation::ListStreams,
1700 Operation::CreateStream => api::access::Operation::CreateStream,
1701 Operation::DeleteStream => api::access::Operation::DeleteStream,
1702 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1703 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1704 Operation::CheckTail => api::access::Operation::CheckTail,
1705 Operation::Append => api::access::Operation::Append,
1706 Operation::Read => api::access::Operation::Read,
1707 Operation::Trim => api::access::Operation::Trim,
1708 Operation::Fence => api::access::Operation::Fence,
1709 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1710 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1711 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1712 }
1713 }
1714}
1715
1716impl From<api::access::Operation> for Operation {
1717 fn from(value: api::access::Operation) -> Self {
1718 match value {
1719 api::access::Operation::ListBasins => Operation::ListBasins,
1720 api::access::Operation::CreateBasin => Operation::CreateBasin,
1721 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1722 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1723 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1724 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1725 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1726 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1727 api::access::Operation::ListStreams => Operation::ListStreams,
1728 api::access::Operation::CreateStream => Operation::CreateStream,
1729 api::access::Operation::DeleteStream => Operation::DeleteStream,
1730 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1731 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1732 api::access::Operation::CheckTail => Operation::CheckTail,
1733 api::access::Operation::Append => Operation::Append,
1734 api::access::Operation::Read => Operation::Read,
1735 api::access::Operation::Trim => Operation::Trim,
1736 api::access::Operation::Fence => Operation::Fence,
1737 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1738 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1739 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1740 }
1741 }
1742}
1743
1744#[derive(Debug, Clone)]
1745#[non_exhaustive]
1746pub struct AccessTokenScopeInput {
1754 basins: Option<BasinMatcher>,
1755 streams: Option<StreamMatcher>,
1756 access_tokens: Option<AccessTokenMatcher>,
1757 op_group_perms: Option<OperationGroupPermissions>,
1758 ops: HashSet<Operation>,
1759}
1760
1761impl AccessTokenScopeInput {
1762 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1764 Self {
1765 basins: None,
1766 streams: None,
1767 access_tokens: None,
1768 op_group_perms: None,
1769 ops: ops.into_iter().collect(),
1770 }
1771 }
1772
1773 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1775 Self {
1776 basins: None,
1777 streams: None,
1778 access_tokens: None,
1779 op_group_perms: Some(op_group_perms),
1780 ops: HashSet::default(),
1781 }
1782 }
1783
1784 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1786 Self {
1787 ops: ops.into_iter().collect(),
1788 ..self
1789 }
1790 }
1791
1792 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1794 Self {
1795 op_group_perms: Some(op_group_perms),
1796 ..self
1797 }
1798 }
1799
1800 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1804 Self {
1805 basins: Some(basins),
1806 ..self
1807 }
1808 }
1809
1810 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1814 Self {
1815 streams: Some(streams),
1816 ..self
1817 }
1818 }
1819
1820 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1824 Self {
1825 access_tokens: Some(access_tokens),
1826 ..self
1827 }
1828 }
1829}
1830
1831#[derive(Debug, Clone)]
1832#[non_exhaustive]
1833pub struct AccessTokenScope {
1835 pub basins: Option<BasinMatcher>,
1837 pub streams: Option<StreamMatcher>,
1839 pub access_tokens: Option<AccessTokenMatcher>,
1841 pub op_group_perms: Option<OperationGroupPermissions>,
1843 pub ops: HashSet<Operation>,
1845}
1846
1847impl From<api::access::AccessTokenScope> for AccessTokenScope {
1848 fn from(value: api::access::AccessTokenScope) -> Self {
1849 Self {
1850 basins: value.basins.map(|rs| match rs {
1851 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1852 BasinMatcher::Exact(e)
1853 }
1854 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1855 BasinMatcher::None
1856 }
1857 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1858 }),
1859 streams: value.streams.map(|rs| match rs {
1860 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1861 StreamMatcher::Exact(e)
1862 }
1863 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1864 StreamMatcher::None
1865 }
1866 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1867 }),
1868 access_tokens: value.access_tokens.map(|rs| match rs {
1869 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1870 AccessTokenMatcher::Exact(e)
1871 }
1872 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1873 AccessTokenMatcher::None
1874 }
1875 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1876 }),
1877 op_group_perms: value.op_groups.map(Into::into),
1878 ops: value
1879 .ops
1880 .map(|ops| ops.into_iter().map(Into::into).collect())
1881 .unwrap_or_default(),
1882 }
1883 }
1884}
1885
1886impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1887 fn from(value: AccessTokenScopeInput) -> Self {
1888 Self {
1889 basins: value.basins.map(|rs| match rs {
1890 BasinMatcher::None => {
1891 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1892 }
1893 BasinMatcher::Exact(e) => {
1894 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1895 }
1896 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1897 }),
1898 streams: value.streams.map(|rs| match rs {
1899 StreamMatcher::None => {
1900 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1901 }
1902 StreamMatcher::Exact(e) => {
1903 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1904 }
1905 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1906 }),
1907 access_tokens: value.access_tokens.map(|rs| match rs {
1908 AccessTokenMatcher::None => {
1909 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1910 }
1911 AccessTokenMatcher::Exact(e) => {
1912 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1913 }
1914 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1915 }),
1916 op_groups: value.op_group_perms.map(Into::into),
1917 ops: if value.ops.is_empty() {
1918 None
1919 } else {
1920 Some(value.ops.into_iter().map(Into::into).collect())
1921 },
1922 }
1923 }
1924}
1925
1926#[derive(Debug, Clone)]
1927#[non_exhaustive]
1928pub struct IssueAccessTokenInput {
1930 pub id: AccessTokenId,
1932 pub expires_at: Option<S2DateTime>,
1937 pub auto_prefix_streams: bool,
1945 pub scope: AccessTokenScopeInput,
1947}
1948
1949impl IssueAccessTokenInput {
1950 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1952 Self {
1953 id,
1954 expires_at: None,
1955 auto_prefix_streams: false,
1956 scope,
1957 }
1958 }
1959
1960 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1962 Self {
1963 expires_at: Some(expires_at),
1964 ..self
1965 }
1966 }
1967
1968 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1971 Self {
1972 auto_prefix_streams,
1973 ..self
1974 }
1975 }
1976}
1977
1978impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1979 fn from(value: IssueAccessTokenInput) -> Self {
1980 Self {
1981 id: value.id,
1982 expires_at: value.expires_at.map(Into::into),
1983 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1984 scope: value.scope.into(),
1985 }
1986 }
1987}
1988
1989#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1990pub enum TimeseriesInterval {
1992 Minute,
1994 Hour,
1996 Day,
1998}
1999
2000impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2001 fn from(value: TimeseriesInterval) -> Self {
2002 match value {
2003 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2004 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2005 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2006 }
2007 }
2008}
2009
2010impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2011 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2012 match value {
2013 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2014 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2015 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2016 }
2017 }
2018}
2019
2020#[derive(Debug, Clone, Copy)]
2021#[non_exhaustive]
2022pub struct TimeRange {
2024 pub start: u32,
2026 pub end: u32,
2028}
2029
2030impl TimeRange {
2031 pub fn new(start: u32, end: u32) -> Self {
2033 Self { start, end }
2034 }
2035}
2036
2037#[derive(Debug, Clone, Copy)]
2038#[non_exhaustive]
2039pub struct TimeRangeAndInterval {
2041 pub start: u32,
2043 pub end: u32,
2045 pub interval: Option<TimeseriesInterval>,
2049}
2050
2051impl TimeRangeAndInterval {
2052 pub fn new(start: u32, end: u32) -> Self {
2054 Self {
2055 start,
2056 end,
2057 interval: None,
2058 }
2059 }
2060
2061 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2063 Self {
2064 interval: Some(interval),
2065 ..self
2066 }
2067 }
2068}
2069
2070#[derive(Debug, Clone, Copy)]
2071pub enum AccountMetricSet {
2073 ActiveBasins(TimeRange),
2076 AccountOps(TimeRangeAndInterval),
2083}
2084
2085#[derive(Debug, Clone)]
2086#[non_exhaustive]
2087pub struct GetAccountMetricsInput {
2089 pub set: AccountMetricSet,
2091}
2092
2093impl GetAccountMetricsInput {
2094 pub fn new(set: AccountMetricSet) -> Self {
2096 Self { set }
2097 }
2098}
2099
2100impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2101 fn from(value: GetAccountMetricsInput) -> Self {
2102 let (set, start, end, interval) = match value.set {
2103 AccountMetricSet::ActiveBasins(args) => (
2104 api::metrics::AccountMetricSet::ActiveBasins,
2105 args.start,
2106 args.end,
2107 None,
2108 ),
2109 AccountMetricSet::AccountOps(args) => (
2110 api::metrics::AccountMetricSet::AccountOps,
2111 args.start,
2112 args.end,
2113 args.interval,
2114 ),
2115 };
2116 Self {
2117 set,
2118 start: Some(start),
2119 end: Some(end),
2120 interval: interval.map(Into::into),
2121 }
2122 }
2123}
2124
2125#[derive(Debug, Clone, Copy)]
2126pub enum BasinMetricSet {
2128 Storage(TimeRange),
2131 AppendOps(TimeRangeAndInterval),
2139 ReadOps(TimeRangeAndInterval),
2147 ReadThroughput(TimeRangeAndInterval),
2154 AppendThroughput(TimeRangeAndInterval),
2161 BasinOps(TimeRangeAndInterval),
2168}
2169
2170#[derive(Debug, Clone)]
2171#[non_exhaustive]
2172pub struct GetBasinMetricsInput {
2174 pub name: BasinName,
2176 pub set: BasinMetricSet,
2178}
2179
2180impl GetBasinMetricsInput {
2181 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2183 Self { name, set }
2184 }
2185}
2186
2187impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2188 fn from(value: GetBasinMetricsInput) -> Self {
2189 let (set, start, end, interval) = match value.set {
2190 BasinMetricSet::Storage(args) => (
2191 api::metrics::BasinMetricSet::Storage,
2192 args.start,
2193 args.end,
2194 None,
2195 ),
2196 BasinMetricSet::AppendOps(args) => (
2197 api::metrics::BasinMetricSet::AppendOps,
2198 args.start,
2199 args.end,
2200 args.interval,
2201 ),
2202 BasinMetricSet::ReadOps(args) => (
2203 api::metrics::BasinMetricSet::ReadOps,
2204 args.start,
2205 args.end,
2206 args.interval,
2207 ),
2208 BasinMetricSet::ReadThroughput(args) => (
2209 api::metrics::BasinMetricSet::ReadThroughput,
2210 args.start,
2211 args.end,
2212 args.interval,
2213 ),
2214 BasinMetricSet::AppendThroughput(args) => (
2215 api::metrics::BasinMetricSet::AppendThroughput,
2216 args.start,
2217 args.end,
2218 args.interval,
2219 ),
2220 BasinMetricSet::BasinOps(args) => (
2221 api::metrics::BasinMetricSet::BasinOps,
2222 args.start,
2223 args.end,
2224 args.interval,
2225 ),
2226 };
2227 (
2228 value.name,
2229 api::metrics::BasinMetricSetRequest {
2230 set,
2231 start: Some(start),
2232 end: Some(end),
2233 interval: interval.map(Into::into),
2234 },
2235 )
2236 }
2237}
2238
2239#[derive(Debug, Clone, Copy)]
2240pub enum StreamMetricSet {
2242 Storage(TimeRange),
2245}
2246
2247#[derive(Debug, Clone)]
2248#[non_exhaustive]
2249pub struct GetStreamMetricsInput {
2251 pub basin_name: BasinName,
2253 pub stream_name: StreamName,
2255 pub set: StreamMetricSet,
2257}
2258
2259impl GetStreamMetricsInput {
2260 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2263 Self {
2264 basin_name,
2265 stream_name,
2266 set,
2267 }
2268 }
2269}
2270
2271impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2272 fn from(value: GetStreamMetricsInput) -> Self {
2273 let (set, start, end, interval) = match value.set {
2274 StreamMetricSet::Storage(args) => (
2275 api::metrics::StreamMetricSet::Storage,
2276 args.start,
2277 args.end,
2278 None,
2279 ),
2280 };
2281 (
2282 value.basin_name,
2283 value.stream_name,
2284 api::metrics::StreamMetricSetRequest {
2285 set,
2286 start: Some(start),
2287 end: Some(end),
2288 interval,
2289 },
2290 )
2291 }
2292}
2293
2294#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2295pub enum MetricUnit {
2297 Bytes,
2299 Operations,
2301}
2302
2303impl From<api::metrics::MetricUnit> for MetricUnit {
2304 fn from(value: api::metrics::MetricUnit) -> Self {
2305 match value {
2306 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2307 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2308 }
2309 }
2310}
2311
2312#[derive(Debug, Clone)]
2313#[non_exhaustive]
2314pub struct ScalarMetric {
2316 pub name: String,
2318 pub unit: MetricUnit,
2320 pub value: f64,
2322}
2323
2324#[derive(Debug, Clone)]
2325#[non_exhaustive]
2326pub struct AccumulationMetric {
2329 pub name: String,
2331 pub unit: MetricUnit,
2333 pub interval: TimeseriesInterval,
2335 pub values: Vec<(u32, f64)>,
2339}
2340
2341#[derive(Debug, Clone)]
2342#[non_exhaustive]
2343pub struct GaugeMetric {
2345 pub name: String,
2347 pub unit: MetricUnit,
2349 pub values: Vec<(u32, f64)>,
2352}
2353
2354#[derive(Debug, Clone)]
2355#[non_exhaustive]
2356pub struct LabelMetric {
2358 pub name: String,
2360 pub values: Vec<String>,
2362}
2363
2364#[derive(Debug, Clone)]
2365pub enum Metric {
2367 Scalar(ScalarMetric),
2369 Accumulation(AccumulationMetric),
2372 Gauge(GaugeMetric),
2374 Label(LabelMetric),
2376}
2377
2378impl From<api::metrics::Metric> for Metric {
2379 fn from(value: api::metrics::Metric) -> Self {
2380 match value {
2381 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2382 name: sm.name.into(),
2383 unit: sm.unit.into(),
2384 value: sm.value,
2385 }),
2386 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2387 name: am.name.into(),
2388 unit: am.unit.into(),
2389 interval: am.interval.into(),
2390 values: am.values,
2391 }),
2392 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2393 name: gm.name.into(),
2394 unit: gm.unit.into(),
2395 values: gm.values,
2396 }),
2397 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2398 name: lm.name.into(),
2399 values: lm.values,
2400 }),
2401 }
2402 }
2403}
2404
2405#[derive(Debug, Clone, Default)]
2406#[non_exhaustive]
2407pub struct ListStreamsInput {
2409 pub prefix: StreamNamePrefix,
2413 pub start_after: StreamNameStartAfter,
2419 pub limit: Option<usize>,
2423}
2424
2425impl ListStreamsInput {
2426 pub fn new() -> Self {
2428 Self::default()
2429 }
2430
2431 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2433 Self { prefix, ..self }
2434 }
2435
2436 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2439 Self {
2440 start_after,
2441 ..self
2442 }
2443 }
2444
2445 pub fn with_limit(self, limit: usize) -> Self {
2447 Self {
2448 limit: Some(limit),
2449 ..self
2450 }
2451 }
2452}
2453
2454impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2455 fn from(value: ListStreamsInput) -> Self {
2456 Self {
2457 prefix: Some(value.prefix),
2458 start_after: Some(value.start_after),
2459 limit: value.limit,
2460 }
2461 }
2462}
2463
2464#[derive(Debug, Clone, Default)]
2465pub struct ListAllStreamsInput {
2467 pub prefix: StreamNamePrefix,
2471 pub start_after: StreamNameStartAfter,
2477 pub include_deleted: bool,
2481}
2482
2483impl ListAllStreamsInput {
2484 pub fn new() -> Self {
2486 Self::default()
2487 }
2488
2489 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2491 Self { prefix, ..self }
2492 }
2493
2494 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2497 Self {
2498 start_after,
2499 ..self
2500 }
2501 }
2502
2503 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2505 Self {
2506 include_deleted,
2507 ..self
2508 }
2509 }
2510}
2511
2512#[derive(Debug, Clone, PartialEq)]
2513#[non_exhaustive]
2514pub struct StreamInfo {
2516 pub name: StreamName,
2518 pub created_at: S2DateTime,
2520 pub deleted_at: Option<S2DateTime>,
2522}
2523
2524impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2525 type Error = ValidationError;
2526
2527 fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2528 Ok(Self {
2529 name: value.name,
2530 created_at: value.created_at.try_into()?,
2531 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2532 })
2533 }
2534}
2535
2536#[derive(Debug, Clone)]
2537#[non_exhaustive]
2538pub struct CreateStreamInput {
2540 pub name: StreamName,
2542 pub config: Option<StreamConfig>,
2546 idempotency_token: String,
2547}
2548
2549impl CreateStreamInput {
2550 pub fn new(name: StreamName) -> Self {
2552 Self {
2553 name,
2554 config: None,
2555 idempotency_token: idempotency_token(),
2556 }
2557 }
2558
2559 pub fn with_config(self, config: StreamConfig) -> Self {
2561 Self {
2562 config: Some(config),
2563 ..self
2564 }
2565 }
2566}
2567
2568impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2569 fn from(value: CreateStreamInput) -> Self {
2570 (
2571 api::stream::CreateStreamRequest {
2572 stream: value.name,
2573 config: value.config.map(Into::into),
2574 },
2575 value.idempotency_token,
2576 )
2577 }
2578}
2579
2580#[derive(Debug, Clone)]
2581#[non_exhaustive]
2582pub struct DeleteStreamInput {
2584 pub name: StreamName,
2586 pub ignore_not_found: bool,
2588}
2589
2590impl DeleteStreamInput {
2591 pub fn new(name: StreamName) -> Self {
2593 Self {
2594 name,
2595 ignore_not_found: false,
2596 }
2597 }
2598
2599 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2601 Self {
2602 ignore_not_found,
2603 ..self
2604 }
2605 }
2606}
2607
2608#[derive(Debug, Clone)]
2609#[non_exhaustive]
2610pub struct ReconfigureStreamInput {
2612 pub name: StreamName,
2614 pub config: StreamReconfiguration,
2616}
2617
2618impl ReconfigureStreamInput {
2619 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2621 Self { name, config }
2622 }
2623}
2624
2625#[derive(Debug, Clone, PartialEq, Eq)]
2626pub struct FencingToken(String);
2632
2633impl FencingToken {
2634 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2636 rand::rng()
2637 .sample_iter(&rand::distr::Alphanumeric)
2638 .take(n)
2639 .map(char::from)
2640 .collect::<String>()
2641 .parse()
2642 }
2643}
2644
2645impl FromStr for FencingToken {
2646 type Err = ValidationError;
2647
2648 fn from_str(s: &str) -> Result<Self, Self::Err> {
2649 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2650 return Err(ValidationError(format!(
2651 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2652 )));
2653 }
2654 Ok(FencingToken(s.to_string()))
2655 }
2656}
2657
2658impl std::fmt::Display for FencingToken {
2659 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2660 write!(f, "{}", self.0)
2661 }
2662}
2663
2664impl Deref for FencingToken {
2665 type Target = str;
2666
2667 fn deref(&self) -> &Self::Target {
2668 &self.0
2669 }
2670}
2671
2672#[derive(Debug, Clone, Copy, PartialEq)]
2673#[non_exhaustive]
2674pub struct StreamPosition {
2676 pub seq_num: u64,
2678 pub timestamp: u64,
2681}
2682
2683impl std::fmt::Display for StreamPosition {
2684 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2685 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2686 }
2687}
2688
2689impl From<api::stream::proto::StreamPosition> for StreamPosition {
2690 fn from(value: api::stream::proto::StreamPosition) -> Self {
2691 Self {
2692 seq_num: value.seq_num,
2693 timestamp: value.timestamp,
2694 }
2695 }
2696}
2697
2698impl From<api::stream::StreamPosition> for StreamPosition {
2699 fn from(value: api::stream::StreamPosition) -> Self {
2700 Self {
2701 seq_num: value.seq_num,
2702 timestamp: value.timestamp,
2703 }
2704 }
2705}
2706
2707#[derive(Debug, Clone, PartialEq)]
2708#[non_exhaustive]
2709pub struct Header {
2711 pub name: Bytes,
2713 pub value: Bytes,
2715}
2716
2717impl Header {
2718 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2720 Self {
2721 name: name.into(),
2722 value: value.into(),
2723 }
2724 }
2725}
2726
2727impl From<Header> for api::stream::proto::Header {
2728 fn from(value: Header) -> Self {
2729 Self {
2730 name: value.name,
2731 value: value.value,
2732 }
2733 }
2734}
2735
2736impl From<api::stream::proto::Header> for Header {
2737 fn from(value: api::stream::proto::Header) -> Self {
2738 Self {
2739 name: value.name,
2740 value: value.value,
2741 }
2742 }
2743}
2744
2745#[derive(Debug, Clone, PartialEq)]
2746pub struct AppendRecord {
2748 body: Bytes,
2749 headers: Vec<Header>,
2750 timestamp: Option<u64>,
2751}
2752
2753impl AppendRecord {
2754 fn validate(self) -> Result<Self, ValidationError> {
2755 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2756 Err(ValidationError(format!(
2757 "metered_bytes: {} exceeds {}",
2758 self.metered_bytes(),
2759 RECORD_BATCH_MAX.bytes
2760 )))
2761 } else {
2762 Ok(self)
2763 }
2764 }
2765
2766 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2768 let record = Self {
2769 body: body.into(),
2770 headers: Vec::default(),
2771 timestamp: None,
2772 };
2773 record.validate()
2774 }
2775
2776 pub fn with_headers(
2778 self,
2779 headers: impl IntoIterator<Item = Header>,
2780 ) -> Result<Self, ValidationError> {
2781 let record = Self {
2782 headers: headers.into_iter().collect(),
2783 ..self
2784 };
2785 record.validate()
2786 }
2787
2788 pub fn with_timestamp(self, timestamp: u64) -> Self {
2792 Self {
2793 timestamp: Some(timestamp),
2794 ..self
2795 }
2796 }
2797
2798 pub fn body(&self) -> &[u8] {
2800 &self.body
2801 }
2802
2803 pub fn headers(&self) -> &[Header] {
2805 &self.headers
2806 }
2807
2808 pub fn timestamp(&self) -> Option<u64> {
2810 self.timestamp
2811 }
2812}
2813
2814impl From<AppendRecord> for api::stream::proto::AppendRecord {
2815 fn from(value: AppendRecord) -> Self {
2816 Self {
2817 timestamp: value.timestamp,
2818 headers: value.headers.into_iter().map(Into::into).collect(),
2819 body: value.body,
2820 }
2821 }
2822}
2823
2824pub trait MeteredBytes {
2831 fn metered_bytes(&self) -> usize;
2833}
2834
2835macro_rules! metered_bytes_impl {
2836 ($ty:ty) => {
2837 impl MeteredBytes for $ty {
2838 fn metered_bytes(&self) -> usize {
2839 8 + (2 * self.headers.len())
2840 + self
2841 .headers
2842 .iter()
2843 .map(|h| h.name.len() + h.value.len())
2844 .sum::<usize>()
2845 + self.body.len()
2846 }
2847 }
2848 };
2849}
2850
2851metered_bytes_impl!(AppendRecord);
2852
2853#[derive(Debug, Clone)]
2854pub struct AppendRecordBatch {
2863 records: Vec<AppendRecord>,
2864 metered_bytes: usize,
2865}
2866
2867impl AppendRecordBatch {
2868 pub(crate) fn with_capacity(capacity: usize) -> Self {
2869 Self {
2870 records: Vec::with_capacity(capacity),
2871 metered_bytes: 0,
2872 }
2873 }
2874
2875 pub(crate) fn push(&mut self, record: AppendRecord) {
2876 self.metered_bytes += record.metered_bytes();
2877 self.records.push(record);
2878 }
2879
2880 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2882 where
2883 I: IntoIterator<Item = AppendRecord>,
2884 {
2885 let mut records = Vec::new();
2886 let mut metered_bytes = 0;
2887
2888 for record in iter {
2889 metered_bytes += record.metered_bytes();
2890 records.push(record);
2891
2892 if metered_bytes > RECORD_BATCH_MAX.bytes {
2893 return Err(ValidationError(format!(
2894 "batch size in metered bytes ({metered_bytes}) exceeds {}",
2895 RECORD_BATCH_MAX.bytes
2896 )));
2897 }
2898
2899 if records.len() > RECORD_BATCH_MAX.count {
2900 return Err(ValidationError(format!(
2901 "number of records in the batch exceeds {}",
2902 RECORD_BATCH_MAX.count
2903 )));
2904 }
2905 }
2906
2907 if records.is_empty() {
2908 return Err(ValidationError("batch is empty".into()));
2909 }
2910
2911 Ok(Self {
2912 records,
2913 metered_bytes,
2914 })
2915 }
2916}
2917
2918impl Deref for AppendRecordBatch {
2919 type Target = [AppendRecord];
2920
2921 fn deref(&self) -> &Self::Target {
2922 &self.records
2923 }
2924}
2925
2926impl MeteredBytes for AppendRecordBatch {
2927 fn metered_bytes(&self) -> usize {
2928 self.metered_bytes
2929 }
2930}
2931
2932#[derive(Debug, Clone)]
2933pub enum Command {
2935 Fence {
2937 fencing_token: FencingToken,
2939 },
2940 Trim {
2942 trim_point: u64,
2944 },
2945}
2946
2947#[derive(Debug, Clone)]
2948#[non_exhaustive]
2949pub struct CommandRecord {
2953 pub command: Command,
2955 pub timestamp: Option<u64>,
2957}
2958
2959impl CommandRecord {
2960 const FENCE: &[u8] = b"fence";
2961 const TRIM: &[u8] = b"trim";
2962
2963 pub fn fence(fencing_token: FencingToken) -> Self {
2968 Self {
2969 command: Command::Fence { fencing_token },
2970 timestamp: None,
2971 }
2972 }
2973
2974 pub fn trim(trim_point: u64) -> Self {
2981 Self {
2982 command: Command::Trim { trim_point },
2983 timestamp: None,
2984 }
2985 }
2986
2987 pub fn with_timestamp(self, timestamp: u64) -> Self {
2989 Self {
2990 timestamp: Some(timestamp),
2991 ..self
2992 }
2993 }
2994}
2995
2996impl From<CommandRecord> for AppendRecord {
2997 fn from(value: CommandRecord) -> Self {
2998 let (header_value, body) = match value.command {
2999 Command::Fence { fencing_token } => (
3000 CommandRecord::FENCE,
3001 Bytes::copy_from_slice(fencing_token.as_bytes()),
3002 ),
3003 Command::Trim { trim_point } => (
3004 CommandRecord::TRIM,
3005 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3006 ),
3007 };
3008 Self {
3009 body,
3010 headers: vec![Header::new("", header_value)],
3011 timestamp: value.timestamp,
3012 }
3013 }
3014}
3015
3016#[derive(Debug, Clone)]
3017#[non_exhaustive]
3018pub struct AppendInput {
3021 pub records: AppendRecordBatch,
3023 pub match_seq_num: Option<u64>,
3027 pub fencing_token: Option<FencingToken>,
3032}
3033
3034impl AppendInput {
3035 pub fn new(records: AppendRecordBatch) -> Self {
3037 Self {
3038 records,
3039 match_seq_num: None,
3040 fencing_token: None,
3041 }
3042 }
3043
3044 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3046 Self {
3047 match_seq_num: Some(match_seq_num),
3048 ..self
3049 }
3050 }
3051
3052 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3054 Self {
3055 fencing_token: Some(fencing_token),
3056 ..self
3057 }
3058 }
3059}
3060
3061impl From<AppendInput> for api::stream::proto::AppendInput {
3062 fn from(value: AppendInput) -> Self {
3063 Self {
3064 records: value.records.iter().cloned().map(Into::into).collect(),
3065 match_seq_num: value.match_seq_num,
3066 fencing_token: value.fencing_token.map(|t| t.to_string()),
3067 }
3068 }
3069}
3070
3071#[derive(Debug, Clone, PartialEq)]
3072#[non_exhaustive]
3073pub struct AppendAck {
3075 pub start: StreamPosition,
3077 pub end: StreamPosition,
3083 pub tail: StreamPosition,
3088}
3089
3090impl From<api::stream::proto::AppendAck> for AppendAck {
3091 fn from(value: api::stream::proto::AppendAck) -> Self {
3092 Self {
3093 start: value.start.unwrap_or_default().into(),
3094 end: value.end.unwrap_or_default().into(),
3095 tail: value.tail.unwrap_or_default().into(),
3096 }
3097 }
3098}
3099
3100#[derive(Debug, Clone, Copy)]
3101pub enum ReadFrom {
3103 SeqNum(u64),
3105 Timestamp(u64),
3107 TailOffset(u64),
3109}
3110
3111impl Default for ReadFrom {
3112 fn default() -> Self {
3113 Self::SeqNum(0)
3114 }
3115}
3116
3117#[derive(Debug, Default, Clone)]
3118#[non_exhaustive]
3119pub struct ReadStart {
3121 pub from: ReadFrom,
3125 pub clamp_to_tail: bool,
3129}
3130
3131impl ReadStart {
3132 pub fn new() -> Self {
3134 Self::default()
3135 }
3136
3137 pub fn with_from(self, from: ReadFrom) -> Self {
3139 Self { from, ..self }
3140 }
3141
3142 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3144 Self {
3145 clamp_to_tail,
3146 ..self
3147 }
3148 }
3149}
3150
3151impl From<ReadStart> for api::stream::ReadStart {
3152 fn from(value: ReadStart) -> Self {
3153 let (seq_num, timestamp, tail_offset) = match value.from {
3154 ReadFrom::SeqNum(n) => (Some(n), None, None),
3155 ReadFrom::Timestamp(t) => (None, Some(t), None),
3156 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3157 };
3158 Self {
3159 seq_num,
3160 timestamp,
3161 tail_offset,
3162 clamp: if value.clamp_to_tail {
3163 Some(true)
3164 } else {
3165 None
3166 },
3167 }
3168 }
3169}
3170
3171#[derive(Debug, Clone, Default)]
3172#[non_exhaustive]
3173pub struct ReadLimits {
3175 pub count: Option<usize>,
3179 pub bytes: Option<usize>,
3183}
3184
3185impl ReadLimits {
3186 pub fn new() -> Self {
3188 Self::default()
3189 }
3190
3191 pub fn with_count(self, count: usize) -> Self {
3193 Self {
3194 count: Some(count),
3195 ..self
3196 }
3197 }
3198
3199 pub fn with_bytes(self, bytes: usize) -> Self {
3201 Self {
3202 bytes: Some(bytes),
3203 ..self
3204 }
3205 }
3206}
3207
3208#[derive(Debug, Clone, Default)]
3209#[non_exhaustive]
3210pub struct ReadStop {
3212 pub limits: ReadLimits,
3216 pub until: Option<RangeTo<u64>>,
3220 pub wait: Option<u32>,
3230}
3231
3232impl ReadStop {
3233 pub fn new() -> Self {
3235 Self::default()
3236 }
3237
3238 pub fn with_limits(self, limits: ReadLimits) -> Self {
3240 Self { limits, ..self }
3241 }
3242
3243 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3245 Self {
3246 until: Some(until),
3247 ..self
3248 }
3249 }
3250
3251 pub fn with_wait(self, wait: u32) -> Self {
3253 Self {
3254 wait: Some(wait),
3255 ..self
3256 }
3257 }
3258}
3259
3260impl From<ReadStop> for api::stream::ReadEnd {
3261 fn from(value: ReadStop) -> Self {
3262 Self {
3263 count: value.limits.count,
3264 bytes: value.limits.bytes,
3265 until: value.until.map(|r| r.end),
3266 wait: value.wait,
3267 }
3268 }
3269}
3270
3271#[derive(Debug, Clone, Default)]
3272#[non_exhaustive]
3273pub struct ReadInput {
3276 pub start: ReadStart,
3280 pub stop: ReadStop,
3284 pub ignore_command_records: bool,
3288}
3289
3290impl ReadInput {
3291 pub fn new() -> Self {
3293 Self::default()
3294 }
3295
3296 pub fn with_start(self, start: ReadStart) -> Self {
3298 Self { start, ..self }
3299 }
3300
3301 pub fn with_stop(self, stop: ReadStop) -> Self {
3303 Self { stop, ..self }
3304 }
3305
3306 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3308 Self {
3309 ignore_command_records,
3310 ..self
3311 }
3312 }
3313}
3314
3315#[derive(Debug, Clone)]
3316#[non_exhaustive]
3317pub struct SequencedRecord {
3319 pub seq_num: u64,
3321 pub body: Bytes,
3323 pub headers: Vec<Header>,
3325 pub timestamp: u64,
3327}
3328
3329impl SequencedRecord {
3330 pub fn is_command_record(&self) -> bool {
3332 self.headers.len() == 1 && *self.headers[0].name == *b""
3333 }
3334}
3335
3336impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3337 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3338 Self {
3339 seq_num: value.seq_num,
3340 body: value.body,
3341 headers: value.headers.into_iter().map(Into::into).collect(),
3342 timestamp: value.timestamp,
3343 }
3344 }
3345}
3346
3347metered_bytes_impl!(SequencedRecord);
3348
3349#[derive(Debug, Clone)]
3350#[non_exhaustive]
3351pub struct ReadBatch {
3354 pub records: Vec<SequencedRecord>,
3361 pub tail: Option<StreamPosition>,
3366}
3367
3368impl ReadBatch {
3369 pub(crate) fn from_api(
3370 batch: api::stream::proto::ReadBatch,
3371 ignore_command_records: bool,
3372 ) -> Self {
3373 Self {
3374 records: batch
3375 .records
3376 .into_iter()
3377 .map(Into::into)
3378 .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3379 .collect(),
3380 tail: batch.tail.map(Into::into),
3381 }
3382 }
3383}
3384
3385pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3387
3388#[derive(Debug, Clone, thiserror::Error)]
3389pub enum AppendConditionFailed {
3391 #[error("fencing token mismatch, expected: {0}")]
3392 FencingTokenMismatch(FencingToken),
3394 #[error("sequence number mismatch, expected: {0}")]
3395 SeqNumMismatch(u64),
3397}
3398
3399impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3400 fn from(value: api::stream::AppendConditionFailed) -> Self {
3401 match value {
3402 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3403 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3404 }
3405 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3406 AppendConditionFailed::SeqNumMismatch(seq)
3407 }
3408 }
3409 }
3410}
3411
3412#[derive(Debug, Clone, thiserror::Error)]
3413pub enum S2Error {
3415 #[error("{0}")]
3416 Client(String),
3418 #[error(transparent)]
3419 Validation(#[from] ValidationError),
3421 #[error("{0}")]
3422 AppendConditionFailed(AppendConditionFailed),
3424 #[error("read from an unwritten position. current tail: {0}")]
3425 ReadUnwritten(StreamPosition),
3427 #[error("{0}")]
3428 Server(ErrorResponse),
3430}
3431
3432impl From<ApiError> for S2Error {
3433 fn from(err: ApiError) -> Self {
3434 match err {
3435 ApiError::ReadUnwritten(tail_response) => {
3436 Self::ReadUnwritten(tail_response.tail.into())
3437 }
3438 ApiError::AppendConditionFailed(condition_failed) => {
3439 Self::AppendConditionFailed(condition_failed.into())
3440 }
3441 ApiError::Server(_, response) => Self::Server(response.into()),
3442 other => Self::Client(other.to_string()),
3443 }
3444 }
3445}
3446
3447#[derive(Debug, Clone, thiserror::Error)]
3448#[error("{code}: {message}")]
3449#[non_exhaustive]
3450pub struct ErrorResponse {
3452 pub code: String,
3454 pub message: String,
3456}
3457
3458impl From<ApiErrorResponse> for ErrorResponse {
3459 fn from(response: ApiErrorResponse) -> Self {
3460 Self {
3461 code: response.code,
3462 message: response.message,
3463 }
3464 }
3465}
3466
3467fn idempotency_token() -> String {
3468 uuid::Uuid::new_v4().simple().to_string()
3469}