1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::stream::StreamName;
45pub use s2_common::types::stream::StreamNamePrefix;
47pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl TryFrom<time::OffsetDateTime> for S2DateTime {
66 type Error = ValidationError;
67
68 fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
69 dt.format(&time::format_description::well_known::Rfc3339)
70 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
71 Ok(Self(dt))
72 }
73}
74
75impl From<S2DateTime> for time::OffsetDateTime {
76 fn from(dt: S2DateTime) -> Self {
77 dt.0
78 }
79}
80
81impl FromStr for S2DateTime {
82 type Err = ValidationError;
83
84 fn from_str(s: &str) -> Result<Self, Self::Err> {
85 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
86 .map(Self)
87 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
88 }
89}
90
91impl fmt::Display for S2DateTime {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 write!(
94 f,
95 "{}",
96 self.0
97 .format(&time::format_description::well_known::Rfc3339)
98 .expect("RFC3339 formatting should not fail for S2DateTime")
99 )
100 }
101}
102
103#[derive(Debug, Clone, PartialEq)]
105pub(crate) enum BasinAuthority {
106 ParentZone(Authority),
108 Direct(Authority),
110}
111
112#[derive(Debug, Clone)]
114pub struct AccountEndpoint {
115 scheme: Scheme,
116 authority: Authority,
117}
118
119impl AccountEndpoint {
120 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
122 endpoint.parse()
123 }
124}
125
126impl FromStr for AccountEndpoint {
127 type Err = ValidationError;
128
129 fn from_str(s: &str) -> Result<Self, Self::Err> {
130 let (scheme, authority) = match s.find("://") {
131 Some(idx) => {
132 let scheme: Scheme = s[..idx]
133 .parse()
134 .map_err(|_| "invalid account endpoint scheme".to_string())?;
135 (scheme, &s[idx + 3..])
136 }
137 None => (Scheme::HTTPS, s),
138 };
139 Ok(Self {
140 scheme,
141 authority: authority
142 .parse()
143 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
144 })
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct BasinEndpoint {
151 scheme: Scheme,
152 authority: BasinAuthority,
153}
154
155impl BasinEndpoint {
156 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
158 endpoint.parse()
159 }
160}
161
162impl FromStr for BasinEndpoint {
163 type Err = ValidationError;
164
165 fn from_str(s: &str) -> Result<Self, Self::Err> {
166 let (scheme, authority) = match s.find("://") {
167 Some(idx) => {
168 let scheme: Scheme = s[..idx]
169 .parse()
170 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
171 (scheme, &s[idx + 3..])
172 }
173 None => (Scheme::HTTPS, s),
174 };
175 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
176 BasinAuthority::ParentZone(
177 authority
178 .parse()
179 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
180 )
181 } else {
182 BasinAuthority::Direct(
183 authority
184 .parse()
185 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
186 )
187 };
188 Ok(Self { scheme, authority })
189 }
190}
191
192#[derive(Debug, Clone)]
193#[non_exhaustive]
194pub struct S2Endpoints {
196 pub(crate) scheme: Scheme,
197 pub(crate) account_authority: Authority,
198 pub(crate) basin_authority: BasinAuthority,
199}
200
201impl S2Endpoints {
202 pub fn new(
204 account_endpoint: AccountEndpoint,
205 basin_endpoint: BasinEndpoint,
206 ) -> Result<Self, ValidationError> {
207 if account_endpoint.scheme != basin_endpoint.scheme {
208 return Err("account and basin endpoints must have the same scheme".into());
209 }
210 Ok(Self {
211 scheme: account_endpoint.scheme,
212 account_authority: account_endpoint.authority,
213 basin_authority: basin_endpoint.authority,
214 })
215 }
216
217 pub fn from_env() -> Result<Self, ValidationError> {
223 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
224 Ok(endpoint) => endpoint.parse()?,
225 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
226 Err(VarError::NotUnicode(_)) => {
227 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
228 }
229 };
230
231 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
232 Ok(endpoint) => endpoint.parse()?,
233 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
234 Err(VarError::NotUnicode(_)) => {
235 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
236 }
237 };
238
239 if account_endpoint.scheme != basin_endpoint.scheme {
240 return Err(
241 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
242 );
243 }
244
245 Ok(Self {
246 scheme: account_endpoint.scheme,
247 account_authority: account_endpoint.authority,
248 basin_authority: basin_endpoint.authority,
249 })
250 }
251
252 pub(crate) fn for_aws() -> Self {
253 Self {
254 scheme: Scheme::HTTPS,
255 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
256 basin_authority: BasinAuthority::ParentZone(
257 "b.aws.s2.dev".try_into().expect("valid authority"),
258 ),
259 }
260 }
261}
262
263#[derive(Debug, Clone, Copy)]
264pub enum Compression {
266 None,
268 Gzip,
270 Zstd,
272}
273
274impl From<Compression> for CompressionAlgorithm {
275 fn from(value: Compression) -> Self {
276 match value {
277 Compression::None => CompressionAlgorithm::None,
278 Compression::Gzip => CompressionAlgorithm::Gzip,
279 Compression::Zstd => CompressionAlgorithm::Zstd,
280 }
281 }
282}
283
284#[derive(Debug, Clone, Copy, PartialEq)]
285#[non_exhaustive]
286pub enum AppendRetryPolicy {
289 All,
291 NoSideEffects,
301}
302
303#[derive(Debug, Clone)]
304#[non_exhaustive]
305pub struct RetryConfig {
314 pub max_attempts: NonZeroU32,
318 pub min_base_delay: Duration,
322 pub max_base_delay: Duration,
326 pub append_retry_policy: AppendRetryPolicy,
331}
332
333impl Default for RetryConfig {
334 fn default() -> Self {
335 Self {
336 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
337 min_base_delay: Duration::from_millis(100),
338 max_base_delay: Duration::from_secs(1),
339 append_retry_policy: AppendRetryPolicy::All,
340 }
341 }
342}
343
344impl RetryConfig {
345 pub fn new() -> Self {
347 Self::default()
348 }
349
350 pub(crate) fn max_retries(&self) -> u32 {
351 self.max_attempts.get() - 1
352 }
353
354 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
356 Self {
357 max_attempts,
358 ..self
359 }
360 }
361
362 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
364 Self {
365 min_base_delay,
366 ..self
367 }
368 }
369
370 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
372 Self {
373 max_base_delay,
374 ..self
375 }
376 }
377
378 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
381 Self {
382 append_retry_policy,
383 ..self
384 }
385 }
386}
387
388#[derive(Debug, Clone)]
389#[non_exhaustive]
390pub struct S2Config {
392 pub(crate) access_token: SecretString,
393 pub(crate) endpoints: S2Endpoints,
394 pub(crate) connection_timeout: Duration,
395 pub(crate) request_timeout: Duration,
396 pub(crate) retry: RetryConfig,
397 pub(crate) compression: Compression,
398 pub(crate) user_agent: HeaderValue,
399 pub(crate) insecure_skip_cert_verification: bool,
400}
401
402impl S2Config {
403 pub fn new(access_token: impl Into<String>) -> Self {
405 Self {
406 access_token: access_token.into().into(),
407 endpoints: S2Endpoints::for_aws(),
408 connection_timeout: Duration::from_secs(3),
409 request_timeout: Duration::from_secs(5),
410 retry: RetryConfig::new(),
411 compression: Compression::None,
412 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
413 .parse()
414 .expect("valid user agent"),
415 insecure_skip_cert_verification: false,
416 }
417 }
418
419 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
421 Self { endpoints, ..self }
422 }
423
424 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
428 Self {
429 connection_timeout,
430 ..self
431 }
432 }
433
434 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
438 Self {
439 request_timeout,
440 ..self
441 }
442 }
443
444 pub fn with_retry(self, retry: RetryConfig) -> Self {
448 Self { retry, ..self }
449 }
450
451 pub fn with_compression(self, compression: Compression) -> Self {
455 Self {
456 compression,
457 ..self
458 }
459 }
460
461 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
473 Self {
474 insecure_skip_cert_verification: skip,
475 ..self
476 }
477 }
478
479 #[doc(hidden)]
480 #[cfg(feature = "_hidden")]
481 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
482 let user_agent = user_agent
483 .into()
484 .parse()
485 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
486 Ok(Self { user_agent, ..self })
487 }
488}
489
490#[derive(Debug, Default, Clone, PartialEq, Eq)]
491#[non_exhaustive]
492pub struct Page<T> {
494 pub values: Vec<T>,
496 pub has_more: bool,
498}
499
500impl<T> Page<T> {
501 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
502 Self {
503 values: values.into(),
504 has_more,
505 }
506 }
507}
508
509#[derive(Debug, Clone, Copy, PartialEq, Eq)]
510pub enum StorageClass {
512 Standard,
514 Express,
516}
517
518impl From<api::config::StorageClass> for StorageClass {
519 fn from(value: api::config::StorageClass) -> Self {
520 match value {
521 api::config::StorageClass::Standard => StorageClass::Standard,
522 api::config::StorageClass::Express => StorageClass::Express,
523 }
524 }
525}
526
527impl From<StorageClass> for api::config::StorageClass {
528 fn from(value: StorageClass) -> Self {
529 match value {
530 StorageClass::Standard => api::config::StorageClass::Standard,
531 StorageClass::Express => api::config::StorageClass::Express,
532 }
533 }
534}
535
536#[derive(Debug, Clone, Copy, PartialEq, Eq)]
537pub enum RetentionPolicy {
539 Age(u64),
541 Infinite,
543}
544
545impl From<api::config::RetentionPolicy> for RetentionPolicy {
546 fn from(value: api::config::RetentionPolicy) -> Self {
547 match value {
548 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
549 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
550 }
551 }
552}
553
554impl From<RetentionPolicy> for api::config::RetentionPolicy {
555 fn from(value: RetentionPolicy) -> Self {
556 match value {
557 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
558 RetentionPolicy::Infinite => {
559 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
560 }
561 }
562 }
563}
564
565#[derive(Debug, Clone, Copy, PartialEq, Eq)]
566pub enum TimestampingMode {
568 ClientPrefer,
570 ClientRequire,
572 Arrival,
574}
575
576impl From<api::config::TimestampingMode> for TimestampingMode {
577 fn from(value: api::config::TimestampingMode) -> Self {
578 match value {
579 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
580 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
581 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
582 }
583 }
584}
585
586impl From<TimestampingMode> for api::config::TimestampingMode {
587 fn from(value: TimestampingMode) -> Self {
588 match value {
589 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
590 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
591 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
592 }
593 }
594}
595
596#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
597#[non_exhaustive]
598pub struct TimestampingConfig {
600 pub mode: Option<TimestampingMode>,
604 pub uncapped: bool,
608}
609
610impl TimestampingConfig {
611 pub fn new() -> Self {
613 Self::default()
614 }
615
616 pub fn with_mode(self, mode: TimestampingMode) -> Self {
618 Self {
619 mode: Some(mode),
620 ..self
621 }
622 }
623
624 pub fn with_uncapped(self, uncapped: bool) -> Self {
626 Self { uncapped, ..self }
627 }
628}
629
630impl From<api::config::TimestampingConfig> for TimestampingConfig {
631 fn from(value: api::config::TimestampingConfig) -> Self {
632 Self {
633 mode: value.mode.map(Into::into),
634 uncapped: value.uncapped.unwrap_or_default(),
635 }
636 }
637}
638
639impl From<TimestampingConfig> for api::config::TimestampingConfig {
640 fn from(value: TimestampingConfig) -> Self {
641 Self {
642 mode: value.mode.map(Into::into),
643 uncapped: Some(value.uncapped),
644 }
645 }
646}
647
648#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
649#[non_exhaustive]
650pub struct DeleteOnEmptyConfig {
652 pub min_age_secs: u64,
656}
657
658impl DeleteOnEmptyConfig {
659 pub fn new() -> Self {
661 Self::default()
662 }
663
664 pub fn with_min_age(self, min_age: Duration) -> Self {
666 Self {
667 min_age_secs: min_age.as_secs(),
668 }
669 }
670}
671
672impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
673 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
674 Self {
675 min_age_secs: value.min_age_secs,
676 }
677 }
678}
679
680impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
681 fn from(value: DeleteOnEmptyConfig) -> Self {
682 Self {
683 min_age_secs: value.min_age_secs,
684 }
685 }
686}
687
688#[derive(Debug, Clone, Default, PartialEq, Eq)]
689#[non_exhaustive]
690pub struct StreamConfig {
692 pub storage_class: Option<StorageClass>,
696 pub retention_policy: Option<RetentionPolicy>,
700 pub timestamping: Option<TimestampingConfig>,
704 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
708}
709
710impl StreamConfig {
711 pub fn new() -> Self {
713 Self::default()
714 }
715
716 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
718 Self {
719 storage_class: Some(storage_class),
720 ..self
721 }
722 }
723
724 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
726 Self {
727 retention_policy: Some(retention_policy),
728 ..self
729 }
730 }
731
732 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
734 Self {
735 timestamping: Some(timestamping),
736 ..self
737 }
738 }
739
740 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
742 Self {
743 delete_on_empty: Some(delete_on_empty),
744 ..self
745 }
746 }
747}
748
749impl From<api::config::StreamConfig> for StreamConfig {
750 fn from(value: api::config::StreamConfig) -> Self {
751 Self {
752 storage_class: value.storage_class.map(Into::into),
753 retention_policy: value.retention_policy.map(Into::into),
754 timestamping: value.timestamping.map(Into::into),
755 delete_on_empty: value.delete_on_empty.map(Into::into),
756 }
757 }
758}
759
760impl From<StreamConfig> for api::config::StreamConfig {
761 fn from(value: StreamConfig) -> Self {
762 Self {
763 storage_class: value.storage_class.map(Into::into),
764 retention_policy: value.retention_policy.map(Into::into),
765 timestamping: value.timestamping.map(Into::into),
766 delete_on_empty: value.delete_on_empty.map(Into::into),
767 }
768 }
769}
770
771#[derive(Debug, Clone, Default)]
772#[non_exhaustive]
773pub struct BasinConfig {
775 pub default_stream_config: Option<StreamConfig>,
779 pub create_stream_on_append: bool,
783 pub create_stream_on_read: bool,
787}
788
789impl BasinConfig {
790 pub fn new() -> Self {
792 Self::default()
793 }
794
795 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
797 Self {
798 default_stream_config: Some(config),
799 ..self
800 }
801 }
802
803 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
806 Self {
807 create_stream_on_append,
808 ..self
809 }
810 }
811
812 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
814 Self {
815 create_stream_on_read,
816 ..self
817 }
818 }
819}
820
821impl From<api::config::BasinConfig> for BasinConfig {
822 fn from(value: api::config::BasinConfig) -> Self {
823 Self {
824 default_stream_config: value.default_stream_config.map(Into::into),
825 create_stream_on_append: value.create_stream_on_append,
826 create_stream_on_read: value.create_stream_on_read,
827 }
828 }
829}
830
831impl From<BasinConfig> for api::config::BasinConfig {
832 fn from(value: BasinConfig) -> Self {
833 Self {
834 default_stream_config: value.default_stream_config.map(Into::into),
835 create_stream_on_append: value.create_stream_on_append,
836 create_stream_on_read: value.create_stream_on_read,
837 }
838 }
839}
840
841#[derive(Debug, Clone, PartialEq, Eq)]
842pub enum BasinScope {
844 AwsUsEast1,
846}
847
848impl From<api::basin::BasinScope> for BasinScope {
849 fn from(value: api::basin::BasinScope) -> Self {
850 match value {
851 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
852 }
853 }
854}
855
856impl From<BasinScope> for api::basin::BasinScope {
857 fn from(value: BasinScope) -> Self {
858 match value {
859 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
860 }
861 }
862}
863
864#[doc(hidden)]
869#[cfg(feature = "_hidden")]
870#[derive(Debug, Clone, PartialEq, Eq)]
871pub enum CreateOrReconfigured<T> {
872 Created(T),
874 Reconfigured(T),
876}
877
878#[cfg(feature = "_hidden")]
879impl<T> CreateOrReconfigured<T> {
880 pub fn is_created(&self) -> bool {
882 matches!(self, Self::Created(_))
883 }
884
885 pub fn into_inner(self) -> T {
887 match self {
888 Self::Created(t) | Self::Reconfigured(t) => t,
889 }
890 }
891}
892
893#[derive(Debug, Clone)]
894#[non_exhaustive]
895pub struct CreateBasinInput {
897 pub name: BasinName,
899 pub config: Option<BasinConfig>,
903 pub scope: Option<BasinScope>,
907 idempotency_token: String,
908}
909
910impl CreateBasinInput {
911 pub fn new(name: BasinName) -> Self {
913 Self {
914 name,
915 config: None,
916 scope: None,
917 idempotency_token: idempotency_token(),
918 }
919 }
920
921 pub fn with_config(self, config: BasinConfig) -> Self {
923 Self {
924 config: Some(config),
925 ..self
926 }
927 }
928
929 pub fn with_scope(self, scope: BasinScope) -> Self {
931 Self {
932 scope: Some(scope),
933 ..self
934 }
935 }
936}
937
938impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
939 fn from(value: CreateBasinInput) -> Self {
940 (
941 api::basin::CreateBasinRequest {
942 basin: value.name,
943 config: value.config.map(Into::into),
944 scope: value.scope.map(Into::into),
945 },
946 value.idempotency_token,
947 )
948 }
949}
950
951#[derive(Debug, Clone)]
952#[non_exhaustive]
953#[doc(hidden)]
955#[cfg(feature = "_hidden")]
956pub struct CreateOrReconfigureBasinInput {
957 pub name: BasinName,
959 pub config: Option<BasinReconfiguration>,
963 pub scope: Option<BasinScope>,
967}
968
969#[cfg(feature = "_hidden")]
970impl CreateOrReconfigureBasinInput {
971 pub fn new(name: BasinName) -> Self {
973 Self {
974 name,
975 config: None,
976 scope: None,
977 }
978 }
979
980 pub fn with_config(self, config: BasinReconfiguration) -> Self {
982 Self {
983 config: Some(config),
984 ..self
985 }
986 }
987
988 pub fn with_scope(self, scope: BasinScope) -> Self {
990 Self {
991 scope: Some(scope),
992 ..self
993 }
994 }
995}
996
997#[cfg(feature = "_hidden")]
998impl From<CreateOrReconfigureBasinInput>
999 for (
1000 BasinName,
1001 Option<api::basin::CreateOrReconfigureBasinRequest>,
1002 )
1003{
1004 fn from(value: CreateOrReconfigureBasinInput) -> Self {
1005 let request = if value.config.is_some() || value.scope.is_some() {
1006 Some(api::basin::CreateOrReconfigureBasinRequest {
1007 config: value.config.map(Into::into),
1008 scope: value.scope.map(Into::into),
1009 })
1010 } else {
1011 None
1012 };
1013 (value.name, request)
1014 }
1015}
1016
1017#[derive(Debug, Clone, Default)]
1018#[non_exhaustive]
1019pub struct ListBasinsInput {
1021 pub prefix: BasinNamePrefix,
1025 pub start_after: BasinNameStartAfter,
1031 pub limit: Option<usize>,
1035}
1036
1037impl ListBasinsInput {
1038 pub fn new() -> Self {
1040 Self::default()
1041 }
1042
1043 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1045 Self { prefix, ..self }
1046 }
1047
1048 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1051 Self {
1052 start_after,
1053 ..self
1054 }
1055 }
1056
1057 pub fn with_limit(self, limit: usize) -> Self {
1059 Self {
1060 limit: Some(limit),
1061 ..self
1062 }
1063 }
1064}
1065
1066impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1067 fn from(value: ListBasinsInput) -> Self {
1068 Self {
1069 prefix: Some(value.prefix),
1070 start_after: Some(value.start_after),
1071 limit: value.limit,
1072 }
1073 }
1074}
1075
1076#[derive(Debug, Clone, Default)]
1077pub struct ListAllBasinsInput {
1079 pub prefix: BasinNamePrefix,
1083 pub start_after: BasinNameStartAfter,
1089 pub include_deleted: bool,
1093}
1094
1095impl ListAllBasinsInput {
1096 pub fn new() -> Self {
1098 Self::default()
1099 }
1100
1101 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1103 Self { prefix, ..self }
1104 }
1105
1106 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1109 Self {
1110 start_after,
1111 ..self
1112 }
1113 }
1114
1115 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1117 Self {
1118 include_deleted,
1119 ..self
1120 }
1121 }
1122}
1123
1124#[derive(Debug, Clone, PartialEq, Eq)]
1125pub enum BasinState {
1127 Active,
1129 Creating,
1131 Deleting,
1133}
1134
1135impl From<api::basin::BasinState> for BasinState {
1136 fn from(value: api::basin::BasinState) -> Self {
1137 match value {
1138 api::basin::BasinState::Active => BasinState::Active,
1139 api::basin::BasinState::Creating => BasinState::Creating,
1140 api::basin::BasinState::Deleting => BasinState::Deleting,
1141 }
1142 }
1143}
1144
1145#[derive(Debug, Clone, PartialEq, Eq)]
1146#[non_exhaustive]
1147pub struct BasinInfo {
1149 pub name: BasinName,
1151 pub scope: Option<BasinScope>,
1153 pub state: BasinState,
1155}
1156
1157impl From<api::basin::BasinInfo> for BasinInfo {
1158 fn from(value: api::basin::BasinInfo) -> Self {
1159 Self {
1160 name: value.name,
1161 scope: value.scope.map(Into::into),
1162 state: value.state.into(),
1163 }
1164 }
1165}
1166
1167#[derive(Debug, Clone)]
1168#[non_exhaustive]
1169pub struct DeleteBasinInput {
1171 pub name: BasinName,
1173 pub ignore_not_found: bool,
1175}
1176
1177impl DeleteBasinInput {
1178 pub fn new(name: BasinName) -> Self {
1180 Self {
1181 name,
1182 ignore_not_found: false,
1183 }
1184 }
1185
1186 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1188 Self {
1189 ignore_not_found,
1190 ..self
1191 }
1192 }
1193}
1194
1195#[derive(Debug, Clone, Default)]
1196#[non_exhaustive]
1197pub struct TimestampingReconfiguration {
1199 pub mode: Maybe<Option<TimestampingMode>>,
1201 pub uncapped: Maybe<Option<bool>>,
1203}
1204
1205impl TimestampingReconfiguration {
1206 pub fn new() -> Self {
1208 Self::default()
1209 }
1210
1211 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1213 Self {
1214 mode: Maybe::Specified(Some(mode)),
1215 ..self
1216 }
1217 }
1218
1219 pub fn with_uncapped(self, uncapped: bool) -> Self {
1221 Self {
1222 uncapped: Maybe::Specified(Some(uncapped)),
1223 ..self
1224 }
1225 }
1226}
1227
1228impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1229 fn from(value: TimestampingReconfiguration) -> Self {
1230 Self {
1231 mode: value.mode.map(|m| m.map(Into::into)),
1232 uncapped: value.uncapped,
1233 }
1234 }
1235}
1236
1237#[derive(Debug, Clone, Default)]
1238#[non_exhaustive]
1239pub struct DeleteOnEmptyReconfiguration {
1241 pub min_age_secs: Maybe<Option<u64>>,
1243}
1244
1245impl DeleteOnEmptyReconfiguration {
1246 pub fn new() -> Self {
1248 Self::default()
1249 }
1250
1251 pub fn with_min_age(self, min_age: Duration) -> Self {
1253 Self {
1254 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1255 }
1256 }
1257}
1258
1259impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1260 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1261 Self {
1262 min_age_secs: value.min_age_secs,
1263 }
1264 }
1265}
1266
1267#[derive(Debug, Clone, Default)]
1268#[non_exhaustive]
1269pub struct StreamReconfiguration {
1271 pub storage_class: Maybe<Option<StorageClass>>,
1273 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1275 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1277 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1279}
1280
1281impl StreamReconfiguration {
1282 pub fn new() -> Self {
1284 Self::default()
1285 }
1286
1287 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1289 Self {
1290 storage_class: Maybe::Specified(Some(storage_class)),
1291 ..self
1292 }
1293 }
1294
1295 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1297 Self {
1298 retention_policy: Maybe::Specified(Some(retention_policy)),
1299 ..self
1300 }
1301 }
1302
1303 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1305 Self {
1306 timestamping: Maybe::Specified(Some(timestamping)),
1307 ..self
1308 }
1309 }
1310
1311 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1313 Self {
1314 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1315 ..self
1316 }
1317 }
1318}
1319
1320impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1321 fn from(value: StreamReconfiguration) -> Self {
1322 Self {
1323 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1324 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1325 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1326 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1327 }
1328 }
1329}
1330
1331#[derive(Debug, Clone, Default)]
1332#[non_exhaustive]
1333pub struct BasinReconfiguration {
1335 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1337 pub create_stream_on_append: Maybe<bool>,
1340 pub create_stream_on_read: Maybe<bool>,
1342}
1343
1344impl BasinReconfiguration {
1345 pub fn new() -> Self {
1347 Self::default()
1348 }
1349
1350 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1353 Self {
1354 default_stream_config: Maybe::Specified(Some(config)),
1355 ..self
1356 }
1357 }
1358
1359 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1362 Self {
1363 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1364 ..self
1365 }
1366 }
1367
1368 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1371 Self {
1372 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1373 ..self
1374 }
1375 }
1376}
1377
1378impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1379 fn from(value: BasinReconfiguration) -> Self {
1380 Self {
1381 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1382 create_stream_on_append: value.create_stream_on_append,
1383 create_stream_on_read: value.create_stream_on_read,
1384 }
1385 }
1386}
1387
1388#[derive(Debug, Clone)]
1389#[non_exhaustive]
1390pub struct ReconfigureBasinInput {
1392 pub name: BasinName,
1394 pub config: BasinReconfiguration,
1396}
1397
1398impl ReconfigureBasinInput {
1399 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1401 Self { name, config }
1402 }
1403}
1404
1405#[derive(Debug, Clone, Default)]
1406#[non_exhaustive]
1407pub struct ListAccessTokensInput {
1409 pub prefix: AccessTokenIdPrefix,
1413 pub start_after: AccessTokenIdStartAfter,
1419 pub limit: Option<usize>,
1423}
1424
1425impl ListAccessTokensInput {
1426 pub fn new() -> Self {
1428 Self::default()
1429 }
1430
1431 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1433 Self { prefix, ..self }
1434 }
1435
1436 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1439 Self {
1440 start_after,
1441 ..self
1442 }
1443 }
1444
1445 pub fn with_limit(self, limit: usize) -> Self {
1447 Self {
1448 limit: Some(limit),
1449 ..self
1450 }
1451 }
1452}
1453
1454impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1455 fn from(value: ListAccessTokensInput) -> Self {
1456 Self {
1457 prefix: Some(value.prefix),
1458 start_after: Some(value.start_after),
1459 limit: value.limit,
1460 }
1461 }
1462}
1463
1464#[derive(Debug, Clone, Default)]
1465pub struct ListAllAccessTokensInput {
1467 pub prefix: AccessTokenIdPrefix,
1471 pub start_after: AccessTokenIdStartAfter,
1477}
1478
1479impl ListAllAccessTokensInput {
1480 pub fn new() -> Self {
1482 Self::default()
1483 }
1484
1485 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1487 Self { prefix, ..self }
1488 }
1489
1490 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1493 Self {
1494 start_after,
1495 ..self
1496 }
1497 }
1498}
1499
1500#[derive(Debug, Clone)]
1501#[non_exhaustive]
1502pub struct AccessTokenInfo {
1504 pub id: AccessTokenId,
1506 pub expires_at: S2DateTime,
1508 pub auto_prefix_streams: bool,
1511 pub scope: AccessTokenScope,
1513}
1514
1515impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1516 type Error = ValidationError;
1517
1518 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1519 let expires_at = value
1520 .expires_at
1521 .map(S2DateTime::try_from)
1522 .transpose()?
1523 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1524 Ok(Self {
1525 id: value.id,
1526 expires_at,
1527 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1528 scope: value.scope.into(),
1529 })
1530 }
1531}
1532
1533#[derive(Debug, Clone)]
1534pub enum BasinMatcher {
1538 None,
1540 Exact(BasinName),
1542 Prefix(BasinNamePrefix),
1544}
1545
1546#[derive(Debug, Clone)]
1547pub enum StreamMatcher {
1551 None,
1553 Exact(StreamName),
1555 Prefix(StreamNamePrefix),
1557}
1558
1559#[derive(Debug, Clone)]
1560pub enum AccessTokenMatcher {
1564 None,
1566 Exact(AccessTokenId),
1568 Prefix(AccessTokenIdPrefix),
1570}
1571
1572#[derive(Debug, Clone, Default)]
1573#[non_exhaustive]
1574pub struct ReadWritePermissions {
1576 pub read: bool,
1580 pub write: bool,
1584}
1585
1586impl ReadWritePermissions {
1587 pub fn new() -> Self {
1589 Self::default()
1590 }
1591
1592 pub fn read_only() -> Self {
1594 Self {
1595 read: true,
1596 write: false,
1597 }
1598 }
1599
1600 pub fn write_only() -> Self {
1602 Self {
1603 read: false,
1604 write: true,
1605 }
1606 }
1607
1608 pub fn read_write() -> Self {
1610 Self {
1611 read: true,
1612 write: true,
1613 }
1614 }
1615}
1616
1617impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1618 fn from(value: ReadWritePermissions) -> Self {
1619 Self {
1620 read: Some(value.read),
1621 write: Some(value.write),
1622 }
1623 }
1624}
1625
1626impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1627 fn from(value: api::access::ReadWritePermissions) -> Self {
1628 Self {
1629 read: value.read.unwrap_or_default(),
1630 write: value.write.unwrap_or_default(),
1631 }
1632 }
1633}
1634
1635#[derive(Debug, Clone, Default)]
1636#[non_exhaustive]
1637pub struct OperationGroupPermissions {
1641 pub account: Option<ReadWritePermissions>,
1645 pub basin: Option<ReadWritePermissions>,
1649 pub stream: Option<ReadWritePermissions>,
1653}
1654
1655impl OperationGroupPermissions {
1656 pub fn new() -> Self {
1658 Self::default()
1659 }
1660
1661 pub fn read_only_all() -> Self {
1663 Self {
1664 account: Some(ReadWritePermissions::read_only()),
1665 basin: Some(ReadWritePermissions::read_only()),
1666 stream: Some(ReadWritePermissions::read_only()),
1667 }
1668 }
1669
1670 pub fn write_only_all() -> Self {
1672 Self {
1673 account: Some(ReadWritePermissions::write_only()),
1674 basin: Some(ReadWritePermissions::write_only()),
1675 stream: Some(ReadWritePermissions::write_only()),
1676 }
1677 }
1678
1679 pub fn read_write_all() -> Self {
1681 Self {
1682 account: Some(ReadWritePermissions::read_write()),
1683 basin: Some(ReadWritePermissions::read_write()),
1684 stream: Some(ReadWritePermissions::read_write()),
1685 }
1686 }
1687
1688 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1690 Self {
1691 account: Some(account),
1692 ..self
1693 }
1694 }
1695
1696 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1698 Self {
1699 basin: Some(basin),
1700 ..self
1701 }
1702 }
1703
1704 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1706 Self {
1707 stream: Some(stream),
1708 ..self
1709 }
1710 }
1711}
1712
1713impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1714 fn from(value: OperationGroupPermissions) -> Self {
1715 Self {
1716 account: value.account.map(Into::into),
1717 basin: value.basin.map(Into::into),
1718 stream: value.stream.map(Into::into),
1719 }
1720 }
1721}
1722
1723impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1724 fn from(value: api::access::PermittedOperationGroups) -> Self {
1725 Self {
1726 account: value.account.map(Into::into),
1727 basin: value.basin.map(Into::into),
1728 stream: value.stream.map(Into::into),
1729 }
1730 }
1731}
1732
1733#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1734pub enum Operation {
1738 ListBasins,
1740 CreateBasin,
1742 GetBasinConfig,
1744 DeleteBasin,
1746 ReconfigureBasin,
1748 ListAccessTokens,
1750 IssueAccessToken,
1752 RevokeAccessToken,
1754 GetAccountMetrics,
1756 GetBasinMetrics,
1758 GetStreamMetrics,
1760 ListStreams,
1762 CreateStream,
1764 GetStreamConfig,
1766 DeleteStream,
1768 ReconfigureStream,
1770 CheckTail,
1772 Append,
1774 Read,
1776 Trim,
1778 Fence,
1780}
1781
1782impl From<Operation> for api::access::Operation {
1783 fn from(value: Operation) -> Self {
1784 match value {
1785 Operation::ListBasins => api::access::Operation::ListBasins,
1786 Operation::CreateBasin => api::access::Operation::CreateBasin,
1787 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1788 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1789 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1790 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1791 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1792 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1793 Operation::ListStreams => api::access::Operation::ListStreams,
1794 Operation::CreateStream => api::access::Operation::CreateStream,
1795 Operation::DeleteStream => api::access::Operation::DeleteStream,
1796 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1797 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1798 Operation::CheckTail => api::access::Operation::CheckTail,
1799 Operation::Append => api::access::Operation::Append,
1800 Operation::Read => api::access::Operation::Read,
1801 Operation::Trim => api::access::Operation::Trim,
1802 Operation::Fence => api::access::Operation::Fence,
1803 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1804 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1805 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1806 }
1807 }
1808}
1809
1810impl From<api::access::Operation> for Operation {
1811 fn from(value: api::access::Operation) -> Self {
1812 match value {
1813 api::access::Operation::ListBasins => Operation::ListBasins,
1814 api::access::Operation::CreateBasin => Operation::CreateBasin,
1815 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1816 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1817 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1818 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1819 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1820 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1821 api::access::Operation::ListStreams => Operation::ListStreams,
1822 api::access::Operation::CreateStream => Operation::CreateStream,
1823 api::access::Operation::DeleteStream => Operation::DeleteStream,
1824 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1825 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1826 api::access::Operation::CheckTail => Operation::CheckTail,
1827 api::access::Operation::Append => Operation::Append,
1828 api::access::Operation::Read => Operation::Read,
1829 api::access::Operation::Trim => Operation::Trim,
1830 api::access::Operation::Fence => Operation::Fence,
1831 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1832 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1833 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1834 }
1835 }
1836}
1837
1838#[derive(Debug, Clone)]
1839#[non_exhaustive]
1840pub struct AccessTokenScopeInput {
1848 basins: Option<BasinMatcher>,
1849 streams: Option<StreamMatcher>,
1850 access_tokens: Option<AccessTokenMatcher>,
1851 op_group_perms: Option<OperationGroupPermissions>,
1852 ops: HashSet<Operation>,
1853}
1854
1855impl AccessTokenScopeInput {
1856 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1858 Self {
1859 basins: None,
1860 streams: None,
1861 access_tokens: None,
1862 op_group_perms: None,
1863 ops: ops.into_iter().collect(),
1864 }
1865 }
1866
1867 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1869 Self {
1870 basins: None,
1871 streams: None,
1872 access_tokens: None,
1873 op_group_perms: Some(op_group_perms),
1874 ops: HashSet::default(),
1875 }
1876 }
1877
1878 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1880 Self {
1881 ops: ops.into_iter().collect(),
1882 ..self
1883 }
1884 }
1885
1886 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1888 Self {
1889 op_group_perms: Some(op_group_perms),
1890 ..self
1891 }
1892 }
1893
1894 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1898 Self {
1899 basins: Some(basins),
1900 ..self
1901 }
1902 }
1903
1904 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1908 Self {
1909 streams: Some(streams),
1910 ..self
1911 }
1912 }
1913
1914 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1918 Self {
1919 access_tokens: Some(access_tokens),
1920 ..self
1921 }
1922 }
1923}
1924
1925#[derive(Debug, Clone)]
1926#[non_exhaustive]
1927pub struct AccessTokenScope {
1929 pub basins: Option<BasinMatcher>,
1931 pub streams: Option<StreamMatcher>,
1933 pub access_tokens: Option<AccessTokenMatcher>,
1935 pub op_group_perms: Option<OperationGroupPermissions>,
1937 pub ops: HashSet<Operation>,
1939}
1940
1941impl From<api::access::AccessTokenScope> for AccessTokenScope {
1942 fn from(value: api::access::AccessTokenScope) -> Self {
1943 Self {
1944 basins: value.basins.map(|rs| match rs {
1945 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1946 BasinMatcher::Exact(e)
1947 }
1948 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1949 BasinMatcher::None
1950 }
1951 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1952 }),
1953 streams: value.streams.map(|rs| match rs {
1954 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1955 StreamMatcher::Exact(e)
1956 }
1957 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1958 StreamMatcher::None
1959 }
1960 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1961 }),
1962 access_tokens: value.access_tokens.map(|rs| match rs {
1963 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1964 AccessTokenMatcher::Exact(e)
1965 }
1966 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1967 AccessTokenMatcher::None
1968 }
1969 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1970 }),
1971 op_group_perms: value.op_groups.map(Into::into),
1972 ops: value
1973 .ops
1974 .map(|ops| ops.into_iter().map(Into::into).collect())
1975 .unwrap_or_default(),
1976 }
1977 }
1978}
1979
1980impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1981 fn from(value: AccessTokenScopeInput) -> Self {
1982 Self {
1983 basins: value.basins.map(|rs| match rs {
1984 BasinMatcher::None => {
1985 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1986 }
1987 BasinMatcher::Exact(e) => {
1988 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1989 }
1990 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1991 }),
1992 streams: value.streams.map(|rs| match rs {
1993 StreamMatcher::None => {
1994 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1995 }
1996 StreamMatcher::Exact(e) => {
1997 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1998 }
1999 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2000 }),
2001 access_tokens: value.access_tokens.map(|rs| match rs {
2002 AccessTokenMatcher::None => {
2003 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2004 }
2005 AccessTokenMatcher::Exact(e) => {
2006 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2007 }
2008 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2009 }),
2010 op_groups: value.op_group_perms.map(Into::into),
2011 ops: if value.ops.is_empty() {
2012 None
2013 } else {
2014 Some(value.ops.into_iter().map(Into::into).collect())
2015 },
2016 }
2017 }
2018}
2019
2020#[derive(Debug, Clone)]
2021#[non_exhaustive]
2022pub struct IssueAccessTokenInput {
2024 pub id: AccessTokenId,
2026 pub expires_at: Option<S2DateTime>,
2031 pub auto_prefix_streams: bool,
2039 pub scope: AccessTokenScopeInput,
2041}
2042
2043impl IssueAccessTokenInput {
2044 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2046 Self {
2047 id,
2048 expires_at: None,
2049 auto_prefix_streams: false,
2050 scope,
2051 }
2052 }
2053
2054 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2056 Self {
2057 expires_at: Some(expires_at),
2058 ..self
2059 }
2060 }
2061
2062 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2065 Self {
2066 auto_prefix_streams,
2067 ..self
2068 }
2069 }
2070}
2071
2072impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2073 fn from(value: IssueAccessTokenInput) -> Self {
2074 Self {
2075 id: value.id,
2076 expires_at: value.expires_at.map(Into::into),
2077 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2078 scope: value.scope.into(),
2079 }
2080 }
2081}
2082
2083#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2084pub enum TimeseriesInterval {
2086 Minute,
2088 Hour,
2090 Day,
2092}
2093
2094impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2095 fn from(value: TimeseriesInterval) -> Self {
2096 match value {
2097 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2098 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2099 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2100 }
2101 }
2102}
2103
2104impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2105 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2106 match value {
2107 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2108 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2109 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2110 }
2111 }
2112}
2113
2114#[derive(Debug, Clone, Copy)]
2115#[non_exhaustive]
2116pub struct TimeRange {
2118 pub start: u32,
2120 pub end: u32,
2122}
2123
2124impl TimeRange {
2125 pub fn new(start: u32, end: u32) -> Self {
2127 Self { start, end }
2128 }
2129}
2130
2131#[derive(Debug, Clone, Copy)]
2132#[non_exhaustive]
2133pub struct TimeRangeAndInterval {
2135 pub start: u32,
2137 pub end: u32,
2139 pub interval: Option<TimeseriesInterval>,
2143}
2144
2145impl TimeRangeAndInterval {
2146 pub fn new(start: u32, end: u32) -> Self {
2148 Self {
2149 start,
2150 end,
2151 interval: None,
2152 }
2153 }
2154
2155 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2157 Self {
2158 interval: Some(interval),
2159 ..self
2160 }
2161 }
2162}
2163
2164#[derive(Debug, Clone, Copy)]
2165pub enum AccountMetricSet {
2167 ActiveBasins(TimeRange),
2170 AccountOps(TimeRangeAndInterval),
2177}
2178
2179#[derive(Debug, Clone)]
2180#[non_exhaustive]
2181pub struct GetAccountMetricsInput {
2183 pub set: AccountMetricSet,
2185}
2186
2187impl GetAccountMetricsInput {
2188 pub fn new(set: AccountMetricSet) -> Self {
2190 Self { set }
2191 }
2192}
2193
2194impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2195 fn from(value: GetAccountMetricsInput) -> Self {
2196 let (set, start, end, interval) = match value.set {
2197 AccountMetricSet::ActiveBasins(args) => (
2198 api::metrics::AccountMetricSet::ActiveBasins,
2199 args.start,
2200 args.end,
2201 None,
2202 ),
2203 AccountMetricSet::AccountOps(args) => (
2204 api::metrics::AccountMetricSet::AccountOps,
2205 args.start,
2206 args.end,
2207 args.interval,
2208 ),
2209 };
2210 Self {
2211 set,
2212 start: Some(start),
2213 end: Some(end),
2214 interval: interval.map(Into::into),
2215 }
2216 }
2217}
2218
2219#[derive(Debug, Clone, Copy)]
2220pub enum BasinMetricSet {
2222 Storage(TimeRange),
2225 AppendOps(TimeRangeAndInterval),
2233 ReadOps(TimeRangeAndInterval),
2241 ReadThroughput(TimeRangeAndInterval),
2248 AppendThroughput(TimeRangeAndInterval),
2255 BasinOps(TimeRangeAndInterval),
2262}
2263
2264#[derive(Debug, Clone)]
2265#[non_exhaustive]
2266pub struct GetBasinMetricsInput {
2268 pub name: BasinName,
2270 pub set: BasinMetricSet,
2272}
2273
2274impl GetBasinMetricsInput {
2275 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2277 Self { name, set }
2278 }
2279}
2280
2281impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2282 fn from(value: GetBasinMetricsInput) -> Self {
2283 let (set, start, end, interval) = match value.set {
2284 BasinMetricSet::Storage(args) => (
2285 api::metrics::BasinMetricSet::Storage,
2286 args.start,
2287 args.end,
2288 None,
2289 ),
2290 BasinMetricSet::AppendOps(args) => (
2291 api::metrics::BasinMetricSet::AppendOps,
2292 args.start,
2293 args.end,
2294 args.interval,
2295 ),
2296 BasinMetricSet::ReadOps(args) => (
2297 api::metrics::BasinMetricSet::ReadOps,
2298 args.start,
2299 args.end,
2300 args.interval,
2301 ),
2302 BasinMetricSet::ReadThroughput(args) => (
2303 api::metrics::BasinMetricSet::ReadThroughput,
2304 args.start,
2305 args.end,
2306 args.interval,
2307 ),
2308 BasinMetricSet::AppendThroughput(args) => (
2309 api::metrics::BasinMetricSet::AppendThroughput,
2310 args.start,
2311 args.end,
2312 args.interval,
2313 ),
2314 BasinMetricSet::BasinOps(args) => (
2315 api::metrics::BasinMetricSet::BasinOps,
2316 args.start,
2317 args.end,
2318 args.interval,
2319 ),
2320 };
2321 (
2322 value.name,
2323 api::metrics::BasinMetricSetRequest {
2324 set,
2325 start: Some(start),
2326 end: Some(end),
2327 interval: interval.map(Into::into),
2328 },
2329 )
2330 }
2331}
2332
2333#[derive(Debug, Clone, Copy)]
2334pub enum StreamMetricSet {
2336 Storage(TimeRange),
2339}
2340
2341#[derive(Debug, Clone)]
2342#[non_exhaustive]
2343pub struct GetStreamMetricsInput {
2345 pub basin_name: BasinName,
2347 pub stream_name: StreamName,
2349 pub set: StreamMetricSet,
2351}
2352
2353impl GetStreamMetricsInput {
2354 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2357 Self {
2358 basin_name,
2359 stream_name,
2360 set,
2361 }
2362 }
2363}
2364
2365impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2366 fn from(value: GetStreamMetricsInput) -> Self {
2367 let (set, start, end, interval) = match value.set {
2368 StreamMetricSet::Storage(args) => (
2369 api::metrics::StreamMetricSet::Storage,
2370 args.start,
2371 args.end,
2372 None,
2373 ),
2374 };
2375 (
2376 value.basin_name,
2377 value.stream_name,
2378 api::metrics::StreamMetricSetRequest {
2379 set,
2380 start: Some(start),
2381 end: Some(end),
2382 interval,
2383 },
2384 )
2385 }
2386}
2387
2388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2389pub enum MetricUnit {
2391 Bytes,
2393 Operations,
2395}
2396
2397impl From<api::metrics::MetricUnit> for MetricUnit {
2398 fn from(value: api::metrics::MetricUnit) -> Self {
2399 match value {
2400 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2401 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2402 }
2403 }
2404}
2405
2406#[derive(Debug, Clone)]
2407#[non_exhaustive]
2408pub struct ScalarMetric {
2410 pub name: String,
2412 pub unit: MetricUnit,
2414 pub value: f64,
2416}
2417
2418#[derive(Debug, Clone)]
2419#[non_exhaustive]
2420pub struct AccumulationMetric {
2423 pub name: String,
2425 pub unit: MetricUnit,
2427 pub interval: TimeseriesInterval,
2429 pub values: Vec<(u32, f64)>,
2433}
2434
2435#[derive(Debug, Clone)]
2436#[non_exhaustive]
2437pub struct GaugeMetric {
2439 pub name: String,
2441 pub unit: MetricUnit,
2443 pub values: Vec<(u32, f64)>,
2446}
2447
2448#[derive(Debug, Clone)]
2449#[non_exhaustive]
2450pub struct LabelMetric {
2452 pub name: String,
2454 pub values: Vec<String>,
2456}
2457
2458#[derive(Debug, Clone)]
2459pub enum Metric {
2461 Scalar(ScalarMetric),
2463 Accumulation(AccumulationMetric),
2466 Gauge(GaugeMetric),
2468 Label(LabelMetric),
2470}
2471
2472impl From<api::metrics::Metric> for Metric {
2473 fn from(value: api::metrics::Metric) -> Self {
2474 match value {
2475 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2476 name: sm.name.into(),
2477 unit: sm.unit.into(),
2478 value: sm.value,
2479 }),
2480 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2481 name: am.name.into(),
2482 unit: am.unit.into(),
2483 interval: am.interval.into(),
2484 values: am.values,
2485 }),
2486 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2487 name: gm.name.into(),
2488 unit: gm.unit.into(),
2489 values: gm.values,
2490 }),
2491 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2492 name: lm.name.into(),
2493 values: lm.values,
2494 }),
2495 }
2496 }
2497}
2498
2499#[derive(Debug, Clone, Default)]
2500#[non_exhaustive]
2501pub struct ListStreamsInput {
2503 pub prefix: StreamNamePrefix,
2507 pub start_after: StreamNameStartAfter,
2513 pub limit: Option<usize>,
2517}
2518
2519impl ListStreamsInput {
2520 pub fn new() -> Self {
2522 Self::default()
2523 }
2524
2525 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2527 Self { prefix, ..self }
2528 }
2529
2530 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2533 Self {
2534 start_after,
2535 ..self
2536 }
2537 }
2538
2539 pub fn with_limit(self, limit: usize) -> Self {
2541 Self {
2542 limit: Some(limit),
2543 ..self
2544 }
2545 }
2546}
2547
2548impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2549 fn from(value: ListStreamsInput) -> Self {
2550 Self {
2551 prefix: Some(value.prefix),
2552 start_after: Some(value.start_after),
2553 limit: value.limit,
2554 }
2555 }
2556}
2557
2558#[derive(Debug, Clone, Default)]
2559pub struct ListAllStreamsInput {
2561 pub prefix: StreamNamePrefix,
2565 pub start_after: StreamNameStartAfter,
2571 pub include_deleted: bool,
2575}
2576
2577impl ListAllStreamsInput {
2578 pub fn new() -> Self {
2580 Self::default()
2581 }
2582
2583 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2585 Self { prefix, ..self }
2586 }
2587
2588 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2591 Self {
2592 start_after,
2593 ..self
2594 }
2595 }
2596
2597 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2599 Self {
2600 include_deleted,
2601 ..self
2602 }
2603 }
2604}
2605
2606#[derive(Debug, Clone, PartialEq)]
2607#[non_exhaustive]
2608pub struct StreamInfo {
2610 pub name: StreamName,
2612 pub created_at: S2DateTime,
2614 pub deleted_at: Option<S2DateTime>,
2616}
2617
2618impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2619 type Error = ValidationError;
2620
2621 fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2622 Ok(Self {
2623 name: value.name,
2624 created_at: value.created_at.try_into()?,
2625 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2626 })
2627 }
2628}
2629
2630#[derive(Debug, Clone)]
2631#[non_exhaustive]
2632pub struct CreateStreamInput {
2634 pub name: StreamName,
2636 pub config: Option<StreamConfig>,
2640 idempotency_token: String,
2641}
2642
2643impl CreateStreamInput {
2644 pub fn new(name: StreamName) -> Self {
2646 Self {
2647 name,
2648 config: None,
2649 idempotency_token: idempotency_token(),
2650 }
2651 }
2652
2653 pub fn with_config(self, config: StreamConfig) -> Self {
2655 Self {
2656 config: Some(config),
2657 ..self
2658 }
2659 }
2660}
2661
2662impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2663 fn from(value: CreateStreamInput) -> Self {
2664 (
2665 api::stream::CreateStreamRequest {
2666 stream: value.name,
2667 config: value.config.map(Into::into),
2668 },
2669 value.idempotency_token,
2670 )
2671 }
2672}
2673
2674#[derive(Debug, Clone)]
2675#[non_exhaustive]
2676#[doc(hidden)]
2679#[cfg(feature = "_hidden")]
2680pub struct CreateOrReconfigureStreamInput {
2681 pub name: StreamName,
2683 pub config: Option<StreamReconfiguration>,
2687}
2688
2689#[cfg(feature = "_hidden")]
2690impl CreateOrReconfigureStreamInput {
2691 pub fn new(name: StreamName) -> Self {
2693 Self { name, config: None }
2694 }
2695
2696 pub fn with_config(self, config: StreamReconfiguration) -> Self {
2698 Self {
2699 config: Some(config),
2700 ..self
2701 }
2702 }
2703}
2704
2705#[cfg(feature = "_hidden")]
2706impl From<CreateOrReconfigureStreamInput>
2707 for (StreamName, Option<api::config::StreamReconfiguration>)
2708{
2709 fn from(value: CreateOrReconfigureStreamInput) -> Self {
2710 (value.name, value.config.map(Into::into))
2711 }
2712}
2713
2714#[derive(Debug, Clone)]
2715#[non_exhaustive]
2716pub struct DeleteStreamInput {
2718 pub name: StreamName,
2720 pub ignore_not_found: bool,
2722}
2723
2724impl DeleteStreamInput {
2725 pub fn new(name: StreamName) -> Self {
2727 Self {
2728 name,
2729 ignore_not_found: false,
2730 }
2731 }
2732
2733 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2735 Self {
2736 ignore_not_found,
2737 ..self
2738 }
2739 }
2740}
2741
2742#[derive(Debug, Clone)]
2743#[non_exhaustive]
2744pub struct ReconfigureStreamInput {
2746 pub name: StreamName,
2748 pub config: StreamReconfiguration,
2750}
2751
2752impl ReconfigureStreamInput {
2753 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2755 Self { name, config }
2756 }
2757}
2758
2759#[derive(Debug, Clone, PartialEq, Eq)]
2760pub struct FencingToken(String);
2766
2767impl FencingToken {
2768 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2770 rand::rng()
2771 .sample_iter(&rand::distr::Alphanumeric)
2772 .take(n)
2773 .map(char::from)
2774 .collect::<String>()
2775 .parse()
2776 }
2777}
2778
2779impl FromStr for FencingToken {
2780 type Err = ValidationError;
2781
2782 fn from_str(s: &str) -> Result<Self, Self::Err> {
2783 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2784 return Err(ValidationError(format!(
2785 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2786 )));
2787 }
2788 Ok(FencingToken(s.to_string()))
2789 }
2790}
2791
2792impl std::fmt::Display for FencingToken {
2793 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2794 write!(f, "{}", self.0)
2795 }
2796}
2797
2798impl Deref for FencingToken {
2799 type Target = str;
2800
2801 fn deref(&self) -> &Self::Target {
2802 &self.0
2803 }
2804}
2805
2806#[derive(Debug, Clone, Copy, PartialEq)]
2807#[non_exhaustive]
2808pub struct StreamPosition {
2810 pub seq_num: u64,
2812 pub timestamp: u64,
2815}
2816
2817impl std::fmt::Display for StreamPosition {
2818 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2819 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2820 }
2821}
2822
2823impl From<api::stream::proto::StreamPosition> for StreamPosition {
2824 fn from(value: api::stream::proto::StreamPosition) -> Self {
2825 Self {
2826 seq_num: value.seq_num,
2827 timestamp: value.timestamp,
2828 }
2829 }
2830}
2831
2832impl From<api::stream::StreamPosition> for StreamPosition {
2833 fn from(value: api::stream::StreamPosition) -> Self {
2834 Self {
2835 seq_num: value.seq_num,
2836 timestamp: value.timestamp,
2837 }
2838 }
2839}
2840
2841#[derive(Debug, Clone, PartialEq)]
2842#[non_exhaustive]
2843pub struct Header {
2845 pub name: Bytes,
2847 pub value: Bytes,
2849}
2850
2851impl Header {
2852 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2854 Self {
2855 name: name.into(),
2856 value: value.into(),
2857 }
2858 }
2859}
2860
2861impl From<Header> for api::stream::proto::Header {
2862 fn from(value: Header) -> Self {
2863 Self {
2864 name: value.name,
2865 value: value.value,
2866 }
2867 }
2868}
2869
2870impl From<api::stream::proto::Header> for Header {
2871 fn from(value: api::stream::proto::Header) -> Self {
2872 Self {
2873 name: value.name,
2874 value: value.value,
2875 }
2876 }
2877}
2878
2879#[derive(Debug, Clone, PartialEq)]
2880pub struct AppendRecord {
2882 body: Bytes,
2883 headers: Vec<Header>,
2884 timestamp: Option<u64>,
2885}
2886
2887impl AppendRecord {
2888 fn validate(self) -> Result<Self, ValidationError> {
2889 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2890 Err(ValidationError(format!(
2891 "metered_bytes: {} exceeds {}",
2892 self.metered_bytes(),
2893 RECORD_BATCH_MAX.bytes
2894 )))
2895 } else {
2896 Ok(self)
2897 }
2898 }
2899
2900 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2902 let record = Self {
2903 body: body.into(),
2904 headers: Vec::default(),
2905 timestamp: None,
2906 };
2907 record.validate()
2908 }
2909
2910 pub fn with_headers(
2912 self,
2913 headers: impl IntoIterator<Item = Header>,
2914 ) -> Result<Self, ValidationError> {
2915 let record = Self {
2916 headers: headers.into_iter().collect(),
2917 ..self
2918 };
2919 record.validate()
2920 }
2921
2922 pub fn with_timestamp(self, timestamp: u64) -> Self {
2926 Self {
2927 timestamp: Some(timestamp),
2928 ..self
2929 }
2930 }
2931
2932 pub fn body(&self) -> &[u8] {
2934 &self.body
2935 }
2936
2937 pub fn headers(&self) -> &[Header] {
2939 &self.headers
2940 }
2941
2942 pub fn timestamp(&self) -> Option<u64> {
2944 self.timestamp
2945 }
2946}
2947
2948impl From<AppendRecord> for api::stream::proto::AppendRecord {
2949 fn from(value: AppendRecord) -> Self {
2950 Self {
2951 timestamp: value.timestamp,
2952 headers: value.headers.into_iter().map(Into::into).collect(),
2953 body: value.body,
2954 }
2955 }
2956}
2957
2958pub trait MeteredBytes {
2965 fn metered_bytes(&self) -> usize;
2967}
2968
2969macro_rules! metered_bytes_impl {
2970 ($ty:ty) => {
2971 impl MeteredBytes for $ty {
2972 fn metered_bytes(&self) -> usize {
2973 8 + (2 * self.headers.len())
2974 + self
2975 .headers
2976 .iter()
2977 .map(|h| h.name.len() + h.value.len())
2978 .sum::<usize>()
2979 + self.body.len()
2980 }
2981 }
2982 };
2983}
2984
2985metered_bytes_impl!(AppendRecord);
2986
2987#[derive(Debug, Clone)]
2988pub struct AppendRecordBatch {
2997 records: Vec<AppendRecord>,
2998 metered_bytes: usize,
2999}
3000
3001impl AppendRecordBatch {
3002 pub(crate) fn with_capacity(capacity: usize) -> Self {
3003 Self {
3004 records: Vec::with_capacity(capacity),
3005 metered_bytes: 0,
3006 }
3007 }
3008
3009 pub(crate) fn push(&mut self, record: AppendRecord) {
3010 self.metered_bytes += record.metered_bytes();
3011 self.records.push(record);
3012 }
3013
3014 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3016 where
3017 I: IntoIterator<Item = AppendRecord>,
3018 {
3019 let mut records = Vec::new();
3020 let mut metered_bytes = 0;
3021
3022 for record in iter {
3023 metered_bytes += record.metered_bytes();
3024 records.push(record);
3025
3026 if metered_bytes > RECORD_BATCH_MAX.bytes {
3027 return Err(ValidationError(format!(
3028 "batch size in metered bytes ({metered_bytes}) exceeds {}",
3029 RECORD_BATCH_MAX.bytes
3030 )));
3031 }
3032
3033 if records.len() > RECORD_BATCH_MAX.count {
3034 return Err(ValidationError(format!(
3035 "number of records in the batch exceeds {}",
3036 RECORD_BATCH_MAX.count
3037 )));
3038 }
3039 }
3040
3041 if records.is_empty() {
3042 return Err(ValidationError("batch is empty".into()));
3043 }
3044
3045 Ok(Self {
3046 records,
3047 metered_bytes,
3048 })
3049 }
3050}
3051
3052impl Deref for AppendRecordBatch {
3053 type Target = [AppendRecord];
3054
3055 fn deref(&self) -> &Self::Target {
3056 &self.records
3057 }
3058}
3059
3060impl MeteredBytes for AppendRecordBatch {
3061 fn metered_bytes(&self) -> usize {
3062 self.metered_bytes
3063 }
3064}
3065
3066#[derive(Debug, Clone)]
3067pub enum Command {
3069 Fence {
3071 fencing_token: FencingToken,
3073 },
3074 Trim {
3076 trim_point: u64,
3078 },
3079}
3080
3081#[derive(Debug, Clone)]
3082#[non_exhaustive]
3083pub struct CommandRecord {
3087 pub command: Command,
3089 pub timestamp: Option<u64>,
3091}
3092
3093impl CommandRecord {
3094 const FENCE: &[u8] = b"fence";
3095 const TRIM: &[u8] = b"trim";
3096
3097 pub fn fence(fencing_token: FencingToken) -> Self {
3102 Self {
3103 command: Command::Fence { fencing_token },
3104 timestamp: None,
3105 }
3106 }
3107
3108 pub fn trim(trim_point: u64) -> Self {
3115 Self {
3116 command: Command::Trim { trim_point },
3117 timestamp: None,
3118 }
3119 }
3120
3121 pub fn with_timestamp(self, timestamp: u64) -> Self {
3123 Self {
3124 timestamp: Some(timestamp),
3125 ..self
3126 }
3127 }
3128}
3129
3130impl From<CommandRecord> for AppendRecord {
3131 fn from(value: CommandRecord) -> Self {
3132 let (header_value, body) = match value.command {
3133 Command::Fence { fencing_token } => (
3134 CommandRecord::FENCE,
3135 Bytes::copy_from_slice(fencing_token.as_bytes()),
3136 ),
3137 Command::Trim { trim_point } => (
3138 CommandRecord::TRIM,
3139 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3140 ),
3141 };
3142 Self {
3143 body,
3144 headers: vec![Header::new("", header_value)],
3145 timestamp: value.timestamp,
3146 }
3147 }
3148}
3149
3150#[derive(Debug, Clone)]
3151#[non_exhaustive]
3152pub struct AppendInput {
3155 pub records: AppendRecordBatch,
3157 pub match_seq_num: Option<u64>,
3161 pub fencing_token: Option<FencingToken>,
3166}
3167
3168impl AppendInput {
3169 pub fn new(records: AppendRecordBatch) -> Self {
3171 Self {
3172 records,
3173 match_seq_num: None,
3174 fencing_token: None,
3175 }
3176 }
3177
3178 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3180 Self {
3181 match_seq_num: Some(match_seq_num),
3182 ..self
3183 }
3184 }
3185
3186 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3188 Self {
3189 fencing_token: Some(fencing_token),
3190 ..self
3191 }
3192 }
3193}
3194
3195impl From<AppendInput> for api::stream::proto::AppendInput {
3196 fn from(value: AppendInput) -> Self {
3197 Self {
3198 records: value.records.iter().cloned().map(Into::into).collect(),
3199 match_seq_num: value.match_seq_num,
3200 fencing_token: value.fencing_token.map(|t| t.to_string()),
3201 }
3202 }
3203}
3204
3205#[derive(Debug, Clone, PartialEq)]
3206#[non_exhaustive]
3207pub struct AppendAck {
3209 pub start: StreamPosition,
3211 pub end: StreamPosition,
3217 pub tail: StreamPosition,
3222}
3223
3224impl From<api::stream::proto::AppendAck> for AppendAck {
3225 fn from(value: api::stream::proto::AppendAck) -> Self {
3226 Self {
3227 start: value.start.unwrap_or_default().into(),
3228 end: value.end.unwrap_or_default().into(),
3229 tail: value.tail.unwrap_or_default().into(),
3230 }
3231 }
3232}
3233
3234#[derive(Debug, Clone, Copy)]
3235pub enum ReadFrom {
3237 SeqNum(u64),
3239 Timestamp(u64),
3241 TailOffset(u64),
3243}
3244
3245impl Default for ReadFrom {
3246 fn default() -> Self {
3247 Self::SeqNum(0)
3248 }
3249}
3250
3251#[derive(Debug, Default, Clone)]
3252#[non_exhaustive]
3253pub struct ReadStart {
3255 pub from: ReadFrom,
3259 pub clamp_to_tail: bool,
3263}
3264
3265impl ReadStart {
3266 pub fn new() -> Self {
3268 Self::default()
3269 }
3270
3271 pub fn with_from(self, from: ReadFrom) -> Self {
3273 Self { from, ..self }
3274 }
3275
3276 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3278 Self {
3279 clamp_to_tail,
3280 ..self
3281 }
3282 }
3283}
3284
3285impl From<ReadStart> for api::stream::ReadStart {
3286 fn from(value: ReadStart) -> Self {
3287 let (seq_num, timestamp, tail_offset) = match value.from {
3288 ReadFrom::SeqNum(n) => (Some(n), None, None),
3289 ReadFrom::Timestamp(t) => (None, Some(t), None),
3290 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3291 };
3292 Self {
3293 seq_num,
3294 timestamp,
3295 tail_offset,
3296 clamp: if value.clamp_to_tail {
3297 Some(true)
3298 } else {
3299 None
3300 },
3301 }
3302 }
3303}
3304
3305#[derive(Debug, Clone, Default)]
3306#[non_exhaustive]
3307pub struct ReadLimits {
3309 pub count: Option<usize>,
3313 pub bytes: Option<usize>,
3317}
3318
3319impl ReadLimits {
3320 pub fn new() -> Self {
3322 Self::default()
3323 }
3324
3325 pub fn with_count(self, count: usize) -> Self {
3327 Self {
3328 count: Some(count),
3329 ..self
3330 }
3331 }
3332
3333 pub fn with_bytes(self, bytes: usize) -> Self {
3335 Self {
3336 bytes: Some(bytes),
3337 ..self
3338 }
3339 }
3340}
3341
3342#[derive(Debug, Clone, Default)]
3343#[non_exhaustive]
3344pub struct ReadStop {
3346 pub limits: ReadLimits,
3350 pub until: Option<RangeTo<u64>>,
3354 pub wait: Option<u32>,
3364}
3365
3366impl ReadStop {
3367 pub fn new() -> Self {
3369 Self::default()
3370 }
3371
3372 pub fn with_limits(self, limits: ReadLimits) -> Self {
3374 Self { limits, ..self }
3375 }
3376
3377 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3379 Self {
3380 until: Some(until),
3381 ..self
3382 }
3383 }
3384
3385 pub fn with_wait(self, wait: u32) -> Self {
3387 Self {
3388 wait: Some(wait),
3389 ..self
3390 }
3391 }
3392}
3393
3394impl From<ReadStop> for api::stream::ReadEnd {
3395 fn from(value: ReadStop) -> Self {
3396 Self {
3397 count: value.limits.count,
3398 bytes: value.limits.bytes,
3399 until: value.until.map(|r| r.end),
3400 wait: value.wait,
3401 }
3402 }
3403}
3404
3405#[derive(Debug, Clone, Default)]
3406#[non_exhaustive]
3407pub struct ReadInput {
3410 pub start: ReadStart,
3414 pub stop: ReadStop,
3418 pub ignore_command_records: bool,
3422}
3423
3424impl ReadInput {
3425 pub fn new() -> Self {
3427 Self::default()
3428 }
3429
3430 pub fn with_start(self, start: ReadStart) -> Self {
3432 Self { start, ..self }
3433 }
3434
3435 pub fn with_stop(self, stop: ReadStop) -> Self {
3437 Self { stop, ..self }
3438 }
3439
3440 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3442 Self {
3443 ignore_command_records,
3444 ..self
3445 }
3446 }
3447}
3448
3449#[derive(Debug, Clone)]
3450#[non_exhaustive]
3451pub struct SequencedRecord {
3453 pub seq_num: u64,
3455 pub body: Bytes,
3457 pub headers: Vec<Header>,
3459 pub timestamp: u64,
3461}
3462
3463impl SequencedRecord {
3464 pub fn is_command_record(&self) -> bool {
3466 self.headers.len() == 1 && *self.headers[0].name == *b""
3467 }
3468}
3469
3470impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3471 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3472 Self {
3473 seq_num: value.seq_num,
3474 body: value.body,
3475 headers: value.headers.into_iter().map(Into::into).collect(),
3476 timestamp: value.timestamp,
3477 }
3478 }
3479}
3480
3481metered_bytes_impl!(SequencedRecord);
3482
3483#[derive(Debug, Clone)]
3484#[non_exhaustive]
3485pub struct ReadBatch {
3488 pub records: Vec<SequencedRecord>,
3495 pub tail: Option<StreamPosition>,
3500}
3501
3502impl ReadBatch {
3503 pub(crate) fn from_api(
3504 batch: api::stream::proto::ReadBatch,
3505 ignore_command_records: bool,
3506 ) -> Self {
3507 Self {
3508 records: batch
3509 .records
3510 .into_iter()
3511 .map(Into::into)
3512 .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3513 .collect(),
3514 tail: batch.tail.map(Into::into),
3515 }
3516 }
3517}
3518
3519pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3521
3522#[derive(Debug, Clone, thiserror::Error)]
3523pub enum AppendConditionFailed {
3525 #[error("fencing token mismatch, expected: {0}")]
3526 FencingTokenMismatch(FencingToken),
3528 #[error("sequence number mismatch, expected: {0}")]
3529 SeqNumMismatch(u64),
3531}
3532
3533impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3534 fn from(value: api::stream::AppendConditionFailed) -> Self {
3535 match value {
3536 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3537 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3538 }
3539 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3540 AppendConditionFailed::SeqNumMismatch(seq)
3541 }
3542 }
3543 }
3544}
3545
3546#[derive(Debug, Clone, thiserror::Error)]
3547pub enum S2Error {
3549 #[error("{0}")]
3550 Client(String),
3552 #[error(transparent)]
3553 Validation(#[from] ValidationError),
3555 #[error("{0}")]
3556 AppendConditionFailed(AppendConditionFailed),
3558 #[error("read from an unwritten position. current tail: {0}")]
3559 ReadUnwritten(StreamPosition),
3561 #[error("{0}")]
3562 Server(ErrorResponse),
3564}
3565
3566impl From<ApiError> for S2Error {
3567 fn from(err: ApiError) -> Self {
3568 match err {
3569 ApiError::ReadUnwritten(tail_response) => {
3570 Self::ReadUnwritten(tail_response.tail.into())
3571 }
3572 ApiError::AppendConditionFailed(condition_failed) => {
3573 Self::AppendConditionFailed(condition_failed.into())
3574 }
3575 ApiError::Server(_, response) => Self::Server(response.into()),
3576 other => Self::Client(other.to_string()),
3577 }
3578 }
3579}
3580
3581#[derive(Debug, Clone, thiserror::Error)]
3582#[error("{code}: {message}")]
3583#[non_exhaustive]
3584pub struct ErrorResponse {
3586 pub code: String,
3588 pub message: String,
3590}
3591
3592impl From<ApiErrorResponse> for ErrorResponse {
3593 fn from(response: ApiErrorResponse) -> Self {
3594 Self {
3595 code: response.code,
3596 message: response.message,
3597 }
3598 }
3599}
3600
3601fn idempotency_token() -> String {
3602 uuid::Uuid::new_v4().simple().to_string()
3603}