1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::stream::StreamName;
45pub use s2_common::types::stream::StreamNamePrefix;
47pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl TryFrom<time::OffsetDateTime> for S2DateTime {
66 type Error = ValidationError;
67
68 fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
69 dt.format(&time::format_description::well_known::Rfc3339)
70 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
71 Ok(Self(dt))
72 }
73}
74
75impl From<S2DateTime> for time::OffsetDateTime {
76 fn from(dt: S2DateTime) -> Self {
77 dt.0
78 }
79}
80
81impl FromStr for S2DateTime {
82 type Err = ValidationError;
83
84 fn from_str(s: &str) -> Result<Self, Self::Err> {
85 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
86 .map(Self)
87 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
88 }
89}
90
91impl fmt::Display for S2DateTime {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 write!(
94 f,
95 "{}",
96 self.0
97 .format(&time::format_description::well_known::Rfc3339)
98 .expect("RFC3339 formatting should not fail for S2DateTime")
99 )
100 }
101}
102
103#[derive(Debug, Clone, PartialEq)]
105pub(crate) enum BasinAuthority {
106 ParentZone(Authority),
108 Direct(Authority),
110}
111
112#[derive(Debug, Clone)]
114pub struct AccountEndpoint {
115 scheme: Scheme,
116 authority: Authority,
117}
118
119impl AccountEndpoint {
120 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
122 endpoint.parse()
123 }
124}
125
126impl FromStr for AccountEndpoint {
127 type Err = ValidationError;
128
129 fn from_str(s: &str) -> Result<Self, Self::Err> {
130 let (scheme, authority) = match s.find("://") {
131 Some(idx) => {
132 let scheme: Scheme = s[..idx]
133 .parse()
134 .map_err(|_| "invalid account endpoint scheme".to_string())?;
135 (scheme, &s[idx + 3..])
136 }
137 None => (Scheme::HTTPS, s),
138 };
139 Ok(Self {
140 scheme,
141 authority: authority
142 .parse()
143 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
144 })
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct BasinEndpoint {
151 scheme: Scheme,
152 authority: BasinAuthority,
153}
154
155impl BasinEndpoint {
156 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
158 endpoint.parse()
159 }
160}
161
162impl FromStr for BasinEndpoint {
163 type Err = ValidationError;
164
165 fn from_str(s: &str) -> Result<Self, Self::Err> {
166 let (scheme, authority) = match s.find("://") {
167 Some(idx) => {
168 let scheme: Scheme = s[..idx]
169 .parse()
170 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
171 (scheme, &s[idx + 3..])
172 }
173 None => (Scheme::HTTPS, s),
174 };
175 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
176 BasinAuthority::ParentZone(
177 authority
178 .parse()
179 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
180 )
181 } else {
182 BasinAuthority::Direct(
183 authority
184 .parse()
185 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
186 )
187 };
188 Ok(Self { scheme, authority })
189 }
190}
191
192#[derive(Debug, Clone)]
193#[non_exhaustive]
194pub struct S2Endpoints {
196 pub(crate) scheme: Scheme,
197 pub(crate) account_authority: Authority,
198 pub(crate) basin_authority: BasinAuthority,
199}
200
201impl S2Endpoints {
202 pub fn new(
204 account_endpoint: AccountEndpoint,
205 basin_endpoint: BasinEndpoint,
206 ) -> Result<Self, ValidationError> {
207 if account_endpoint.scheme != basin_endpoint.scheme {
208 return Err("account and basin endpoints must have the same scheme".into());
209 }
210 Ok(Self {
211 scheme: account_endpoint.scheme,
212 account_authority: account_endpoint.authority,
213 basin_authority: basin_endpoint.authority,
214 })
215 }
216
217 pub fn from_env() -> Result<Self, ValidationError> {
223 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
224 Ok(endpoint) => endpoint.parse()?,
225 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
226 Err(VarError::NotUnicode(_)) => {
227 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
228 }
229 };
230
231 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
232 Ok(endpoint) => endpoint.parse()?,
233 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
234 Err(VarError::NotUnicode(_)) => {
235 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
236 }
237 };
238
239 if account_endpoint.scheme != basin_endpoint.scheme {
240 return Err(
241 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
242 );
243 }
244
245 Ok(Self {
246 scheme: account_endpoint.scheme,
247 account_authority: account_endpoint.authority,
248 basin_authority: basin_endpoint.authority,
249 })
250 }
251
252 pub(crate) fn for_aws() -> Self {
253 Self {
254 scheme: Scheme::HTTPS,
255 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
256 basin_authority: BasinAuthority::ParentZone(
257 "b.aws.s2.dev".try_into().expect("valid authority"),
258 ),
259 }
260 }
261}
262
263#[derive(Debug, Clone, Copy)]
264pub enum Compression {
266 None,
268 Gzip,
270 Zstd,
272}
273
274impl From<Compression> for CompressionAlgorithm {
275 fn from(value: Compression) -> Self {
276 match value {
277 Compression::None => CompressionAlgorithm::None,
278 Compression::Gzip => CompressionAlgorithm::Gzip,
279 Compression::Zstd => CompressionAlgorithm::Zstd,
280 }
281 }
282}
283
284#[derive(Debug, Clone, Copy, PartialEq)]
285#[non_exhaustive]
286pub enum AppendRetryPolicy {
289 All,
291 NoSideEffects,
293}
294
295impl AppendRetryPolicy {
296 pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
297 match self {
298 Self::All => true,
299 Self::NoSideEffects => input.match_seq_num.is_some(),
300 }
301 }
302}
303
304#[derive(Debug, Clone)]
305#[non_exhaustive]
306pub struct RetryConfig {
315 pub max_attempts: NonZeroU32,
319 pub min_base_delay: Duration,
323 pub max_base_delay: Duration,
327 pub append_retry_policy: AppendRetryPolicy,
332}
333
334impl Default for RetryConfig {
335 fn default() -> Self {
336 Self {
337 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
338 min_base_delay: Duration::from_millis(100),
339 max_base_delay: Duration::from_secs(1),
340 append_retry_policy: AppendRetryPolicy::All,
341 }
342 }
343}
344
345impl RetryConfig {
346 pub fn new() -> Self {
348 Self::default()
349 }
350
351 pub(crate) fn max_retries(&self) -> u32 {
352 self.max_attempts.get() - 1
353 }
354
355 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
357 Self {
358 max_attempts,
359 ..self
360 }
361 }
362
363 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
365 Self {
366 min_base_delay,
367 ..self
368 }
369 }
370
371 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
373 Self {
374 max_base_delay,
375 ..self
376 }
377 }
378
379 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
382 Self {
383 append_retry_policy,
384 ..self
385 }
386 }
387}
388
389#[derive(Debug, Clone)]
390#[non_exhaustive]
391pub struct S2Config {
393 pub(crate) access_token: SecretString,
394 pub(crate) endpoints: S2Endpoints,
395 pub(crate) connection_timeout: Duration,
396 pub(crate) request_timeout: Duration,
397 pub(crate) retry: RetryConfig,
398 pub(crate) compression: Compression,
399 pub(crate) user_agent: HeaderValue,
400 pub(crate) insecure_skip_cert_verification: bool,
401}
402
403impl S2Config {
404 pub fn new(access_token: impl Into<String>) -> Self {
406 Self {
407 access_token: access_token.into().into(),
408 endpoints: S2Endpoints::for_aws(),
409 connection_timeout: Duration::from_secs(3),
410 request_timeout: Duration::from_secs(5),
411 retry: RetryConfig::new(),
412 compression: Compression::None,
413 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
414 .parse()
415 .expect("valid user agent"),
416 insecure_skip_cert_verification: false,
417 }
418 }
419
420 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
422 Self { endpoints, ..self }
423 }
424
425 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
429 Self {
430 connection_timeout,
431 ..self
432 }
433 }
434
435 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
439 Self {
440 request_timeout,
441 ..self
442 }
443 }
444
445 pub fn with_retry(self, retry: RetryConfig) -> Self {
449 Self { retry, ..self }
450 }
451
452 pub fn with_compression(self, compression: Compression) -> Self {
456 Self {
457 compression,
458 ..self
459 }
460 }
461
462 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
474 Self {
475 insecure_skip_cert_verification: skip,
476 ..self
477 }
478 }
479
480 #[doc(hidden)]
481 #[cfg(feature = "_hidden")]
482 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
483 let user_agent = user_agent
484 .into()
485 .parse()
486 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
487 Ok(Self { user_agent, ..self })
488 }
489}
490
491#[derive(Debug, Default, Clone, PartialEq, Eq)]
492#[non_exhaustive]
493pub struct Page<T> {
495 pub values: Vec<T>,
497 pub has_more: bool,
499}
500
501impl<T> Page<T> {
502 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
503 Self {
504 values: values.into(),
505 has_more,
506 }
507 }
508}
509
510#[derive(Debug, Clone, Copy, PartialEq, Eq)]
511pub enum StorageClass {
513 Standard,
515 Express,
517}
518
519impl From<api::config::StorageClass> for StorageClass {
520 fn from(value: api::config::StorageClass) -> Self {
521 match value {
522 api::config::StorageClass::Standard => StorageClass::Standard,
523 api::config::StorageClass::Express => StorageClass::Express,
524 }
525 }
526}
527
528impl From<StorageClass> for api::config::StorageClass {
529 fn from(value: StorageClass) -> Self {
530 match value {
531 StorageClass::Standard => api::config::StorageClass::Standard,
532 StorageClass::Express => api::config::StorageClass::Express,
533 }
534 }
535}
536
537#[derive(Debug, Clone, Copy, PartialEq, Eq)]
538pub enum RetentionPolicy {
540 Age(u64),
542 Infinite,
544}
545
546impl From<api::config::RetentionPolicy> for RetentionPolicy {
547 fn from(value: api::config::RetentionPolicy) -> Self {
548 match value {
549 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
550 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
551 }
552 }
553}
554
555impl From<RetentionPolicy> for api::config::RetentionPolicy {
556 fn from(value: RetentionPolicy) -> Self {
557 match value {
558 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
559 RetentionPolicy::Infinite => {
560 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
561 }
562 }
563 }
564}
565
566#[derive(Debug, Clone, Copy, PartialEq, Eq)]
567pub enum TimestampingMode {
569 ClientPrefer,
571 ClientRequire,
573 Arrival,
575}
576
577impl From<api::config::TimestampingMode> for TimestampingMode {
578 fn from(value: api::config::TimestampingMode) -> Self {
579 match value {
580 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
581 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
582 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
583 }
584 }
585}
586
587impl From<TimestampingMode> for api::config::TimestampingMode {
588 fn from(value: TimestampingMode) -> Self {
589 match value {
590 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
591 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
592 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
593 }
594 }
595}
596
597#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
598#[non_exhaustive]
599pub struct TimestampingConfig {
601 pub mode: Option<TimestampingMode>,
605 pub uncapped: bool,
609}
610
611impl TimestampingConfig {
612 pub fn new() -> Self {
614 Self::default()
615 }
616
617 pub fn with_mode(self, mode: TimestampingMode) -> Self {
619 Self {
620 mode: Some(mode),
621 ..self
622 }
623 }
624
625 pub fn with_uncapped(self, uncapped: bool) -> Self {
627 Self { uncapped, ..self }
628 }
629}
630
631impl From<api::config::TimestampingConfig> for TimestampingConfig {
632 fn from(value: api::config::TimestampingConfig) -> Self {
633 Self {
634 mode: value.mode.map(Into::into),
635 uncapped: value.uncapped.unwrap_or_default(),
636 }
637 }
638}
639
640impl From<TimestampingConfig> for api::config::TimestampingConfig {
641 fn from(value: TimestampingConfig) -> Self {
642 Self {
643 mode: value.mode.map(Into::into),
644 uncapped: Some(value.uncapped),
645 }
646 }
647}
648
649#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
650#[non_exhaustive]
651pub struct DeleteOnEmptyConfig {
653 pub min_age_secs: u64,
657}
658
659impl DeleteOnEmptyConfig {
660 pub fn new() -> Self {
662 Self::default()
663 }
664
665 pub fn with_min_age(self, min_age: Duration) -> Self {
667 Self {
668 min_age_secs: min_age.as_secs(),
669 }
670 }
671}
672
673impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
674 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
675 Self {
676 min_age_secs: value.min_age_secs,
677 }
678 }
679}
680
681impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
682 fn from(value: DeleteOnEmptyConfig) -> Self {
683 Self {
684 min_age_secs: value.min_age_secs,
685 }
686 }
687}
688
689#[derive(Debug, Clone, Default, PartialEq, Eq)]
690#[non_exhaustive]
691pub struct StreamConfig {
693 pub storage_class: Option<StorageClass>,
697 pub retention_policy: Option<RetentionPolicy>,
701 pub timestamping: Option<TimestampingConfig>,
705 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
709}
710
711impl StreamConfig {
712 pub fn new() -> Self {
714 Self::default()
715 }
716
717 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
719 Self {
720 storage_class: Some(storage_class),
721 ..self
722 }
723 }
724
725 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
727 Self {
728 retention_policy: Some(retention_policy),
729 ..self
730 }
731 }
732
733 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
735 Self {
736 timestamping: Some(timestamping),
737 ..self
738 }
739 }
740
741 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
743 Self {
744 delete_on_empty: Some(delete_on_empty),
745 ..self
746 }
747 }
748}
749
750impl From<api::config::StreamConfig> for StreamConfig {
751 fn from(value: api::config::StreamConfig) -> Self {
752 Self {
753 storage_class: value.storage_class.map(Into::into),
754 retention_policy: value.retention_policy.map(Into::into),
755 timestamping: value.timestamping.map(Into::into),
756 delete_on_empty: value.delete_on_empty.map(Into::into),
757 }
758 }
759}
760
761impl From<StreamConfig> for api::config::StreamConfig {
762 fn from(value: StreamConfig) -> Self {
763 Self {
764 storage_class: value.storage_class.map(Into::into),
765 retention_policy: value.retention_policy.map(Into::into),
766 timestamping: value.timestamping.map(Into::into),
767 delete_on_empty: value.delete_on_empty.map(Into::into),
768 }
769 }
770}
771
772#[derive(Debug, Clone, Default)]
773#[non_exhaustive]
774pub struct BasinConfig {
776 pub default_stream_config: Option<StreamConfig>,
780 pub create_stream_on_append: bool,
784 pub create_stream_on_read: bool,
788}
789
790impl BasinConfig {
791 pub fn new() -> Self {
793 Self::default()
794 }
795
796 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
798 Self {
799 default_stream_config: Some(config),
800 ..self
801 }
802 }
803
804 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
807 Self {
808 create_stream_on_append,
809 ..self
810 }
811 }
812
813 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
815 Self {
816 create_stream_on_read,
817 ..self
818 }
819 }
820}
821
822impl From<api::config::BasinConfig> for BasinConfig {
823 fn from(value: api::config::BasinConfig) -> Self {
824 Self {
825 default_stream_config: value.default_stream_config.map(Into::into),
826 create_stream_on_append: value.create_stream_on_append,
827 create_stream_on_read: value.create_stream_on_read,
828 }
829 }
830}
831
832impl From<BasinConfig> for api::config::BasinConfig {
833 fn from(value: BasinConfig) -> Self {
834 Self {
835 default_stream_config: value.default_stream_config.map(Into::into),
836 create_stream_on_append: value.create_stream_on_append,
837 create_stream_on_read: value.create_stream_on_read,
838 }
839 }
840}
841
842#[derive(Debug, Clone, PartialEq, Eq)]
843pub enum BasinScope {
845 AwsUsEast1,
847}
848
849impl From<api::basin::BasinScope> for BasinScope {
850 fn from(value: api::basin::BasinScope) -> Self {
851 match value {
852 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
853 }
854 }
855}
856
857impl From<BasinScope> for api::basin::BasinScope {
858 fn from(value: BasinScope) -> Self {
859 match value {
860 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
861 }
862 }
863}
864
865#[doc(hidden)]
870#[cfg(feature = "_hidden")]
871#[derive(Debug, Clone, PartialEq, Eq)]
872pub enum CreateOrReconfigured<T> {
873 Created(T),
875 Reconfigured(T),
877}
878
879#[cfg(feature = "_hidden")]
880impl<T> CreateOrReconfigured<T> {
881 pub fn is_created(&self) -> bool {
883 matches!(self, Self::Created(_))
884 }
885
886 pub fn into_inner(self) -> T {
888 match self {
889 Self::Created(t) | Self::Reconfigured(t) => t,
890 }
891 }
892}
893
894#[derive(Debug, Clone)]
895#[non_exhaustive]
896pub struct CreateBasinInput {
898 pub name: BasinName,
900 pub config: Option<BasinConfig>,
904 pub scope: Option<BasinScope>,
908 idempotency_token: String,
909}
910
911impl CreateBasinInput {
912 pub fn new(name: BasinName) -> Self {
914 Self {
915 name,
916 config: None,
917 scope: None,
918 idempotency_token: idempotency_token(),
919 }
920 }
921
922 pub fn with_config(self, config: BasinConfig) -> Self {
924 Self {
925 config: Some(config),
926 ..self
927 }
928 }
929
930 pub fn with_scope(self, scope: BasinScope) -> Self {
932 Self {
933 scope: Some(scope),
934 ..self
935 }
936 }
937}
938
939impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
940 fn from(value: CreateBasinInput) -> Self {
941 (
942 api::basin::CreateBasinRequest {
943 basin: value.name,
944 config: value.config.map(Into::into),
945 scope: value.scope.map(Into::into),
946 },
947 value.idempotency_token,
948 )
949 }
950}
951
952#[derive(Debug, Clone)]
953#[non_exhaustive]
954#[doc(hidden)]
956#[cfg(feature = "_hidden")]
957pub struct CreateOrReconfigureBasinInput {
958 pub name: BasinName,
960 pub config: Option<BasinReconfiguration>,
964 pub scope: Option<BasinScope>,
968}
969
970#[cfg(feature = "_hidden")]
971impl CreateOrReconfigureBasinInput {
972 pub fn new(name: BasinName) -> Self {
974 Self {
975 name,
976 config: None,
977 scope: None,
978 }
979 }
980
981 pub fn with_config(self, config: BasinReconfiguration) -> Self {
983 Self {
984 config: Some(config),
985 ..self
986 }
987 }
988
989 pub fn with_scope(self, scope: BasinScope) -> Self {
991 Self {
992 scope: Some(scope),
993 ..self
994 }
995 }
996}
997
998#[cfg(feature = "_hidden")]
999impl From<CreateOrReconfigureBasinInput>
1000 for (
1001 BasinName,
1002 Option<api::basin::CreateOrReconfigureBasinRequest>,
1003 )
1004{
1005 fn from(value: CreateOrReconfigureBasinInput) -> Self {
1006 let request = if value.config.is_some() || value.scope.is_some() {
1007 Some(api::basin::CreateOrReconfigureBasinRequest {
1008 config: value.config.map(Into::into),
1009 scope: value.scope.map(Into::into),
1010 })
1011 } else {
1012 None
1013 };
1014 (value.name, request)
1015 }
1016}
1017
1018#[derive(Debug, Clone, Default)]
1019#[non_exhaustive]
1020pub struct ListBasinsInput {
1022 pub prefix: BasinNamePrefix,
1026 pub start_after: BasinNameStartAfter,
1032 pub limit: Option<usize>,
1036}
1037
1038impl ListBasinsInput {
1039 pub fn new() -> Self {
1041 Self::default()
1042 }
1043
1044 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1046 Self { prefix, ..self }
1047 }
1048
1049 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1052 Self {
1053 start_after,
1054 ..self
1055 }
1056 }
1057
1058 pub fn with_limit(self, limit: usize) -> Self {
1060 Self {
1061 limit: Some(limit),
1062 ..self
1063 }
1064 }
1065}
1066
1067impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1068 fn from(value: ListBasinsInput) -> Self {
1069 Self {
1070 prefix: Some(value.prefix),
1071 start_after: Some(value.start_after),
1072 limit: value.limit,
1073 }
1074 }
1075}
1076
1077#[derive(Debug, Clone, Default)]
1078pub struct ListAllBasinsInput {
1080 pub prefix: BasinNamePrefix,
1084 pub start_after: BasinNameStartAfter,
1090 pub include_deleted: bool,
1094}
1095
1096impl ListAllBasinsInput {
1097 pub fn new() -> Self {
1099 Self::default()
1100 }
1101
1102 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1104 Self { prefix, ..self }
1105 }
1106
1107 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1110 Self {
1111 start_after,
1112 ..self
1113 }
1114 }
1115
1116 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1118 Self {
1119 include_deleted,
1120 ..self
1121 }
1122 }
1123}
1124
1125#[derive(Debug, Clone, PartialEq, Eq)]
1126pub enum BasinState {
1128 Active,
1130 Creating,
1132 Deleting,
1134}
1135
1136impl From<api::basin::BasinState> for BasinState {
1137 fn from(value: api::basin::BasinState) -> Self {
1138 match value {
1139 api::basin::BasinState::Active => BasinState::Active,
1140 api::basin::BasinState::Creating => BasinState::Creating,
1141 api::basin::BasinState::Deleting => BasinState::Deleting,
1142 }
1143 }
1144}
1145
1146#[derive(Debug, Clone, PartialEq, Eq)]
1147#[non_exhaustive]
1148pub struct BasinInfo {
1150 pub name: BasinName,
1152 pub scope: Option<BasinScope>,
1154 pub state: BasinState,
1156}
1157
1158impl From<api::basin::BasinInfo> for BasinInfo {
1159 fn from(value: api::basin::BasinInfo) -> Self {
1160 Self {
1161 name: value.name,
1162 scope: value.scope.map(Into::into),
1163 state: value.state.into(),
1164 }
1165 }
1166}
1167
1168#[derive(Debug, Clone)]
1169#[non_exhaustive]
1170pub struct DeleteBasinInput {
1172 pub name: BasinName,
1174 pub ignore_not_found: bool,
1176}
1177
1178impl DeleteBasinInput {
1179 pub fn new(name: BasinName) -> Self {
1181 Self {
1182 name,
1183 ignore_not_found: false,
1184 }
1185 }
1186
1187 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1189 Self {
1190 ignore_not_found,
1191 ..self
1192 }
1193 }
1194}
1195
1196#[derive(Debug, Clone, Default)]
1197#[non_exhaustive]
1198pub struct TimestampingReconfiguration {
1200 pub mode: Maybe<Option<TimestampingMode>>,
1202 pub uncapped: Maybe<Option<bool>>,
1204}
1205
1206impl TimestampingReconfiguration {
1207 pub fn new() -> Self {
1209 Self::default()
1210 }
1211
1212 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1214 Self {
1215 mode: Maybe::Specified(Some(mode)),
1216 ..self
1217 }
1218 }
1219
1220 pub fn with_uncapped(self, uncapped: bool) -> Self {
1222 Self {
1223 uncapped: Maybe::Specified(Some(uncapped)),
1224 ..self
1225 }
1226 }
1227}
1228
1229impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1230 fn from(value: TimestampingReconfiguration) -> Self {
1231 Self {
1232 mode: value.mode.map(|m| m.map(Into::into)),
1233 uncapped: value.uncapped,
1234 }
1235 }
1236}
1237
1238#[derive(Debug, Clone, Default)]
1239#[non_exhaustive]
1240pub struct DeleteOnEmptyReconfiguration {
1242 pub min_age_secs: Maybe<Option<u64>>,
1244}
1245
1246impl DeleteOnEmptyReconfiguration {
1247 pub fn new() -> Self {
1249 Self::default()
1250 }
1251
1252 pub fn with_min_age(self, min_age: Duration) -> Self {
1254 Self {
1255 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1256 }
1257 }
1258}
1259
1260impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1261 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1262 Self {
1263 min_age_secs: value.min_age_secs,
1264 }
1265 }
1266}
1267
1268#[derive(Debug, Clone, Default)]
1269#[non_exhaustive]
1270pub struct StreamReconfiguration {
1272 pub storage_class: Maybe<Option<StorageClass>>,
1274 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1276 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1278 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1280}
1281
1282impl StreamReconfiguration {
1283 pub fn new() -> Self {
1285 Self::default()
1286 }
1287
1288 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1290 Self {
1291 storage_class: Maybe::Specified(Some(storage_class)),
1292 ..self
1293 }
1294 }
1295
1296 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1298 Self {
1299 retention_policy: Maybe::Specified(Some(retention_policy)),
1300 ..self
1301 }
1302 }
1303
1304 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1306 Self {
1307 timestamping: Maybe::Specified(Some(timestamping)),
1308 ..self
1309 }
1310 }
1311
1312 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1314 Self {
1315 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1316 ..self
1317 }
1318 }
1319}
1320
1321impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1322 fn from(value: StreamReconfiguration) -> Self {
1323 Self {
1324 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1325 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1326 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1327 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1328 }
1329 }
1330}
1331
1332#[derive(Debug, Clone, Default)]
1333#[non_exhaustive]
1334pub struct BasinReconfiguration {
1336 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1338 pub create_stream_on_append: Maybe<bool>,
1341 pub create_stream_on_read: Maybe<bool>,
1343}
1344
1345impl BasinReconfiguration {
1346 pub fn new() -> Self {
1348 Self::default()
1349 }
1350
1351 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1354 Self {
1355 default_stream_config: Maybe::Specified(Some(config)),
1356 ..self
1357 }
1358 }
1359
1360 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1363 Self {
1364 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1365 ..self
1366 }
1367 }
1368
1369 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1372 Self {
1373 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1374 ..self
1375 }
1376 }
1377}
1378
1379impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1380 fn from(value: BasinReconfiguration) -> Self {
1381 Self {
1382 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1383 create_stream_on_append: value.create_stream_on_append,
1384 create_stream_on_read: value.create_stream_on_read,
1385 }
1386 }
1387}
1388
1389#[derive(Debug, Clone)]
1390#[non_exhaustive]
1391pub struct ReconfigureBasinInput {
1393 pub name: BasinName,
1395 pub config: BasinReconfiguration,
1397}
1398
1399impl ReconfigureBasinInput {
1400 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1402 Self { name, config }
1403 }
1404}
1405
1406#[derive(Debug, Clone, Default)]
1407#[non_exhaustive]
1408pub struct ListAccessTokensInput {
1410 pub prefix: AccessTokenIdPrefix,
1414 pub start_after: AccessTokenIdStartAfter,
1420 pub limit: Option<usize>,
1424}
1425
1426impl ListAccessTokensInput {
1427 pub fn new() -> Self {
1429 Self::default()
1430 }
1431
1432 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1434 Self { prefix, ..self }
1435 }
1436
1437 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1440 Self {
1441 start_after,
1442 ..self
1443 }
1444 }
1445
1446 pub fn with_limit(self, limit: usize) -> Self {
1448 Self {
1449 limit: Some(limit),
1450 ..self
1451 }
1452 }
1453}
1454
1455impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1456 fn from(value: ListAccessTokensInput) -> Self {
1457 Self {
1458 prefix: Some(value.prefix),
1459 start_after: Some(value.start_after),
1460 limit: value.limit,
1461 }
1462 }
1463}
1464
1465#[derive(Debug, Clone, Default)]
1466pub struct ListAllAccessTokensInput {
1468 pub prefix: AccessTokenIdPrefix,
1472 pub start_after: AccessTokenIdStartAfter,
1478}
1479
1480impl ListAllAccessTokensInput {
1481 pub fn new() -> Self {
1483 Self::default()
1484 }
1485
1486 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1488 Self { prefix, ..self }
1489 }
1490
1491 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1494 Self {
1495 start_after,
1496 ..self
1497 }
1498 }
1499}
1500
1501#[derive(Debug, Clone)]
1502#[non_exhaustive]
1503pub struct AccessTokenInfo {
1505 pub id: AccessTokenId,
1507 pub expires_at: S2DateTime,
1509 pub auto_prefix_streams: bool,
1512 pub scope: AccessTokenScope,
1514}
1515
1516impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1517 type Error = ValidationError;
1518
1519 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1520 let expires_at = value
1521 .expires_at
1522 .map(S2DateTime::try_from)
1523 .transpose()?
1524 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1525 Ok(Self {
1526 id: value.id,
1527 expires_at,
1528 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1529 scope: value.scope.into(),
1530 })
1531 }
1532}
1533
1534#[derive(Debug, Clone)]
1535pub enum BasinMatcher {
1539 None,
1541 Exact(BasinName),
1543 Prefix(BasinNamePrefix),
1545}
1546
1547#[derive(Debug, Clone)]
1548pub enum StreamMatcher {
1552 None,
1554 Exact(StreamName),
1556 Prefix(StreamNamePrefix),
1558}
1559
1560#[derive(Debug, Clone)]
1561pub enum AccessTokenMatcher {
1565 None,
1567 Exact(AccessTokenId),
1569 Prefix(AccessTokenIdPrefix),
1571}
1572
1573#[derive(Debug, Clone, Default)]
1574#[non_exhaustive]
1575pub struct ReadWritePermissions {
1577 pub read: bool,
1581 pub write: bool,
1585}
1586
1587impl ReadWritePermissions {
1588 pub fn new() -> Self {
1590 Self::default()
1591 }
1592
1593 pub fn read_only() -> Self {
1595 Self {
1596 read: true,
1597 write: false,
1598 }
1599 }
1600
1601 pub fn write_only() -> Self {
1603 Self {
1604 read: false,
1605 write: true,
1606 }
1607 }
1608
1609 pub fn read_write() -> Self {
1611 Self {
1612 read: true,
1613 write: true,
1614 }
1615 }
1616}
1617
1618impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1619 fn from(value: ReadWritePermissions) -> Self {
1620 Self {
1621 read: Some(value.read),
1622 write: Some(value.write),
1623 }
1624 }
1625}
1626
1627impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1628 fn from(value: api::access::ReadWritePermissions) -> Self {
1629 Self {
1630 read: value.read.unwrap_or_default(),
1631 write: value.write.unwrap_or_default(),
1632 }
1633 }
1634}
1635
1636#[derive(Debug, Clone, Default)]
1637#[non_exhaustive]
1638pub struct OperationGroupPermissions {
1642 pub account: Option<ReadWritePermissions>,
1646 pub basin: Option<ReadWritePermissions>,
1650 pub stream: Option<ReadWritePermissions>,
1654}
1655
1656impl OperationGroupPermissions {
1657 pub fn new() -> Self {
1659 Self::default()
1660 }
1661
1662 pub fn read_only_all() -> Self {
1664 Self {
1665 account: Some(ReadWritePermissions::read_only()),
1666 basin: Some(ReadWritePermissions::read_only()),
1667 stream: Some(ReadWritePermissions::read_only()),
1668 }
1669 }
1670
1671 pub fn write_only_all() -> Self {
1673 Self {
1674 account: Some(ReadWritePermissions::write_only()),
1675 basin: Some(ReadWritePermissions::write_only()),
1676 stream: Some(ReadWritePermissions::write_only()),
1677 }
1678 }
1679
1680 pub fn read_write_all() -> Self {
1682 Self {
1683 account: Some(ReadWritePermissions::read_write()),
1684 basin: Some(ReadWritePermissions::read_write()),
1685 stream: Some(ReadWritePermissions::read_write()),
1686 }
1687 }
1688
1689 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1691 Self {
1692 account: Some(account),
1693 ..self
1694 }
1695 }
1696
1697 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1699 Self {
1700 basin: Some(basin),
1701 ..self
1702 }
1703 }
1704
1705 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1707 Self {
1708 stream: Some(stream),
1709 ..self
1710 }
1711 }
1712}
1713
1714impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1715 fn from(value: OperationGroupPermissions) -> Self {
1716 Self {
1717 account: value.account.map(Into::into),
1718 basin: value.basin.map(Into::into),
1719 stream: value.stream.map(Into::into),
1720 }
1721 }
1722}
1723
1724impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1725 fn from(value: api::access::PermittedOperationGroups) -> Self {
1726 Self {
1727 account: value.account.map(Into::into),
1728 basin: value.basin.map(Into::into),
1729 stream: value.stream.map(Into::into),
1730 }
1731 }
1732}
1733
1734#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1735pub enum Operation {
1739 ListBasins,
1741 CreateBasin,
1743 GetBasinConfig,
1745 DeleteBasin,
1747 ReconfigureBasin,
1749 ListAccessTokens,
1751 IssueAccessToken,
1753 RevokeAccessToken,
1755 GetAccountMetrics,
1757 GetBasinMetrics,
1759 GetStreamMetrics,
1761 ListStreams,
1763 CreateStream,
1765 GetStreamConfig,
1767 DeleteStream,
1769 ReconfigureStream,
1771 CheckTail,
1773 Append,
1775 Read,
1777 Trim,
1779 Fence,
1781}
1782
1783impl From<Operation> for api::access::Operation {
1784 fn from(value: Operation) -> Self {
1785 match value {
1786 Operation::ListBasins => api::access::Operation::ListBasins,
1787 Operation::CreateBasin => api::access::Operation::CreateBasin,
1788 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1789 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1790 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1791 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1792 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1793 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1794 Operation::ListStreams => api::access::Operation::ListStreams,
1795 Operation::CreateStream => api::access::Operation::CreateStream,
1796 Operation::DeleteStream => api::access::Operation::DeleteStream,
1797 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1798 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1799 Operation::CheckTail => api::access::Operation::CheckTail,
1800 Operation::Append => api::access::Operation::Append,
1801 Operation::Read => api::access::Operation::Read,
1802 Operation::Trim => api::access::Operation::Trim,
1803 Operation::Fence => api::access::Operation::Fence,
1804 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1805 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1806 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1807 }
1808 }
1809}
1810
1811impl From<api::access::Operation> for Operation {
1812 fn from(value: api::access::Operation) -> Self {
1813 match value {
1814 api::access::Operation::ListBasins => Operation::ListBasins,
1815 api::access::Operation::CreateBasin => Operation::CreateBasin,
1816 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1817 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1818 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1819 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1820 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1821 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1822 api::access::Operation::ListStreams => Operation::ListStreams,
1823 api::access::Operation::CreateStream => Operation::CreateStream,
1824 api::access::Operation::DeleteStream => Operation::DeleteStream,
1825 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1826 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1827 api::access::Operation::CheckTail => Operation::CheckTail,
1828 api::access::Operation::Append => Operation::Append,
1829 api::access::Operation::Read => Operation::Read,
1830 api::access::Operation::Trim => Operation::Trim,
1831 api::access::Operation::Fence => Operation::Fence,
1832 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1833 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1834 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1835 }
1836 }
1837}
1838
1839#[derive(Debug, Clone)]
1840#[non_exhaustive]
1841pub struct AccessTokenScopeInput {
1849 basins: Option<BasinMatcher>,
1850 streams: Option<StreamMatcher>,
1851 access_tokens: Option<AccessTokenMatcher>,
1852 op_group_perms: Option<OperationGroupPermissions>,
1853 ops: HashSet<Operation>,
1854}
1855
1856impl AccessTokenScopeInput {
1857 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1859 Self {
1860 basins: None,
1861 streams: None,
1862 access_tokens: None,
1863 op_group_perms: None,
1864 ops: ops.into_iter().collect(),
1865 }
1866 }
1867
1868 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1870 Self {
1871 basins: None,
1872 streams: None,
1873 access_tokens: None,
1874 op_group_perms: Some(op_group_perms),
1875 ops: HashSet::default(),
1876 }
1877 }
1878
1879 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1881 Self {
1882 ops: ops.into_iter().collect(),
1883 ..self
1884 }
1885 }
1886
1887 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1889 Self {
1890 op_group_perms: Some(op_group_perms),
1891 ..self
1892 }
1893 }
1894
1895 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1899 Self {
1900 basins: Some(basins),
1901 ..self
1902 }
1903 }
1904
1905 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1909 Self {
1910 streams: Some(streams),
1911 ..self
1912 }
1913 }
1914
1915 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1919 Self {
1920 access_tokens: Some(access_tokens),
1921 ..self
1922 }
1923 }
1924}
1925
1926#[derive(Debug, Clone)]
1927#[non_exhaustive]
1928pub struct AccessTokenScope {
1930 pub basins: Option<BasinMatcher>,
1932 pub streams: Option<StreamMatcher>,
1934 pub access_tokens: Option<AccessTokenMatcher>,
1936 pub op_group_perms: Option<OperationGroupPermissions>,
1938 pub ops: HashSet<Operation>,
1940}
1941
1942impl From<api::access::AccessTokenScope> for AccessTokenScope {
1943 fn from(value: api::access::AccessTokenScope) -> Self {
1944 Self {
1945 basins: value.basins.map(|rs| match rs {
1946 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1947 BasinMatcher::Exact(e)
1948 }
1949 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1950 BasinMatcher::None
1951 }
1952 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1953 }),
1954 streams: value.streams.map(|rs| match rs {
1955 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1956 StreamMatcher::Exact(e)
1957 }
1958 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1959 StreamMatcher::None
1960 }
1961 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1962 }),
1963 access_tokens: value.access_tokens.map(|rs| match rs {
1964 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1965 AccessTokenMatcher::Exact(e)
1966 }
1967 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1968 AccessTokenMatcher::None
1969 }
1970 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1971 }),
1972 op_group_perms: value.op_groups.map(Into::into),
1973 ops: value
1974 .ops
1975 .map(|ops| ops.into_iter().map(Into::into).collect())
1976 .unwrap_or_default(),
1977 }
1978 }
1979}
1980
1981impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1982 fn from(value: AccessTokenScopeInput) -> Self {
1983 Self {
1984 basins: value.basins.map(|rs| match rs {
1985 BasinMatcher::None => {
1986 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1987 }
1988 BasinMatcher::Exact(e) => {
1989 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1990 }
1991 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1992 }),
1993 streams: value.streams.map(|rs| match rs {
1994 StreamMatcher::None => {
1995 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1996 }
1997 StreamMatcher::Exact(e) => {
1998 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1999 }
2000 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2001 }),
2002 access_tokens: value.access_tokens.map(|rs| match rs {
2003 AccessTokenMatcher::None => {
2004 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2005 }
2006 AccessTokenMatcher::Exact(e) => {
2007 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2008 }
2009 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2010 }),
2011 op_groups: value.op_group_perms.map(Into::into),
2012 ops: if value.ops.is_empty() {
2013 None
2014 } else {
2015 Some(value.ops.into_iter().map(Into::into).collect())
2016 },
2017 }
2018 }
2019}
2020
2021#[derive(Debug, Clone)]
2022#[non_exhaustive]
2023pub struct IssueAccessTokenInput {
2025 pub id: AccessTokenId,
2027 pub expires_at: Option<S2DateTime>,
2032 pub auto_prefix_streams: bool,
2040 pub scope: AccessTokenScopeInput,
2042}
2043
2044impl IssueAccessTokenInput {
2045 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2047 Self {
2048 id,
2049 expires_at: None,
2050 auto_prefix_streams: false,
2051 scope,
2052 }
2053 }
2054
2055 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2057 Self {
2058 expires_at: Some(expires_at),
2059 ..self
2060 }
2061 }
2062
2063 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2066 Self {
2067 auto_prefix_streams,
2068 ..self
2069 }
2070 }
2071}
2072
2073impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2074 fn from(value: IssueAccessTokenInput) -> Self {
2075 Self {
2076 id: value.id,
2077 expires_at: value.expires_at.map(Into::into),
2078 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2079 scope: value.scope.into(),
2080 }
2081 }
2082}
2083
2084#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2085pub enum TimeseriesInterval {
2087 Minute,
2089 Hour,
2091 Day,
2093}
2094
2095impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2096 fn from(value: TimeseriesInterval) -> Self {
2097 match value {
2098 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2099 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2100 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2101 }
2102 }
2103}
2104
2105impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2106 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2107 match value {
2108 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2109 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2110 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2111 }
2112 }
2113}
2114
2115#[derive(Debug, Clone, Copy)]
2116#[non_exhaustive]
2117pub struct TimeRange {
2119 pub start: u32,
2121 pub end: u32,
2123}
2124
2125impl TimeRange {
2126 pub fn new(start: u32, end: u32) -> Self {
2128 Self { start, end }
2129 }
2130}
2131
2132#[derive(Debug, Clone, Copy)]
2133#[non_exhaustive]
2134pub struct TimeRangeAndInterval {
2136 pub start: u32,
2138 pub end: u32,
2140 pub interval: Option<TimeseriesInterval>,
2144}
2145
2146impl TimeRangeAndInterval {
2147 pub fn new(start: u32, end: u32) -> Self {
2149 Self {
2150 start,
2151 end,
2152 interval: None,
2153 }
2154 }
2155
2156 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2158 Self {
2159 interval: Some(interval),
2160 ..self
2161 }
2162 }
2163}
2164
2165#[derive(Debug, Clone, Copy)]
2166pub enum AccountMetricSet {
2168 ActiveBasins(TimeRange),
2171 AccountOps(TimeRangeAndInterval),
2178}
2179
2180#[derive(Debug, Clone)]
2181#[non_exhaustive]
2182pub struct GetAccountMetricsInput {
2184 pub set: AccountMetricSet,
2186}
2187
2188impl GetAccountMetricsInput {
2189 pub fn new(set: AccountMetricSet) -> Self {
2191 Self { set }
2192 }
2193}
2194
2195impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2196 fn from(value: GetAccountMetricsInput) -> Self {
2197 let (set, start, end, interval) = match value.set {
2198 AccountMetricSet::ActiveBasins(args) => (
2199 api::metrics::AccountMetricSet::ActiveBasins,
2200 args.start,
2201 args.end,
2202 None,
2203 ),
2204 AccountMetricSet::AccountOps(args) => (
2205 api::metrics::AccountMetricSet::AccountOps,
2206 args.start,
2207 args.end,
2208 args.interval,
2209 ),
2210 };
2211 Self {
2212 set,
2213 start: Some(start),
2214 end: Some(end),
2215 interval: interval.map(Into::into),
2216 }
2217 }
2218}
2219
2220#[derive(Debug, Clone, Copy)]
2221pub enum BasinMetricSet {
2223 Storage(TimeRange),
2226 AppendOps(TimeRangeAndInterval),
2234 ReadOps(TimeRangeAndInterval),
2242 ReadThroughput(TimeRangeAndInterval),
2249 AppendThroughput(TimeRangeAndInterval),
2256 BasinOps(TimeRangeAndInterval),
2263}
2264
2265#[derive(Debug, Clone)]
2266#[non_exhaustive]
2267pub struct GetBasinMetricsInput {
2269 pub name: BasinName,
2271 pub set: BasinMetricSet,
2273}
2274
2275impl GetBasinMetricsInput {
2276 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2278 Self { name, set }
2279 }
2280}
2281
2282impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2283 fn from(value: GetBasinMetricsInput) -> Self {
2284 let (set, start, end, interval) = match value.set {
2285 BasinMetricSet::Storage(args) => (
2286 api::metrics::BasinMetricSet::Storage,
2287 args.start,
2288 args.end,
2289 None,
2290 ),
2291 BasinMetricSet::AppendOps(args) => (
2292 api::metrics::BasinMetricSet::AppendOps,
2293 args.start,
2294 args.end,
2295 args.interval,
2296 ),
2297 BasinMetricSet::ReadOps(args) => (
2298 api::metrics::BasinMetricSet::ReadOps,
2299 args.start,
2300 args.end,
2301 args.interval,
2302 ),
2303 BasinMetricSet::ReadThroughput(args) => (
2304 api::metrics::BasinMetricSet::ReadThroughput,
2305 args.start,
2306 args.end,
2307 args.interval,
2308 ),
2309 BasinMetricSet::AppendThroughput(args) => (
2310 api::metrics::BasinMetricSet::AppendThroughput,
2311 args.start,
2312 args.end,
2313 args.interval,
2314 ),
2315 BasinMetricSet::BasinOps(args) => (
2316 api::metrics::BasinMetricSet::BasinOps,
2317 args.start,
2318 args.end,
2319 args.interval,
2320 ),
2321 };
2322 (
2323 value.name,
2324 api::metrics::BasinMetricSetRequest {
2325 set,
2326 start: Some(start),
2327 end: Some(end),
2328 interval: interval.map(Into::into),
2329 },
2330 )
2331 }
2332}
2333
2334#[derive(Debug, Clone, Copy)]
2335pub enum StreamMetricSet {
2337 Storage(TimeRange),
2340}
2341
2342#[derive(Debug, Clone)]
2343#[non_exhaustive]
2344pub struct GetStreamMetricsInput {
2346 pub basin_name: BasinName,
2348 pub stream_name: StreamName,
2350 pub set: StreamMetricSet,
2352}
2353
2354impl GetStreamMetricsInput {
2355 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2358 Self {
2359 basin_name,
2360 stream_name,
2361 set,
2362 }
2363 }
2364}
2365
2366impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2367 fn from(value: GetStreamMetricsInput) -> Self {
2368 let (set, start, end, interval) = match value.set {
2369 StreamMetricSet::Storage(args) => (
2370 api::metrics::StreamMetricSet::Storage,
2371 args.start,
2372 args.end,
2373 None,
2374 ),
2375 };
2376 (
2377 value.basin_name,
2378 value.stream_name,
2379 api::metrics::StreamMetricSetRequest {
2380 set,
2381 start: Some(start),
2382 end: Some(end),
2383 interval,
2384 },
2385 )
2386 }
2387}
2388
2389#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2390pub enum MetricUnit {
2392 Bytes,
2394 Operations,
2396}
2397
2398impl From<api::metrics::MetricUnit> for MetricUnit {
2399 fn from(value: api::metrics::MetricUnit) -> Self {
2400 match value {
2401 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2402 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2403 }
2404 }
2405}
2406
2407#[derive(Debug, Clone)]
2408#[non_exhaustive]
2409pub struct ScalarMetric {
2411 pub name: String,
2413 pub unit: MetricUnit,
2415 pub value: f64,
2417}
2418
2419#[derive(Debug, Clone)]
2420#[non_exhaustive]
2421pub struct AccumulationMetric {
2424 pub name: String,
2426 pub unit: MetricUnit,
2428 pub interval: TimeseriesInterval,
2430 pub values: Vec<(u32, f64)>,
2434}
2435
2436#[derive(Debug, Clone)]
2437#[non_exhaustive]
2438pub struct GaugeMetric {
2440 pub name: String,
2442 pub unit: MetricUnit,
2444 pub values: Vec<(u32, f64)>,
2447}
2448
2449#[derive(Debug, Clone)]
2450#[non_exhaustive]
2451pub struct LabelMetric {
2453 pub name: String,
2455 pub values: Vec<String>,
2457}
2458
2459#[derive(Debug, Clone)]
2460pub enum Metric {
2462 Scalar(ScalarMetric),
2464 Accumulation(AccumulationMetric),
2467 Gauge(GaugeMetric),
2469 Label(LabelMetric),
2471}
2472
2473impl From<api::metrics::Metric> for Metric {
2474 fn from(value: api::metrics::Metric) -> Self {
2475 match value {
2476 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2477 name: sm.name.into(),
2478 unit: sm.unit.into(),
2479 value: sm.value,
2480 }),
2481 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2482 name: am.name.into(),
2483 unit: am.unit.into(),
2484 interval: am.interval.into(),
2485 values: am.values,
2486 }),
2487 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2488 name: gm.name.into(),
2489 unit: gm.unit.into(),
2490 values: gm.values,
2491 }),
2492 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2493 name: lm.name.into(),
2494 values: lm.values,
2495 }),
2496 }
2497 }
2498}
2499
2500#[derive(Debug, Clone, Default)]
2501#[non_exhaustive]
2502pub struct ListStreamsInput {
2504 pub prefix: StreamNamePrefix,
2508 pub start_after: StreamNameStartAfter,
2514 pub limit: Option<usize>,
2518}
2519
2520impl ListStreamsInput {
2521 pub fn new() -> Self {
2523 Self::default()
2524 }
2525
2526 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2528 Self { prefix, ..self }
2529 }
2530
2531 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2534 Self {
2535 start_after,
2536 ..self
2537 }
2538 }
2539
2540 pub fn with_limit(self, limit: usize) -> Self {
2542 Self {
2543 limit: Some(limit),
2544 ..self
2545 }
2546 }
2547}
2548
2549impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2550 fn from(value: ListStreamsInput) -> Self {
2551 Self {
2552 prefix: Some(value.prefix),
2553 start_after: Some(value.start_after),
2554 limit: value.limit,
2555 }
2556 }
2557}
2558
2559#[derive(Debug, Clone, Default)]
2560pub struct ListAllStreamsInput {
2562 pub prefix: StreamNamePrefix,
2566 pub start_after: StreamNameStartAfter,
2572 pub include_deleted: bool,
2576}
2577
2578impl ListAllStreamsInput {
2579 pub fn new() -> Self {
2581 Self::default()
2582 }
2583
2584 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2586 Self { prefix, ..self }
2587 }
2588
2589 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2592 Self {
2593 start_after,
2594 ..self
2595 }
2596 }
2597
2598 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2600 Self {
2601 include_deleted,
2602 ..self
2603 }
2604 }
2605}
2606
2607#[derive(Debug, Clone, PartialEq)]
2608#[non_exhaustive]
2609pub struct StreamInfo {
2611 pub name: StreamName,
2613 pub created_at: S2DateTime,
2615 pub deleted_at: Option<S2DateTime>,
2617}
2618
2619impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2620 type Error = ValidationError;
2621
2622 fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2623 Ok(Self {
2624 name: value.name,
2625 created_at: value.created_at.try_into()?,
2626 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2627 })
2628 }
2629}
2630
2631#[derive(Debug, Clone)]
2632#[non_exhaustive]
2633pub struct CreateStreamInput {
2635 pub name: StreamName,
2637 pub config: Option<StreamConfig>,
2641 idempotency_token: String,
2642}
2643
2644impl CreateStreamInput {
2645 pub fn new(name: StreamName) -> Self {
2647 Self {
2648 name,
2649 config: None,
2650 idempotency_token: idempotency_token(),
2651 }
2652 }
2653
2654 pub fn with_config(self, config: StreamConfig) -> Self {
2656 Self {
2657 config: Some(config),
2658 ..self
2659 }
2660 }
2661}
2662
2663impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2664 fn from(value: CreateStreamInput) -> Self {
2665 (
2666 api::stream::CreateStreamRequest {
2667 stream: value.name,
2668 config: value.config.map(Into::into),
2669 },
2670 value.idempotency_token,
2671 )
2672 }
2673}
2674
2675#[derive(Debug, Clone)]
2676#[non_exhaustive]
2677#[doc(hidden)]
2680#[cfg(feature = "_hidden")]
2681pub struct CreateOrReconfigureStreamInput {
2682 pub name: StreamName,
2684 pub config: Option<StreamReconfiguration>,
2688}
2689
2690#[cfg(feature = "_hidden")]
2691impl CreateOrReconfigureStreamInput {
2692 pub fn new(name: StreamName) -> Self {
2694 Self { name, config: None }
2695 }
2696
2697 pub fn with_config(self, config: StreamReconfiguration) -> Self {
2699 Self {
2700 config: Some(config),
2701 ..self
2702 }
2703 }
2704}
2705
2706#[cfg(feature = "_hidden")]
2707impl From<CreateOrReconfigureStreamInput>
2708 for (StreamName, Option<api::config::StreamReconfiguration>)
2709{
2710 fn from(value: CreateOrReconfigureStreamInput) -> Self {
2711 (value.name, value.config.map(Into::into))
2712 }
2713}
2714
2715#[derive(Debug, Clone)]
2716#[non_exhaustive]
2717pub struct DeleteStreamInput {
2719 pub name: StreamName,
2721 pub ignore_not_found: bool,
2723}
2724
2725impl DeleteStreamInput {
2726 pub fn new(name: StreamName) -> Self {
2728 Self {
2729 name,
2730 ignore_not_found: false,
2731 }
2732 }
2733
2734 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2736 Self {
2737 ignore_not_found,
2738 ..self
2739 }
2740 }
2741}
2742
2743#[derive(Debug, Clone)]
2744#[non_exhaustive]
2745pub struct ReconfigureStreamInput {
2747 pub name: StreamName,
2749 pub config: StreamReconfiguration,
2751}
2752
2753impl ReconfigureStreamInput {
2754 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2756 Self { name, config }
2757 }
2758}
2759
2760#[derive(Debug, Clone, PartialEq, Eq)]
2761pub struct FencingToken(String);
2767
2768impl FencingToken {
2769 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2771 rand::rng()
2772 .sample_iter(&rand::distr::Alphanumeric)
2773 .take(n)
2774 .map(char::from)
2775 .collect::<String>()
2776 .parse()
2777 }
2778}
2779
2780impl FromStr for FencingToken {
2781 type Err = ValidationError;
2782
2783 fn from_str(s: &str) -> Result<Self, Self::Err> {
2784 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2785 return Err(ValidationError(format!(
2786 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2787 )));
2788 }
2789 Ok(FencingToken(s.to_string()))
2790 }
2791}
2792
2793impl std::fmt::Display for FencingToken {
2794 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2795 write!(f, "{}", self.0)
2796 }
2797}
2798
2799impl Deref for FencingToken {
2800 type Target = str;
2801
2802 fn deref(&self) -> &Self::Target {
2803 &self.0
2804 }
2805}
2806
2807#[derive(Debug, Clone, Copy, PartialEq)]
2808#[non_exhaustive]
2809pub struct StreamPosition {
2811 pub seq_num: u64,
2813 pub timestamp: u64,
2816}
2817
2818impl std::fmt::Display for StreamPosition {
2819 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2820 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2821 }
2822}
2823
2824impl From<api::stream::proto::StreamPosition> for StreamPosition {
2825 fn from(value: api::stream::proto::StreamPosition) -> Self {
2826 Self {
2827 seq_num: value.seq_num,
2828 timestamp: value.timestamp,
2829 }
2830 }
2831}
2832
2833impl From<api::stream::StreamPosition> for StreamPosition {
2834 fn from(value: api::stream::StreamPosition) -> Self {
2835 Self {
2836 seq_num: value.seq_num,
2837 timestamp: value.timestamp,
2838 }
2839 }
2840}
2841
2842#[derive(Debug, Clone, PartialEq)]
2843#[non_exhaustive]
2844pub struct Header {
2846 pub name: Bytes,
2848 pub value: Bytes,
2850}
2851
2852impl Header {
2853 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2855 Self {
2856 name: name.into(),
2857 value: value.into(),
2858 }
2859 }
2860}
2861
2862impl From<Header> for api::stream::proto::Header {
2863 fn from(value: Header) -> Self {
2864 Self {
2865 name: value.name,
2866 value: value.value,
2867 }
2868 }
2869}
2870
2871impl From<api::stream::proto::Header> for Header {
2872 fn from(value: api::stream::proto::Header) -> Self {
2873 Self {
2874 name: value.name,
2875 value: value.value,
2876 }
2877 }
2878}
2879
2880#[derive(Debug, Clone, PartialEq)]
2881pub struct AppendRecord {
2883 body: Bytes,
2884 headers: Vec<Header>,
2885 timestamp: Option<u64>,
2886}
2887
2888impl AppendRecord {
2889 fn validate(self) -> Result<Self, ValidationError> {
2890 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2891 Err(ValidationError(format!(
2892 "metered_bytes: {} exceeds {}",
2893 self.metered_bytes(),
2894 RECORD_BATCH_MAX.bytes
2895 )))
2896 } else {
2897 Ok(self)
2898 }
2899 }
2900
2901 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2903 let record = Self {
2904 body: body.into(),
2905 headers: Vec::default(),
2906 timestamp: None,
2907 };
2908 record.validate()
2909 }
2910
2911 pub fn with_headers(
2913 self,
2914 headers: impl IntoIterator<Item = Header>,
2915 ) -> Result<Self, ValidationError> {
2916 let record = Self {
2917 headers: headers.into_iter().collect(),
2918 ..self
2919 };
2920 record.validate()
2921 }
2922
2923 pub fn with_timestamp(self, timestamp: u64) -> Self {
2927 Self {
2928 timestamp: Some(timestamp),
2929 ..self
2930 }
2931 }
2932
2933 pub fn body(&self) -> &[u8] {
2935 &self.body
2936 }
2937
2938 pub fn headers(&self) -> &[Header] {
2940 &self.headers
2941 }
2942
2943 pub fn timestamp(&self) -> Option<u64> {
2945 self.timestamp
2946 }
2947}
2948
2949impl From<AppendRecord> for api::stream::proto::AppendRecord {
2950 fn from(value: AppendRecord) -> Self {
2951 Self {
2952 timestamp: value.timestamp,
2953 headers: value.headers.into_iter().map(Into::into).collect(),
2954 body: value.body,
2955 }
2956 }
2957}
2958
2959pub trait MeteredBytes {
2966 fn metered_bytes(&self) -> usize;
2968}
2969
2970macro_rules! metered_bytes_impl {
2971 ($ty:ty) => {
2972 impl MeteredBytes for $ty {
2973 fn metered_bytes(&self) -> usize {
2974 8 + (2 * self.headers.len())
2975 + self
2976 .headers
2977 .iter()
2978 .map(|h| h.name.len() + h.value.len())
2979 .sum::<usize>()
2980 + self.body.len()
2981 }
2982 }
2983 };
2984}
2985
2986metered_bytes_impl!(AppendRecord);
2987
2988#[derive(Debug, Clone)]
2989pub struct AppendRecordBatch {
2998 records: Vec<AppendRecord>,
2999 metered_bytes: usize,
3000}
3001
3002impl AppendRecordBatch {
3003 pub(crate) fn with_capacity(capacity: usize) -> Self {
3004 Self {
3005 records: Vec::with_capacity(capacity),
3006 metered_bytes: 0,
3007 }
3008 }
3009
3010 pub(crate) fn push(&mut self, record: AppendRecord) {
3011 self.metered_bytes += record.metered_bytes();
3012 self.records.push(record);
3013 }
3014
3015 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3017 where
3018 I: IntoIterator<Item = AppendRecord>,
3019 {
3020 let mut records = Vec::new();
3021 let mut metered_bytes = 0;
3022
3023 for record in iter {
3024 metered_bytes += record.metered_bytes();
3025 records.push(record);
3026
3027 if metered_bytes > RECORD_BATCH_MAX.bytes {
3028 return Err(ValidationError(format!(
3029 "batch size in metered bytes ({metered_bytes}) exceeds {}",
3030 RECORD_BATCH_MAX.bytes
3031 )));
3032 }
3033
3034 if records.len() > RECORD_BATCH_MAX.count {
3035 return Err(ValidationError(format!(
3036 "number of records in the batch exceeds {}",
3037 RECORD_BATCH_MAX.count
3038 )));
3039 }
3040 }
3041
3042 if records.is_empty() {
3043 return Err(ValidationError("batch is empty".into()));
3044 }
3045
3046 Ok(Self {
3047 records,
3048 metered_bytes,
3049 })
3050 }
3051}
3052
3053impl Deref for AppendRecordBatch {
3054 type Target = [AppendRecord];
3055
3056 fn deref(&self) -> &Self::Target {
3057 &self.records
3058 }
3059}
3060
3061impl MeteredBytes for AppendRecordBatch {
3062 fn metered_bytes(&self) -> usize {
3063 self.metered_bytes
3064 }
3065}
3066
3067#[derive(Debug, Clone)]
3068pub enum Command {
3070 Fence {
3072 fencing_token: FencingToken,
3074 },
3075 Trim {
3077 trim_point: u64,
3079 },
3080}
3081
3082#[derive(Debug, Clone)]
3083#[non_exhaustive]
3084pub struct CommandRecord {
3088 pub command: Command,
3090 pub timestamp: Option<u64>,
3092}
3093
3094impl CommandRecord {
3095 const FENCE: &[u8] = b"fence";
3096 const TRIM: &[u8] = b"trim";
3097
3098 pub fn fence(fencing_token: FencingToken) -> Self {
3103 Self {
3104 command: Command::Fence { fencing_token },
3105 timestamp: None,
3106 }
3107 }
3108
3109 pub fn trim(trim_point: u64) -> Self {
3116 Self {
3117 command: Command::Trim { trim_point },
3118 timestamp: None,
3119 }
3120 }
3121
3122 pub fn with_timestamp(self, timestamp: u64) -> Self {
3124 Self {
3125 timestamp: Some(timestamp),
3126 ..self
3127 }
3128 }
3129}
3130
3131impl From<CommandRecord> for AppendRecord {
3132 fn from(value: CommandRecord) -> Self {
3133 let (header_value, body) = match value.command {
3134 Command::Fence { fencing_token } => (
3135 CommandRecord::FENCE,
3136 Bytes::copy_from_slice(fencing_token.as_bytes()),
3137 ),
3138 Command::Trim { trim_point } => (
3139 CommandRecord::TRIM,
3140 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3141 ),
3142 };
3143 Self {
3144 body,
3145 headers: vec![Header::new("", header_value)],
3146 timestamp: value.timestamp,
3147 }
3148 }
3149}
3150
3151#[derive(Debug, Clone)]
3152#[non_exhaustive]
3153pub struct AppendInput {
3156 pub records: AppendRecordBatch,
3158 pub match_seq_num: Option<u64>,
3162 pub fencing_token: Option<FencingToken>,
3167}
3168
3169impl AppendInput {
3170 pub fn new(records: AppendRecordBatch) -> Self {
3172 Self {
3173 records,
3174 match_seq_num: None,
3175 fencing_token: None,
3176 }
3177 }
3178
3179 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3181 Self {
3182 match_seq_num: Some(match_seq_num),
3183 ..self
3184 }
3185 }
3186
3187 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3189 Self {
3190 fencing_token: Some(fencing_token),
3191 ..self
3192 }
3193 }
3194}
3195
3196impl From<AppendInput> for api::stream::proto::AppendInput {
3197 fn from(value: AppendInput) -> Self {
3198 Self {
3199 records: value.records.iter().cloned().map(Into::into).collect(),
3200 match_seq_num: value.match_seq_num,
3201 fencing_token: value.fencing_token.map(|t| t.to_string()),
3202 }
3203 }
3204}
3205
3206#[derive(Debug, Clone, PartialEq)]
3207#[non_exhaustive]
3208pub struct AppendAck {
3210 pub start: StreamPosition,
3212 pub end: StreamPosition,
3218 pub tail: StreamPosition,
3223}
3224
3225impl From<api::stream::proto::AppendAck> for AppendAck {
3226 fn from(value: api::stream::proto::AppendAck) -> Self {
3227 Self {
3228 start: value.start.unwrap_or_default().into(),
3229 end: value.end.unwrap_or_default().into(),
3230 tail: value.tail.unwrap_or_default().into(),
3231 }
3232 }
3233}
3234
3235#[derive(Debug, Clone, Copy)]
3236pub enum ReadFrom {
3238 SeqNum(u64),
3240 Timestamp(u64),
3242 TailOffset(u64),
3244}
3245
3246impl Default for ReadFrom {
3247 fn default() -> Self {
3248 Self::SeqNum(0)
3249 }
3250}
3251
3252#[derive(Debug, Default, Clone)]
3253#[non_exhaustive]
3254pub struct ReadStart {
3256 pub from: ReadFrom,
3260 pub clamp_to_tail: bool,
3264}
3265
3266impl ReadStart {
3267 pub fn new() -> Self {
3269 Self::default()
3270 }
3271
3272 pub fn with_from(self, from: ReadFrom) -> Self {
3274 Self { from, ..self }
3275 }
3276
3277 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3279 Self {
3280 clamp_to_tail,
3281 ..self
3282 }
3283 }
3284}
3285
3286impl From<ReadStart> for api::stream::ReadStart {
3287 fn from(value: ReadStart) -> Self {
3288 let (seq_num, timestamp, tail_offset) = match value.from {
3289 ReadFrom::SeqNum(n) => (Some(n), None, None),
3290 ReadFrom::Timestamp(t) => (None, Some(t), None),
3291 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3292 };
3293 Self {
3294 seq_num,
3295 timestamp,
3296 tail_offset,
3297 clamp: if value.clamp_to_tail {
3298 Some(true)
3299 } else {
3300 None
3301 },
3302 }
3303 }
3304}
3305
3306#[derive(Debug, Clone, Default)]
3307#[non_exhaustive]
3308pub struct ReadLimits {
3310 pub count: Option<usize>,
3314 pub bytes: Option<usize>,
3318}
3319
3320impl ReadLimits {
3321 pub fn new() -> Self {
3323 Self::default()
3324 }
3325
3326 pub fn with_count(self, count: usize) -> Self {
3328 Self {
3329 count: Some(count),
3330 ..self
3331 }
3332 }
3333
3334 pub fn with_bytes(self, bytes: usize) -> Self {
3336 Self {
3337 bytes: Some(bytes),
3338 ..self
3339 }
3340 }
3341}
3342
3343#[derive(Debug, Clone, Default)]
3344#[non_exhaustive]
3345pub struct ReadStop {
3347 pub limits: ReadLimits,
3351 pub until: Option<RangeTo<u64>>,
3355 pub wait: Option<u32>,
3365}
3366
3367impl ReadStop {
3368 pub fn new() -> Self {
3370 Self::default()
3371 }
3372
3373 pub fn with_limits(self, limits: ReadLimits) -> Self {
3375 Self { limits, ..self }
3376 }
3377
3378 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3380 Self {
3381 until: Some(until),
3382 ..self
3383 }
3384 }
3385
3386 pub fn with_wait(self, wait: u32) -> Self {
3388 Self {
3389 wait: Some(wait),
3390 ..self
3391 }
3392 }
3393}
3394
3395impl From<ReadStop> for api::stream::ReadEnd {
3396 fn from(value: ReadStop) -> Self {
3397 Self {
3398 count: value.limits.count,
3399 bytes: value.limits.bytes,
3400 until: value.until.map(|r| r.end),
3401 wait: value.wait,
3402 }
3403 }
3404}
3405
3406#[derive(Debug, Clone, Default)]
3407#[non_exhaustive]
3408pub struct ReadInput {
3411 pub start: ReadStart,
3415 pub stop: ReadStop,
3419 pub ignore_command_records: bool,
3423}
3424
3425impl ReadInput {
3426 pub fn new() -> Self {
3428 Self::default()
3429 }
3430
3431 pub fn with_start(self, start: ReadStart) -> Self {
3433 Self { start, ..self }
3434 }
3435
3436 pub fn with_stop(self, stop: ReadStop) -> Self {
3438 Self { stop, ..self }
3439 }
3440
3441 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3443 Self {
3444 ignore_command_records,
3445 ..self
3446 }
3447 }
3448}
3449
3450#[derive(Debug, Clone)]
3451#[non_exhaustive]
3452pub struct SequencedRecord {
3454 pub seq_num: u64,
3456 pub body: Bytes,
3458 pub headers: Vec<Header>,
3460 pub timestamp: u64,
3462}
3463
3464impl SequencedRecord {
3465 pub fn is_command_record(&self) -> bool {
3467 self.headers.len() == 1 && *self.headers[0].name == *b""
3468 }
3469}
3470
3471impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3472 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3473 Self {
3474 seq_num: value.seq_num,
3475 body: value.body,
3476 headers: value.headers.into_iter().map(Into::into).collect(),
3477 timestamp: value.timestamp,
3478 }
3479 }
3480}
3481
3482metered_bytes_impl!(SequencedRecord);
3483
3484#[derive(Debug, Clone)]
3485#[non_exhaustive]
3486pub struct ReadBatch {
3489 pub records: Vec<SequencedRecord>,
3496 pub tail: Option<StreamPosition>,
3501}
3502
3503impl ReadBatch {
3504 pub(crate) fn from_api(
3505 batch: api::stream::proto::ReadBatch,
3506 ignore_command_records: bool,
3507 ) -> Self {
3508 Self {
3509 records: batch
3510 .records
3511 .into_iter()
3512 .map(Into::into)
3513 .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3514 .collect(),
3515 tail: batch.tail.map(Into::into),
3516 }
3517 }
3518}
3519
3520pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3522
3523#[derive(Debug, Clone, thiserror::Error)]
3524pub enum AppendConditionFailed {
3526 #[error("fencing token mismatch, expected: {0}")]
3527 FencingTokenMismatch(FencingToken),
3529 #[error("sequence number mismatch, expected: {0}")]
3530 SeqNumMismatch(u64),
3532}
3533
3534impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3535 fn from(value: api::stream::AppendConditionFailed) -> Self {
3536 match value {
3537 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3538 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3539 }
3540 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3541 AppendConditionFailed::SeqNumMismatch(seq)
3542 }
3543 }
3544 }
3545}
3546
3547#[derive(Debug, Clone, thiserror::Error)]
3548pub enum S2Error {
3550 #[error("{0}")]
3551 Client(String),
3553 #[error(transparent)]
3554 Validation(#[from] ValidationError),
3556 #[error("{0}")]
3557 AppendConditionFailed(AppendConditionFailed),
3559 #[error("read from an unwritten position. current tail: {0}")]
3560 ReadUnwritten(StreamPosition),
3562 #[error("{0}")]
3563 Server(ErrorResponse),
3565}
3566
3567impl From<ApiError> for S2Error {
3568 fn from(err: ApiError) -> Self {
3569 match err {
3570 ApiError::ReadUnwritten(tail_response) => {
3571 Self::ReadUnwritten(tail_response.tail.into())
3572 }
3573 ApiError::AppendConditionFailed(condition_failed) => {
3574 Self::AppendConditionFailed(condition_failed.into())
3575 }
3576 ApiError::Server(_, response) => Self::Server(response.into()),
3577 other => Self::Client(other.to_string()),
3578 }
3579 }
3580}
3581
3582#[derive(Debug, Clone, thiserror::Error)]
3583#[error("{code}: {message}")]
3584#[non_exhaustive]
3585pub struct ErrorResponse {
3587 pub code: String,
3589 pub message: String,
3591}
3592
3593impl From<ApiErrorResponse> for ErrorResponse {
3594 fn from(response: ApiErrorResponse) -> Self {
3595 Self {
3596 code: response.code,
3597 message: response.message,
3598 }
3599 }
3600}
3601
3602fn idempotency_token() -> String {
3603 uuid::Uuid::new_v4().simple().to_string()
3604}