1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16 header::HeaderValue,
17 uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22pub use s2_common::encryption::EncryptionAlgorithm;
24pub use s2_common::encryption::EncryptionKey;
26pub use s2_common::types::ValidationError;
28pub use s2_common::types::access::AccessTokenId;
32pub use s2_common::types::access::AccessTokenIdPrefix;
34pub use s2_common::types::access::AccessTokenIdStartAfter;
36pub use s2_common::types::basin::BasinName;
41pub use s2_common::types::basin::BasinNamePrefix;
43pub use s2_common::types::basin::BasinNameStartAfter;
45#[doc(hidden)]
47#[cfg(feature = "_hidden")]
48pub use s2_common::types::resources::ProvisionResult;
49pub use s2_common::types::stream::StreamName;
53pub use s2_common::types::stream::StreamNamePrefix;
55pub use s2_common::types::stream::StreamNameStartAfter;
57
58pub(crate) const ONE_MIB: u32 = 1024 * 1024;
59
60use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
61use secrecy::SecretString;
62
63use crate::api::{ApiError, ApiErrorResponse};
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub struct S2DateTime(time::OffsetDateTime);
72
73impl TryFrom<time::OffsetDateTime> for S2DateTime {
74 type Error = ValidationError;
75
76 fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
77 dt.format(&time::format_description::well_known::Rfc3339)
78 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
79 Ok(Self(dt))
80 }
81}
82
83impl From<S2DateTime> for time::OffsetDateTime {
84 fn from(dt: S2DateTime) -> Self {
85 dt.0
86 }
87}
88
89impl FromStr for S2DateTime {
90 type Err = ValidationError;
91
92 fn from_str(s: &str) -> Result<Self, Self::Err> {
93 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
94 .map(Self)
95 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
96 }
97}
98
99impl fmt::Display for S2DateTime {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 write!(
102 f,
103 "{}",
104 self.0
105 .format(&time::format_description::well_known::Rfc3339)
106 .expect("RFC3339 formatting should not fail for S2DateTime")
107 )
108 }
109}
110
111#[derive(Debug, Clone, PartialEq)]
113pub(crate) enum BasinAuthority {
114 ParentZone(Authority),
116 Direct(Authority),
118}
119
120#[derive(Debug, Clone)]
122pub struct AccountEndpoint {
123 scheme: Scheme,
124 authority: Authority,
125}
126
127impl AccountEndpoint {
128 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
130 endpoint.parse()
131 }
132}
133
134impl FromStr for AccountEndpoint {
135 type Err = ValidationError;
136
137 fn from_str(s: &str) -> Result<Self, Self::Err> {
138 let (scheme, authority) = match s.find("://") {
139 Some(idx) => {
140 let scheme: Scheme = s[..idx]
141 .parse()
142 .map_err(|_| "invalid account endpoint scheme".to_string())?;
143 (scheme, &s[idx + 3..])
144 }
145 None => (Scheme::HTTPS, s),
146 };
147 Ok(Self {
148 scheme,
149 authority: authority
150 .parse()
151 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
152 })
153 }
154}
155
156#[derive(Debug, Clone)]
158pub struct BasinEndpoint {
159 scheme: Scheme,
160 authority: BasinAuthority,
161}
162
163impl BasinEndpoint {
164 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
166 endpoint.parse()
167 }
168}
169
170impl FromStr for BasinEndpoint {
171 type Err = ValidationError;
172
173 fn from_str(s: &str) -> Result<Self, Self::Err> {
174 let (scheme, authority) = match s.find("://") {
175 Some(idx) => {
176 let scheme: Scheme = s[..idx]
177 .parse()
178 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
179 (scheme, &s[idx + 3..])
180 }
181 None => (Scheme::HTTPS, s),
182 };
183 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
184 BasinAuthority::ParentZone(
185 authority
186 .parse()
187 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
188 )
189 } else {
190 BasinAuthority::Direct(
191 authority
192 .parse()
193 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
194 )
195 };
196 Ok(Self { scheme, authority })
197 }
198}
199
200#[derive(Debug, Clone)]
201#[non_exhaustive]
202pub struct S2Endpoints {
204 pub(crate) scheme: Scheme,
205 pub(crate) account_authority: Authority,
206 pub(crate) basin_authority: BasinAuthority,
207}
208
209impl S2Endpoints {
210 pub fn new(
212 account_endpoint: AccountEndpoint,
213 basin_endpoint: BasinEndpoint,
214 ) -> Result<Self, ValidationError> {
215 if account_endpoint.scheme != basin_endpoint.scheme {
216 return Err("account and basin endpoints must have the same scheme".into());
217 }
218 Ok(Self {
219 scheme: account_endpoint.scheme,
220 account_authority: account_endpoint.authority,
221 basin_authority: basin_endpoint.authority,
222 })
223 }
224
225 pub fn from_env() -> Result<Self, ValidationError> {
231 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
232 Ok(endpoint) => endpoint.parse()?,
233 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
234 Err(VarError::NotUnicode(_)) => {
235 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
236 }
237 };
238
239 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
240 Ok(endpoint) => endpoint.parse()?,
241 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
242 Err(VarError::NotUnicode(_)) => {
243 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
244 }
245 };
246
247 if account_endpoint.scheme != basin_endpoint.scheme {
248 return Err(
249 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
250 );
251 }
252
253 Ok(Self {
254 scheme: account_endpoint.scheme,
255 account_authority: account_endpoint.authority,
256 basin_authority: basin_endpoint.authority,
257 })
258 }
259
260 pub(crate) fn for_aws() -> Self {
261 Self {
262 scheme: Scheme::HTTPS,
263 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
264 basin_authority: BasinAuthority::ParentZone(
265 "b.s2.dev".try_into().expect("valid authority"),
266 ),
267 }
268 }
269}
270
271#[derive(Debug, Clone, Copy)]
272pub enum Compression {
274 None,
276 Gzip,
278 Zstd,
280}
281
282impl From<Compression> for CompressionAlgorithm {
283 fn from(value: Compression) -> Self {
284 match value {
285 Compression::None => CompressionAlgorithm::None,
286 Compression::Gzip => CompressionAlgorithm::Gzip,
287 Compression::Zstd => CompressionAlgorithm::Zstd,
288 }
289 }
290}
291
292#[derive(Debug, Clone, Copy, PartialEq)]
293#[non_exhaustive]
294pub enum AppendRetryPolicy {
297 All,
299 NoSideEffects,
309}
310
311#[derive(Debug, Clone)]
312#[non_exhaustive]
313pub struct RetryConfig {
322 pub max_attempts: NonZeroU32,
326 pub min_base_delay: Duration,
330 pub max_base_delay: Duration,
334 pub append_retry_policy: AppendRetryPolicy,
339}
340
341impl Default for RetryConfig {
342 fn default() -> Self {
343 Self {
344 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
345 min_base_delay: Duration::from_millis(100),
346 max_base_delay: Duration::from_secs(1),
347 append_retry_policy: AppendRetryPolicy::All,
348 }
349 }
350}
351
352impl RetryConfig {
353 pub fn new() -> Self {
355 Self::default()
356 }
357
358 pub(crate) fn max_retries(&self) -> u32 {
359 self.max_attempts.get() - 1
360 }
361
362 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
364 Self {
365 max_attempts,
366 ..self
367 }
368 }
369
370 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
372 Self {
373 min_base_delay,
374 ..self
375 }
376 }
377
378 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
380 Self {
381 max_base_delay,
382 ..self
383 }
384 }
385
386 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
389 Self {
390 append_retry_policy,
391 ..self
392 }
393 }
394}
395
396#[derive(Debug, Clone)]
397#[non_exhaustive]
398pub struct S2Config {
400 pub(crate) access_token: SecretString,
401 pub(crate) endpoints: S2Endpoints,
402 pub(crate) connection_timeout: Duration,
403 pub(crate) request_timeout: Duration,
404 pub(crate) retry: RetryConfig,
405 pub(crate) compression: Compression,
406 pub(crate) user_agent: HeaderValue,
407 pub(crate) insecure_skip_cert_verification: bool,
408}
409
410impl S2Config {
411 pub fn new(access_token: impl Into<String>) -> Self {
413 Self {
414 access_token: access_token.into().into(),
415 endpoints: S2Endpoints::for_aws(),
416 connection_timeout: Duration::from_secs(3),
417 request_timeout: Duration::from_secs(5),
418 retry: RetryConfig::new(),
419 compression: Compression::None,
420 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
421 .parse()
422 .expect("valid user agent"),
423 insecure_skip_cert_verification: false,
424 }
425 }
426
427 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
429 Self { endpoints, ..self }
430 }
431
432 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
436 Self {
437 connection_timeout,
438 ..self
439 }
440 }
441
442 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
446 Self {
447 request_timeout,
448 ..self
449 }
450 }
451
452 pub fn with_retry(self, retry: RetryConfig) -> Self {
456 Self { retry, ..self }
457 }
458
459 pub fn with_compression(self, compression: Compression) -> Self {
463 Self {
464 compression,
465 ..self
466 }
467 }
468
469 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
481 Self {
482 insecure_skip_cert_verification: skip,
483 ..self
484 }
485 }
486
487 #[doc(hidden)]
488 #[cfg(feature = "_hidden")]
489 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
490 let user_agent = user_agent
491 .into()
492 .parse()
493 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
494 Ok(Self { user_agent, ..self })
495 }
496}
497
498#[derive(Debug, Default, Clone, PartialEq, Eq)]
499#[non_exhaustive]
500pub struct Page<T> {
502 pub values: Vec<T>,
504 pub has_more: bool,
506}
507
508impl<T> Page<T> {
509 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
510 Self {
511 values: values.into(),
512 has_more,
513 }
514 }
515}
516
517#[derive(Debug, Clone, Copy, PartialEq, Eq)]
518pub enum StorageClass {
520 Standard,
522 Express,
524}
525
526impl From<api::config::StorageClass> for StorageClass {
527 fn from(value: api::config::StorageClass) -> Self {
528 match value {
529 api::config::StorageClass::Standard => StorageClass::Standard,
530 api::config::StorageClass::Express => StorageClass::Express,
531 }
532 }
533}
534
535impl From<StorageClass> for api::config::StorageClass {
536 fn from(value: StorageClass) -> Self {
537 match value {
538 StorageClass::Standard => api::config::StorageClass::Standard,
539 StorageClass::Express => api::config::StorageClass::Express,
540 }
541 }
542}
543
544#[derive(Debug, Clone, Copy, PartialEq, Eq)]
545pub enum RetentionPolicy {
547 Age(u64),
549 Infinite,
551}
552
553impl From<api::config::RetentionPolicy> for RetentionPolicy {
554 fn from(value: api::config::RetentionPolicy) -> Self {
555 match value {
556 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
557 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
558 }
559 }
560}
561
562impl From<RetentionPolicy> for api::config::RetentionPolicy {
563 fn from(value: RetentionPolicy) -> Self {
564 match value {
565 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
566 RetentionPolicy::Infinite => {
567 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
568 }
569 }
570 }
571}
572
573#[derive(Debug, Clone, Copy, PartialEq, Eq)]
574pub enum TimestampingMode {
576 ClientPrefer,
578 ClientRequire,
580 Arrival,
582}
583
584impl From<api::config::TimestampingMode> for TimestampingMode {
585 fn from(value: api::config::TimestampingMode) -> Self {
586 match value {
587 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
588 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
589 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
590 }
591 }
592}
593
594impl From<TimestampingMode> for api::config::TimestampingMode {
595 fn from(value: TimestampingMode) -> Self {
596 match value {
597 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
598 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
599 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
600 }
601 }
602}
603
604#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
605#[non_exhaustive]
606pub struct TimestampingConfig {
608 pub mode: Option<TimestampingMode>,
612 pub uncapped: Option<bool>,
616}
617
618impl TimestampingConfig {
619 pub fn new() -> Self {
621 Self::default()
622 }
623
624 pub fn with_mode(self, mode: TimestampingMode) -> Self {
626 Self {
627 mode: Some(mode),
628 ..self
629 }
630 }
631
632 pub fn with_uncapped(self, uncapped: bool) -> Self {
634 Self {
635 uncapped: Some(uncapped),
636 ..self
637 }
638 }
639}
640
641impl From<api::config::TimestampingConfig> for TimestampingConfig {
642 fn from(value: api::config::TimestampingConfig) -> Self {
643 Self {
644 mode: value.mode.map(Into::into),
645 uncapped: value.uncapped,
646 }
647 }
648}
649
650impl From<TimestampingConfig> for api::config::TimestampingConfig {
651 fn from(value: TimestampingConfig) -> Self {
652 Self {
653 mode: value.mode.map(Into::into),
654 uncapped: value.uncapped,
655 }
656 }
657}
658
659#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
660#[non_exhaustive]
661pub struct DeleteOnEmptyConfig {
663 pub min_age_secs: u64,
667}
668
669impl DeleteOnEmptyConfig {
670 pub fn new() -> Self {
672 Self::default()
673 }
674
675 pub fn with_min_age(self, min_age: Duration) -> Self {
677 Self {
678 min_age_secs: min_age.as_secs(),
679 }
680 }
681}
682
683impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
684 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
685 Self {
686 min_age_secs: value.min_age_secs,
687 }
688 }
689}
690
691impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
692 fn from(value: DeleteOnEmptyConfig) -> Self {
693 Self {
694 min_age_secs: value.min_age_secs,
695 }
696 }
697}
698
699#[derive(Debug, Clone, Default, PartialEq, Eq)]
700#[non_exhaustive]
701pub struct StreamConfig {
703 pub storage_class: Option<StorageClass>,
707 pub retention_policy: Option<RetentionPolicy>,
711 pub timestamping: Option<TimestampingConfig>,
715 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
719}
720
721impl StreamConfig {
722 pub fn new() -> Self {
724 Self::default()
725 }
726
727 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
729 Self {
730 storage_class: Some(storage_class),
731 ..self
732 }
733 }
734
735 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
737 Self {
738 retention_policy: Some(retention_policy),
739 ..self
740 }
741 }
742
743 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
745 Self {
746 timestamping: Some(timestamping),
747 ..self
748 }
749 }
750
751 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
753 Self {
754 delete_on_empty: Some(delete_on_empty),
755 ..self
756 }
757 }
758}
759
760impl From<api::config::StreamConfig> for StreamConfig {
761 fn from(value: api::config::StreamConfig) -> Self {
762 Self {
763 storage_class: value.storage_class.map(Into::into),
764 retention_policy: value.retention_policy.map(Into::into),
765 timestamping: value.timestamping.map(Into::into),
766 delete_on_empty: value.delete_on_empty.map(Into::into),
767 }
768 }
769}
770
771impl From<StreamConfig> for api::config::StreamConfig {
772 fn from(value: StreamConfig) -> Self {
773 Self {
774 storage_class: value.storage_class.map(Into::into),
775 retention_policy: value.retention_policy.map(Into::into),
776 timestamping: value.timestamping.map(Into::into),
777 delete_on_empty: value.delete_on_empty.map(Into::into),
778 }
779 }
780}
781
782#[derive(Debug, Clone, Default)]
783#[non_exhaustive]
784pub struct BasinConfig {
786 pub default_stream_config: Option<StreamConfig>,
790 pub stream_cipher: Option<EncryptionAlgorithm>,
792 pub create_stream_on_append: bool,
796 pub create_stream_on_read: bool,
800}
801
802impl BasinConfig {
803 pub fn new() -> Self {
805 Self::default()
806 }
807
808 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
810 Self {
811 default_stream_config: Some(config),
812 ..self
813 }
814 }
815
816 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
818 Self {
819 stream_cipher: Some(stream_cipher),
820 ..self
821 }
822 }
823
824 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
827 Self {
828 create_stream_on_append,
829 ..self
830 }
831 }
832
833 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
835 Self {
836 create_stream_on_read,
837 ..self
838 }
839 }
840}
841
842impl From<api::config::BasinConfig> for BasinConfig {
843 fn from(value: api::config::BasinConfig) -> Self {
844 Self {
845 default_stream_config: value.default_stream_config.map(Into::into),
846 stream_cipher: value.stream_cipher.map(Into::into),
847 create_stream_on_append: value.create_stream_on_append,
848 create_stream_on_read: value.create_stream_on_read,
849 }
850 }
851}
852
853impl From<BasinConfig> for api::config::BasinConfig {
854 fn from(value: BasinConfig) -> Self {
855 Self {
856 default_stream_config: value.default_stream_config.map(Into::into),
857 stream_cipher: value.stream_cipher.map(Into::into),
858 create_stream_on_append: value.create_stream_on_append,
859 create_stream_on_read: value.create_stream_on_read,
860 }
861 }
862}
863
864#[derive(Debug, Clone, PartialEq, Eq)]
865#[non_exhaustive]
867pub enum BasinScope {
868 AwsUsEast1,
870 AwsUsWest2,
872 AwsEuNorth1,
874}
875
876impl From<api::basin::BasinScope> for BasinScope {
877 fn from(value: api::basin::BasinScope) -> Self {
878 match value {
879 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
880 api::basin::BasinScope::AwsUsWest2 => BasinScope::AwsUsWest2,
881 api::basin::BasinScope::AwsEuNorth1 => BasinScope::AwsEuNorth1,
882 }
883 }
884}
885
886impl From<BasinScope> for api::basin::BasinScope {
887 fn from(value: BasinScope) -> Self {
888 match value {
889 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
890 BasinScope::AwsUsWest2 => api::basin::BasinScope::AwsUsWest2,
891 BasinScope::AwsEuNorth1 => api::basin::BasinScope::AwsEuNorth1,
892 }
893 }
894}
895
896#[derive(Debug, Clone)]
897#[non_exhaustive]
898pub struct CreateBasinInput {
900 pub name: BasinName,
902 pub config: Option<BasinConfig>,
906 pub scope: Option<BasinScope>,
910 idempotency_token: String,
911}
912
913impl CreateBasinInput {
914 pub fn new(name: BasinName) -> Self {
916 Self {
917 name,
918 config: None,
919 scope: None,
920 idempotency_token: idempotency_token(),
921 }
922 }
923
924 pub fn with_config(self, config: BasinConfig) -> Self {
926 Self {
927 config: Some(config),
928 ..self
929 }
930 }
931
932 pub fn with_scope(self, scope: BasinScope) -> Self {
934 Self {
935 scope: Some(scope),
936 ..self
937 }
938 }
939}
940
941impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
942 fn from(value: CreateBasinInput) -> Self {
943 (
944 api::basin::CreateBasinRequest {
945 basin: value.name,
946 config: value.config.map(Into::into),
947 scope: value.scope.map(Into::into),
948 },
949 value.idempotency_token,
950 )
951 }
952}
953
954#[derive(Debug, Clone)]
955#[non_exhaustive]
956#[doc(hidden)]
958#[cfg(feature = "_hidden")]
959pub struct EnsureBasinInput {
960 pub name: BasinName,
962 config: Option<api::config::BasinConfig>,
966 pub scope: Option<BasinScope>,
970}
971
972#[cfg(feature = "_hidden")]
973impl EnsureBasinInput {
974 pub fn new(name: BasinName) -> Self {
976 Self {
977 name,
978 config: None,
979 scope: None,
980 }
981 }
982
983 pub fn with_config(self, config: impl Into<s2_api::v1::config::BasinConfig>) -> Self {
985 Self {
986 config: Some(config.into()),
987 ..self
988 }
989 }
990
991 pub fn with_scope(self, scope: BasinScope) -> Self {
993 Self {
994 scope: Some(scope),
995 ..self
996 }
997 }
998}
999
1000#[cfg(feature = "_hidden")]
1001impl From<EnsureBasinInput> for (BasinName, Option<api::basin::EnsureBasinRequest>) {
1002 fn from(value: EnsureBasinInput) -> Self {
1003 let config = value.config;
1004 let request = if config.is_some() || value.scope.is_some() {
1005 Some(api::basin::EnsureBasinRequest {
1006 config,
1007 scope: value.scope.map(Into::into),
1008 })
1009 } else {
1010 None
1011 };
1012 (value.name, request)
1013 }
1014}
1015
1016#[derive(Debug, Clone, Default)]
1017#[non_exhaustive]
1018pub struct ListBasinsInput {
1020 pub prefix: BasinNamePrefix,
1024 pub start_after: BasinNameStartAfter,
1030 pub limit: Option<usize>,
1034}
1035
1036impl ListBasinsInput {
1037 pub fn new() -> Self {
1039 Self::default()
1040 }
1041
1042 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1044 Self { prefix, ..self }
1045 }
1046
1047 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1050 Self {
1051 start_after,
1052 ..self
1053 }
1054 }
1055
1056 pub fn with_limit(self, limit: usize) -> Self {
1058 Self {
1059 limit: Some(limit),
1060 ..self
1061 }
1062 }
1063}
1064
1065impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1066 fn from(value: ListBasinsInput) -> Self {
1067 Self {
1068 prefix: Some(value.prefix),
1069 start_after: Some(value.start_after),
1070 limit: value.limit,
1071 }
1072 }
1073}
1074
1075#[derive(Debug, Clone, Default)]
1076pub struct ListAllBasinsInput {
1078 pub prefix: BasinNamePrefix,
1082 pub start_after: BasinNameStartAfter,
1088 pub include_deleted: bool,
1092}
1093
1094impl ListAllBasinsInput {
1095 pub fn new() -> Self {
1097 Self::default()
1098 }
1099
1100 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1102 Self { prefix, ..self }
1103 }
1104
1105 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1108 Self {
1109 start_after,
1110 ..self
1111 }
1112 }
1113
1114 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1116 Self {
1117 include_deleted,
1118 ..self
1119 }
1120 }
1121}
1122
1123#[derive(Debug, Clone, PartialEq, Eq)]
1124#[non_exhaustive]
1125pub struct BasinInfo {
1127 pub name: BasinName,
1129 pub scope: Option<BasinScope>,
1131 pub created_at: S2DateTime,
1133 pub deleted_at: Option<S2DateTime>,
1135}
1136
1137impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1138 type Error = ValidationError;
1139
1140 fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1141 Ok(Self {
1142 name: value.name,
1143 scope: value.scope.map(Into::into),
1144 created_at: value.created_at.try_into()?,
1145 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1146 })
1147 }
1148}
1149
1150#[derive(Debug, Clone)]
1151#[non_exhaustive]
1152pub struct DeleteBasinInput {
1154 pub name: BasinName,
1156 pub ignore_not_found: bool,
1158}
1159
1160impl DeleteBasinInput {
1161 pub fn new(name: BasinName) -> Self {
1163 Self {
1164 name,
1165 ignore_not_found: false,
1166 }
1167 }
1168
1169 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1171 Self {
1172 ignore_not_found,
1173 ..self
1174 }
1175 }
1176}
1177
1178#[derive(Debug, Clone, Default)]
1179#[non_exhaustive]
1180pub struct TimestampingReconfiguration {
1182 pub mode: Maybe<Option<TimestampingMode>>,
1184 pub uncapped: Maybe<Option<bool>>,
1186}
1187
1188impl TimestampingReconfiguration {
1189 pub fn new() -> Self {
1191 Self::default()
1192 }
1193
1194 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1196 Self {
1197 mode: Maybe::Specified(Some(mode)),
1198 ..self
1199 }
1200 }
1201
1202 pub fn with_uncapped(self, uncapped: bool) -> Self {
1204 Self {
1205 uncapped: Maybe::Specified(Some(uncapped)),
1206 ..self
1207 }
1208 }
1209}
1210
1211impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1212 fn from(value: TimestampingReconfiguration) -> Self {
1213 Self {
1214 mode: value.mode.map(|m| m.map(Into::into)),
1215 uncapped: value.uncapped,
1216 }
1217 }
1218}
1219
1220#[derive(Debug, Clone, Default)]
1221#[non_exhaustive]
1222pub struct DeleteOnEmptyReconfiguration {
1224 pub min_age_secs: Maybe<Option<u64>>,
1226}
1227
1228impl DeleteOnEmptyReconfiguration {
1229 pub fn new() -> Self {
1231 Self::default()
1232 }
1233
1234 pub fn with_min_age(self, min_age: Duration) -> Self {
1236 Self {
1237 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1238 }
1239 }
1240}
1241
1242impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1243 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1244 Self {
1245 min_age_secs: value.min_age_secs,
1246 }
1247 }
1248}
1249
1250#[derive(Debug, Clone, Default)]
1251#[non_exhaustive]
1252pub struct StreamReconfiguration {
1254 pub storage_class: Maybe<Option<StorageClass>>,
1256 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1258 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1260 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1262}
1263
1264impl StreamReconfiguration {
1265 pub fn new() -> Self {
1267 Self::default()
1268 }
1269
1270 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1272 Self {
1273 storage_class: Maybe::Specified(Some(storage_class)),
1274 ..self
1275 }
1276 }
1277
1278 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1280 Self {
1281 retention_policy: Maybe::Specified(Some(retention_policy)),
1282 ..self
1283 }
1284 }
1285
1286 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1288 Self {
1289 timestamping: Maybe::Specified(Some(timestamping)),
1290 ..self
1291 }
1292 }
1293
1294 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1296 Self {
1297 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1298 ..self
1299 }
1300 }
1301}
1302
1303impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1304 fn from(value: StreamReconfiguration) -> Self {
1305 Self {
1306 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1307 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1308 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1309 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1310 }
1311 }
1312}
1313
1314#[derive(Debug, Clone, Default)]
1315#[non_exhaustive]
1316pub struct BasinReconfiguration {
1318 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1320 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1322 pub create_stream_on_append: Maybe<bool>,
1325 pub create_stream_on_read: Maybe<bool>,
1327}
1328
1329impl BasinReconfiguration {
1330 pub fn new() -> Self {
1332 Self::default()
1333 }
1334
1335 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1338 Self {
1339 default_stream_config: Maybe::Specified(Some(config)),
1340 ..self
1341 }
1342 }
1343
1344 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1346 Self {
1347 stream_cipher: Maybe::Specified(Some(stream_cipher)),
1348 ..self
1349 }
1350 }
1351
1352 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1355 Self {
1356 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1357 ..self
1358 }
1359 }
1360
1361 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1364 Self {
1365 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1366 ..self
1367 }
1368 }
1369}
1370
1371impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1372 fn from(value: BasinReconfiguration) -> Self {
1373 Self {
1374 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1375 stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1376 create_stream_on_append: value.create_stream_on_append,
1377 create_stream_on_read: value.create_stream_on_read,
1378 }
1379 }
1380}
1381
1382#[derive(Debug, Clone)]
1383#[non_exhaustive]
1384pub struct ReconfigureBasinInput {
1386 pub name: BasinName,
1388 pub config: BasinReconfiguration,
1390}
1391
1392impl ReconfigureBasinInput {
1393 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1395 Self { name, config }
1396 }
1397}
1398
1399#[derive(Debug, Clone, Default)]
1400#[non_exhaustive]
1401pub struct ListAccessTokensInput {
1403 pub prefix: AccessTokenIdPrefix,
1407 pub start_after: AccessTokenIdStartAfter,
1413 pub limit: Option<usize>,
1417}
1418
1419impl ListAccessTokensInput {
1420 pub fn new() -> Self {
1422 Self::default()
1423 }
1424
1425 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1427 Self { prefix, ..self }
1428 }
1429
1430 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1433 Self {
1434 start_after,
1435 ..self
1436 }
1437 }
1438
1439 pub fn with_limit(self, limit: usize) -> Self {
1441 Self {
1442 limit: Some(limit),
1443 ..self
1444 }
1445 }
1446}
1447
1448impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1449 fn from(value: ListAccessTokensInput) -> Self {
1450 Self {
1451 prefix: Some(value.prefix),
1452 start_after: Some(value.start_after),
1453 limit: value.limit,
1454 }
1455 }
1456}
1457
1458#[derive(Debug, Clone, Default)]
1459pub struct ListAllAccessTokensInput {
1461 pub prefix: AccessTokenIdPrefix,
1465 pub start_after: AccessTokenIdStartAfter,
1471}
1472
1473impl ListAllAccessTokensInput {
1474 pub fn new() -> Self {
1476 Self::default()
1477 }
1478
1479 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1481 Self { prefix, ..self }
1482 }
1483
1484 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1487 Self {
1488 start_after,
1489 ..self
1490 }
1491 }
1492}
1493
1494#[derive(Debug, Clone)]
1495#[non_exhaustive]
1496pub struct AccessTokenInfo {
1498 pub id: AccessTokenId,
1500 pub expires_at: S2DateTime,
1502 pub auto_prefix_streams: bool,
1505 pub scope: AccessTokenScope,
1507}
1508
1509impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1510 type Error = ValidationError;
1511
1512 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1513 let expires_at = value
1514 .expires_at
1515 .map(S2DateTime::try_from)
1516 .transpose()?
1517 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1518 Ok(Self {
1519 id: value.id,
1520 expires_at,
1521 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1522 scope: value.scope.into(),
1523 })
1524 }
1525}
1526
1527#[derive(Debug, Clone)]
1528pub enum BasinMatcher {
1532 None,
1534 Exact(BasinName),
1536 Prefix(BasinNamePrefix),
1538}
1539
1540#[derive(Debug, Clone)]
1541pub enum StreamMatcher {
1545 None,
1547 Exact(StreamName),
1549 Prefix(StreamNamePrefix),
1551}
1552
1553#[derive(Debug, Clone)]
1554pub enum AccessTokenMatcher {
1558 None,
1560 Exact(AccessTokenId),
1562 Prefix(AccessTokenIdPrefix),
1564}
1565
1566#[derive(Debug, Clone, Default)]
1567#[non_exhaustive]
1568pub struct ReadWritePermissions {
1570 pub read: bool,
1574 pub write: bool,
1578}
1579
1580impl ReadWritePermissions {
1581 pub fn new() -> Self {
1583 Self::default()
1584 }
1585
1586 pub fn read_only() -> Self {
1588 Self {
1589 read: true,
1590 write: false,
1591 }
1592 }
1593
1594 pub fn write_only() -> Self {
1596 Self {
1597 read: false,
1598 write: true,
1599 }
1600 }
1601
1602 pub fn read_write() -> Self {
1604 Self {
1605 read: true,
1606 write: true,
1607 }
1608 }
1609}
1610
1611impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1612 fn from(value: ReadWritePermissions) -> Self {
1613 Self {
1614 read: Some(value.read),
1615 write: Some(value.write),
1616 }
1617 }
1618}
1619
1620impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1621 fn from(value: api::access::ReadWritePermissions) -> Self {
1622 Self {
1623 read: value.read.unwrap_or_default(),
1624 write: value.write.unwrap_or_default(),
1625 }
1626 }
1627}
1628
1629#[derive(Debug, Clone, Default)]
1630#[non_exhaustive]
1631pub struct OperationGroupPermissions {
1635 pub account: Option<ReadWritePermissions>,
1639 pub basin: Option<ReadWritePermissions>,
1643 pub stream: Option<ReadWritePermissions>,
1647}
1648
1649impl OperationGroupPermissions {
1650 pub fn new() -> Self {
1652 Self::default()
1653 }
1654
1655 pub fn read_only_all() -> Self {
1657 Self {
1658 account: Some(ReadWritePermissions::read_only()),
1659 basin: Some(ReadWritePermissions::read_only()),
1660 stream: Some(ReadWritePermissions::read_only()),
1661 }
1662 }
1663
1664 pub fn write_only_all() -> Self {
1666 Self {
1667 account: Some(ReadWritePermissions::write_only()),
1668 basin: Some(ReadWritePermissions::write_only()),
1669 stream: Some(ReadWritePermissions::write_only()),
1670 }
1671 }
1672
1673 pub fn read_write_all() -> Self {
1675 Self {
1676 account: Some(ReadWritePermissions::read_write()),
1677 basin: Some(ReadWritePermissions::read_write()),
1678 stream: Some(ReadWritePermissions::read_write()),
1679 }
1680 }
1681
1682 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1684 Self {
1685 account: Some(account),
1686 ..self
1687 }
1688 }
1689
1690 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1692 Self {
1693 basin: Some(basin),
1694 ..self
1695 }
1696 }
1697
1698 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1700 Self {
1701 stream: Some(stream),
1702 ..self
1703 }
1704 }
1705}
1706
1707impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1708 fn from(value: OperationGroupPermissions) -> Self {
1709 Self {
1710 account: value.account.map(Into::into),
1711 basin: value.basin.map(Into::into),
1712 stream: value.stream.map(Into::into),
1713 }
1714 }
1715}
1716
1717impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1718 fn from(value: api::access::PermittedOperationGroups) -> Self {
1719 Self {
1720 account: value.account.map(Into::into),
1721 basin: value.basin.map(Into::into),
1722 stream: value.stream.map(Into::into),
1723 }
1724 }
1725}
1726
1727#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1728pub enum Operation {
1732 ListBasins,
1734 CreateBasin,
1736 GetBasinConfig,
1738 DeleteBasin,
1740 ReconfigureBasin,
1742 ListAccessTokens,
1744 IssueAccessToken,
1746 RevokeAccessToken,
1748 GetAccountMetrics,
1750 GetBasinMetrics,
1752 GetStreamMetrics,
1754 ListStreams,
1756 CreateStream,
1758 GetStreamConfig,
1760 DeleteStream,
1762 ReconfigureStream,
1764 CheckTail,
1766 Append,
1768 Read,
1770 Trim,
1772 Fence,
1774}
1775
1776impl From<Operation> for api::access::Operation {
1777 fn from(value: Operation) -> Self {
1778 match value {
1779 Operation::ListBasins => api::access::Operation::ListBasins,
1780 Operation::CreateBasin => api::access::Operation::CreateBasin,
1781 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1782 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1783 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1784 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1785 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1786 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1787 Operation::ListStreams => api::access::Operation::ListStreams,
1788 Operation::CreateStream => api::access::Operation::CreateStream,
1789 Operation::DeleteStream => api::access::Operation::DeleteStream,
1790 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1791 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1792 Operation::CheckTail => api::access::Operation::CheckTail,
1793 Operation::Append => api::access::Operation::Append,
1794 Operation::Read => api::access::Operation::Read,
1795 Operation::Trim => api::access::Operation::Trim,
1796 Operation::Fence => api::access::Operation::Fence,
1797 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1798 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1799 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1800 }
1801 }
1802}
1803
1804impl From<api::access::Operation> for Operation {
1805 fn from(value: api::access::Operation) -> Self {
1806 match value {
1807 api::access::Operation::ListBasins => Operation::ListBasins,
1808 api::access::Operation::CreateBasin => Operation::CreateBasin,
1809 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1810 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1811 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1812 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1813 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1814 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1815 api::access::Operation::ListStreams => Operation::ListStreams,
1816 api::access::Operation::CreateStream => Operation::CreateStream,
1817 api::access::Operation::DeleteStream => Operation::DeleteStream,
1818 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1819 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1820 api::access::Operation::CheckTail => Operation::CheckTail,
1821 api::access::Operation::Append => Operation::Append,
1822 api::access::Operation::Read => Operation::Read,
1823 api::access::Operation::Trim => Operation::Trim,
1824 api::access::Operation::Fence => Operation::Fence,
1825 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1826 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1827 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1828 }
1829 }
1830}
1831
1832#[derive(Debug, Clone)]
1833#[non_exhaustive]
1834pub struct AccessTokenScopeInput {
1842 basins: Option<BasinMatcher>,
1843 streams: Option<StreamMatcher>,
1844 access_tokens: Option<AccessTokenMatcher>,
1845 op_group_perms: Option<OperationGroupPermissions>,
1846 ops: HashSet<Operation>,
1847}
1848
1849impl AccessTokenScopeInput {
1850 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1852 Self {
1853 basins: None,
1854 streams: None,
1855 access_tokens: None,
1856 op_group_perms: None,
1857 ops: ops.into_iter().collect(),
1858 }
1859 }
1860
1861 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1863 Self {
1864 basins: None,
1865 streams: None,
1866 access_tokens: None,
1867 op_group_perms: Some(op_group_perms),
1868 ops: HashSet::default(),
1869 }
1870 }
1871
1872 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1874 Self {
1875 ops: ops.into_iter().collect(),
1876 ..self
1877 }
1878 }
1879
1880 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1882 Self {
1883 op_group_perms: Some(op_group_perms),
1884 ..self
1885 }
1886 }
1887
1888 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1892 Self {
1893 basins: Some(basins),
1894 ..self
1895 }
1896 }
1897
1898 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1902 Self {
1903 streams: Some(streams),
1904 ..self
1905 }
1906 }
1907
1908 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1912 Self {
1913 access_tokens: Some(access_tokens),
1914 ..self
1915 }
1916 }
1917}
1918
1919#[derive(Debug, Clone)]
1920#[non_exhaustive]
1921pub struct AccessTokenScope {
1923 pub basins: Option<BasinMatcher>,
1925 pub streams: Option<StreamMatcher>,
1927 pub access_tokens: Option<AccessTokenMatcher>,
1929 pub op_group_perms: Option<OperationGroupPermissions>,
1931 pub ops: HashSet<Operation>,
1933}
1934
1935impl From<api::access::AccessTokenScope> for AccessTokenScope {
1936 fn from(value: api::access::AccessTokenScope) -> Self {
1937 Self {
1938 basins: value.basins.map(|rs| match rs {
1939 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1940 BasinMatcher::Exact(e)
1941 }
1942 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1943 BasinMatcher::None
1944 }
1945 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1946 }),
1947 streams: value.streams.map(|rs| match rs {
1948 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1949 StreamMatcher::Exact(e)
1950 }
1951 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1952 StreamMatcher::None
1953 }
1954 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1955 }),
1956 access_tokens: value.access_tokens.map(|rs| match rs {
1957 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1958 AccessTokenMatcher::Exact(e)
1959 }
1960 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1961 AccessTokenMatcher::None
1962 }
1963 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1964 }),
1965 op_group_perms: value.op_groups.map(Into::into),
1966 ops: value
1967 .ops
1968 .map(|ops| ops.into_iter().map(Into::into).collect())
1969 .unwrap_or_default(),
1970 }
1971 }
1972}
1973
1974impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1975 fn from(value: AccessTokenScopeInput) -> Self {
1976 Self {
1977 basins: value.basins.map(|rs| match rs {
1978 BasinMatcher::None => {
1979 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1980 }
1981 BasinMatcher::Exact(e) => {
1982 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1983 }
1984 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1985 }),
1986 streams: value.streams.map(|rs| match rs {
1987 StreamMatcher::None => {
1988 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1989 }
1990 StreamMatcher::Exact(e) => {
1991 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1992 }
1993 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1994 }),
1995 access_tokens: value.access_tokens.map(|rs| match rs {
1996 AccessTokenMatcher::None => {
1997 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1998 }
1999 AccessTokenMatcher::Exact(e) => {
2000 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2001 }
2002 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2003 }),
2004 op_groups: value.op_group_perms.map(Into::into),
2005 ops: if value.ops.is_empty() {
2006 None
2007 } else {
2008 Some(value.ops.into_iter().map(Into::into).collect())
2009 },
2010 }
2011 }
2012}
2013
2014#[derive(Debug, Clone)]
2015#[non_exhaustive]
2016pub struct IssueAccessTokenInput {
2018 pub id: AccessTokenId,
2020 pub expires_at: Option<S2DateTime>,
2025 pub auto_prefix_streams: bool,
2033 pub scope: AccessTokenScopeInput,
2035}
2036
2037impl IssueAccessTokenInput {
2038 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2040 Self {
2041 id,
2042 expires_at: None,
2043 auto_prefix_streams: false,
2044 scope,
2045 }
2046 }
2047
2048 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2050 Self {
2051 expires_at: Some(expires_at),
2052 ..self
2053 }
2054 }
2055
2056 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2059 Self {
2060 auto_prefix_streams,
2061 ..self
2062 }
2063 }
2064}
2065
2066impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2067 fn from(value: IssueAccessTokenInput) -> Self {
2068 Self {
2069 id: value.id,
2070 expires_at: value.expires_at.map(Into::into),
2071 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2072 scope: value.scope.into(),
2073 }
2074 }
2075}
2076
2077#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2078pub enum TimeseriesInterval {
2080 Minute,
2082 Hour,
2084 Day,
2086}
2087
2088impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2089 fn from(value: TimeseriesInterval) -> Self {
2090 match value {
2091 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2092 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2093 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2094 }
2095 }
2096}
2097
2098impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2099 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2100 match value {
2101 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2102 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2103 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2104 }
2105 }
2106}
2107
2108#[derive(Debug, Clone, Copy)]
2109#[non_exhaustive]
2110pub struct TimeRange {
2112 pub start: u32,
2114 pub end: u32,
2116}
2117
2118impl TimeRange {
2119 pub fn new(start: u32, end: u32) -> Self {
2121 Self { start, end }
2122 }
2123}
2124
2125#[derive(Debug, Clone, Copy)]
2126#[non_exhaustive]
2127pub struct TimeRangeAndInterval {
2129 pub start: u32,
2131 pub end: u32,
2133 pub interval: Option<TimeseriesInterval>,
2137}
2138
2139impl TimeRangeAndInterval {
2140 pub fn new(start: u32, end: u32) -> Self {
2142 Self {
2143 start,
2144 end,
2145 interval: None,
2146 }
2147 }
2148
2149 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2151 Self {
2152 interval: Some(interval),
2153 ..self
2154 }
2155 }
2156}
2157
2158#[derive(Debug, Clone, Copy)]
2159pub enum AccountMetricSet {
2161 ActiveBasins(TimeRange),
2164 AccountOps(TimeRangeAndInterval),
2171}
2172
2173#[derive(Debug, Clone)]
2174#[non_exhaustive]
2175pub struct GetAccountMetricsInput {
2177 pub set: AccountMetricSet,
2179}
2180
2181impl GetAccountMetricsInput {
2182 pub fn new(set: AccountMetricSet) -> Self {
2184 Self { set }
2185 }
2186}
2187
2188impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2189 fn from(value: GetAccountMetricsInput) -> Self {
2190 let (set, start, end, interval) = match value.set {
2191 AccountMetricSet::ActiveBasins(args) => (
2192 api::metrics::AccountMetricSet::ActiveBasins,
2193 args.start,
2194 args.end,
2195 None,
2196 ),
2197 AccountMetricSet::AccountOps(args) => (
2198 api::metrics::AccountMetricSet::AccountOps,
2199 args.start,
2200 args.end,
2201 args.interval,
2202 ),
2203 };
2204 Self {
2205 set,
2206 start: Some(start),
2207 end: Some(end),
2208 interval: interval.map(Into::into),
2209 }
2210 }
2211}
2212
2213#[derive(Debug, Clone, Copy)]
2214pub enum BasinMetricSet {
2216 Storage(TimeRange),
2219 AppendOps(TimeRangeAndInterval),
2227 ReadOps(TimeRangeAndInterval),
2235 ReadThroughput(TimeRangeAndInterval),
2242 AppendThroughput(TimeRangeAndInterval),
2249 BasinOps(TimeRangeAndInterval),
2256}
2257
2258#[derive(Debug, Clone)]
2259#[non_exhaustive]
2260pub struct GetBasinMetricsInput {
2262 pub name: BasinName,
2264 pub set: BasinMetricSet,
2266}
2267
2268impl GetBasinMetricsInput {
2269 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2271 Self { name, set }
2272 }
2273}
2274
2275impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2276 fn from(value: GetBasinMetricsInput) -> Self {
2277 let (set, start, end, interval) = match value.set {
2278 BasinMetricSet::Storage(args) => (
2279 api::metrics::BasinMetricSet::Storage,
2280 args.start,
2281 args.end,
2282 None,
2283 ),
2284 BasinMetricSet::AppendOps(args) => (
2285 api::metrics::BasinMetricSet::AppendOps,
2286 args.start,
2287 args.end,
2288 args.interval,
2289 ),
2290 BasinMetricSet::ReadOps(args) => (
2291 api::metrics::BasinMetricSet::ReadOps,
2292 args.start,
2293 args.end,
2294 args.interval,
2295 ),
2296 BasinMetricSet::ReadThroughput(args) => (
2297 api::metrics::BasinMetricSet::ReadThroughput,
2298 args.start,
2299 args.end,
2300 args.interval,
2301 ),
2302 BasinMetricSet::AppendThroughput(args) => (
2303 api::metrics::BasinMetricSet::AppendThroughput,
2304 args.start,
2305 args.end,
2306 args.interval,
2307 ),
2308 BasinMetricSet::BasinOps(args) => (
2309 api::metrics::BasinMetricSet::BasinOps,
2310 args.start,
2311 args.end,
2312 args.interval,
2313 ),
2314 };
2315 (
2316 value.name,
2317 api::metrics::BasinMetricSetRequest {
2318 set,
2319 start: Some(start),
2320 end: Some(end),
2321 interval: interval.map(Into::into),
2322 },
2323 )
2324 }
2325}
2326
2327#[derive(Debug, Clone, Copy)]
2328pub enum StreamMetricSet {
2330 Storage(TimeRange),
2333}
2334
2335#[derive(Debug, Clone)]
2336#[non_exhaustive]
2337pub struct GetStreamMetricsInput {
2339 pub basin_name: BasinName,
2341 pub stream_name: StreamName,
2343 pub set: StreamMetricSet,
2345}
2346
2347impl GetStreamMetricsInput {
2348 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2351 Self {
2352 basin_name,
2353 stream_name,
2354 set,
2355 }
2356 }
2357}
2358
2359impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2360 fn from(value: GetStreamMetricsInput) -> Self {
2361 let (set, start, end, interval) = match value.set {
2362 StreamMetricSet::Storage(args) => (
2363 api::metrics::StreamMetricSet::Storage,
2364 args.start,
2365 args.end,
2366 None,
2367 ),
2368 };
2369 (
2370 value.basin_name,
2371 value.stream_name,
2372 api::metrics::StreamMetricSetRequest {
2373 set,
2374 start: Some(start),
2375 end: Some(end),
2376 interval,
2377 },
2378 )
2379 }
2380}
2381
2382#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2383pub enum MetricUnit {
2385 Bytes,
2387 Operations,
2389}
2390
2391impl From<api::metrics::MetricUnit> for MetricUnit {
2392 fn from(value: api::metrics::MetricUnit) -> Self {
2393 match value {
2394 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2395 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2396 }
2397 }
2398}
2399
2400#[derive(Debug, Clone)]
2401#[non_exhaustive]
2402pub struct ScalarMetric {
2404 pub name: String,
2406 pub unit: MetricUnit,
2408 pub value: f64,
2410}
2411
2412#[derive(Debug, Clone)]
2413#[non_exhaustive]
2414pub struct AccumulationMetric {
2417 pub name: String,
2419 pub unit: MetricUnit,
2421 pub interval: TimeseriesInterval,
2423 pub values: Vec<(u32, f64)>,
2427}
2428
2429#[derive(Debug, Clone)]
2430#[non_exhaustive]
2431pub struct GaugeMetric {
2433 pub name: String,
2435 pub unit: MetricUnit,
2437 pub values: Vec<(u32, f64)>,
2440}
2441
2442#[derive(Debug, Clone)]
2443#[non_exhaustive]
2444pub struct LabelMetric {
2446 pub name: String,
2448 pub values: Vec<String>,
2450}
2451
2452#[derive(Debug, Clone)]
2453pub enum Metric {
2455 Scalar(ScalarMetric),
2457 Accumulation(AccumulationMetric),
2460 Gauge(GaugeMetric),
2462 Label(LabelMetric),
2464}
2465
2466impl From<api::metrics::Metric> for Metric {
2467 fn from(value: api::metrics::Metric) -> Self {
2468 match value {
2469 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2470 name: sm.name.into(),
2471 unit: sm.unit.into(),
2472 value: sm.value,
2473 }),
2474 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2475 name: am.name.into(),
2476 unit: am.unit.into(),
2477 interval: am.interval.into(),
2478 values: am.values,
2479 }),
2480 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2481 name: gm.name.into(),
2482 unit: gm.unit.into(),
2483 values: gm.values,
2484 }),
2485 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2486 name: lm.name.into(),
2487 values: lm.values,
2488 }),
2489 }
2490 }
2491}
2492
2493#[derive(Debug, Clone, Default)]
2494#[non_exhaustive]
2495pub struct ListStreamsInput {
2497 pub prefix: StreamNamePrefix,
2501 pub start_after: StreamNameStartAfter,
2507 pub limit: Option<usize>,
2511}
2512
2513impl ListStreamsInput {
2514 pub fn new() -> Self {
2516 Self::default()
2517 }
2518
2519 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2521 Self { prefix, ..self }
2522 }
2523
2524 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2527 Self {
2528 start_after,
2529 ..self
2530 }
2531 }
2532
2533 pub fn with_limit(self, limit: usize) -> Self {
2535 Self {
2536 limit: Some(limit),
2537 ..self
2538 }
2539 }
2540}
2541
2542impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2543 fn from(value: ListStreamsInput) -> Self {
2544 Self {
2545 prefix: Some(value.prefix),
2546 start_after: Some(value.start_after),
2547 limit: value.limit,
2548 }
2549 }
2550}
2551
2552#[derive(Debug, Clone, Default)]
2553pub struct ListAllStreamsInput {
2555 pub prefix: StreamNamePrefix,
2559 pub start_after: StreamNameStartAfter,
2565 pub include_deleted: bool,
2569}
2570
2571impl ListAllStreamsInput {
2572 pub fn new() -> Self {
2574 Self::default()
2575 }
2576
2577 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2579 Self { prefix, ..self }
2580 }
2581
2582 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2585 Self {
2586 start_after,
2587 ..self
2588 }
2589 }
2590
2591 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2593 Self {
2594 include_deleted,
2595 ..self
2596 }
2597 }
2598}
2599
2600#[derive(Debug, Clone, PartialEq, Eq)]
2601#[non_exhaustive]
2602pub struct StreamInfo {
2604 pub name: StreamName,
2606 pub created_at: S2DateTime,
2608 pub deleted_at: Option<S2DateTime>,
2610 pub cipher: Option<EncryptionAlgorithm>,
2612}
2613
2614impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2615 type Error = ValidationError;
2616
2617 fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2618 Ok(Self {
2619 name: value.name,
2620 created_at: value.created_at.try_into()?,
2621 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2622 cipher: value.cipher.map(Into::into),
2623 })
2624 }
2625}
2626
2627#[derive(Debug, Clone)]
2628#[non_exhaustive]
2629pub struct CreateStreamInput {
2631 pub name: StreamName,
2633 pub config: Option<StreamConfig>,
2637 idempotency_token: String,
2638}
2639
2640impl CreateStreamInput {
2641 pub fn new(name: StreamName) -> Self {
2643 Self {
2644 name,
2645 config: None,
2646 idempotency_token: idempotency_token(),
2647 }
2648 }
2649
2650 pub fn with_config(self, config: StreamConfig) -> Self {
2652 Self {
2653 config: Some(config),
2654 ..self
2655 }
2656 }
2657}
2658
2659impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2660 fn from(value: CreateStreamInput) -> Self {
2661 (
2662 api::stream::CreateStreamRequest {
2663 stream: value.name,
2664 config: value.config.map(Into::into),
2665 },
2666 value.idempotency_token,
2667 )
2668 }
2669}
2670
2671#[derive(Debug, Clone)]
2672#[non_exhaustive]
2673#[doc(hidden)]
2676#[cfg(feature = "_hidden")]
2677pub struct EnsureStreamInput {
2678 pub name: StreamName,
2680 config: Option<api::config::StreamConfig>,
2686}
2687
2688#[cfg(feature = "_hidden")]
2689impl EnsureStreamInput {
2690 pub fn new(name: StreamName) -> Self {
2692 Self { name, config: None }
2693 }
2694
2695 pub fn with_config(self, config: impl Into<s2_api::v1::config::StreamConfig>) -> Self {
2697 Self {
2698 config: Some(config.into()),
2699 ..self
2700 }
2701 }
2702}
2703
2704#[cfg(feature = "_hidden")]
2705impl From<EnsureStreamInput> for (StreamName, Option<api::config::StreamConfig>) {
2706 fn from(value: EnsureStreamInput) -> Self {
2707 (value.name, value.config)
2708 }
2709}
2710
2711#[derive(Debug, Clone)]
2712#[non_exhaustive]
2713pub struct DeleteStreamInput {
2715 pub name: StreamName,
2717 pub ignore_not_found: bool,
2719}
2720
2721impl DeleteStreamInput {
2722 pub fn new(name: StreamName) -> Self {
2724 Self {
2725 name,
2726 ignore_not_found: false,
2727 }
2728 }
2729
2730 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2732 Self {
2733 ignore_not_found,
2734 ..self
2735 }
2736 }
2737}
2738
2739#[derive(Debug, Clone)]
2740#[non_exhaustive]
2741pub struct ReconfigureStreamInput {
2743 pub name: StreamName,
2745 pub config: StreamReconfiguration,
2747}
2748
2749impl ReconfigureStreamInput {
2750 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2752 Self { name, config }
2753 }
2754}
2755
2756#[derive(Debug, Clone, PartialEq, Eq)]
2757pub struct FencingToken(String);
2763
2764impl FencingToken {
2765 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2767 rand::rng()
2768 .sample_iter(&rand::distr::Alphanumeric)
2769 .take(n)
2770 .map(char::from)
2771 .collect::<String>()
2772 .parse()
2773 }
2774}
2775
2776impl FromStr for FencingToken {
2777 type Err = ValidationError;
2778
2779 fn from_str(s: &str) -> Result<Self, Self::Err> {
2780 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2781 return Err(ValidationError(format!(
2782 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2783 )));
2784 }
2785 Ok(FencingToken(s.to_string()))
2786 }
2787}
2788
2789impl std::fmt::Display for FencingToken {
2790 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2791 write!(f, "{}", self.0)
2792 }
2793}
2794
2795impl Deref for FencingToken {
2796 type Target = str;
2797
2798 fn deref(&self) -> &Self::Target {
2799 &self.0
2800 }
2801}
2802
2803#[derive(Debug, Clone, Copy, PartialEq)]
2804#[non_exhaustive]
2805pub struct StreamPosition {
2807 pub seq_num: u64,
2809 pub timestamp: u64,
2812}
2813
2814impl std::fmt::Display for StreamPosition {
2815 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2816 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2817 }
2818}
2819
2820impl From<api::stream::proto::StreamPosition> for StreamPosition {
2821 fn from(value: api::stream::proto::StreamPosition) -> Self {
2822 Self {
2823 seq_num: value.seq_num,
2824 timestamp: value.timestamp,
2825 }
2826 }
2827}
2828
2829impl From<api::stream::StreamPosition> for StreamPosition {
2830 fn from(value: api::stream::StreamPosition) -> Self {
2831 Self {
2832 seq_num: value.seq_num,
2833 timestamp: value.timestamp,
2834 }
2835 }
2836}
2837
2838#[derive(Debug, Clone, PartialEq)]
2839#[non_exhaustive]
2840pub struct Header {
2842 pub name: Bytes,
2844 pub value: Bytes,
2846}
2847
2848impl Header {
2849 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2851 Self {
2852 name: name.into(),
2853 value: value.into(),
2854 }
2855 }
2856}
2857
2858impl From<Header> for api::stream::proto::Header {
2859 fn from(value: Header) -> Self {
2860 Self {
2861 name: value.name,
2862 value: value.value,
2863 }
2864 }
2865}
2866
2867impl From<api::stream::proto::Header> for Header {
2868 fn from(value: api::stream::proto::Header) -> Self {
2869 Self {
2870 name: value.name,
2871 value: value.value,
2872 }
2873 }
2874}
2875
2876#[derive(Debug, Clone, PartialEq)]
2877pub struct AppendRecord {
2879 body: Bytes,
2880 headers: Vec<Header>,
2881 timestamp: Option<u64>,
2882}
2883
2884impl AppendRecord {
2885 fn validate(self) -> Result<Self, ValidationError> {
2886 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2887 Err(ValidationError(format!(
2888 "metered_bytes: {} exceeds {}",
2889 self.metered_bytes(),
2890 RECORD_BATCH_MAX.bytes
2891 )))
2892 } else {
2893 Ok(self)
2894 }
2895 }
2896
2897 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2899 let record = Self {
2900 body: body.into(),
2901 headers: Vec::default(),
2902 timestamp: None,
2903 };
2904 record.validate()
2905 }
2906
2907 pub fn with_headers(
2909 self,
2910 headers: impl IntoIterator<Item = Header>,
2911 ) -> Result<Self, ValidationError> {
2912 let record = Self {
2913 headers: headers.into_iter().collect(),
2914 ..self
2915 };
2916 record.validate()
2917 }
2918
2919 pub fn with_timestamp(self, timestamp: u64) -> Self {
2923 Self {
2924 timestamp: Some(timestamp),
2925 ..self
2926 }
2927 }
2928
2929 pub fn body(&self) -> &[u8] {
2931 &self.body
2932 }
2933
2934 pub fn headers(&self) -> &[Header] {
2936 &self.headers
2937 }
2938
2939 pub fn timestamp(&self) -> Option<u64> {
2941 self.timestamp
2942 }
2943}
2944
2945impl From<AppendRecord> for api::stream::proto::AppendRecord {
2946 fn from(value: AppendRecord) -> Self {
2947 Self {
2948 timestamp: value.timestamp,
2949 headers: value.headers.into_iter().map(Into::into).collect(),
2950 body: value.body,
2951 }
2952 }
2953}
2954
2955pub trait MeteredBytes {
2962 fn metered_bytes(&self) -> usize;
2964}
2965
2966macro_rules! metered_bytes_impl {
2967 ($ty:ty) => {
2968 impl MeteredBytes for $ty {
2969 fn metered_bytes(&self) -> usize {
2970 8 + (2 * self.headers.len())
2971 + self
2972 .headers
2973 .iter()
2974 .map(|h| h.name.len() + h.value.len())
2975 .sum::<usize>()
2976 + self.body.len()
2977 }
2978 }
2979 };
2980}
2981
2982metered_bytes_impl!(AppendRecord);
2983
2984#[derive(Debug, Clone)]
2985pub struct AppendRecordBatch {
2994 records: Vec<AppendRecord>,
2995 metered_bytes: usize,
2996}
2997
2998impl AppendRecordBatch {
2999 pub(crate) fn with_capacity(capacity: usize) -> Self {
3000 Self {
3001 records: Vec::with_capacity(capacity),
3002 metered_bytes: 0,
3003 }
3004 }
3005
3006 pub(crate) fn push(&mut self, record: AppendRecord) {
3007 self.metered_bytes += record.metered_bytes();
3008 self.records.push(record);
3009 }
3010
3011 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3013 where
3014 I: IntoIterator<Item = AppendRecord>,
3015 {
3016 let mut records = Vec::new();
3017 let mut metered_bytes = 0;
3018
3019 for record in iter {
3020 metered_bytes += record.metered_bytes();
3021 records.push(record);
3022
3023 if metered_bytes > RECORD_BATCH_MAX.bytes {
3024 return Err(ValidationError(format!(
3025 "batch size in metered bytes ({metered_bytes}) exceeds {}",
3026 RECORD_BATCH_MAX.bytes
3027 )));
3028 }
3029
3030 if records.len() > RECORD_BATCH_MAX.count {
3031 return Err(ValidationError(format!(
3032 "number of records in the batch exceeds {}",
3033 RECORD_BATCH_MAX.count
3034 )));
3035 }
3036 }
3037
3038 if records.is_empty() {
3039 return Err(ValidationError("batch is empty".into()));
3040 }
3041
3042 Ok(Self {
3043 records,
3044 metered_bytes,
3045 })
3046 }
3047}
3048
3049impl Deref for AppendRecordBatch {
3050 type Target = [AppendRecord];
3051
3052 fn deref(&self) -> &Self::Target {
3053 &self.records
3054 }
3055}
3056
3057impl MeteredBytes for AppendRecordBatch {
3058 fn metered_bytes(&self) -> usize {
3059 self.metered_bytes
3060 }
3061}
3062
3063#[derive(Debug, Clone)]
3064pub enum Command {
3066 Fence {
3068 fencing_token: FencingToken,
3070 },
3071 Trim {
3073 trim_point: u64,
3075 },
3076}
3077
3078#[derive(Debug, Clone)]
3079#[non_exhaustive]
3080pub struct CommandRecord {
3084 pub command: Command,
3086 pub timestamp: Option<u64>,
3088}
3089
3090impl CommandRecord {
3091 const FENCE: &[u8] = b"fence";
3092 const TRIM: &[u8] = b"trim";
3093
3094 pub fn fence(fencing_token: FencingToken) -> Self {
3099 Self {
3100 command: Command::Fence { fencing_token },
3101 timestamp: None,
3102 }
3103 }
3104
3105 pub fn trim(trim_point: u64) -> Self {
3112 Self {
3113 command: Command::Trim { trim_point },
3114 timestamp: None,
3115 }
3116 }
3117
3118 pub fn with_timestamp(self, timestamp: u64) -> Self {
3120 Self {
3121 timestamp: Some(timestamp),
3122 ..self
3123 }
3124 }
3125}
3126
3127impl From<CommandRecord> for AppendRecord {
3128 fn from(value: CommandRecord) -> Self {
3129 let (header_value, body) = match value.command {
3130 Command::Fence { fencing_token } => (
3131 CommandRecord::FENCE,
3132 Bytes::copy_from_slice(fencing_token.as_bytes()),
3133 ),
3134 Command::Trim { trim_point } => (
3135 CommandRecord::TRIM,
3136 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3137 ),
3138 };
3139 Self {
3140 body,
3141 headers: vec![Header::new("", header_value)],
3142 timestamp: value.timestamp,
3143 }
3144 }
3145}
3146
3147#[derive(Debug, Clone)]
3148#[non_exhaustive]
3149pub struct AppendInput {
3152 pub records: AppendRecordBatch,
3154 pub match_seq_num: Option<u64>,
3158 pub fencing_token: Option<FencingToken>,
3163}
3164
3165impl AppendInput {
3166 pub fn new(records: AppendRecordBatch) -> Self {
3168 Self {
3169 records,
3170 match_seq_num: None,
3171 fencing_token: None,
3172 }
3173 }
3174
3175 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3177 Self {
3178 match_seq_num: Some(match_seq_num),
3179 ..self
3180 }
3181 }
3182
3183 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3185 Self {
3186 fencing_token: Some(fencing_token),
3187 ..self
3188 }
3189 }
3190}
3191
3192impl From<AppendInput> for api::stream::proto::AppendInput {
3193 fn from(value: AppendInput) -> Self {
3194 Self {
3195 records: value.records.iter().cloned().map(Into::into).collect(),
3196 match_seq_num: value.match_seq_num,
3197 fencing_token: value.fencing_token.map(|t| t.to_string()),
3198 }
3199 }
3200}
3201
3202#[derive(Debug, Clone, PartialEq)]
3203#[non_exhaustive]
3204pub struct AppendAck {
3206 pub start: StreamPosition,
3208 pub end: StreamPosition,
3214 pub tail: StreamPosition,
3219}
3220
3221impl From<api::stream::proto::AppendAck> for AppendAck {
3222 fn from(value: api::stream::proto::AppendAck) -> Self {
3223 Self {
3224 start: value.start.unwrap_or_default().into(),
3225 end: value.end.unwrap_or_default().into(),
3226 tail: value.tail.unwrap_or_default().into(),
3227 }
3228 }
3229}
3230
3231#[derive(Debug, Clone, Copy)]
3232pub enum ReadFrom {
3234 SeqNum(u64),
3236 Timestamp(u64),
3238 TailOffset(u64),
3240}
3241
3242impl Default for ReadFrom {
3243 fn default() -> Self {
3244 Self::SeqNum(0)
3245 }
3246}
3247
3248#[derive(Debug, Default, Clone)]
3249#[non_exhaustive]
3250pub struct ReadStart {
3252 pub from: ReadFrom,
3256 pub clamp_to_tail: bool,
3260}
3261
3262impl ReadStart {
3263 pub fn new() -> Self {
3265 Self::default()
3266 }
3267
3268 pub fn with_from(self, from: ReadFrom) -> Self {
3270 Self { from, ..self }
3271 }
3272
3273 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3275 Self {
3276 clamp_to_tail,
3277 ..self
3278 }
3279 }
3280}
3281
3282impl From<ReadStart> for api::stream::ReadStart {
3283 fn from(value: ReadStart) -> Self {
3284 let (seq_num, timestamp, tail_offset) = match value.from {
3285 ReadFrom::SeqNum(n) => (Some(n), None, None),
3286 ReadFrom::Timestamp(t) => (None, Some(t), None),
3287 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3288 };
3289 Self {
3290 seq_num,
3291 timestamp,
3292 tail_offset,
3293 clamp: if value.clamp_to_tail {
3294 Some(true)
3295 } else {
3296 None
3297 },
3298 }
3299 }
3300}
3301
3302#[derive(Debug, Clone, Default)]
3303#[non_exhaustive]
3304pub struct ReadLimits {
3306 pub count: Option<usize>,
3310 pub bytes: Option<usize>,
3314}
3315
3316impl ReadLimits {
3317 pub fn new() -> Self {
3319 Self::default()
3320 }
3321
3322 pub fn with_count(self, count: usize) -> Self {
3324 Self {
3325 count: Some(count),
3326 ..self
3327 }
3328 }
3329
3330 pub fn with_bytes(self, bytes: usize) -> Self {
3332 Self {
3333 bytes: Some(bytes),
3334 ..self
3335 }
3336 }
3337}
3338
3339#[derive(Debug, Clone, Default)]
3340#[non_exhaustive]
3341pub struct ReadStop {
3343 pub limits: ReadLimits,
3347 pub until: Option<RangeTo<u64>>,
3351 pub wait: Option<u32>,
3361}
3362
3363impl ReadStop {
3364 pub fn new() -> Self {
3366 Self::default()
3367 }
3368
3369 pub fn with_limits(self, limits: ReadLimits) -> Self {
3371 Self { limits, ..self }
3372 }
3373
3374 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3376 Self {
3377 until: Some(until),
3378 ..self
3379 }
3380 }
3381
3382 pub fn with_wait(self, wait: u32) -> Self {
3384 Self {
3385 wait: Some(wait),
3386 ..self
3387 }
3388 }
3389}
3390
3391impl From<ReadStop> for api::stream::ReadEnd {
3392 fn from(value: ReadStop) -> Self {
3393 Self {
3394 count: value.limits.count,
3395 bytes: value.limits.bytes,
3396 until: value.until.map(|r| r.end),
3397 wait: value.wait,
3398 }
3399 }
3400}
3401
3402#[derive(Debug, Clone, Default)]
3403#[non_exhaustive]
3404pub struct ReadInput {
3407 pub start: ReadStart,
3411 pub stop: ReadStop,
3415 pub ignore_command_records: bool,
3419}
3420
3421impl ReadInput {
3422 pub fn new() -> Self {
3424 Self::default()
3425 }
3426
3427 pub fn with_start(self, start: ReadStart) -> Self {
3429 Self { start, ..self }
3430 }
3431
3432 pub fn with_stop(self, stop: ReadStop) -> Self {
3434 Self { stop, ..self }
3435 }
3436
3437 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3439 Self {
3440 ignore_command_records,
3441 ..self
3442 }
3443 }
3444}
3445
3446#[derive(Debug, Clone)]
3447#[non_exhaustive]
3448pub struct SequencedRecord {
3450 pub seq_num: u64,
3452 pub body: Bytes,
3454 pub headers: Vec<Header>,
3456 pub timestamp: u64,
3458}
3459
3460impl SequencedRecord {
3461 pub fn is_command_record(&self) -> bool {
3463 self.headers.len() == 1 && *self.headers[0].name == *b""
3464 }
3465}
3466
3467impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3468 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3469 Self {
3470 seq_num: value.seq_num,
3471 body: value.body,
3472 headers: value.headers.into_iter().map(Into::into).collect(),
3473 timestamp: value.timestamp,
3474 }
3475 }
3476}
3477
3478metered_bytes_impl!(SequencedRecord);
3479
3480#[derive(Debug, Clone)]
3481#[non_exhaustive]
3482pub struct ReadBatch {
3485 pub records: Vec<SequencedRecord>,
3492 pub tail: Option<StreamPosition>,
3497}
3498
3499impl ReadBatch {
3500 pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3501 Self {
3502 records: batch.records.into_iter().map(Into::into).collect(),
3503 tail: batch.tail.map(Into::into),
3504 }
3505 }
3506}
3507
3508pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3510
3511#[derive(Debug, Clone, thiserror::Error)]
3512pub enum AppendConditionFailed {
3514 #[error("fencing token mismatch, expected: {0}")]
3515 FencingTokenMismatch(FencingToken),
3517 #[error("sequence number mismatch, expected: {0}")]
3518 SeqNumMismatch(u64),
3520}
3521
3522impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3523 fn from(value: api::stream::AppendConditionFailed) -> Self {
3524 match value {
3525 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3526 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3527 }
3528 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3529 AppendConditionFailed::SeqNumMismatch(seq)
3530 }
3531 }
3532 }
3533}
3534
3535#[derive(Debug, Clone, thiserror::Error)]
3536pub enum S2Error {
3538 #[error("{0}")]
3539 Client(String),
3541 #[error(transparent)]
3542 Validation(#[from] ValidationError),
3544 #[error("{0}")]
3545 AppendConditionFailed(AppendConditionFailed),
3547 #[error("read from an unwritten position. current tail: {0}")]
3548 ReadUnwritten(StreamPosition),
3550 #[error("{0}")]
3551 Server(ErrorResponse),
3553}
3554
3555impl From<ApiError> for S2Error {
3556 fn from(err: ApiError) -> Self {
3557 match err {
3558 ApiError::ReadUnwritten(tail_response) => {
3559 Self::ReadUnwritten(tail_response.tail.into())
3560 }
3561 ApiError::AppendConditionFailed(condition_failed) => {
3562 Self::AppendConditionFailed(condition_failed.into())
3563 }
3564 ApiError::Server(_, response) => Self::Server(response.into()),
3565 other => Self::Client(other.to_string()),
3566 }
3567 }
3568}
3569
3570#[derive(Debug, Clone, thiserror::Error)]
3571#[error("{code}: {message}")]
3572#[non_exhaustive]
3573pub struct ErrorResponse {
3575 pub code: String,
3577 pub message: String,
3579}
3580
3581impl From<ApiErrorResponse> for ErrorResponse {
3582 fn from(response: ApiErrorResponse) -> Self {
3583 Self {
3584 code: response.code,
3585 message: response.message,
3586 }
3587 }
3588}
3589
3590fn idempotency_token() -> String {
3591 uuid::Uuid::new_v4().simple().to_string()
3592}