1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::stream::StreamName;
45pub use s2_common::types::stream::StreamNamePrefix;
47pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66 fn from(dt: time::OffsetDateTime) -> Self {
67 Self(dt)
68 }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72 fn from(dt: S2DateTime) -> Self {
73 dt.0
74 }
75}
76
77impl FromStr for S2DateTime {
78 type Err = ValidationError;
79
80 fn from_str(s: &str) -> Result<Self, Self::Err> {
81 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82 .map(Self)
83 .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84 }
85}
86
87impl fmt::Display for S2DateTime {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 write!(
90 f,
91 "{}",
92 self.0
93 .format(&time::format_description::well_known::Rfc3339)
94 .expect("RFC3339 formatting should not fail for S2DateTime")
95 )
96 }
97}
98
99#[derive(Debug, Clone, PartialEq)]
101pub(crate) enum BasinAuthority {
102 ParentZone(Authority),
104 Direct(Authority),
106}
107
108#[derive(Debug, Clone)]
110pub struct AccountEndpoint {
111 scheme: Scheme,
112 authority: Authority,
113}
114
115impl AccountEndpoint {
116 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
118 endpoint.parse()
119 }
120}
121
122impl FromStr for AccountEndpoint {
123 type Err = ValidationError;
124
125 fn from_str(s: &str) -> Result<Self, Self::Err> {
126 let (scheme, authority) = match s.find("://") {
127 Some(idx) => {
128 let scheme: Scheme = s[..idx]
129 .parse()
130 .map_err(|_| "invalid account endpoint scheme".to_string())?;
131 (scheme, &s[idx + 3..])
132 }
133 None => (Scheme::HTTPS, s),
134 };
135 Ok(Self {
136 scheme,
137 authority: authority
138 .parse()
139 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
140 })
141 }
142}
143
144#[derive(Debug, Clone)]
146pub struct BasinEndpoint {
147 scheme: Scheme,
148 authority: BasinAuthority,
149}
150
151impl BasinEndpoint {
152 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
154 endpoint.parse()
155 }
156}
157
158impl FromStr for BasinEndpoint {
159 type Err = ValidationError;
160
161 fn from_str(s: &str) -> Result<Self, Self::Err> {
162 let (scheme, authority) = match s.find("://") {
163 Some(idx) => {
164 let scheme: Scheme = s[..idx]
165 .parse()
166 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
167 (scheme, &s[idx + 3..])
168 }
169 None => (Scheme::HTTPS, s),
170 };
171 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
172 BasinAuthority::ParentZone(
173 authority
174 .parse()
175 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
176 )
177 } else {
178 BasinAuthority::Direct(
179 authority
180 .parse()
181 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
182 )
183 };
184 Ok(Self { scheme, authority })
185 }
186}
187
188#[derive(Debug, Clone)]
189#[non_exhaustive]
190pub struct S2Endpoints {
192 pub(crate) scheme: Scheme,
193 pub(crate) account_authority: Authority,
194 pub(crate) basin_authority: BasinAuthority,
195}
196
197impl S2Endpoints {
198 pub fn new(
200 account_endpoint: AccountEndpoint,
201 basin_endpoint: BasinEndpoint,
202 ) -> Result<Self, ValidationError> {
203 if account_endpoint.scheme != basin_endpoint.scheme {
204 return Err("account and basin endpoints must have the same scheme".into());
205 }
206 Ok(Self {
207 scheme: account_endpoint.scheme,
208 account_authority: account_endpoint.authority,
209 basin_authority: basin_endpoint.authority,
210 })
211 }
212
213 pub fn from_env() -> Result<Self, ValidationError> {
219 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
220 Ok(endpoint) => endpoint.parse()?,
221 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
222 Err(VarError::NotUnicode(_)) => {
223 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
224 }
225 };
226
227 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
228 Ok(endpoint) => endpoint.parse()?,
229 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
230 Err(VarError::NotUnicode(_)) => {
231 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
232 }
233 };
234
235 if account_endpoint.scheme != basin_endpoint.scheme {
236 return Err(
237 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
238 );
239 }
240
241 Ok(Self {
242 scheme: account_endpoint.scheme,
243 account_authority: account_endpoint.authority,
244 basin_authority: basin_endpoint.authority,
245 })
246 }
247
248 pub(crate) fn for_aws() -> Self {
249 Self {
250 scheme: Scheme::HTTPS,
251 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
252 basin_authority: BasinAuthority::ParentZone(
253 "b.aws.s2.dev".try_into().expect("valid authority"),
254 ),
255 }
256 }
257}
258
259#[derive(Debug, Clone, Copy)]
260pub enum Compression {
262 None,
264 Gzip,
266 Zstd,
268}
269
270impl From<Compression> for CompressionAlgorithm {
271 fn from(value: Compression) -> Self {
272 match value {
273 Compression::None => CompressionAlgorithm::None,
274 Compression::Gzip => CompressionAlgorithm::Gzip,
275 Compression::Zstd => CompressionAlgorithm::Zstd,
276 }
277 }
278}
279
280#[derive(Debug, Clone, Copy, PartialEq)]
281#[non_exhaustive]
282pub enum AppendRetryPolicy {
285 All,
287 NoSideEffects,
289}
290
291impl AppendRetryPolicy {
292 pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
293 match self {
294 Self::All => true,
295 Self::NoSideEffects => input.match_seq_num.is_some(),
296 }
297 }
298}
299
300#[derive(Debug, Clone)]
301#[non_exhaustive]
302pub struct RetryConfig {
311 pub max_attempts: NonZeroU32,
315 pub min_base_delay: Duration,
319 pub max_base_delay: Duration,
323 pub append_retry_policy: AppendRetryPolicy,
328}
329
330impl Default for RetryConfig {
331 fn default() -> Self {
332 Self {
333 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
334 min_base_delay: Duration::from_millis(100),
335 max_base_delay: Duration::from_secs(1),
336 append_retry_policy: AppendRetryPolicy::All,
337 }
338 }
339}
340
341impl RetryConfig {
342 pub fn new() -> Self {
344 Self::default()
345 }
346
347 pub(crate) fn max_retries(&self) -> u32 {
348 self.max_attempts.get() - 1
349 }
350
351 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
353 Self {
354 max_attempts,
355 ..self
356 }
357 }
358
359 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
361 Self {
362 min_base_delay,
363 ..self
364 }
365 }
366
367 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
369 Self {
370 max_base_delay,
371 ..self
372 }
373 }
374
375 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
378 Self {
379 append_retry_policy,
380 ..self
381 }
382 }
383}
384
385#[derive(Debug, Clone)]
386#[non_exhaustive]
387pub struct S2Config {
389 pub(crate) access_token: SecretString,
390 pub(crate) endpoints: S2Endpoints,
391 pub(crate) connection_timeout: Duration,
392 pub(crate) request_timeout: Duration,
393 pub(crate) retry: RetryConfig,
394 pub(crate) compression: Compression,
395 pub(crate) user_agent: HeaderValue,
396}
397
398impl S2Config {
399 pub fn new(access_token: impl Into<String>) -> Self {
401 Self {
402 access_token: access_token.into().into(),
403 endpoints: S2Endpoints::for_aws(),
404 connection_timeout: Duration::from_secs(3),
405 request_timeout: Duration::from_secs(5),
406 retry: RetryConfig::new(),
407 compression: Compression::None,
408 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
409 .parse()
410 .expect("valid user agent"),
411 }
412 }
413
414 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
416 Self { endpoints, ..self }
417 }
418
419 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
423 Self {
424 connection_timeout,
425 ..self
426 }
427 }
428
429 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
433 Self {
434 request_timeout,
435 ..self
436 }
437 }
438
439 pub fn with_retry(self, retry: RetryConfig) -> Self {
443 Self { retry, ..self }
444 }
445
446 pub fn with_compression(self, compression: Compression) -> Self {
450 Self {
451 compression,
452 ..self
453 }
454 }
455
456 #[doc(hidden)]
457 #[cfg(feature = "_hidden")]
458 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
459 let user_agent = user_agent
460 .into()
461 .parse()
462 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
463 Ok(Self { user_agent, ..self })
464 }
465}
466
467#[derive(Debug, Default, Clone, PartialEq, Eq)]
468#[non_exhaustive]
469pub struct Page<T> {
471 pub values: Vec<T>,
473 pub has_more: bool,
475}
476
477impl<T> Page<T> {
478 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
479 Self {
480 values: values.into(),
481 has_more,
482 }
483 }
484}
485
486#[derive(Debug, Clone, Copy, PartialEq, Eq)]
487pub enum StorageClass {
489 Standard,
491 Express,
493}
494
495impl From<api::config::StorageClass> for StorageClass {
496 fn from(value: api::config::StorageClass) -> Self {
497 match value {
498 api::config::StorageClass::Standard => StorageClass::Standard,
499 api::config::StorageClass::Express => StorageClass::Express,
500 }
501 }
502}
503
504impl From<StorageClass> for api::config::StorageClass {
505 fn from(value: StorageClass) -> Self {
506 match value {
507 StorageClass::Standard => api::config::StorageClass::Standard,
508 StorageClass::Express => api::config::StorageClass::Express,
509 }
510 }
511}
512
513#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514pub enum RetentionPolicy {
516 Age(u64),
518 Infinite,
520}
521
522impl From<api::config::RetentionPolicy> for RetentionPolicy {
523 fn from(value: api::config::RetentionPolicy) -> Self {
524 match value {
525 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
526 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
527 }
528 }
529}
530
531impl From<RetentionPolicy> for api::config::RetentionPolicy {
532 fn from(value: RetentionPolicy) -> Self {
533 match value {
534 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
535 RetentionPolicy::Infinite => {
536 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
537 }
538 }
539 }
540}
541
542#[derive(Debug, Clone, Copy, PartialEq, Eq)]
543pub enum TimestampingMode {
545 ClientPrefer,
547 ClientRequire,
549 Arrival,
551}
552
553impl From<api::config::TimestampingMode> for TimestampingMode {
554 fn from(value: api::config::TimestampingMode) -> Self {
555 match value {
556 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
557 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
558 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
559 }
560 }
561}
562
563impl From<TimestampingMode> for api::config::TimestampingMode {
564 fn from(value: TimestampingMode) -> Self {
565 match value {
566 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
567 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
568 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
569 }
570 }
571}
572
573#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
574#[non_exhaustive]
575pub struct TimestampingConfig {
577 pub mode: Option<TimestampingMode>,
581 pub uncapped: bool,
585}
586
587impl TimestampingConfig {
588 pub fn new() -> Self {
590 Self::default()
591 }
592
593 pub fn with_mode(self, mode: TimestampingMode) -> Self {
595 Self {
596 mode: Some(mode),
597 ..self
598 }
599 }
600
601 pub fn with_uncapped(self, uncapped: bool) -> Self {
603 Self { uncapped, ..self }
604 }
605}
606
607impl From<api::config::TimestampingConfig> for TimestampingConfig {
608 fn from(value: api::config::TimestampingConfig) -> Self {
609 Self {
610 mode: value.mode.map(Into::into),
611 uncapped: value.uncapped.unwrap_or_default(),
612 }
613 }
614}
615
616impl From<TimestampingConfig> for api::config::TimestampingConfig {
617 fn from(value: TimestampingConfig) -> Self {
618 Self {
619 mode: value.mode.map(Into::into),
620 uncapped: Some(value.uncapped),
621 }
622 }
623}
624
625#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
626#[non_exhaustive]
627pub struct DeleteOnEmptyConfig {
629 pub min_age_secs: u64,
633}
634
635impl DeleteOnEmptyConfig {
636 pub fn new() -> Self {
638 Self::default()
639 }
640
641 pub fn with_min_age(self, min_age: Duration) -> Self {
643 Self {
644 min_age_secs: min_age.as_secs(),
645 }
646 }
647}
648
649impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
650 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
651 Self {
652 min_age_secs: value.min_age_secs,
653 }
654 }
655}
656
657impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
658 fn from(value: DeleteOnEmptyConfig) -> Self {
659 Self {
660 min_age_secs: value.min_age_secs,
661 }
662 }
663}
664
665#[derive(Debug, Clone, Default, PartialEq, Eq)]
666#[non_exhaustive]
667pub struct StreamConfig {
669 pub storage_class: Option<StorageClass>,
673 pub retention_policy: Option<RetentionPolicy>,
677 pub timestamping: Option<TimestampingConfig>,
681 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
685}
686
687impl StreamConfig {
688 pub fn new() -> Self {
690 Self::default()
691 }
692
693 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
695 Self {
696 storage_class: Some(storage_class),
697 ..self
698 }
699 }
700
701 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
703 Self {
704 retention_policy: Some(retention_policy),
705 ..self
706 }
707 }
708
709 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
711 Self {
712 timestamping: Some(timestamping),
713 ..self
714 }
715 }
716
717 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
719 Self {
720 delete_on_empty: Some(delete_on_empty),
721 ..self
722 }
723 }
724}
725
726impl From<api::config::StreamConfig> for StreamConfig {
727 fn from(value: api::config::StreamConfig) -> Self {
728 Self {
729 storage_class: value.storage_class.map(Into::into),
730 retention_policy: value.retention_policy.map(Into::into),
731 timestamping: value.timestamping.map(Into::into),
732 delete_on_empty: value.delete_on_empty.map(Into::into),
733 }
734 }
735}
736
737impl From<StreamConfig> for api::config::StreamConfig {
738 fn from(value: StreamConfig) -> Self {
739 Self {
740 storage_class: value.storage_class.map(Into::into),
741 retention_policy: value.retention_policy.map(Into::into),
742 timestamping: value.timestamping.map(Into::into),
743 delete_on_empty: value.delete_on_empty.map(Into::into),
744 }
745 }
746}
747
748#[derive(Debug, Clone, Default)]
749#[non_exhaustive]
750pub struct BasinConfig {
752 pub default_stream_config: Option<StreamConfig>,
756 pub create_stream_on_append: bool,
760 pub create_stream_on_read: bool,
764}
765
766impl BasinConfig {
767 pub fn new() -> Self {
769 Self::default()
770 }
771
772 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
774 Self {
775 default_stream_config: Some(config),
776 ..self
777 }
778 }
779
780 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
783 Self {
784 create_stream_on_append,
785 ..self
786 }
787 }
788
789 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
791 Self {
792 create_stream_on_read,
793 ..self
794 }
795 }
796}
797
798impl From<api::config::BasinConfig> for BasinConfig {
799 fn from(value: api::config::BasinConfig) -> Self {
800 Self {
801 default_stream_config: value.default_stream_config.map(Into::into),
802 create_stream_on_append: value.create_stream_on_append,
803 create_stream_on_read: value.create_stream_on_read,
804 }
805 }
806}
807
808impl From<BasinConfig> for api::config::BasinConfig {
809 fn from(value: BasinConfig) -> Self {
810 Self {
811 default_stream_config: value.default_stream_config.map(Into::into),
812 create_stream_on_append: value.create_stream_on_append,
813 create_stream_on_read: value.create_stream_on_read,
814 }
815 }
816}
817
818#[derive(Debug, Clone, PartialEq, Eq)]
819pub enum BasinScope {
821 AwsUsEast1,
823}
824
825impl From<api::basin::BasinScope> for BasinScope {
826 fn from(value: api::basin::BasinScope) -> Self {
827 match value {
828 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
829 }
830 }
831}
832
833impl From<BasinScope> for api::basin::BasinScope {
834 fn from(value: BasinScope) -> Self {
835 match value {
836 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
837 }
838 }
839}
840
841#[derive(Debug, Clone)]
842#[non_exhaustive]
843pub struct CreateBasinInput {
845 pub name: BasinName,
847 pub config: Option<BasinConfig>,
851 pub scope: Option<BasinScope>,
855 pub idempotency_token: String,
863}
864
865impl CreateBasinInput {
866 pub fn new(name: BasinName) -> Self {
868 Self {
869 name,
870 config: None,
871 scope: None,
872 idempotency_token: idempotency_token(),
873 }
874 }
875
876 pub fn with_config(self, config: BasinConfig) -> Self {
878 Self {
879 config: Some(config),
880 ..self
881 }
882 }
883
884 pub fn with_scope(self, scope: BasinScope) -> Self {
886 Self {
887 scope: Some(scope),
888 ..self
889 }
890 }
891
892 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
894 Self {
895 idempotency_token: idempotency_token.into(),
896 ..self
897 }
898 }
899}
900
901impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
902 fn from(value: CreateBasinInput) -> Self {
903 (
904 api::basin::CreateBasinRequest {
905 basin: value.name,
906 config: value.config.map(Into::into),
907 scope: value.scope.map(Into::into),
908 },
909 value.idempotency_token,
910 )
911 }
912}
913
914#[derive(Debug, Clone, Default)]
915#[non_exhaustive]
916pub struct ListBasinsInput {
918 pub prefix: BasinNamePrefix,
922 pub start_after: BasinNameStartAfter,
928 pub limit: Option<usize>,
932}
933
934impl ListBasinsInput {
935 pub fn new() -> Self {
937 Self::default()
938 }
939
940 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
942 Self { prefix, ..self }
943 }
944
945 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
948 Self {
949 start_after,
950 ..self
951 }
952 }
953
954 pub fn with_limit(self, limit: usize) -> Self {
956 Self {
957 limit: Some(limit),
958 ..self
959 }
960 }
961}
962
963impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
964 fn from(value: ListBasinsInput) -> Self {
965 Self {
966 prefix: Some(value.prefix),
967 start_after: Some(value.start_after),
968 limit: value.limit,
969 }
970 }
971}
972
973#[derive(Debug, Clone, Default)]
974pub struct ListAllBasinsInput {
976 pub prefix: BasinNamePrefix,
980 pub start_after: BasinNameStartAfter,
986 pub include_deleted: bool,
990}
991
992impl ListAllBasinsInput {
993 pub fn new() -> Self {
995 Self::default()
996 }
997
998 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1000 Self { prefix, ..self }
1001 }
1002
1003 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1006 Self {
1007 start_after,
1008 ..self
1009 }
1010 }
1011
1012 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1014 Self {
1015 include_deleted,
1016 ..self
1017 }
1018 }
1019}
1020
1021#[derive(Debug, Clone, PartialEq, Eq)]
1022pub enum BasinState {
1024 Active,
1026 Creating,
1028 Deleting,
1030}
1031
1032impl From<api::basin::BasinState> for BasinState {
1033 fn from(value: api::basin::BasinState) -> Self {
1034 match value {
1035 api::basin::BasinState::Active => BasinState::Active,
1036 api::basin::BasinState::Creating => BasinState::Creating,
1037 api::basin::BasinState::Deleting => BasinState::Deleting,
1038 }
1039 }
1040}
1041
1042#[derive(Debug, Clone, PartialEq, Eq)]
1043#[non_exhaustive]
1044pub struct BasinInfo {
1046 pub name: BasinName,
1048 pub scope: Option<BasinScope>,
1050 pub state: BasinState,
1052}
1053
1054impl From<api::basin::BasinInfo> for BasinInfo {
1055 fn from(value: api::basin::BasinInfo) -> Self {
1056 Self {
1057 name: value.name,
1058 scope: value.scope.map(Into::into),
1059 state: value.state.into(),
1060 }
1061 }
1062}
1063
1064#[derive(Debug, Clone)]
1065#[non_exhaustive]
1066pub struct DeleteBasinInput {
1068 pub name: BasinName,
1070 pub ignore_not_found: bool,
1072}
1073
1074impl DeleteBasinInput {
1075 pub fn new(name: BasinName) -> Self {
1077 Self {
1078 name,
1079 ignore_not_found: false,
1080 }
1081 }
1082
1083 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1085 Self {
1086 ignore_not_found,
1087 ..self
1088 }
1089 }
1090}
1091
1092#[derive(Debug, Clone, Default)]
1093#[non_exhaustive]
1094pub struct TimestampingReconfiguration {
1096 pub mode: Maybe<Option<TimestampingMode>>,
1098 pub uncapped: Maybe<Option<bool>>,
1100}
1101
1102impl TimestampingReconfiguration {
1103 pub fn new() -> Self {
1105 Self::default()
1106 }
1107
1108 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1110 Self {
1111 mode: Maybe::Specified(Some(mode)),
1112 ..self
1113 }
1114 }
1115
1116 pub fn with_uncapped(self, uncapped: bool) -> Self {
1118 Self {
1119 uncapped: Maybe::Specified(Some(uncapped)),
1120 ..self
1121 }
1122 }
1123}
1124
1125impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1126 fn from(value: TimestampingReconfiguration) -> Self {
1127 Self {
1128 mode: value.mode.map(|m| m.map(Into::into)),
1129 uncapped: value.uncapped,
1130 }
1131 }
1132}
1133
1134#[derive(Debug, Clone, Default)]
1135#[non_exhaustive]
1136pub struct DeleteOnEmptyReconfiguration {
1138 pub min_age_secs: Maybe<Option<u64>>,
1140}
1141
1142impl DeleteOnEmptyReconfiguration {
1143 pub fn new() -> Self {
1145 Self::default()
1146 }
1147
1148 pub fn with_min_age(self, min_age: Duration) -> Self {
1150 Self {
1151 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1152 }
1153 }
1154}
1155
1156impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1157 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1158 Self {
1159 min_age_secs: value.min_age_secs,
1160 }
1161 }
1162}
1163
1164#[derive(Debug, Clone, Default)]
1165#[non_exhaustive]
1166pub struct StreamReconfiguration {
1168 pub storage_class: Maybe<Option<StorageClass>>,
1170 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1172 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1174 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1176}
1177
1178impl StreamReconfiguration {
1179 pub fn new() -> Self {
1181 Self::default()
1182 }
1183
1184 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1186 Self {
1187 storage_class: Maybe::Specified(Some(storage_class)),
1188 ..self
1189 }
1190 }
1191
1192 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1194 Self {
1195 retention_policy: Maybe::Specified(Some(retention_policy)),
1196 ..self
1197 }
1198 }
1199
1200 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1202 Self {
1203 timestamping: Maybe::Specified(Some(timestamping)),
1204 ..self
1205 }
1206 }
1207
1208 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1210 Self {
1211 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1212 ..self
1213 }
1214 }
1215}
1216
1217impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1218 fn from(value: StreamReconfiguration) -> Self {
1219 Self {
1220 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1221 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1222 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1223 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1224 }
1225 }
1226}
1227
1228#[derive(Debug, Clone, Default)]
1229#[non_exhaustive]
1230pub struct BasinReconfiguration {
1232 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1234 pub create_stream_on_append: Maybe<bool>,
1237 pub create_stream_on_read: Maybe<bool>,
1239}
1240
1241impl BasinReconfiguration {
1242 pub fn new() -> Self {
1244 Self::default()
1245 }
1246
1247 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1250 Self {
1251 default_stream_config: Maybe::Specified(Some(config)),
1252 ..self
1253 }
1254 }
1255
1256 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1259 Self {
1260 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1261 ..self
1262 }
1263 }
1264
1265 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1268 Self {
1269 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1270 ..self
1271 }
1272 }
1273}
1274
1275impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1276 fn from(value: BasinReconfiguration) -> Self {
1277 Self {
1278 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1279 create_stream_on_append: value.create_stream_on_append,
1280 create_stream_on_read: value.create_stream_on_read,
1281 }
1282 }
1283}
1284
1285#[derive(Debug, Clone)]
1286#[non_exhaustive]
1287pub struct ReconfigureBasinInput {
1289 pub name: BasinName,
1291 pub config: BasinReconfiguration,
1293}
1294
1295impl ReconfigureBasinInput {
1296 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1298 Self { name, config }
1299 }
1300}
1301
1302#[derive(Debug, Clone, Default)]
1303#[non_exhaustive]
1304pub struct ListAccessTokensInput {
1306 pub prefix: AccessTokenIdPrefix,
1310 pub start_after: AccessTokenIdStartAfter,
1316 pub limit: Option<usize>,
1320}
1321
1322impl ListAccessTokensInput {
1323 pub fn new() -> Self {
1325 Self::default()
1326 }
1327
1328 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1330 Self { prefix, ..self }
1331 }
1332
1333 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1336 Self {
1337 start_after,
1338 ..self
1339 }
1340 }
1341
1342 pub fn with_limit(self, limit: usize) -> Self {
1344 Self {
1345 limit: Some(limit),
1346 ..self
1347 }
1348 }
1349}
1350
1351impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1352 fn from(value: ListAccessTokensInput) -> Self {
1353 Self {
1354 prefix: Some(value.prefix),
1355 start_after: Some(value.start_after),
1356 limit: value.limit,
1357 }
1358 }
1359}
1360
1361#[derive(Debug, Clone, Default)]
1362pub struct ListAllAccessTokensInput {
1364 pub prefix: AccessTokenIdPrefix,
1368 pub start_after: AccessTokenIdStartAfter,
1374}
1375
1376impl ListAllAccessTokensInput {
1377 pub fn new() -> Self {
1379 Self::default()
1380 }
1381
1382 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1384 Self { prefix, ..self }
1385 }
1386
1387 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1390 Self {
1391 start_after,
1392 ..self
1393 }
1394 }
1395}
1396
1397#[derive(Debug, Clone)]
1398#[non_exhaustive]
1399pub struct AccessTokenInfo {
1401 pub id: AccessTokenId,
1403 pub expires_at: S2DateTime,
1405 pub auto_prefix_streams: bool,
1408 pub scope: AccessTokenScope,
1410}
1411
1412impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1413 type Error = ValidationError;
1414
1415 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1416 let expires_at = value
1417 .expires_at
1418 .map(Into::into)
1419 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1420 Ok(Self {
1421 id: value.id,
1422 expires_at,
1423 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1424 scope: value.scope.into(),
1425 })
1426 }
1427}
1428
1429#[derive(Debug, Clone)]
1430pub enum BasinMatcher {
1434 None,
1436 Exact(BasinName),
1438 Prefix(BasinNamePrefix),
1440}
1441
1442#[derive(Debug, Clone)]
1443pub enum StreamMatcher {
1447 None,
1449 Exact(StreamName),
1451 Prefix(StreamNamePrefix),
1453}
1454
1455#[derive(Debug, Clone)]
1456pub enum AccessTokenMatcher {
1460 None,
1462 Exact(AccessTokenId),
1464 Prefix(AccessTokenIdPrefix),
1466}
1467
1468#[derive(Debug, Clone, Default)]
1469#[non_exhaustive]
1470pub struct ReadWritePermissions {
1472 pub read: bool,
1476 pub write: bool,
1480}
1481
1482impl ReadWritePermissions {
1483 pub fn new() -> Self {
1485 Self::default()
1486 }
1487
1488 pub fn read_only() -> Self {
1490 Self {
1491 read: true,
1492 write: false,
1493 }
1494 }
1495
1496 pub fn write_only() -> Self {
1498 Self {
1499 read: false,
1500 write: true,
1501 }
1502 }
1503
1504 pub fn read_write() -> Self {
1506 Self {
1507 read: true,
1508 write: true,
1509 }
1510 }
1511}
1512
1513impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1514 fn from(value: ReadWritePermissions) -> Self {
1515 Self {
1516 read: Some(value.read),
1517 write: Some(value.write),
1518 }
1519 }
1520}
1521
1522impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1523 fn from(value: api::access::ReadWritePermissions) -> Self {
1524 Self {
1525 read: value.read.unwrap_or_default(),
1526 write: value.write.unwrap_or_default(),
1527 }
1528 }
1529}
1530
1531#[derive(Debug, Clone, Default)]
1532#[non_exhaustive]
1533pub struct OperationGroupPermissions {
1537 pub account: Option<ReadWritePermissions>,
1541 pub basin: Option<ReadWritePermissions>,
1545 pub stream: Option<ReadWritePermissions>,
1549}
1550
1551impl OperationGroupPermissions {
1552 pub fn new() -> Self {
1554 Self::default()
1555 }
1556
1557 pub fn read_only_all() -> Self {
1559 Self {
1560 account: Some(ReadWritePermissions::read_only()),
1561 basin: Some(ReadWritePermissions::read_only()),
1562 stream: Some(ReadWritePermissions::read_only()),
1563 }
1564 }
1565
1566 pub fn write_only_all() -> Self {
1568 Self {
1569 account: Some(ReadWritePermissions::write_only()),
1570 basin: Some(ReadWritePermissions::write_only()),
1571 stream: Some(ReadWritePermissions::write_only()),
1572 }
1573 }
1574
1575 pub fn read_write_all() -> Self {
1577 Self {
1578 account: Some(ReadWritePermissions::read_write()),
1579 basin: Some(ReadWritePermissions::read_write()),
1580 stream: Some(ReadWritePermissions::read_write()),
1581 }
1582 }
1583
1584 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1586 Self {
1587 account: Some(account),
1588 ..self
1589 }
1590 }
1591
1592 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1594 Self {
1595 basin: Some(basin),
1596 ..self
1597 }
1598 }
1599
1600 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1602 Self {
1603 stream: Some(stream),
1604 ..self
1605 }
1606 }
1607}
1608
1609impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1610 fn from(value: OperationGroupPermissions) -> Self {
1611 Self {
1612 account: value.account.map(Into::into),
1613 basin: value.basin.map(Into::into),
1614 stream: value.stream.map(Into::into),
1615 }
1616 }
1617}
1618
1619impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1620 fn from(value: api::access::PermittedOperationGroups) -> Self {
1621 Self {
1622 account: value.account.map(Into::into),
1623 basin: value.basin.map(Into::into),
1624 stream: value.stream.map(Into::into),
1625 }
1626 }
1627}
1628
1629#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1630pub enum Operation {
1634 ListBasins,
1636 CreateBasin,
1638 GetBasinConfig,
1640 DeleteBasin,
1642 ReconfigureBasin,
1644 ListAccessTokens,
1646 IssueAccessToken,
1648 RevokeAccessToken,
1650 GetAccountMetrics,
1652 GetBasinMetrics,
1654 GetStreamMetrics,
1656 ListStreams,
1658 CreateStream,
1660 GetStreamConfig,
1662 DeleteStream,
1664 ReconfigureStream,
1666 CheckTail,
1668 Append,
1670 Read,
1672 Trim,
1674 Fence,
1676}
1677
1678impl From<Operation> for api::access::Operation {
1679 fn from(value: Operation) -> Self {
1680 match value {
1681 Operation::ListBasins => api::access::Operation::ListBasins,
1682 Operation::CreateBasin => api::access::Operation::CreateBasin,
1683 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1684 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1685 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1686 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1687 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1688 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1689 Operation::ListStreams => api::access::Operation::ListStreams,
1690 Operation::CreateStream => api::access::Operation::CreateStream,
1691 Operation::DeleteStream => api::access::Operation::DeleteStream,
1692 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1693 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1694 Operation::CheckTail => api::access::Operation::CheckTail,
1695 Operation::Append => api::access::Operation::Append,
1696 Operation::Read => api::access::Operation::Read,
1697 Operation::Trim => api::access::Operation::Trim,
1698 Operation::Fence => api::access::Operation::Fence,
1699 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1700 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1701 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1702 }
1703 }
1704}
1705
1706impl From<api::access::Operation> for Operation {
1707 fn from(value: api::access::Operation) -> Self {
1708 match value {
1709 api::access::Operation::ListBasins => Operation::ListBasins,
1710 api::access::Operation::CreateBasin => Operation::CreateBasin,
1711 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1712 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1713 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1714 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1715 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1716 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1717 api::access::Operation::ListStreams => Operation::ListStreams,
1718 api::access::Operation::CreateStream => Operation::CreateStream,
1719 api::access::Operation::DeleteStream => Operation::DeleteStream,
1720 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1721 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1722 api::access::Operation::CheckTail => Operation::CheckTail,
1723 api::access::Operation::Append => Operation::Append,
1724 api::access::Operation::Read => Operation::Read,
1725 api::access::Operation::Trim => Operation::Trim,
1726 api::access::Operation::Fence => Operation::Fence,
1727 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1728 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1729 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1730 }
1731 }
1732}
1733
1734#[derive(Debug, Clone)]
1735#[non_exhaustive]
1736pub struct AccessTokenScopeInput {
1744 basins: Option<BasinMatcher>,
1745 streams: Option<StreamMatcher>,
1746 access_tokens: Option<AccessTokenMatcher>,
1747 op_group_perms: Option<OperationGroupPermissions>,
1748 ops: HashSet<Operation>,
1749}
1750
1751impl AccessTokenScopeInput {
1752 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1754 Self {
1755 basins: None,
1756 streams: None,
1757 access_tokens: None,
1758 op_group_perms: None,
1759 ops: ops.into_iter().collect(),
1760 }
1761 }
1762
1763 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1765 Self {
1766 basins: None,
1767 streams: None,
1768 access_tokens: None,
1769 op_group_perms: Some(op_group_perms),
1770 ops: HashSet::default(),
1771 }
1772 }
1773
1774 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1776 Self {
1777 ops: ops.into_iter().collect(),
1778 ..self
1779 }
1780 }
1781
1782 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1784 Self {
1785 op_group_perms: Some(op_group_perms),
1786 ..self
1787 }
1788 }
1789
1790 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1794 Self {
1795 basins: Some(basins),
1796 ..self
1797 }
1798 }
1799
1800 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1804 Self {
1805 streams: Some(streams),
1806 ..self
1807 }
1808 }
1809
1810 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1814 Self {
1815 access_tokens: Some(access_tokens),
1816 ..self
1817 }
1818 }
1819}
1820
1821#[derive(Debug, Clone)]
1822#[non_exhaustive]
1823pub struct AccessTokenScope {
1825 pub basins: Option<BasinMatcher>,
1827 pub streams: Option<StreamMatcher>,
1829 pub access_tokens: Option<AccessTokenMatcher>,
1831 pub op_group_perms: Option<OperationGroupPermissions>,
1833 pub ops: HashSet<Operation>,
1835}
1836
1837impl From<api::access::AccessTokenScope> for AccessTokenScope {
1838 fn from(value: api::access::AccessTokenScope) -> Self {
1839 Self {
1840 basins: value.basins.map(|rs| match rs {
1841 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1842 BasinMatcher::Exact(e)
1843 }
1844 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1845 BasinMatcher::None
1846 }
1847 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1848 }),
1849 streams: value.streams.map(|rs| match rs {
1850 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1851 StreamMatcher::Exact(e)
1852 }
1853 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1854 StreamMatcher::None
1855 }
1856 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1857 }),
1858 access_tokens: value.access_tokens.map(|rs| match rs {
1859 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1860 AccessTokenMatcher::Exact(e)
1861 }
1862 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1863 AccessTokenMatcher::None
1864 }
1865 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1866 }),
1867 op_group_perms: value.op_groups.map(Into::into),
1868 ops: value
1869 .ops
1870 .map(|ops| ops.into_iter().map(Into::into).collect())
1871 .unwrap_or_default(),
1872 }
1873 }
1874}
1875
1876impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1877 fn from(value: AccessTokenScopeInput) -> Self {
1878 Self {
1879 basins: value.basins.map(|rs| match rs {
1880 BasinMatcher::None => {
1881 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1882 }
1883 BasinMatcher::Exact(e) => {
1884 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1885 }
1886 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1887 }),
1888 streams: value.streams.map(|rs| match rs {
1889 StreamMatcher::None => {
1890 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1891 }
1892 StreamMatcher::Exact(e) => {
1893 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1894 }
1895 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1896 }),
1897 access_tokens: value.access_tokens.map(|rs| match rs {
1898 AccessTokenMatcher::None => {
1899 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1900 }
1901 AccessTokenMatcher::Exact(e) => {
1902 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1903 }
1904 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1905 }),
1906 op_groups: value.op_group_perms.map(Into::into),
1907 ops: if value.ops.is_empty() {
1908 None
1909 } else {
1910 Some(value.ops.into_iter().map(Into::into).collect())
1911 },
1912 }
1913 }
1914}
1915
1916#[derive(Debug, Clone)]
1917#[non_exhaustive]
1918pub struct IssueAccessTokenInput {
1920 pub id: AccessTokenId,
1922 pub expires_at: Option<S2DateTime>,
1927 pub auto_prefix_streams: bool,
1935 pub scope: AccessTokenScopeInput,
1937}
1938
1939impl IssueAccessTokenInput {
1940 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1942 Self {
1943 id,
1944 expires_at: None,
1945 auto_prefix_streams: false,
1946 scope,
1947 }
1948 }
1949
1950 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1952 Self {
1953 expires_at: Some(expires_at),
1954 ..self
1955 }
1956 }
1957
1958 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1961 Self {
1962 auto_prefix_streams,
1963 ..self
1964 }
1965 }
1966}
1967
1968impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1969 fn from(value: IssueAccessTokenInput) -> Self {
1970 Self {
1971 id: value.id,
1972 expires_at: value.expires_at.map(Into::into),
1973 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1974 scope: value.scope.into(),
1975 }
1976 }
1977}
1978
1979#[derive(Debug, Clone, Copy)]
1980pub enum TimeseriesInterval {
1982 Minute,
1984 Hour,
1986 Day,
1988}
1989
1990impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
1991 fn from(value: TimeseriesInterval) -> Self {
1992 match value {
1993 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
1994 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
1995 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
1996 }
1997 }
1998}
1999
2000impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2001 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2002 match value {
2003 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2004 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2005 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2006 }
2007 }
2008}
2009
2010#[derive(Debug, Clone, Copy)]
2011#[non_exhaustive]
2012pub struct TimeRange {
2014 pub start: u32,
2016 pub end: u32,
2018}
2019
2020impl TimeRange {
2021 pub fn new(start: u32, end: u32) -> Self {
2023 Self { start, end }
2024 }
2025}
2026
2027#[derive(Debug, Clone, Copy)]
2028#[non_exhaustive]
2029pub struct TimeRangeAndInterval {
2031 pub start: u32,
2033 pub end: u32,
2035 pub interval: Option<TimeseriesInterval>,
2039}
2040
2041impl TimeRangeAndInterval {
2042 pub fn new(start: u32, end: u32) -> Self {
2044 Self {
2045 start,
2046 end,
2047 interval: None,
2048 }
2049 }
2050
2051 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2053 Self {
2054 interval: Some(interval),
2055 ..self
2056 }
2057 }
2058}
2059
2060#[derive(Debug, Clone, Copy)]
2061pub enum AccountMetricSet {
2063 ActiveBasins(TimeRange),
2066 AccountOps(TimeRangeAndInterval),
2073}
2074
2075#[derive(Debug, Clone)]
2076#[non_exhaustive]
2077pub struct GetAccountMetricsInput {
2079 pub set: AccountMetricSet,
2081}
2082
2083impl GetAccountMetricsInput {
2084 pub fn new(set: AccountMetricSet) -> Self {
2086 Self { set }
2087 }
2088}
2089
2090impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2091 fn from(value: GetAccountMetricsInput) -> Self {
2092 let (set, start, end, interval) = match value.set {
2093 AccountMetricSet::ActiveBasins(args) => (
2094 api::metrics::AccountMetricSet::ActiveBasins,
2095 args.start,
2096 args.end,
2097 None,
2098 ),
2099 AccountMetricSet::AccountOps(args) => (
2100 api::metrics::AccountMetricSet::AccountOps,
2101 args.start,
2102 args.end,
2103 args.interval,
2104 ),
2105 };
2106 Self {
2107 set,
2108 start: Some(start),
2109 end: Some(end),
2110 interval: interval.map(Into::into),
2111 }
2112 }
2113}
2114
2115#[derive(Debug, Clone, Copy)]
2116pub enum BasinMetricSet {
2118 Storage(TimeRange),
2121 AppendOps(TimeRangeAndInterval),
2129 ReadOps(TimeRangeAndInterval),
2137 ReadThroughput(TimeRangeAndInterval),
2144 AppendThroughput(TimeRangeAndInterval),
2151 BasinOps(TimeRangeAndInterval),
2158}
2159
2160#[derive(Debug, Clone)]
2161#[non_exhaustive]
2162pub struct GetBasinMetricsInput {
2164 pub name: BasinName,
2166 pub set: BasinMetricSet,
2168}
2169
2170impl GetBasinMetricsInput {
2171 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2173 Self { name, set }
2174 }
2175}
2176
2177impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2178 fn from(value: GetBasinMetricsInput) -> Self {
2179 let (set, start, end, interval) = match value.set {
2180 BasinMetricSet::Storage(args) => (
2181 api::metrics::BasinMetricSet::Storage,
2182 args.start,
2183 args.end,
2184 None,
2185 ),
2186 BasinMetricSet::AppendOps(args) => (
2187 api::metrics::BasinMetricSet::AppendOps,
2188 args.start,
2189 args.end,
2190 args.interval,
2191 ),
2192 BasinMetricSet::ReadOps(args) => (
2193 api::metrics::BasinMetricSet::ReadOps,
2194 args.start,
2195 args.end,
2196 args.interval,
2197 ),
2198 BasinMetricSet::ReadThroughput(args) => (
2199 api::metrics::BasinMetricSet::ReadThroughput,
2200 args.start,
2201 args.end,
2202 args.interval,
2203 ),
2204 BasinMetricSet::AppendThroughput(args) => (
2205 api::metrics::BasinMetricSet::AppendThroughput,
2206 args.start,
2207 args.end,
2208 args.interval,
2209 ),
2210 BasinMetricSet::BasinOps(args) => (
2211 api::metrics::BasinMetricSet::BasinOps,
2212 args.start,
2213 args.end,
2214 args.interval,
2215 ),
2216 };
2217 (
2218 value.name,
2219 api::metrics::BasinMetricSetRequest {
2220 set,
2221 start: Some(start),
2222 end: Some(end),
2223 interval: interval.map(Into::into),
2224 },
2225 )
2226 }
2227}
2228
2229#[derive(Debug, Clone, Copy)]
2230pub enum StreamMetricSet {
2232 Storage(TimeRange),
2235}
2236
2237#[derive(Debug, Clone)]
2238#[non_exhaustive]
2239pub struct GetStreamMetricsInput {
2241 pub basin_name: BasinName,
2243 pub stream_name: StreamName,
2245 pub set: StreamMetricSet,
2247}
2248
2249impl GetStreamMetricsInput {
2250 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2253 Self {
2254 basin_name,
2255 stream_name,
2256 set,
2257 }
2258 }
2259}
2260
2261impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2262 fn from(value: GetStreamMetricsInput) -> Self {
2263 let (set, start, end, interval) = match value.set {
2264 StreamMetricSet::Storage(args) => (
2265 api::metrics::StreamMetricSet::Storage,
2266 args.start,
2267 args.end,
2268 None,
2269 ),
2270 };
2271 (
2272 value.basin_name,
2273 value.stream_name,
2274 api::metrics::StreamMetricSetRequest {
2275 set,
2276 start: Some(start),
2277 end: Some(end),
2278 interval,
2279 },
2280 )
2281 }
2282}
2283
2284#[derive(Debug, Clone, Copy)]
2285pub enum MetricUnit {
2287 Bytes,
2289 Operations,
2291}
2292
2293impl From<api::metrics::MetricUnit> for MetricUnit {
2294 fn from(value: api::metrics::MetricUnit) -> Self {
2295 match value {
2296 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2297 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2298 }
2299 }
2300}
2301
2302#[derive(Debug, Clone)]
2303#[non_exhaustive]
2304pub struct ScalarMetric {
2306 pub name: String,
2308 pub unit: MetricUnit,
2310 pub value: f64,
2312}
2313
2314#[derive(Debug, Clone)]
2315#[non_exhaustive]
2316pub struct AccumulationMetric {
2319 pub name: String,
2321 pub unit: MetricUnit,
2323 pub interval: TimeseriesInterval,
2325 pub values: Vec<(u32, f64)>,
2329}
2330
2331#[derive(Debug, Clone)]
2332#[non_exhaustive]
2333pub struct GaugeMetric {
2335 pub name: String,
2337 pub unit: MetricUnit,
2339 pub values: Vec<(u32, f64)>,
2342}
2343
2344#[derive(Debug, Clone)]
2345#[non_exhaustive]
2346pub struct LabelMetric {
2348 pub name: String,
2350 pub values: Vec<String>,
2352}
2353
2354#[derive(Debug, Clone)]
2355pub enum Metric {
2357 Scalar(ScalarMetric),
2359 Accumulation(AccumulationMetric),
2362 Gauge(GaugeMetric),
2364 Label(LabelMetric),
2366}
2367
2368impl From<api::metrics::Metric> for Metric {
2369 fn from(value: api::metrics::Metric) -> Self {
2370 match value {
2371 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2372 name: sm.name.into(),
2373 unit: sm.unit.into(),
2374 value: sm.value,
2375 }),
2376 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2377 name: am.name.into(),
2378 unit: am.unit.into(),
2379 interval: am.interval.into(),
2380 values: am.values,
2381 }),
2382 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2383 name: gm.name.into(),
2384 unit: gm.unit.into(),
2385 values: gm.values,
2386 }),
2387 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2388 name: lm.name.into(),
2389 values: lm.values,
2390 }),
2391 }
2392 }
2393}
2394
2395#[derive(Debug, Clone, Default)]
2396#[non_exhaustive]
2397pub struct ListStreamsInput {
2399 pub prefix: StreamNamePrefix,
2403 pub start_after: StreamNameStartAfter,
2409 pub limit: Option<usize>,
2413}
2414
2415impl ListStreamsInput {
2416 pub fn new() -> Self {
2418 Self::default()
2419 }
2420
2421 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2423 Self { prefix, ..self }
2424 }
2425
2426 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2429 Self {
2430 start_after,
2431 ..self
2432 }
2433 }
2434
2435 pub fn with_limit(self, limit: usize) -> Self {
2437 Self {
2438 limit: Some(limit),
2439 ..self
2440 }
2441 }
2442}
2443
2444impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2445 fn from(value: ListStreamsInput) -> Self {
2446 Self {
2447 prefix: Some(value.prefix),
2448 start_after: Some(value.start_after),
2449 limit: value.limit,
2450 }
2451 }
2452}
2453
2454#[derive(Debug, Clone, Default)]
2455pub struct ListAllStreamsInput {
2457 pub prefix: StreamNamePrefix,
2461 pub start_after: StreamNameStartAfter,
2467 pub include_deleted: bool,
2471}
2472
2473impl ListAllStreamsInput {
2474 pub fn new() -> Self {
2476 Self::default()
2477 }
2478
2479 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2481 Self { prefix, ..self }
2482 }
2483
2484 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2487 Self {
2488 start_after,
2489 ..self
2490 }
2491 }
2492
2493 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2495 Self {
2496 include_deleted,
2497 ..self
2498 }
2499 }
2500}
2501
2502#[derive(Debug, Clone, PartialEq)]
2503#[non_exhaustive]
2504pub struct StreamInfo {
2506 pub name: StreamName,
2508 pub created_at: S2DateTime,
2510 pub deleted_at: Option<S2DateTime>,
2512}
2513
2514impl From<api::stream::StreamInfo> for StreamInfo {
2515 fn from(value: api::stream::StreamInfo) -> Self {
2516 Self {
2517 name: value.name,
2518 created_at: value.created_at.into(),
2519 deleted_at: value.deleted_at.map(Into::into),
2520 }
2521 }
2522}
2523
2524#[derive(Debug, Clone)]
2525#[non_exhaustive]
2526pub struct CreateStreamInput {
2528 pub name: StreamName,
2530 pub config: Option<StreamConfig>,
2534 pub idempotency_token: String,
2542}
2543
2544impl CreateStreamInput {
2545 pub fn new(name: StreamName) -> Self {
2547 Self {
2548 name,
2549 config: None,
2550 idempotency_token: idempotency_token(),
2551 }
2552 }
2553
2554 pub fn with_config(self, config: StreamConfig) -> Self {
2556 Self {
2557 config: Some(config),
2558 ..self
2559 }
2560 }
2561
2562 pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2564 Self {
2565 idempotency_token: idempotency_token.into(),
2566 ..self
2567 }
2568 }
2569}
2570
2571impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2572 fn from(value: CreateStreamInput) -> Self {
2573 (
2574 api::stream::CreateStreamRequest {
2575 stream: value.name,
2576 config: value.config.map(Into::into),
2577 },
2578 value.idempotency_token,
2579 )
2580 }
2581}
2582
2583#[derive(Debug, Clone)]
2584#[non_exhaustive]
2585pub struct DeleteStreamInput {
2587 pub name: StreamName,
2589 pub ignore_not_found: bool,
2591}
2592
2593impl DeleteStreamInput {
2594 pub fn new(name: StreamName) -> Self {
2596 Self {
2597 name,
2598 ignore_not_found: false,
2599 }
2600 }
2601
2602 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2604 Self {
2605 ignore_not_found,
2606 ..self
2607 }
2608 }
2609}
2610
2611#[derive(Debug, Clone)]
2612#[non_exhaustive]
2613pub struct ReconfigureStreamInput {
2615 pub name: StreamName,
2617 pub config: StreamReconfiguration,
2619}
2620
2621impl ReconfigureStreamInput {
2622 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2624 Self { name, config }
2625 }
2626}
2627
2628#[derive(Debug, Clone, PartialEq, Eq)]
2629pub struct FencingToken(String);
2635
2636impl FencingToken {
2637 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2639 rand::rng()
2640 .sample_iter(&rand::distr::Alphanumeric)
2641 .take(n)
2642 .map(char::from)
2643 .collect::<String>()
2644 .parse()
2645 }
2646}
2647
2648impl FromStr for FencingToken {
2649 type Err = ValidationError;
2650
2651 fn from_str(s: &str) -> Result<Self, Self::Err> {
2652 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2653 return Err(ValidationError(format!(
2654 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2655 )));
2656 }
2657 Ok(FencingToken(s.to_string()))
2658 }
2659}
2660
2661impl std::fmt::Display for FencingToken {
2662 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2663 write!(f, "{}", self.0)
2664 }
2665}
2666
2667impl Deref for FencingToken {
2668 type Target = str;
2669
2670 fn deref(&self) -> &Self::Target {
2671 &self.0
2672 }
2673}
2674
2675#[derive(Debug, Clone, Copy, PartialEq)]
2676#[non_exhaustive]
2677pub struct StreamPosition {
2679 pub seq_num: u64,
2681 pub timestamp: u64,
2684}
2685
2686impl std::fmt::Display for StreamPosition {
2687 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2688 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2689 }
2690}
2691
2692impl From<api::stream::proto::StreamPosition> for StreamPosition {
2693 fn from(value: api::stream::proto::StreamPosition) -> Self {
2694 Self {
2695 seq_num: value.seq_num,
2696 timestamp: value.timestamp,
2697 }
2698 }
2699}
2700
2701impl From<api::stream::StreamPosition> for StreamPosition {
2702 fn from(value: api::stream::StreamPosition) -> Self {
2703 Self {
2704 seq_num: value.seq_num,
2705 timestamp: value.timestamp,
2706 }
2707 }
2708}
2709
2710#[derive(Debug, Clone, PartialEq)]
2711#[non_exhaustive]
2712pub struct Header {
2714 pub name: Bytes,
2716 pub value: Bytes,
2718}
2719
2720impl Header {
2721 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2723 Self {
2724 name: name.into(),
2725 value: value.into(),
2726 }
2727 }
2728}
2729
2730impl From<Header> for api::stream::proto::Header {
2731 fn from(value: Header) -> Self {
2732 Self {
2733 name: value.name,
2734 value: value.value,
2735 }
2736 }
2737}
2738
2739impl From<api::stream::proto::Header> for Header {
2740 fn from(value: api::stream::proto::Header) -> Self {
2741 Self {
2742 name: value.name,
2743 value: value.value,
2744 }
2745 }
2746}
2747
2748#[derive(Debug, Clone, PartialEq)]
2749pub struct AppendRecord {
2751 body: Bytes,
2752 headers: Vec<Header>,
2753 timestamp: Option<u64>,
2754}
2755
2756impl AppendRecord {
2757 fn validate(self) -> Result<Self, ValidationError> {
2758 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2759 Err(ValidationError(format!(
2760 "metered_bytes: {} exceeds {}",
2761 self.metered_bytes(),
2762 RECORD_BATCH_MAX.bytes
2763 )))
2764 } else {
2765 Ok(self)
2766 }
2767 }
2768
2769 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2771 let record = Self {
2772 body: body.into(),
2773 headers: Vec::default(),
2774 timestamp: None,
2775 };
2776 record.validate()
2777 }
2778
2779 pub fn with_headers(
2781 self,
2782 headers: impl IntoIterator<Item = Header>,
2783 ) -> Result<Self, ValidationError> {
2784 let record = Self {
2785 headers: headers.into_iter().collect(),
2786 ..self
2787 };
2788 record.validate()
2789 }
2790
2791 pub fn with_timestamp(self, timestamp: u64) -> Self {
2795 Self {
2796 timestamp: Some(timestamp),
2797 ..self
2798 }
2799 }
2800}
2801
2802impl From<AppendRecord> for api::stream::proto::AppendRecord {
2803 fn from(value: AppendRecord) -> Self {
2804 Self {
2805 timestamp: value.timestamp,
2806 headers: value.headers.into_iter().map(Into::into).collect(),
2807 body: value.body,
2808 }
2809 }
2810}
2811
2812pub trait MeteredBytes {
2819 fn metered_bytes(&self) -> usize;
2821}
2822
2823macro_rules! metered_bytes_impl {
2824 ($ty:ty) => {
2825 impl MeteredBytes for $ty {
2826 fn metered_bytes(&self) -> usize {
2827 8 + (2 * self.headers.len())
2828 + self
2829 .headers
2830 .iter()
2831 .map(|h| h.name.len() + h.value.len())
2832 .sum::<usize>()
2833 + self.body.len()
2834 }
2835 }
2836 };
2837}
2838
2839metered_bytes_impl!(AppendRecord);
2840
2841#[derive(Debug, Clone)]
2842pub struct AppendRecordBatch {
2851 records: Vec<AppendRecord>,
2852 metered_bytes: usize,
2853}
2854
2855impl AppendRecordBatch {
2856 pub(crate) fn with_capacity(capacity: usize) -> Self {
2857 Self {
2858 records: Vec::with_capacity(capacity),
2859 metered_bytes: 0,
2860 }
2861 }
2862
2863 pub(crate) fn push(&mut self, record: AppendRecord) {
2864 self.metered_bytes += record.metered_bytes();
2865 self.records.push(record);
2866 }
2867
2868 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2870 where
2871 I: IntoIterator<Item = AppendRecord>,
2872 {
2873 let mut records = Vec::new();
2874 let mut metered_bytes = 0;
2875
2876 for record in iter {
2877 metered_bytes += record.metered_bytes();
2878 records.push(record);
2879
2880 if metered_bytes > RECORD_BATCH_MAX.bytes {
2881 return Err(ValidationError(format!(
2882 "batch size in metered bytes ({metered_bytes}) exceeds {}",
2883 RECORD_BATCH_MAX.bytes
2884 )));
2885 }
2886
2887 if records.len() > RECORD_BATCH_MAX.count {
2888 return Err(ValidationError(format!(
2889 "number of records in the batch exceeds {}",
2890 RECORD_BATCH_MAX.count
2891 )));
2892 }
2893 }
2894
2895 if records.is_empty() {
2896 return Err(ValidationError("batch is empty".into()));
2897 }
2898
2899 Ok(Self {
2900 records,
2901 metered_bytes,
2902 })
2903 }
2904}
2905
2906impl Deref for AppendRecordBatch {
2907 type Target = [AppendRecord];
2908
2909 fn deref(&self) -> &Self::Target {
2910 &self.records
2911 }
2912}
2913
2914impl MeteredBytes for AppendRecordBatch {
2915 fn metered_bytes(&self) -> usize {
2916 self.metered_bytes
2917 }
2918}
2919
2920#[derive(Debug, Clone)]
2921pub enum Command {
2923 Fence {
2925 fencing_token: FencingToken,
2927 },
2928 Trim {
2930 trim_point: u64,
2932 },
2933}
2934
2935#[derive(Debug, Clone)]
2936#[non_exhaustive]
2937pub struct CommandRecord {
2941 pub command: Command,
2943 pub timestamp: Option<u64>,
2945}
2946
2947impl CommandRecord {
2948 const FENCE: &[u8] = b"fence";
2949 const TRIM: &[u8] = b"trim";
2950
2951 pub fn fence(fencing_token: FencingToken) -> Self {
2956 Self {
2957 command: Command::Fence { fencing_token },
2958 timestamp: None,
2959 }
2960 }
2961
2962 pub fn trim(trim_point: u64) -> Self {
2969 Self {
2970 command: Command::Trim { trim_point },
2971 timestamp: None,
2972 }
2973 }
2974
2975 pub fn with_timestamp(self, timestamp: u64) -> Self {
2977 Self {
2978 timestamp: Some(timestamp),
2979 ..self
2980 }
2981 }
2982}
2983
2984impl From<CommandRecord> for AppendRecord {
2985 fn from(value: CommandRecord) -> Self {
2986 let (header_value, body) = match value.command {
2987 Command::Fence { fencing_token } => (
2988 CommandRecord::FENCE,
2989 Bytes::copy_from_slice(fencing_token.as_bytes()),
2990 ),
2991 Command::Trim { trim_point } => (
2992 CommandRecord::TRIM,
2993 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
2994 ),
2995 };
2996 Self {
2997 body,
2998 headers: vec![Header::new("", header_value)],
2999 timestamp: value.timestamp,
3000 }
3001 }
3002}
3003
3004#[derive(Debug, Clone)]
3005#[non_exhaustive]
3006pub struct AppendInput {
3009 pub records: AppendRecordBatch,
3011 pub match_seq_num: Option<u64>,
3015 pub fencing_token: Option<FencingToken>,
3020}
3021
3022impl AppendInput {
3023 pub fn new(records: AppendRecordBatch) -> Self {
3025 Self {
3026 records,
3027 match_seq_num: None,
3028 fencing_token: None,
3029 }
3030 }
3031
3032 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3034 Self {
3035 match_seq_num: Some(match_seq_num),
3036 ..self
3037 }
3038 }
3039
3040 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3042 Self {
3043 fencing_token: Some(fencing_token),
3044 ..self
3045 }
3046 }
3047}
3048
3049impl From<AppendInput> for api::stream::proto::AppendInput {
3050 fn from(value: AppendInput) -> Self {
3051 Self {
3052 records: value.records.iter().cloned().map(Into::into).collect(),
3053 match_seq_num: value.match_seq_num,
3054 fencing_token: value.fencing_token.map(|t| t.to_string()),
3055 }
3056 }
3057}
3058
3059#[derive(Debug, Clone, PartialEq)]
3060#[non_exhaustive]
3061pub struct AppendAck {
3063 pub start: StreamPosition,
3065 pub end: StreamPosition,
3071 pub tail: StreamPosition,
3076}
3077
3078impl From<api::stream::proto::AppendAck> for AppendAck {
3079 fn from(value: api::stream::proto::AppendAck) -> Self {
3080 Self {
3081 start: value.start.unwrap_or_default().into(),
3082 end: value.end.unwrap_or_default().into(),
3083 tail: value.tail.unwrap_or_default().into(),
3084 }
3085 }
3086}
3087
3088#[derive(Debug, Clone, Copy)]
3089pub enum ReadFrom {
3091 SeqNum(u64),
3093 Timestamp(u64),
3095 TailOffset(u64),
3097}
3098
3099impl Default for ReadFrom {
3100 fn default() -> Self {
3101 Self::SeqNum(0)
3102 }
3103}
3104
3105#[derive(Debug, Default, Clone)]
3106#[non_exhaustive]
3107pub struct ReadStart {
3109 pub from: ReadFrom,
3113 pub clamp_to_tail: bool,
3117}
3118
3119impl ReadStart {
3120 pub fn new() -> Self {
3122 Self::default()
3123 }
3124
3125 pub fn with_from(self, from: ReadFrom) -> Self {
3127 Self { from, ..self }
3128 }
3129
3130 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3132 Self {
3133 clamp_to_tail,
3134 ..self
3135 }
3136 }
3137}
3138
3139impl From<ReadStart> for api::stream::ReadStart {
3140 fn from(value: ReadStart) -> Self {
3141 let (seq_num, timestamp, tail_offset) = match value.from {
3142 ReadFrom::SeqNum(n) => (Some(n), None, None),
3143 ReadFrom::Timestamp(t) => (None, Some(t), None),
3144 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3145 };
3146 Self {
3147 seq_num,
3148 timestamp,
3149 tail_offset,
3150 clamp: if value.clamp_to_tail {
3151 Some(true)
3152 } else {
3153 None
3154 },
3155 }
3156 }
3157}
3158
3159#[derive(Debug, Clone, Default)]
3160#[non_exhaustive]
3161pub struct ReadLimits {
3163 pub count: Option<usize>,
3167 pub bytes: Option<usize>,
3171}
3172
3173impl ReadLimits {
3174 pub fn new() -> Self {
3176 Self::default()
3177 }
3178
3179 pub fn with_count(self, count: usize) -> Self {
3181 Self {
3182 count: Some(count),
3183 ..self
3184 }
3185 }
3186
3187 pub fn with_bytes(self, bytes: usize) -> Self {
3189 Self {
3190 bytes: Some(bytes),
3191 ..self
3192 }
3193 }
3194}
3195
3196#[derive(Debug, Clone, Default)]
3197#[non_exhaustive]
3198pub struct ReadStop {
3200 pub limits: ReadLimits,
3202 pub until: Option<RangeTo<u64>>,
3204 pub wait: Option<u32>,
3214}
3215
3216impl ReadStop {
3217 pub fn new() -> Self {
3219 Self::default()
3220 }
3221
3222 pub fn with_limits(self, limits: ReadLimits) -> Self {
3224 Self { limits, ..self }
3225 }
3226
3227 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3229 Self {
3230 until: Some(until),
3231 ..self
3232 }
3233 }
3234
3235 pub fn with_wait(self, wait: u32) -> Self {
3237 Self {
3238 wait: Some(wait),
3239 ..self
3240 }
3241 }
3242}
3243
3244impl From<ReadStop> for api::stream::ReadEnd {
3245 fn from(value: ReadStop) -> Self {
3246 Self {
3247 count: value.limits.count,
3248 bytes: value.limits.bytes,
3249 until: value.until.map(|r| r.end),
3250 wait: value.wait,
3251 }
3252 }
3253}
3254
3255#[derive(Debug, Clone, Default)]
3256#[non_exhaustive]
3257pub struct ReadInput {
3260 pub start: ReadStart,
3262 pub stop: ReadStop,
3264 pub ignore_command_records: bool,
3268}
3269
3270impl ReadInput {
3271 pub fn new() -> Self {
3273 Self::default()
3274 }
3275
3276 pub fn with_start(self, start: ReadStart) -> Self {
3278 Self { start, ..self }
3279 }
3280
3281 pub fn with_stop(self, stop: ReadStop) -> Self {
3283 Self { stop, ..self }
3284 }
3285
3286 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3288 Self {
3289 ignore_command_records,
3290 ..self
3291 }
3292 }
3293}
3294
3295#[derive(Debug, Clone)]
3296#[non_exhaustive]
3297pub struct SequencedRecord {
3299 pub seq_num: u64,
3301 pub body: Bytes,
3303 pub headers: Vec<Header>,
3305 pub timestamp: u64,
3307}
3308
3309impl SequencedRecord {
3310 pub fn is_command_record(&self) -> bool {
3312 self.headers.len() == 1 && *self.headers[0].name == *b""
3313 }
3314}
3315
3316impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3317 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3318 Self {
3319 seq_num: value.seq_num,
3320 body: value.body,
3321 headers: value.headers.into_iter().map(Into::into).collect(),
3322 timestamp: value.timestamp,
3323 }
3324 }
3325}
3326
3327metered_bytes_impl!(SequencedRecord);
3328
3329#[derive(Debug, Clone)]
3330#[non_exhaustive]
3331pub struct ReadBatch {
3334 pub records: Vec<SequencedRecord>,
3341 pub tail: Option<StreamPosition>,
3346}
3347
3348impl ReadBatch {
3349 pub(crate) fn from_api(
3350 batch: api::stream::proto::ReadBatch,
3351 ignore_command_records: bool,
3352 ) -> Self {
3353 Self {
3354 records: batch
3355 .records
3356 .into_iter()
3357 .map(Into::into)
3358 .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3359 .collect(),
3360 tail: batch.tail.map(Into::into),
3361 }
3362 }
3363}
3364
3365pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3367
3368#[derive(Debug, Clone, thiserror::Error)]
3369pub enum AppendConditionFailed {
3371 #[error("fencing token mismatch, expected: {0}")]
3372 FencingTokenMismatch(FencingToken),
3374 #[error("sequence number mismatch, expected: {0}")]
3375 SeqNumMismatch(u64),
3377}
3378
3379impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3380 fn from(value: api::stream::AppendConditionFailed) -> Self {
3381 match value {
3382 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3383 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3384 }
3385 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3386 AppendConditionFailed::SeqNumMismatch(seq)
3387 }
3388 }
3389 }
3390}
3391
3392#[derive(Debug, Clone, thiserror::Error)]
3393pub enum S2Error {
3395 #[error("{0}")]
3396 Client(String),
3398 #[error(transparent)]
3399 Validation(#[from] ValidationError),
3401 #[error("{0}")]
3402 AppendConditionFailed(AppendConditionFailed),
3404 #[error("read from an unwritten position. current tail: {0}")]
3405 ReadUnwritten(StreamPosition),
3407 #[error("{0}")]
3408 Server(ErrorResponse),
3410}
3411
3412impl From<ApiError> for S2Error {
3413 fn from(err: ApiError) -> Self {
3414 match err {
3415 ApiError::ReadUnwritten(tail_response) => {
3416 Self::ReadUnwritten(tail_response.tail.into())
3417 }
3418 ApiError::AppendConditionFailed(condition_failed) => {
3419 Self::AppendConditionFailed(condition_failed.into())
3420 }
3421 ApiError::Server(_, response) => Self::Server(response.into()),
3422 other => Self::Client(other.to_string()),
3423 }
3424 }
3425}
3426
3427#[derive(Debug, Clone, thiserror::Error)]
3428#[error("{code}: {message}")]
3429#[non_exhaustive]
3430pub struct ErrorResponse {
3432 pub code: String,
3434 pub message: String,
3436}
3437
3438impl From<ApiErrorResponse> for ErrorResponse {
3439 fn from(response: ApiErrorResponse) -> Self {
3440 Self {
3441 code: response.code,
3442 message: response.message,
3443 }
3444 }
3445}
3446
3447fn idempotency_token() -> String {
3448 uuid::Uuid::new_v4().simple().to_string()
3449}