1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::stream::StreamName;
45pub use s2_common::types::stream::StreamNamePrefix;
47pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66 fn from(dt: time::OffsetDateTime) -> Self {
67 Self(dt)
68 }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72 fn from(dt: S2DateTime) -> Self {
73 dt.0
74 }
75}
76
77impl FromStr for S2DateTime {
78 type Err = ValidationError;
79
80 fn from_str(s: &str) -> Result<Self, Self::Err> {
81 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82 .map(Self)
83 .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84 }
85}
86
87impl fmt::Display for S2DateTime {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 write!(
90 f,
91 "{}",
92 self.0
93 .format(&time::format_description::well_known::Rfc3339)
94 .expect("RFC3339 formatting should not fail for S2DateTime")
95 )
96 }
97}
98
99#[derive(Debug, Clone, PartialEq)]
101pub(crate) enum BasinAuthority {
102 ParentZone(Authority),
104 Direct(Authority),
106}
107
108#[derive(Debug, Clone)]
110pub struct AccountEndpoint {
111 scheme: Scheme,
112 authority: Authority,
113}
114
115impl AccountEndpoint {
116 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
118 endpoint.parse()
119 }
120}
121
122impl FromStr for AccountEndpoint {
123 type Err = ValidationError;
124
125 fn from_str(s: &str) -> Result<Self, Self::Err> {
126 let (scheme, authority) = match s.find("://") {
127 Some(idx) => {
128 let scheme: Scheme = s[..idx]
129 .parse()
130 .map_err(|_| "invalid account endpoint scheme".to_string())?;
131 (scheme, &s[idx + 3..])
132 }
133 None => (Scheme::HTTPS, s),
134 };
135 Ok(Self {
136 scheme,
137 authority: authority
138 .parse()
139 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
140 })
141 }
142}
143
144#[derive(Debug, Clone)]
146pub struct BasinEndpoint {
147 scheme: Scheme,
148 authority: BasinAuthority,
149}
150
151impl BasinEndpoint {
152 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
154 endpoint.parse()
155 }
156}
157
158impl FromStr for BasinEndpoint {
159 type Err = ValidationError;
160
161 fn from_str(s: &str) -> Result<Self, Self::Err> {
162 let (scheme, authority) = match s.find("://") {
163 Some(idx) => {
164 let scheme: Scheme = s[..idx]
165 .parse()
166 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
167 (scheme, &s[idx + 3..])
168 }
169 None => (Scheme::HTTPS, s),
170 };
171 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
172 BasinAuthority::ParentZone(
173 authority
174 .parse()
175 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
176 )
177 } else {
178 BasinAuthority::Direct(
179 authority
180 .parse()
181 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
182 )
183 };
184 Ok(Self { scheme, authority })
185 }
186}
187
188#[derive(Debug, Clone)]
189#[non_exhaustive]
190pub struct S2Endpoints {
192 pub(crate) scheme: Scheme,
193 pub(crate) account_authority: Authority,
194 pub(crate) basin_authority: BasinAuthority,
195}
196
197impl S2Endpoints {
198 pub fn new(
200 account_endpoint: AccountEndpoint,
201 basin_endpoint: BasinEndpoint,
202 ) -> Result<Self, ValidationError> {
203 if account_endpoint.scheme != basin_endpoint.scheme {
204 return Err("account and basin endpoints must have the same scheme".into());
205 }
206 Ok(Self {
207 scheme: account_endpoint.scheme,
208 account_authority: account_endpoint.authority,
209 basin_authority: basin_endpoint.authority,
210 })
211 }
212
213 pub fn from_env() -> Result<Self, ValidationError> {
219 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
220 Ok(endpoint) => endpoint.parse()?,
221 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
222 Err(VarError::NotUnicode(_)) => {
223 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
224 }
225 };
226
227 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
228 Ok(endpoint) => endpoint.parse()?,
229 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
230 Err(VarError::NotUnicode(_)) => {
231 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
232 }
233 };
234
235 if account_endpoint.scheme != basin_endpoint.scheme {
236 return Err(
237 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
238 );
239 }
240
241 Ok(Self {
242 scheme: account_endpoint.scheme,
243 account_authority: account_endpoint.authority,
244 basin_authority: basin_endpoint.authority,
245 })
246 }
247
248 pub(crate) fn for_aws() -> Self {
249 Self {
250 scheme: Scheme::HTTPS,
251 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
252 basin_authority: BasinAuthority::ParentZone(
253 "b.aws.s2.dev".try_into().expect("valid authority"),
254 ),
255 }
256 }
257}
258
259#[derive(Debug, Clone, Copy)]
260pub enum Compression {
262 None,
264 Gzip,
266 Zstd,
268}
269
270impl From<Compression> for CompressionAlgorithm {
271 fn from(value: Compression) -> Self {
272 match value {
273 Compression::None => CompressionAlgorithm::None,
274 Compression::Gzip => CompressionAlgorithm::Gzip,
275 Compression::Zstd => CompressionAlgorithm::Zstd,
276 }
277 }
278}
279
280#[derive(Debug, Clone, Copy, PartialEq)]
281#[non_exhaustive]
282pub enum AppendRetryPolicy {
285 All,
287 NoSideEffects,
289}
290
291impl AppendRetryPolicy {
292 pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
293 match self {
294 Self::All => true,
295 Self::NoSideEffects => input.match_seq_num.is_some(),
296 }
297 }
298}
299
300#[derive(Debug, Clone)]
301#[non_exhaustive]
302pub struct RetryConfig {
311 pub max_attempts: NonZeroU32,
315 pub min_base_delay: Duration,
319 pub max_base_delay: Duration,
323 pub append_retry_policy: AppendRetryPolicy,
328}
329
330impl Default for RetryConfig {
331 fn default() -> Self {
332 Self {
333 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
334 min_base_delay: Duration::from_millis(100),
335 max_base_delay: Duration::from_secs(1),
336 append_retry_policy: AppendRetryPolicy::All,
337 }
338 }
339}
340
341impl RetryConfig {
342 pub fn new() -> Self {
344 Self::default()
345 }
346
347 pub(crate) fn max_retries(&self) -> u32 {
348 self.max_attempts.get() - 1
349 }
350
351 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
353 Self {
354 max_attempts,
355 ..self
356 }
357 }
358
359 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
361 Self {
362 min_base_delay,
363 ..self
364 }
365 }
366
367 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
369 Self {
370 max_base_delay,
371 ..self
372 }
373 }
374
375 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
378 Self {
379 append_retry_policy,
380 ..self
381 }
382 }
383}
384
385#[derive(Debug, Clone)]
386#[non_exhaustive]
387pub struct S2Config {
389 pub(crate) access_token: SecretString,
390 pub(crate) endpoints: S2Endpoints,
391 pub(crate) connection_timeout: Duration,
392 pub(crate) request_timeout: Duration,
393 pub(crate) retry: RetryConfig,
394 pub(crate) compression: Compression,
395 pub(crate) user_agent: HeaderValue,
396 pub(crate) insecure_skip_cert_verification: bool,
397}
398
399impl S2Config {
400 pub fn new(access_token: impl Into<String>) -> Self {
402 Self {
403 access_token: access_token.into().into(),
404 endpoints: S2Endpoints::for_aws(),
405 connection_timeout: Duration::from_secs(3),
406 request_timeout: Duration::from_secs(5),
407 retry: RetryConfig::new(),
408 compression: Compression::None,
409 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
410 .parse()
411 .expect("valid user agent"),
412 insecure_skip_cert_verification: false,
413 }
414 }
415
416 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
418 Self { endpoints, ..self }
419 }
420
421 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
425 Self {
426 connection_timeout,
427 ..self
428 }
429 }
430
431 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
435 Self {
436 request_timeout,
437 ..self
438 }
439 }
440
441 pub fn with_retry(self, retry: RetryConfig) -> Self {
445 Self { retry, ..self }
446 }
447
448 pub fn with_compression(self, compression: Compression) -> Self {
452 Self {
453 compression,
454 ..self
455 }
456 }
457
458 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
470 Self {
471 insecure_skip_cert_verification: skip,
472 ..self
473 }
474 }
475
476 #[doc(hidden)]
477 #[cfg(feature = "_hidden")]
478 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
479 let user_agent = user_agent
480 .into()
481 .parse()
482 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
483 Ok(Self { user_agent, ..self })
484 }
485}
486
487#[derive(Debug, Default, Clone, PartialEq, Eq)]
488#[non_exhaustive]
489pub struct Page<T> {
491 pub values: Vec<T>,
493 pub has_more: bool,
495}
496
497impl<T> Page<T> {
498 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
499 Self {
500 values: values.into(),
501 has_more,
502 }
503 }
504}
505
506#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507pub enum StorageClass {
509 Standard,
511 Express,
513}
514
515impl From<api::config::StorageClass> for StorageClass {
516 fn from(value: api::config::StorageClass) -> Self {
517 match value {
518 api::config::StorageClass::Standard => StorageClass::Standard,
519 api::config::StorageClass::Express => StorageClass::Express,
520 }
521 }
522}
523
524impl From<StorageClass> for api::config::StorageClass {
525 fn from(value: StorageClass) -> Self {
526 match value {
527 StorageClass::Standard => api::config::StorageClass::Standard,
528 StorageClass::Express => api::config::StorageClass::Express,
529 }
530 }
531}
532
533#[derive(Debug, Clone, Copy, PartialEq, Eq)]
534pub enum RetentionPolicy {
536 Age(u64),
538 Infinite,
540}
541
542impl From<api::config::RetentionPolicy> for RetentionPolicy {
543 fn from(value: api::config::RetentionPolicy) -> Self {
544 match value {
545 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
546 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
547 }
548 }
549}
550
551impl From<RetentionPolicy> for api::config::RetentionPolicy {
552 fn from(value: RetentionPolicy) -> Self {
553 match value {
554 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
555 RetentionPolicy::Infinite => {
556 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
557 }
558 }
559 }
560}
561
562#[derive(Debug, Clone, Copy, PartialEq, Eq)]
563pub enum TimestampingMode {
565 ClientPrefer,
567 ClientRequire,
569 Arrival,
571}
572
573impl From<api::config::TimestampingMode> for TimestampingMode {
574 fn from(value: api::config::TimestampingMode) -> Self {
575 match value {
576 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
577 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
578 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
579 }
580 }
581}
582
583impl From<TimestampingMode> for api::config::TimestampingMode {
584 fn from(value: TimestampingMode) -> Self {
585 match value {
586 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
587 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
588 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
589 }
590 }
591}
592
593#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
594#[non_exhaustive]
595pub struct TimestampingConfig {
597 pub mode: Option<TimestampingMode>,
601 pub uncapped: bool,
605}
606
607impl TimestampingConfig {
608 pub fn new() -> Self {
610 Self::default()
611 }
612
613 pub fn with_mode(self, mode: TimestampingMode) -> Self {
615 Self {
616 mode: Some(mode),
617 ..self
618 }
619 }
620
621 pub fn with_uncapped(self, uncapped: bool) -> Self {
623 Self { uncapped, ..self }
624 }
625}
626
627impl From<api::config::TimestampingConfig> for TimestampingConfig {
628 fn from(value: api::config::TimestampingConfig) -> Self {
629 Self {
630 mode: value.mode.map(Into::into),
631 uncapped: value.uncapped.unwrap_or_default(),
632 }
633 }
634}
635
636impl From<TimestampingConfig> for api::config::TimestampingConfig {
637 fn from(value: TimestampingConfig) -> Self {
638 Self {
639 mode: value.mode.map(Into::into),
640 uncapped: Some(value.uncapped),
641 }
642 }
643}
644
645#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
646#[non_exhaustive]
647pub struct DeleteOnEmptyConfig {
649 pub min_age_secs: u64,
653}
654
655impl DeleteOnEmptyConfig {
656 pub fn new() -> Self {
658 Self::default()
659 }
660
661 pub fn with_min_age(self, min_age: Duration) -> Self {
663 Self {
664 min_age_secs: min_age.as_secs(),
665 }
666 }
667}
668
669impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
670 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
671 Self {
672 min_age_secs: value.min_age_secs,
673 }
674 }
675}
676
677impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
678 fn from(value: DeleteOnEmptyConfig) -> Self {
679 Self {
680 min_age_secs: value.min_age_secs,
681 }
682 }
683}
684
685#[derive(Debug, Clone, Default, PartialEq, Eq)]
686#[non_exhaustive]
687pub struct StreamConfig {
689 pub storage_class: Option<StorageClass>,
693 pub retention_policy: Option<RetentionPolicy>,
697 pub timestamping: Option<TimestampingConfig>,
701 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
705}
706
707impl StreamConfig {
708 pub fn new() -> Self {
710 Self::default()
711 }
712
713 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
715 Self {
716 storage_class: Some(storage_class),
717 ..self
718 }
719 }
720
721 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
723 Self {
724 retention_policy: Some(retention_policy),
725 ..self
726 }
727 }
728
729 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
731 Self {
732 timestamping: Some(timestamping),
733 ..self
734 }
735 }
736
737 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
739 Self {
740 delete_on_empty: Some(delete_on_empty),
741 ..self
742 }
743 }
744}
745
746impl From<api::config::StreamConfig> for StreamConfig {
747 fn from(value: api::config::StreamConfig) -> Self {
748 Self {
749 storage_class: value.storage_class.map(Into::into),
750 retention_policy: value.retention_policy.map(Into::into),
751 timestamping: value.timestamping.map(Into::into),
752 delete_on_empty: value.delete_on_empty.map(Into::into),
753 }
754 }
755}
756
757impl From<StreamConfig> for api::config::StreamConfig {
758 fn from(value: StreamConfig) -> Self {
759 Self {
760 storage_class: value.storage_class.map(Into::into),
761 retention_policy: value.retention_policy.map(Into::into),
762 timestamping: value.timestamping.map(Into::into),
763 delete_on_empty: value.delete_on_empty.map(Into::into),
764 }
765 }
766}
767
768#[derive(Debug, Clone, Default)]
769#[non_exhaustive]
770pub struct BasinConfig {
772 pub default_stream_config: Option<StreamConfig>,
776 pub create_stream_on_append: bool,
780 pub create_stream_on_read: bool,
784}
785
786impl BasinConfig {
787 pub fn new() -> Self {
789 Self::default()
790 }
791
792 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
794 Self {
795 default_stream_config: Some(config),
796 ..self
797 }
798 }
799
800 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
803 Self {
804 create_stream_on_append,
805 ..self
806 }
807 }
808
809 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
811 Self {
812 create_stream_on_read,
813 ..self
814 }
815 }
816}
817
818impl From<api::config::BasinConfig> for BasinConfig {
819 fn from(value: api::config::BasinConfig) -> Self {
820 Self {
821 default_stream_config: value.default_stream_config.map(Into::into),
822 create_stream_on_append: value.create_stream_on_append,
823 create_stream_on_read: value.create_stream_on_read,
824 }
825 }
826}
827
828impl From<BasinConfig> for api::config::BasinConfig {
829 fn from(value: BasinConfig) -> Self {
830 Self {
831 default_stream_config: value.default_stream_config.map(Into::into),
832 create_stream_on_append: value.create_stream_on_append,
833 create_stream_on_read: value.create_stream_on_read,
834 }
835 }
836}
837
838#[derive(Debug, Clone, PartialEq, Eq)]
839pub enum BasinScope {
841 AwsUsEast1,
843}
844
845impl From<api::basin::BasinScope> for BasinScope {
846 fn from(value: api::basin::BasinScope) -> Self {
847 match value {
848 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
849 }
850 }
851}
852
853impl From<BasinScope> for api::basin::BasinScope {
854 fn from(value: BasinScope) -> Self {
855 match value {
856 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
857 }
858 }
859}
860
861#[derive(Debug, Clone)]
862#[non_exhaustive]
863pub struct CreateBasinInput {
865 pub name: BasinName,
867 pub config: Option<BasinConfig>,
871 pub scope: Option<BasinScope>,
875 pub idempotency_token: String,
883}
884
885impl CreateBasinInput {
886 pub fn new(name: BasinName) -> Self {
888 Self {
889 name,
890 config: None,
891 scope: None,
892 idempotency_token: idempotency_token(),
893 }
894 }
895
896 pub fn with_config(self, config: BasinConfig) -> Self {
898 Self {
899 config: Some(config),
900 ..self
901 }
902 }
903
904 pub fn with_scope(self, scope: BasinScope) -> Self {
906 Self {
907 scope: Some(scope),
908 ..self
909 }
910 }
911
912 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
914 Self {
915 idempotency_token: idempotency_token.into(),
916 ..self
917 }
918 }
919}
920
921impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
922 fn from(value: CreateBasinInput) -> Self {
923 (
924 api::basin::CreateBasinRequest {
925 basin: value.name,
926 config: value.config.map(Into::into),
927 scope: value.scope.map(Into::into),
928 },
929 value.idempotency_token,
930 )
931 }
932}
933
934#[derive(Debug, Clone, Default)]
935#[non_exhaustive]
936pub struct ListBasinsInput {
938 pub prefix: BasinNamePrefix,
942 pub start_after: BasinNameStartAfter,
948 pub limit: Option<usize>,
952}
953
954impl ListBasinsInput {
955 pub fn new() -> Self {
957 Self::default()
958 }
959
960 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
962 Self { prefix, ..self }
963 }
964
965 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
968 Self {
969 start_after,
970 ..self
971 }
972 }
973
974 pub fn with_limit(self, limit: usize) -> Self {
976 Self {
977 limit: Some(limit),
978 ..self
979 }
980 }
981}
982
983impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
984 fn from(value: ListBasinsInput) -> Self {
985 Self {
986 prefix: Some(value.prefix),
987 start_after: Some(value.start_after),
988 limit: value.limit,
989 }
990 }
991}
992
993#[derive(Debug, Clone, Default)]
994pub struct ListAllBasinsInput {
996 pub prefix: BasinNamePrefix,
1000 pub start_after: BasinNameStartAfter,
1006 pub include_deleted: bool,
1010}
1011
1012impl ListAllBasinsInput {
1013 pub fn new() -> Self {
1015 Self::default()
1016 }
1017
1018 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1020 Self { prefix, ..self }
1021 }
1022
1023 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1026 Self {
1027 start_after,
1028 ..self
1029 }
1030 }
1031
1032 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1034 Self {
1035 include_deleted,
1036 ..self
1037 }
1038 }
1039}
1040
1041#[derive(Debug, Clone, PartialEq, Eq)]
1042pub enum BasinState {
1044 Active,
1046 Creating,
1048 Deleting,
1050}
1051
1052impl From<api::basin::BasinState> for BasinState {
1053 fn from(value: api::basin::BasinState) -> Self {
1054 match value {
1055 api::basin::BasinState::Active => BasinState::Active,
1056 api::basin::BasinState::Creating => BasinState::Creating,
1057 api::basin::BasinState::Deleting => BasinState::Deleting,
1058 }
1059 }
1060}
1061
1062#[derive(Debug, Clone, PartialEq, Eq)]
1063#[non_exhaustive]
1064pub struct BasinInfo {
1066 pub name: BasinName,
1068 pub scope: Option<BasinScope>,
1070 pub state: BasinState,
1072}
1073
1074impl From<api::basin::BasinInfo> for BasinInfo {
1075 fn from(value: api::basin::BasinInfo) -> Self {
1076 Self {
1077 name: value.name,
1078 scope: value.scope.map(Into::into),
1079 state: value.state.into(),
1080 }
1081 }
1082}
1083
1084#[derive(Debug, Clone)]
1085#[non_exhaustive]
1086pub struct DeleteBasinInput {
1088 pub name: BasinName,
1090 pub ignore_not_found: bool,
1092}
1093
1094impl DeleteBasinInput {
1095 pub fn new(name: BasinName) -> Self {
1097 Self {
1098 name,
1099 ignore_not_found: false,
1100 }
1101 }
1102
1103 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1105 Self {
1106 ignore_not_found,
1107 ..self
1108 }
1109 }
1110}
1111
1112#[derive(Debug, Clone, Default)]
1113#[non_exhaustive]
1114pub struct TimestampingReconfiguration {
1116 pub mode: Maybe<Option<TimestampingMode>>,
1118 pub uncapped: Maybe<Option<bool>>,
1120}
1121
1122impl TimestampingReconfiguration {
1123 pub fn new() -> Self {
1125 Self::default()
1126 }
1127
1128 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1130 Self {
1131 mode: Maybe::Specified(Some(mode)),
1132 ..self
1133 }
1134 }
1135
1136 pub fn with_uncapped(self, uncapped: bool) -> Self {
1138 Self {
1139 uncapped: Maybe::Specified(Some(uncapped)),
1140 ..self
1141 }
1142 }
1143}
1144
1145impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1146 fn from(value: TimestampingReconfiguration) -> Self {
1147 Self {
1148 mode: value.mode.map(|m| m.map(Into::into)),
1149 uncapped: value.uncapped,
1150 }
1151 }
1152}
1153
1154#[derive(Debug, Clone, Default)]
1155#[non_exhaustive]
1156pub struct DeleteOnEmptyReconfiguration {
1158 pub min_age_secs: Maybe<Option<u64>>,
1160}
1161
1162impl DeleteOnEmptyReconfiguration {
1163 pub fn new() -> Self {
1165 Self::default()
1166 }
1167
1168 pub fn with_min_age(self, min_age: Duration) -> Self {
1170 Self {
1171 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1172 }
1173 }
1174}
1175
1176impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1177 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1178 Self {
1179 min_age_secs: value.min_age_secs,
1180 }
1181 }
1182}
1183
1184#[derive(Debug, Clone, Default)]
1185#[non_exhaustive]
1186pub struct StreamReconfiguration {
1188 pub storage_class: Maybe<Option<StorageClass>>,
1190 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1192 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1194 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1196}
1197
1198impl StreamReconfiguration {
1199 pub fn new() -> Self {
1201 Self::default()
1202 }
1203
1204 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1206 Self {
1207 storage_class: Maybe::Specified(Some(storage_class)),
1208 ..self
1209 }
1210 }
1211
1212 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1214 Self {
1215 retention_policy: Maybe::Specified(Some(retention_policy)),
1216 ..self
1217 }
1218 }
1219
1220 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1222 Self {
1223 timestamping: Maybe::Specified(Some(timestamping)),
1224 ..self
1225 }
1226 }
1227
1228 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1230 Self {
1231 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1232 ..self
1233 }
1234 }
1235}
1236
1237impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1238 fn from(value: StreamReconfiguration) -> Self {
1239 Self {
1240 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1241 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1242 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1243 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1244 }
1245 }
1246}
1247
1248#[derive(Debug, Clone, Default)]
1249#[non_exhaustive]
1250pub struct BasinReconfiguration {
1252 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1254 pub create_stream_on_append: Maybe<bool>,
1257 pub create_stream_on_read: Maybe<bool>,
1259}
1260
1261impl BasinReconfiguration {
1262 pub fn new() -> Self {
1264 Self::default()
1265 }
1266
1267 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1270 Self {
1271 default_stream_config: Maybe::Specified(Some(config)),
1272 ..self
1273 }
1274 }
1275
1276 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1279 Self {
1280 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1281 ..self
1282 }
1283 }
1284
1285 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1288 Self {
1289 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1290 ..self
1291 }
1292 }
1293}
1294
1295impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1296 fn from(value: BasinReconfiguration) -> Self {
1297 Self {
1298 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1299 create_stream_on_append: value.create_stream_on_append,
1300 create_stream_on_read: value.create_stream_on_read,
1301 }
1302 }
1303}
1304
1305#[derive(Debug, Clone)]
1306#[non_exhaustive]
1307pub struct ReconfigureBasinInput {
1309 pub name: BasinName,
1311 pub config: BasinReconfiguration,
1313}
1314
1315impl ReconfigureBasinInput {
1316 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1318 Self { name, config }
1319 }
1320}
1321
1322#[derive(Debug, Clone, Default)]
1323#[non_exhaustive]
1324pub struct ListAccessTokensInput {
1326 pub prefix: AccessTokenIdPrefix,
1330 pub start_after: AccessTokenIdStartAfter,
1336 pub limit: Option<usize>,
1340}
1341
1342impl ListAccessTokensInput {
1343 pub fn new() -> Self {
1345 Self::default()
1346 }
1347
1348 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1350 Self { prefix, ..self }
1351 }
1352
1353 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1356 Self {
1357 start_after,
1358 ..self
1359 }
1360 }
1361
1362 pub fn with_limit(self, limit: usize) -> Self {
1364 Self {
1365 limit: Some(limit),
1366 ..self
1367 }
1368 }
1369}
1370
1371impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1372 fn from(value: ListAccessTokensInput) -> Self {
1373 Self {
1374 prefix: Some(value.prefix),
1375 start_after: Some(value.start_after),
1376 limit: value.limit,
1377 }
1378 }
1379}
1380
1381#[derive(Debug, Clone, Default)]
1382pub struct ListAllAccessTokensInput {
1384 pub prefix: AccessTokenIdPrefix,
1388 pub start_after: AccessTokenIdStartAfter,
1394}
1395
1396impl ListAllAccessTokensInput {
1397 pub fn new() -> Self {
1399 Self::default()
1400 }
1401
1402 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1404 Self { prefix, ..self }
1405 }
1406
1407 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1410 Self {
1411 start_after,
1412 ..self
1413 }
1414 }
1415}
1416
1417#[derive(Debug, Clone)]
1418#[non_exhaustive]
1419pub struct AccessTokenInfo {
1421 pub id: AccessTokenId,
1423 pub expires_at: S2DateTime,
1425 pub auto_prefix_streams: bool,
1428 pub scope: AccessTokenScope,
1430}
1431
1432impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1433 type Error = ValidationError;
1434
1435 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1436 let expires_at = value
1437 .expires_at
1438 .map(Into::into)
1439 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1440 Ok(Self {
1441 id: value.id,
1442 expires_at,
1443 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1444 scope: value.scope.into(),
1445 })
1446 }
1447}
1448
1449#[derive(Debug, Clone)]
1450pub enum BasinMatcher {
1454 None,
1456 Exact(BasinName),
1458 Prefix(BasinNamePrefix),
1460}
1461
1462#[derive(Debug, Clone)]
1463pub enum StreamMatcher {
1467 None,
1469 Exact(StreamName),
1471 Prefix(StreamNamePrefix),
1473}
1474
1475#[derive(Debug, Clone)]
1476pub enum AccessTokenMatcher {
1480 None,
1482 Exact(AccessTokenId),
1484 Prefix(AccessTokenIdPrefix),
1486}
1487
1488#[derive(Debug, Clone, Default)]
1489#[non_exhaustive]
1490pub struct ReadWritePermissions {
1492 pub read: bool,
1496 pub write: bool,
1500}
1501
1502impl ReadWritePermissions {
1503 pub fn new() -> Self {
1505 Self::default()
1506 }
1507
1508 pub fn read_only() -> Self {
1510 Self {
1511 read: true,
1512 write: false,
1513 }
1514 }
1515
1516 pub fn write_only() -> Self {
1518 Self {
1519 read: false,
1520 write: true,
1521 }
1522 }
1523
1524 pub fn read_write() -> Self {
1526 Self {
1527 read: true,
1528 write: true,
1529 }
1530 }
1531}
1532
1533impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1534 fn from(value: ReadWritePermissions) -> Self {
1535 Self {
1536 read: Some(value.read),
1537 write: Some(value.write),
1538 }
1539 }
1540}
1541
1542impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1543 fn from(value: api::access::ReadWritePermissions) -> Self {
1544 Self {
1545 read: value.read.unwrap_or_default(),
1546 write: value.write.unwrap_or_default(),
1547 }
1548 }
1549}
1550
1551#[derive(Debug, Clone, Default)]
1552#[non_exhaustive]
1553pub struct OperationGroupPermissions {
1557 pub account: Option<ReadWritePermissions>,
1561 pub basin: Option<ReadWritePermissions>,
1565 pub stream: Option<ReadWritePermissions>,
1569}
1570
1571impl OperationGroupPermissions {
1572 pub fn new() -> Self {
1574 Self::default()
1575 }
1576
1577 pub fn read_only_all() -> Self {
1579 Self {
1580 account: Some(ReadWritePermissions::read_only()),
1581 basin: Some(ReadWritePermissions::read_only()),
1582 stream: Some(ReadWritePermissions::read_only()),
1583 }
1584 }
1585
1586 pub fn write_only_all() -> Self {
1588 Self {
1589 account: Some(ReadWritePermissions::write_only()),
1590 basin: Some(ReadWritePermissions::write_only()),
1591 stream: Some(ReadWritePermissions::write_only()),
1592 }
1593 }
1594
1595 pub fn read_write_all() -> Self {
1597 Self {
1598 account: Some(ReadWritePermissions::read_write()),
1599 basin: Some(ReadWritePermissions::read_write()),
1600 stream: Some(ReadWritePermissions::read_write()),
1601 }
1602 }
1603
1604 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1606 Self {
1607 account: Some(account),
1608 ..self
1609 }
1610 }
1611
1612 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1614 Self {
1615 basin: Some(basin),
1616 ..self
1617 }
1618 }
1619
1620 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1622 Self {
1623 stream: Some(stream),
1624 ..self
1625 }
1626 }
1627}
1628
1629impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1630 fn from(value: OperationGroupPermissions) -> Self {
1631 Self {
1632 account: value.account.map(Into::into),
1633 basin: value.basin.map(Into::into),
1634 stream: value.stream.map(Into::into),
1635 }
1636 }
1637}
1638
1639impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1640 fn from(value: api::access::PermittedOperationGroups) -> Self {
1641 Self {
1642 account: value.account.map(Into::into),
1643 basin: value.basin.map(Into::into),
1644 stream: value.stream.map(Into::into),
1645 }
1646 }
1647}
1648
1649#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1650pub enum Operation {
1654 ListBasins,
1656 CreateBasin,
1658 GetBasinConfig,
1660 DeleteBasin,
1662 ReconfigureBasin,
1664 ListAccessTokens,
1666 IssueAccessToken,
1668 RevokeAccessToken,
1670 GetAccountMetrics,
1672 GetBasinMetrics,
1674 GetStreamMetrics,
1676 ListStreams,
1678 CreateStream,
1680 GetStreamConfig,
1682 DeleteStream,
1684 ReconfigureStream,
1686 CheckTail,
1688 Append,
1690 Read,
1692 Trim,
1694 Fence,
1696}
1697
1698impl From<Operation> for api::access::Operation {
1699 fn from(value: Operation) -> Self {
1700 match value {
1701 Operation::ListBasins => api::access::Operation::ListBasins,
1702 Operation::CreateBasin => api::access::Operation::CreateBasin,
1703 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1704 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1705 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1706 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1707 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1708 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1709 Operation::ListStreams => api::access::Operation::ListStreams,
1710 Operation::CreateStream => api::access::Operation::CreateStream,
1711 Operation::DeleteStream => api::access::Operation::DeleteStream,
1712 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1713 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1714 Operation::CheckTail => api::access::Operation::CheckTail,
1715 Operation::Append => api::access::Operation::Append,
1716 Operation::Read => api::access::Operation::Read,
1717 Operation::Trim => api::access::Operation::Trim,
1718 Operation::Fence => api::access::Operation::Fence,
1719 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1720 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1721 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1722 }
1723 }
1724}
1725
1726impl From<api::access::Operation> for Operation {
1727 fn from(value: api::access::Operation) -> Self {
1728 match value {
1729 api::access::Operation::ListBasins => Operation::ListBasins,
1730 api::access::Operation::CreateBasin => Operation::CreateBasin,
1731 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1732 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1733 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1734 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1735 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1736 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1737 api::access::Operation::ListStreams => Operation::ListStreams,
1738 api::access::Operation::CreateStream => Operation::CreateStream,
1739 api::access::Operation::DeleteStream => Operation::DeleteStream,
1740 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1741 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1742 api::access::Operation::CheckTail => Operation::CheckTail,
1743 api::access::Operation::Append => Operation::Append,
1744 api::access::Operation::Read => Operation::Read,
1745 api::access::Operation::Trim => Operation::Trim,
1746 api::access::Operation::Fence => Operation::Fence,
1747 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1748 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1749 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1750 }
1751 }
1752}
1753
1754#[derive(Debug, Clone)]
1755#[non_exhaustive]
1756pub struct AccessTokenScopeInput {
1764 basins: Option<BasinMatcher>,
1765 streams: Option<StreamMatcher>,
1766 access_tokens: Option<AccessTokenMatcher>,
1767 op_group_perms: Option<OperationGroupPermissions>,
1768 ops: HashSet<Operation>,
1769}
1770
1771impl AccessTokenScopeInput {
1772 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1774 Self {
1775 basins: None,
1776 streams: None,
1777 access_tokens: None,
1778 op_group_perms: None,
1779 ops: ops.into_iter().collect(),
1780 }
1781 }
1782
1783 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1785 Self {
1786 basins: None,
1787 streams: None,
1788 access_tokens: None,
1789 op_group_perms: Some(op_group_perms),
1790 ops: HashSet::default(),
1791 }
1792 }
1793
1794 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1796 Self {
1797 ops: ops.into_iter().collect(),
1798 ..self
1799 }
1800 }
1801
1802 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1804 Self {
1805 op_group_perms: Some(op_group_perms),
1806 ..self
1807 }
1808 }
1809
1810 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1814 Self {
1815 basins: Some(basins),
1816 ..self
1817 }
1818 }
1819
1820 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1824 Self {
1825 streams: Some(streams),
1826 ..self
1827 }
1828 }
1829
1830 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1834 Self {
1835 access_tokens: Some(access_tokens),
1836 ..self
1837 }
1838 }
1839}
1840
1841#[derive(Debug, Clone)]
1842#[non_exhaustive]
1843pub struct AccessTokenScope {
1845 pub basins: Option<BasinMatcher>,
1847 pub streams: Option<StreamMatcher>,
1849 pub access_tokens: Option<AccessTokenMatcher>,
1851 pub op_group_perms: Option<OperationGroupPermissions>,
1853 pub ops: HashSet<Operation>,
1855}
1856
1857impl From<api::access::AccessTokenScope> for AccessTokenScope {
1858 fn from(value: api::access::AccessTokenScope) -> Self {
1859 Self {
1860 basins: value.basins.map(|rs| match rs {
1861 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1862 BasinMatcher::Exact(e)
1863 }
1864 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1865 BasinMatcher::None
1866 }
1867 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1868 }),
1869 streams: value.streams.map(|rs| match rs {
1870 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1871 StreamMatcher::Exact(e)
1872 }
1873 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1874 StreamMatcher::None
1875 }
1876 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1877 }),
1878 access_tokens: value.access_tokens.map(|rs| match rs {
1879 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1880 AccessTokenMatcher::Exact(e)
1881 }
1882 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1883 AccessTokenMatcher::None
1884 }
1885 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1886 }),
1887 op_group_perms: value.op_groups.map(Into::into),
1888 ops: value
1889 .ops
1890 .map(|ops| ops.into_iter().map(Into::into).collect())
1891 .unwrap_or_default(),
1892 }
1893 }
1894}
1895
1896impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1897 fn from(value: AccessTokenScopeInput) -> Self {
1898 Self {
1899 basins: value.basins.map(|rs| match rs {
1900 BasinMatcher::None => {
1901 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1902 }
1903 BasinMatcher::Exact(e) => {
1904 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1905 }
1906 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1907 }),
1908 streams: value.streams.map(|rs| match rs {
1909 StreamMatcher::None => {
1910 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1911 }
1912 StreamMatcher::Exact(e) => {
1913 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1914 }
1915 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1916 }),
1917 access_tokens: value.access_tokens.map(|rs| match rs {
1918 AccessTokenMatcher::None => {
1919 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1920 }
1921 AccessTokenMatcher::Exact(e) => {
1922 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1923 }
1924 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1925 }),
1926 op_groups: value.op_group_perms.map(Into::into),
1927 ops: if value.ops.is_empty() {
1928 None
1929 } else {
1930 Some(value.ops.into_iter().map(Into::into).collect())
1931 },
1932 }
1933 }
1934}
1935
1936#[derive(Debug, Clone)]
1937#[non_exhaustive]
1938pub struct IssueAccessTokenInput {
1940 pub id: AccessTokenId,
1942 pub expires_at: Option<S2DateTime>,
1947 pub auto_prefix_streams: bool,
1955 pub scope: AccessTokenScopeInput,
1957}
1958
1959impl IssueAccessTokenInput {
1960 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1962 Self {
1963 id,
1964 expires_at: None,
1965 auto_prefix_streams: false,
1966 scope,
1967 }
1968 }
1969
1970 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1972 Self {
1973 expires_at: Some(expires_at),
1974 ..self
1975 }
1976 }
1977
1978 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1981 Self {
1982 auto_prefix_streams,
1983 ..self
1984 }
1985 }
1986}
1987
1988impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1989 fn from(value: IssueAccessTokenInput) -> Self {
1990 Self {
1991 id: value.id,
1992 expires_at: value.expires_at.map(Into::into),
1993 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1994 scope: value.scope.into(),
1995 }
1996 }
1997}
1998
1999#[derive(Debug, Clone, Copy)]
2000pub enum TimeseriesInterval {
2002 Minute,
2004 Hour,
2006 Day,
2008}
2009
2010impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2011 fn from(value: TimeseriesInterval) -> Self {
2012 match value {
2013 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2014 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2015 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2016 }
2017 }
2018}
2019
2020impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2021 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2022 match value {
2023 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2024 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2025 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2026 }
2027 }
2028}
2029
2030#[derive(Debug, Clone, Copy)]
2031#[non_exhaustive]
2032pub struct TimeRange {
2034 pub start: u32,
2036 pub end: u32,
2038}
2039
2040impl TimeRange {
2041 pub fn new(start: u32, end: u32) -> Self {
2043 Self { start, end }
2044 }
2045}
2046
2047#[derive(Debug, Clone, Copy)]
2048#[non_exhaustive]
2049pub struct TimeRangeAndInterval {
2051 pub start: u32,
2053 pub end: u32,
2055 pub interval: Option<TimeseriesInterval>,
2059}
2060
2061impl TimeRangeAndInterval {
2062 pub fn new(start: u32, end: u32) -> Self {
2064 Self {
2065 start,
2066 end,
2067 interval: None,
2068 }
2069 }
2070
2071 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2073 Self {
2074 interval: Some(interval),
2075 ..self
2076 }
2077 }
2078}
2079
2080#[derive(Debug, Clone, Copy)]
2081pub enum AccountMetricSet {
2083 ActiveBasins(TimeRange),
2086 AccountOps(TimeRangeAndInterval),
2093}
2094
2095#[derive(Debug, Clone)]
2096#[non_exhaustive]
2097pub struct GetAccountMetricsInput {
2099 pub set: AccountMetricSet,
2101}
2102
2103impl GetAccountMetricsInput {
2104 pub fn new(set: AccountMetricSet) -> Self {
2106 Self { set }
2107 }
2108}
2109
2110impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2111 fn from(value: GetAccountMetricsInput) -> Self {
2112 let (set, start, end, interval) = match value.set {
2113 AccountMetricSet::ActiveBasins(args) => (
2114 api::metrics::AccountMetricSet::ActiveBasins,
2115 args.start,
2116 args.end,
2117 None,
2118 ),
2119 AccountMetricSet::AccountOps(args) => (
2120 api::metrics::AccountMetricSet::AccountOps,
2121 args.start,
2122 args.end,
2123 args.interval,
2124 ),
2125 };
2126 Self {
2127 set,
2128 start: Some(start),
2129 end: Some(end),
2130 interval: interval.map(Into::into),
2131 }
2132 }
2133}
2134
2135#[derive(Debug, Clone, Copy)]
2136pub enum BasinMetricSet {
2138 Storage(TimeRange),
2141 AppendOps(TimeRangeAndInterval),
2149 ReadOps(TimeRangeAndInterval),
2157 ReadThroughput(TimeRangeAndInterval),
2164 AppendThroughput(TimeRangeAndInterval),
2171 BasinOps(TimeRangeAndInterval),
2178}
2179
2180#[derive(Debug, Clone)]
2181#[non_exhaustive]
2182pub struct GetBasinMetricsInput {
2184 pub name: BasinName,
2186 pub set: BasinMetricSet,
2188}
2189
2190impl GetBasinMetricsInput {
2191 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2193 Self { name, set }
2194 }
2195}
2196
2197impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2198 fn from(value: GetBasinMetricsInput) -> Self {
2199 let (set, start, end, interval) = match value.set {
2200 BasinMetricSet::Storage(args) => (
2201 api::metrics::BasinMetricSet::Storage,
2202 args.start,
2203 args.end,
2204 None,
2205 ),
2206 BasinMetricSet::AppendOps(args) => (
2207 api::metrics::BasinMetricSet::AppendOps,
2208 args.start,
2209 args.end,
2210 args.interval,
2211 ),
2212 BasinMetricSet::ReadOps(args) => (
2213 api::metrics::BasinMetricSet::ReadOps,
2214 args.start,
2215 args.end,
2216 args.interval,
2217 ),
2218 BasinMetricSet::ReadThroughput(args) => (
2219 api::metrics::BasinMetricSet::ReadThroughput,
2220 args.start,
2221 args.end,
2222 args.interval,
2223 ),
2224 BasinMetricSet::AppendThroughput(args) => (
2225 api::metrics::BasinMetricSet::AppendThroughput,
2226 args.start,
2227 args.end,
2228 args.interval,
2229 ),
2230 BasinMetricSet::BasinOps(args) => (
2231 api::metrics::BasinMetricSet::BasinOps,
2232 args.start,
2233 args.end,
2234 args.interval,
2235 ),
2236 };
2237 (
2238 value.name,
2239 api::metrics::BasinMetricSetRequest {
2240 set,
2241 start: Some(start),
2242 end: Some(end),
2243 interval: interval.map(Into::into),
2244 },
2245 )
2246 }
2247}
2248
2249#[derive(Debug, Clone, Copy)]
2250pub enum StreamMetricSet {
2252 Storage(TimeRange),
2255}
2256
2257#[derive(Debug, Clone)]
2258#[non_exhaustive]
2259pub struct GetStreamMetricsInput {
2261 pub basin_name: BasinName,
2263 pub stream_name: StreamName,
2265 pub set: StreamMetricSet,
2267}
2268
2269impl GetStreamMetricsInput {
2270 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2273 Self {
2274 basin_name,
2275 stream_name,
2276 set,
2277 }
2278 }
2279}
2280
2281impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2282 fn from(value: GetStreamMetricsInput) -> Self {
2283 let (set, start, end, interval) = match value.set {
2284 StreamMetricSet::Storage(args) => (
2285 api::metrics::StreamMetricSet::Storage,
2286 args.start,
2287 args.end,
2288 None,
2289 ),
2290 };
2291 (
2292 value.basin_name,
2293 value.stream_name,
2294 api::metrics::StreamMetricSetRequest {
2295 set,
2296 start: Some(start),
2297 end: Some(end),
2298 interval,
2299 },
2300 )
2301 }
2302}
2303
2304#[derive(Debug, Clone, Copy)]
2305pub enum MetricUnit {
2307 Bytes,
2309 Operations,
2311}
2312
2313impl From<api::metrics::MetricUnit> for MetricUnit {
2314 fn from(value: api::metrics::MetricUnit) -> Self {
2315 match value {
2316 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2317 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2318 }
2319 }
2320}
2321
2322#[derive(Debug, Clone)]
2323#[non_exhaustive]
2324pub struct ScalarMetric {
2326 pub name: String,
2328 pub unit: MetricUnit,
2330 pub value: f64,
2332}
2333
2334#[derive(Debug, Clone)]
2335#[non_exhaustive]
2336pub struct AccumulationMetric {
2339 pub name: String,
2341 pub unit: MetricUnit,
2343 pub interval: TimeseriesInterval,
2345 pub values: Vec<(u32, f64)>,
2349}
2350
2351#[derive(Debug, Clone)]
2352#[non_exhaustive]
2353pub struct GaugeMetric {
2355 pub name: String,
2357 pub unit: MetricUnit,
2359 pub values: Vec<(u32, f64)>,
2362}
2363
2364#[derive(Debug, Clone)]
2365#[non_exhaustive]
2366pub struct LabelMetric {
2368 pub name: String,
2370 pub values: Vec<String>,
2372}
2373
2374#[derive(Debug, Clone)]
2375pub enum Metric {
2377 Scalar(ScalarMetric),
2379 Accumulation(AccumulationMetric),
2382 Gauge(GaugeMetric),
2384 Label(LabelMetric),
2386}
2387
2388impl From<api::metrics::Metric> for Metric {
2389 fn from(value: api::metrics::Metric) -> Self {
2390 match value {
2391 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2392 name: sm.name.into(),
2393 unit: sm.unit.into(),
2394 value: sm.value,
2395 }),
2396 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2397 name: am.name.into(),
2398 unit: am.unit.into(),
2399 interval: am.interval.into(),
2400 values: am.values,
2401 }),
2402 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2403 name: gm.name.into(),
2404 unit: gm.unit.into(),
2405 values: gm.values,
2406 }),
2407 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2408 name: lm.name.into(),
2409 values: lm.values,
2410 }),
2411 }
2412 }
2413}
2414
2415#[derive(Debug, Clone, Default)]
2416#[non_exhaustive]
2417pub struct ListStreamsInput {
2419 pub prefix: StreamNamePrefix,
2423 pub start_after: StreamNameStartAfter,
2429 pub limit: Option<usize>,
2433}
2434
2435impl ListStreamsInput {
2436 pub fn new() -> Self {
2438 Self::default()
2439 }
2440
2441 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2443 Self { prefix, ..self }
2444 }
2445
2446 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2449 Self {
2450 start_after,
2451 ..self
2452 }
2453 }
2454
2455 pub fn with_limit(self, limit: usize) -> Self {
2457 Self {
2458 limit: Some(limit),
2459 ..self
2460 }
2461 }
2462}
2463
2464impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2465 fn from(value: ListStreamsInput) -> Self {
2466 Self {
2467 prefix: Some(value.prefix),
2468 start_after: Some(value.start_after),
2469 limit: value.limit,
2470 }
2471 }
2472}
2473
2474#[derive(Debug, Clone, Default)]
2475pub struct ListAllStreamsInput {
2477 pub prefix: StreamNamePrefix,
2481 pub start_after: StreamNameStartAfter,
2487 pub include_deleted: bool,
2491}
2492
2493impl ListAllStreamsInput {
2494 pub fn new() -> Self {
2496 Self::default()
2497 }
2498
2499 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2501 Self { prefix, ..self }
2502 }
2503
2504 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2507 Self {
2508 start_after,
2509 ..self
2510 }
2511 }
2512
2513 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2515 Self {
2516 include_deleted,
2517 ..self
2518 }
2519 }
2520}
2521
2522#[derive(Debug, Clone, PartialEq)]
2523#[non_exhaustive]
2524pub struct StreamInfo {
2526 pub name: StreamName,
2528 pub created_at: S2DateTime,
2530 pub deleted_at: Option<S2DateTime>,
2532}
2533
2534impl From<api::stream::StreamInfo> for StreamInfo {
2535 fn from(value: api::stream::StreamInfo) -> Self {
2536 Self {
2537 name: value.name,
2538 created_at: value.created_at.into(),
2539 deleted_at: value.deleted_at.map(Into::into),
2540 }
2541 }
2542}
2543
2544#[derive(Debug, Clone)]
2545#[non_exhaustive]
2546pub struct CreateStreamInput {
2548 pub name: StreamName,
2550 pub config: Option<StreamConfig>,
2554 pub idempotency_token: String,
2562}
2563
2564impl CreateStreamInput {
2565 pub fn new(name: StreamName) -> Self {
2567 Self {
2568 name,
2569 config: None,
2570 idempotency_token: idempotency_token(),
2571 }
2572 }
2573
2574 pub fn with_config(self, config: StreamConfig) -> Self {
2576 Self {
2577 config: Some(config),
2578 ..self
2579 }
2580 }
2581
2582 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2584 Self {
2585 idempotency_token: idempotency_token.into(),
2586 ..self
2587 }
2588 }
2589}
2590
2591impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2592 fn from(value: CreateStreamInput) -> Self {
2593 (
2594 api::stream::CreateStreamRequest {
2595 stream: value.name,
2596 config: value.config.map(Into::into),
2597 },
2598 value.idempotency_token,
2599 )
2600 }
2601}
2602
2603#[derive(Debug, Clone)]
2604#[non_exhaustive]
2605pub struct DeleteStreamInput {
2607 pub name: StreamName,
2609 pub ignore_not_found: bool,
2611}
2612
2613impl DeleteStreamInput {
2614 pub fn new(name: StreamName) -> Self {
2616 Self {
2617 name,
2618 ignore_not_found: false,
2619 }
2620 }
2621
2622 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2624 Self {
2625 ignore_not_found,
2626 ..self
2627 }
2628 }
2629}
2630
2631#[derive(Debug, Clone)]
2632#[non_exhaustive]
2633pub struct ReconfigureStreamInput {
2635 pub name: StreamName,
2637 pub config: StreamReconfiguration,
2639}
2640
2641impl ReconfigureStreamInput {
2642 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2644 Self { name, config }
2645 }
2646}
2647
2648#[derive(Debug, Clone, PartialEq, Eq)]
2649pub struct FencingToken(String);
2655
2656impl FencingToken {
2657 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2659 rand::rng()
2660 .sample_iter(&rand::distr::Alphanumeric)
2661 .take(n)
2662 .map(char::from)
2663 .collect::<String>()
2664 .parse()
2665 }
2666}
2667
2668impl FromStr for FencingToken {
2669 type Err = ValidationError;
2670
2671 fn from_str(s: &str) -> Result<Self, Self::Err> {
2672 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2673 return Err(ValidationError(format!(
2674 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2675 )));
2676 }
2677 Ok(FencingToken(s.to_string()))
2678 }
2679}
2680
2681impl std::fmt::Display for FencingToken {
2682 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2683 write!(f, "{}", self.0)
2684 }
2685}
2686
2687impl Deref for FencingToken {
2688 type Target = str;
2689
2690 fn deref(&self) -> &Self::Target {
2691 &self.0
2692 }
2693}
2694
2695#[derive(Debug, Clone, Copy, PartialEq)]
2696#[non_exhaustive]
2697pub struct StreamPosition {
2699 pub seq_num: u64,
2701 pub timestamp: u64,
2704}
2705
2706impl std::fmt::Display for StreamPosition {
2707 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2708 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2709 }
2710}
2711
2712impl From<api::stream::proto::StreamPosition> for StreamPosition {
2713 fn from(value: api::stream::proto::StreamPosition) -> Self {
2714 Self {
2715 seq_num: value.seq_num,
2716 timestamp: value.timestamp,
2717 }
2718 }
2719}
2720
2721impl From<api::stream::StreamPosition> for StreamPosition {
2722 fn from(value: api::stream::StreamPosition) -> Self {
2723 Self {
2724 seq_num: value.seq_num,
2725 timestamp: value.timestamp,
2726 }
2727 }
2728}
2729
2730#[derive(Debug, Clone, PartialEq)]
2731#[non_exhaustive]
2732pub struct Header {
2734 pub name: Bytes,
2736 pub value: Bytes,
2738}
2739
2740impl Header {
2741 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2743 Self {
2744 name: name.into(),
2745 value: value.into(),
2746 }
2747 }
2748}
2749
2750impl From<Header> for api::stream::proto::Header {
2751 fn from(value: Header) -> Self {
2752 Self {
2753 name: value.name,
2754 value: value.value,
2755 }
2756 }
2757}
2758
2759impl From<api::stream::proto::Header> for Header {
2760 fn from(value: api::stream::proto::Header) -> Self {
2761 Self {
2762 name: value.name,
2763 value: value.value,
2764 }
2765 }
2766}
2767
2768#[derive(Debug, Clone, PartialEq)]
2769pub struct AppendRecord {
2771 body: Bytes,
2772 headers: Vec<Header>,
2773 timestamp: Option<u64>,
2774}
2775
2776impl AppendRecord {
2777 fn validate(self) -> Result<Self, ValidationError> {
2778 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2779 Err(ValidationError(format!(
2780 "metered_bytes: {} exceeds {}",
2781 self.metered_bytes(),
2782 RECORD_BATCH_MAX.bytes
2783 )))
2784 } else {
2785 Ok(self)
2786 }
2787 }
2788
2789 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2791 let record = Self {
2792 body: body.into(),
2793 headers: Vec::default(),
2794 timestamp: None,
2795 };
2796 record.validate()
2797 }
2798
2799 pub fn with_headers(
2801 self,
2802 headers: impl IntoIterator<Item = Header>,
2803 ) -> Result<Self, ValidationError> {
2804 let record = Self {
2805 headers: headers.into_iter().collect(),
2806 ..self
2807 };
2808 record.validate()
2809 }
2810
2811 pub fn with_timestamp(self, timestamp: u64) -> Self {
2815 Self {
2816 timestamp: Some(timestamp),
2817 ..self
2818 }
2819 }
2820}
2821
2822impl From<AppendRecord> for api::stream::proto::AppendRecord {
2823 fn from(value: AppendRecord) -> Self {
2824 Self {
2825 timestamp: value.timestamp,
2826 headers: value.headers.into_iter().map(Into::into).collect(),
2827 body: value.body,
2828 }
2829 }
2830}
2831
2832pub trait MeteredBytes {
2839 fn metered_bytes(&self) -> usize;
2841}
2842
2843macro_rules! metered_bytes_impl {
2844 ($ty:ty) => {
2845 impl MeteredBytes for $ty {
2846 fn metered_bytes(&self) -> usize {
2847 8 + (2 * self.headers.len())
2848 + self
2849 .headers
2850 .iter()
2851 .map(|h| h.name.len() + h.value.len())
2852 .sum::<usize>()
2853 + self.body.len()
2854 }
2855 }
2856 };
2857}
2858
2859metered_bytes_impl!(AppendRecord);
2860
2861#[derive(Debug, Clone)]
2862pub struct AppendRecordBatch {
2871 records: Vec<AppendRecord>,
2872 metered_bytes: usize,
2873}
2874
2875impl AppendRecordBatch {
2876 pub(crate) fn with_capacity(capacity: usize) -> Self {
2877 Self {
2878 records: Vec::with_capacity(capacity),
2879 metered_bytes: 0,
2880 }
2881 }
2882
2883 pub(crate) fn push(&mut self, record: AppendRecord) {
2884 self.metered_bytes += record.metered_bytes();
2885 self.records.push(record);
2886 }
2887
2888 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2890 where
2891 I: IntoIterator<Item = AppendRecord>,
2892 {
2893 let mut records = Vec::new();
2894 let mut metered_bytes = 0;
2895
2896 for record in iter {
2897 metered_bytes += record.metered_bytes();
2898 records.push(record);
2899
2900 if metered_bytes > RECORD_BATCH_MAX.bytes {
2901 return Err(ValidationError(format!(
2902 "batch size in metered bytes ({metered_bytes}) exceeds {}",
2903 RECORD_BATCH_MAX.bytes
2904 )));
2905 }
2906
2907 if records.len() > RECORD_BATCH_MAX.count {
2908 return Err(ValidationError(format!(
2909 "number of records in the batch exceeds {}",
2910 RECORD_BATCH_MAX.count
2911 )));
2912 }
2913 }
2914
2915 if records.is_empty() {
2916 return Err(ValidationError("batch is empty".into()));
2917 }
2918
2919 Ok(Self {
2920 records,
2921 metered_bytes,
2922 })
2923 }
2924}
2925
2926impl Deref for AppendRecordBatch {
2927 type Target = [AppendRecord];
2928
2929 fn deref(&self) -> &Self::Target {
2930 &self.records
2931 }
2932}
2933
2934impl MeteredBytes for AppendRecordBatch {
2935 fn metered_bytes(&self) -> usize {
2936 self.metered_bytes
2937 }
2938}
2939
2940#[derive(Debug, Clone)]
2941pub enum Command {
2943 Fence {
2945 fencing_token: FencingToken,
2947 },
2948 Trim {
2950 trim_point: u64,
2952 },
2953}
2954
2955#[derive(Debug, Clone)]
2956#[non_exhaustive]
2957pub struct CommandRecord {
2961 pub command: Command,
2963 pub timestamp: Option<u64>,
2965}
2966
2967impl CommandRecord {
2968 const FENCE: &[u8] = b"fence";
2969 const TRIM: &[u8] = b"trim";
2970
2971 pub fn fence(fencing_token: FencingToken) -> Self {
2976 Self {
2977 command: Command::Fence { fencing_token },
2978 timestamp: None,
2979 }
2980 }
2981
2982 pub fn trim(trim_point: u64) -> Self {
2989 Self {
2990 command: Command::Trim { trim_point },
2991 timestamp: None,
2992 }
2993 }
2994
2995 pub fn with_timestamp(self, timestamp: u64) -> Self {
2997 Self {
2998 timestamp: Some(timestamp),
2999 ..self
3000 }
3001 }
3002}
3003
3004impl From<CommandRecord> for AppendRecord {
3005 fn from(value: CommandRecord) -> Self {
3006 let (header_value, body) = match value.command {
3007 Command::Fence { fencing_token } => (
3008 CommandRecord::FENCE,
3009 Bytes::copy_from_slice(fencing_token.as_bytes()),
3010 ),
3011 Command::Trim { trim_point } => (
3012 CommandRecord::TRIM,
3013 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3014 ),
3015 };
3016 Self {
3017 body,
3018 headers: vec![Header::new("", header_value)],
3019 timestamp: value.timestamp,
3020 }
3021 }
3022}
3023
3024#[derive(Debug, Clone)]
3025#[non_exhaustive]
3026pub struct AppendInput {
3029 pub records: AppendRecordBatch,
3031 pub match_seq_num: Option<u64>,
3035 pub fencing_token: Option<FencingToken>,
3040}
3041
3042impl AppendInput {
3043 pub fn new(records: AppendRecordBatch) -> Self {
3045 Self {
3046 records,
3047 match_seq_num: None,
3048 fencing_token: None,
3049 }
3050 }
3051
3052 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3054 Self {
3055 match_seq_num: Some(match_seq_num),
3056 ..self
3057 }
3058 }
3059
3060 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3062 Self {
3063 fencing_token: Some(fencing_token),
3064 ..self
3065 }
3066 }
3067}
3068
3069impl From<AppendInput> for api::stream::proto::AppendInput {
3070 fn from(value: AppendInput) -> Self {
3071 Self {
3072 records: value.records.iter().cloned().map(Into::into).collect(),
3073 match_seq_num: value.match_seq_num,
3074 fencing_token: value.fencing_token.map(|t| t.to_string()),
3075 }
3076 }
3077}
3078
3079#[derive(Debug, Clone, PartialEq)]
3080#[non_exhaustive]
3081pub struct AppendAck {
3083 pub start: StreamPosition,
3085 pub end: StreamPosition,
3091 pub tail: StreamPosition,
3096}
3097
3098impl From<api::stream::proto::AppendAck> for AppendAck {
3099 fn from(value: api::stream::proto::AppendAck) -> Self {
3100 Self {
3101 start: value.start.unwrap_or_default().into(),
3102 end: value.end.unwrap_or_default().into(),
3103 tail: value.tail.unwrap_or_default().into(),
3104 }
3105 }
3106}
3107
3108#[derive(Debug, Clone, Copy)]
3109pub enum ReadFrom {
3111 SeqNum(u64),
3113 Timestamp(u64),
3115 TailOffset(u64),
3117}
3118
3119impl Default for ReadFrom {
3120 fn default() -> Self {
3121 Self::SeqNum(0)
3122 }
3123}
3124
3125#[derive(Debug, Default, Clone)]
3126#[non_exhaustive]
3127pub struct ReadStart {
3129 pub from: ReadFrom,
3133 pub clamp_to_tail: bool,
3137}
3138
3139impl ReadStart {
3140 pub fn new() -> Self {
3142 Self::default()
3143 }
3144
3145 pub fn with_from(self, from: ReadFrom) -> Self {
3147 Self { from, ..self }
3148 }
3149
3150 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3152 Self {
3153 clamp_to_tail,
3154 ..self
3155 }
3156 }
3157}
3158
3159impl From<ReadStart> for api::stream::ReadStart {
3160 fn from(value: ReadStart) -> Self {
3161 let (seq_num, timestamp, tail_offset) = match value.from {
3162 ReadFrom::SeqNum(n) => (Some(n), None, None),
3163 ReadFrom::Timestamp(t) => (None, Some(t), None),
3164 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3165 };
3166 Self {
3167 seq_num,
3168 timestamp,
3169 tail_offset,
3170 clamp: if value.clamp_to_tail {
3171 Some(true)
3172 } else {
3173 None
3174 },
3175 }
3176 }
3177}
3178
3179#[derive(Debug, Clone, Default)]
3180#[non_exhaustive]
3181pub struct ReadLimits {
3183 pub count: Option<usize>,
3187 pub bytes: Option<usize>,
3191}
3192
3193impl ReadLimits {
3194 pub fn new() -> Self {
3196 Self::default()
3197 }
3198
3199 pub fn with_count(self, count: usize) -> Self {
3201 Self {
3202 count: Some(count),
3203 ..self
3204 }
3205 }
3206
3207 pub fn with_bytes(self, bytes: usize) -> Self {
3209 Self {
3210 bytes: Some(bytes),
3211 ..self
3212 }
3213 }
3214}
3215
3216#[derive(Debug, Clone, Default)]
3217#[non_exhaustive]
3218pub struct ReadStop {
3220 pub limits: ReadLimits,
3222 pub until: Option<RangeTo<u64>>,
3224 pub wait: Option<u32>,
3234}
3235
3236impl ReadStop {
3237 pub fn new() -> Self {
3239 Self::default()
3240 }
3241
3242 pub fn with_limits(self, limits: ReadLimits) -> Self {
3244 Self { limits, ..self }
3245 }
3246
3247 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3249 Self {
3250 until: Some(until),
3251 ..self
3252 }
3253 }
3254
3255 pub fn with_wait(self, wait: u32) -> Self {
3257 Self {
3258 wait: Some(wait),
3259 ..self
3260 }
3261 }
3262}
3263
3264impl From<ReadStop> for api::stream::ReadEnd {
3265 fn from(value: ReadStop) -> Self {
3266 Self {
3267 count: value.limits.count,
3268 bytes: value.limits.bytes,
3269 until: value.until.map(|r| r.end),
3270 wait: value.wait,
3271 }
3272 }
3273}
3274
3275#[derive(Debug, Clone, Default)]
3276#[non_exhaustive]
3277pub struct ReadInput {
3280 pub start: ReadStart,
3282 pub stop: ReadStop,
3284 pub ignore_command_records: bool,
3288}
3289
3290impl ReadInput {
3291 pub fn new() -> Self {
3293 Self::default()
3294 }
3295
3296 pub fn with_start(self, start: ReadStart) -> Self {
3298 Self { start, ..self }
3299 }
3300
3301 pub fn with_stop(self, stop: ReadStop) -> Self {
3303 Self { stop, ..self }
3304 }
3305
3306 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3308 Self {
3309 ignore_command_records,
3310 ..self
3311 }
3312 }
3313}
3314
3315#[derive(Debug, Clone)]
3316#[non_exhaustive]
3317pub struct SequencedRecord {
3319 pub seq_num: u64,
3321 pub body: Bytes,
3323 pub headers: Vec<Header>,
3325 pub timestamp: u64,
3327}
3328
3329impl SequencedRecord {
3330 pub fn is_command_record(&self) -> bool {
3332 self.headers.len() == 1 && *self.headers[0].name == *b""
3333 }
3334}
3335
3336impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3337 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3338 Self {
3339 seq_num: value.seq_num,
3340 body: value.body,
3341 headers: value.headers.into_iter().map(Into::into).collect(),
3342 timestamp: value.timestamp,
3343 }
3344 }
3345}
3346
3347metered_bytes_impl!(SequencedRecord);
3348
3349#[derive(Debug, Clone)]
3350#[non_exhaustive]
3351pub struct ReadBatch {
3354 pub records: Vec<SequencedRecord>,
3361 pub tail: Option<StreamPosition>,
3366}
3367
3368impl ReadBatch {
3369 pub(crate) fn from_api(
3370 batch: api::stream::proto::ReadBatch,
3371 ignore_command_records: bool,
3372 ) -> Self {
3373 Self {
3374 records: batch
3375 .records
3376 .into_iter()
3377 .map(Into::into)
3378 .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3379 .collect(),
3380 tail: batch.tail.map(Into::into),
3381 }
3382 }
3383}
3384
3385pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3387
3388#[derive(Debug, Clone, thiserror::Error)]
3389pub enum AppendConditionFailed {
3391 #[error("fencing token mismatch, expected: {0}")]
3392 FencingTokenMismatch(FencingToken),
3394 #[error("sequence number mismatch, expected: {0}")]
3395 SeqNumMismatch(u64),
3397}
3398
3399impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3400 fn from(value: api::stream::AppendConditionFailed) -> Self {
3401 match value {
3402 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3403 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3404 }
3405 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3406 AppendConditionFailed::SeqNumMismatch(seq)
3407 }
3408 }
3409 }
3410}
3411
3412#[derive(Debug, Clone, thiserror::Error)]
3413pub enum S2Error {
3415 #[error("{0}")]
3416 Client(String),
3418 #[error(transparent)]
3419 Validation(#[from] ValidationError),
3421 #[error("{0}")]
3422 AppendConditionFailed(AppendConditionFailed),
3424 #[error("read from an unwritten position. current tail: {0}")]
3425 ReadUnwritten(StreamPosition),
3427 #[error("{0}")]
3428 Server(ErrorResponse),
3430}
3431
3432impl From<ApiError> for S2Error {
3433 fn from(err: ApiError) -> Self {
3434 match err {
3435 ApiError::ReadUnwritten(tail_response) => {
3436 Self::ReadUnwritten(tail_response.tail.into())
3437 }
3438 ApiError::AppendConditionFailed(condition_failed) => {
3439 Self::AppendConditionFailed(condition_failed.into())
3440 }
3441 ApiError::Server(_, response) => Self::Server(response.into()),
3442 other => Self::Client(other.to_string()),
3443 }
3444 }
3445}
3446
3447#[derive(Debug, Clone, thiserror::Error)]
3448#[error("{code}: {message}")]
3449#[non_exhaustive]
3450pub struct ErrorResponse {
3452 pub code: String,
3454 pub message: String,
3456}
3457
3458impl From<ApiErrorResponse> for ErrorResponse {
3459 fn from(response: ApiErrorResponse) -> Self {
3460 Self {
3461 code: response.code,
3462 message: response.message,
3463 }
3464 }
3465}
3466
3467fn idempotency_token() -> String {
3468 uuid::Uuid::new_v4().simple().to_string()
3469}