1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 sync::Arc,
12 time::Duration,
13};
14
15use bytes::Bytes;
16use http::{
17 header::HeaderValue,
18 uri::{Authority, Scheme},
19};
20use rand::RngExt;
21use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
22pub use s2_common::caps::RECORD_BATCH_MAX;
23pub use s2_common::encryption::EncryptionAlgorithm;
25pub use s2_common::encryption::EncryptionKey;
27pub use s2_common::types::ValidationError;
29pub use s2_common::types::access::AccessTokenId;
33pub use s2_common::types::access::AccessTokenIdPrefix;
35pub use s2_common::types::access::AccessTokenIdStartAfter;
37pub use s2_common::types::basin::BasinName;
42pub use s2_common::types::basin::BasinNamePrefix;
44pub use s2_common::types::basin::BasinNameStartAfter;
46pub use s2_common::types::stream::StreamName;
50pub use s2_common::types::stream::StreamNamePrefix;
52pub use s2_common::types::stream::StreamNameStartAfter;
54
55pub(crate) const ONE_MIB: u32 = 1024 * 1024;
56
57use s2_common::{
58 maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH, types::resources::ProvisionResult,
59};
60use secrecy::SecretString;
61
62use crate::api::{ApiError, ApiErrorResponse};
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub struct S2DateTime(time::OffsetDateTime);
71
72impl TryFrom<time::OffsetDateTime> for S2DateTime {
73 type Error = ValidationError;
74
75 fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
76 dt.format(&time::format_description::well_known::Rfc3339)
77 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
78 Ok(Self(dt))
79 }
80}
81
82impl From<S2DateTime> for time::OffsetDateTime {
83 fn from(dt: S2DateTime) -> Self {
84 dt.0
85 }
86}
87
88impl FromStr for S2DateTime {
89 type Err = ValidationError;
90
91 fn from_str(s: &str) -> Result<Self, Self::Err> {
92 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
93 .map(Self)
94 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
95 }
96}
97
98impl fmt::Display for S2DateTime {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 write!(
101 f,
102 "{}",
103 self.0
104 .format(&time::format_description::well_known::Rfc3339)
105 .expect("RFC3339 formatting should not fail for S2DateTime")
106 )
107 }
108}
109
110#[derive(Debug, Clone, PartialEq)]
112pub(crate) enum BasinAuthority {
113 ParentZone(Authority),
115 Direct(Authority),
117}
118
119#[derive(Debug, Clone)]
121pub struct AccountEndpoint {
122 scheme: Scheme,
123 authority: Authority,
124}
125
126impl AccountEndpoint {
127 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
129 endpoint.parse()
130 }
131}
132
133impl FromStr for AccountEndpoint {
134 type Err = ValidationError;
135
136 fn from_str(s: &str) -> Result<Self, Self::Err> {
137 let (scheme, authority) = match s.find("://") {
138 Some(idx) => {
139 let scheme: Scheme = s[..idx]
140 .parse()
141 .map_err(|_| "invalid account endpoint scheme".to_string())?;
142 (scheme, &s[idx + 3..])
143 }
144 None => (Scheme::HTTPS, s),
145 };
146 Ok(Self {
147 scheme,
148 authority: authority
149 .parse()
150 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
151 })
152 }
153}
154
155#[derive(Debug, Clone)]
157pub struct BasinEndpoint {
158 scheme: Scheme,
159 authority: BasinAuthority,
160}
161
162impl BasinEndpoint {
163 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
165 endpoint.parse()
166 }
167}
168
169impl FromStr for BasinEndpoint {
170 type Err = ValidationError;
171
172 fn from_str(s: &str) -> Result<Self, Self::Err> {
173 let (scheme, authority) = match s.find("://") {
174 Some(idx) => {
175 let scheme: Scheme = s[..idx]
176 .parse()
177 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
178 (scheme, &s[idx + 3..])
179 }
180 None => (Scheme::HTTPS, s),
181 };
182 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
183 BasinAuthority::ParentZone(
184 authority
185 .parse()
186 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
187 )
188 } else {
189 BasinAuthority::Direct(
190 authority
191 .parse()
192 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
193 )
194 };
195 Ok(Self { scheme, authority })
196 }
197}
198
199#[derive(Debug, Clone)]
200#[non_exhaustive]
201pub struct S2Endpoints {
203 pub(crate) scheme: Scheme,
204 pub(crate) account_authority: Authority,
205 pub(crate) basin_authority: BasinAuthority,
206}
207
208impl S2Endpoints {
209 pub fn new(
211 account_endpoint: AccountEndpoint,
212 basin_endpoint: BasinEndpoint,
213 ) -> Result<Self, ValidationError> {
214 if account_endpoint.scheme != basin_endpoint.scheme {
215 return Err("account and basin endpoints must have the same scheme".into());
216 }
217 Ok(Self {
218 scheme: account_endpoint.scheme,
219 account_authority: account_endpoint.authority,
220 basin_authority: basin_endpoint.authority,
221 })
222 }
223
224 pub fn from_env() -> Result<Self, ValidationError> {
230 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
231 Ok(endpoint) => endpoint.parse()?,
232 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
233 Err(VarError::NotUnicode(_)) => {
234 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
235 }
236 };
237
238 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
239 Ok(endpoint) => endpoint.parse()?,
240 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
241 Err(VarError::NotUnicode(_)) => {
242 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
243 }
244 };
245
246 if account_endpoint.scheme != basin_endpoint.scheme {
247 return Err(
248 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
249 );
250 }
251
252 Ok(Self {
253 scheme: account_endpoint.scheme,
254 account_authority: account_endpoint.authority,
255 basin_authority: basin_endpoint.authority,
256 })
257 }
258
259 pub(crate) fn for_aws() -> Self {
260 Self {
261 scheme: Scheme::HTTPS,
262 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
263 basin_authority: BasinAuthority::ParentZone(
264 "b.s2.dev".try_into().expect("valid authority"),
265 ),
266 }
267 }
268}
269
270#[derive(Debug, Clone, Copy)]
271pub enum Compression {
273 None,
275 Gzip,
277 Zstd,
279}
280
281impl From<Compression> for CompressionAlgorithm {
282 fn from(value: Compression) -> Self {
283 match value {
284 Compression::None => CompressionAlgorithm::None,
285 Compression::Gzip => CompressionAlgorithm::Gzip,
286 Compression::Zstd => CompressionAlgorithm::Zstd,
287 }
288 }
289}
290
291#[derive(Debug, Clone, Copy, PartialEq)]
292#[non_exhaustive]
293pub enum AppendRetryPolicy {
296 All,
298 NoSideEffects,
308}
309
310#[derive(Debug, Clone)]
311#[non_exhaustive]
312pub struct RetryConfig {
321 pub max_attempts: NonZeroU32,
325 pub min_base_delay: Duration,
329 pub max_base_delay: Duration,
333 pub append_retry_policy: AppendRetryPolicy,
338}
339
340impl Default for RetryConfig {
341 fn default() -> Self {
342 Self {
343 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
344 min_base_delay: Duration::from_millis(100),
345 max_base_delay: Duration::from_secs(1),
346 append_retry_policy: AppendRetryPolicy::All,
347 }
348 }
349}
350
351impl RetryConfig {
352 pub fn new() -> Self {
354 Self::default()
355 }
356
357 pub(crate) fn max_retries(&self) -> u32 {
358 self.max_attempts.get() - 1
359 }
360
361 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
363 Self {
364 max_attempts,
365 ..self
366 }
367 }
368
369 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
371 Self {
372 min_base_delay,
373 ..self
374 }
375 }
376
377 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
379 Self {
380 max_base_delay,
381 ..self
382 }
383 }
384
385 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
388 Self {
389 append_retry_policy,
390 ..self
391 }
392 }
393}
394
395#[derive(Debug, Clone)]
396#[non_exhaustive]
397pub struct S2Config {
399 pub(crate) access_token: SecretString,
400 pub(crate) endpoints: S2Endpoints,
401 pub(crate) connection_timeout: Duration,
402 pub(crate) request_timeout: Duration,
403 pub(crate) retry: RetryConfig,
404 pub(crate) compression: Compression,
405 pub(crate) user_agent: HeaderValue,
406 pub(crate) insecure_skip_cert_verification: bool,
407 pub(crate) rustls_crypto_provider: Option<Arc<rustls::crypto::CryptoProvider>>,
408}
409
410impl S2Config {
411 pub fn new(access_token: impl Into<String>) -> Self {
413 Self {
414 access_token: access_token.into().into(),
415 endpoints: S2Endpoints::for_aws(),
416 connection_timeout: Duration::from_secs(3),
417 request_timeout: Duration::from_secs(5),
418 retry: RetryConfig::new(),
419 compression: Compression::None,
420 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
421 .parse()
422 .expect("valid user agent"),
423 insecure_skip_cert_verification: false,
424 rustls_crypto_provider: default_rustls_crypto_provider(),
425 }
426 }
427
428 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
430 Self { endpoints, ..self }
431 }
432
433 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
437 Self {
438 connection_timeout,
439 ..self
440 }
441 }
442
443 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
447 Self {
448 request_timeout,
449 ..self
450 }
451 }
452
453 pub fn with_retry(self, retry: RetryConfig) -> Self {
457 Self { retry, ..self }
458 }
459
460 pub fn with_compression(self, compression: Compression) -> Self {
464 Self {
465 compression,
466 ..self
467 }
468 }
469
470 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
482 Self {
483 insecure_skip_cert_verification: skip,
484 ..self
485 }
486 }
487
488 pub fn with_rustls_crypto_provider(
498 self,
499 provider: impl Into<Arc<rustls::crypto::CryptoProvider>>,
500 ) -> Self {
501 Self {
502 rustls_crypto_provider: Some(provider.into()),
503 ..self
504 }
505 }
506
507 #[cfg(feature = "rustls-aws-lc-rs")]
511 pub fn with_rustls_aws_lc_rs_crypto_provider(self) -> Self {
512 self.with_rustls_crypto_provider(rustls::crypto::aws_lc_rs::default_provider())
513 }
514
515 #[cfg(feature = "rustls-ring")]
519 pub fn with_rustls_ring_crypto_provider(self) -> Self {
520 self.with_rustls_crypto_provider(rustls::crypto::ring::default_provider())
521 }
522
523 #[doc(hidden)]
524 #[cfg(feature = "_hidden")]
525 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
526 let user_agent = user_agent
527 .into()
528 .parse()
529 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
530 Ok(Self { user_agent, ..self })
531 }
532}
533
534#[cfg(feature = "rustls-aws-lc-rs")]
535fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
536 Some(Arc::new(rustls::crypto::aws_lc_rs::default_provider()))
537}
538
539#[cfg(all(not(feature = "rustls-aws-lc-rs"), feature = "rustls-ring"))]
540fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
541 Some(Arc::new(rustls::crypto::ring::default_provider()))
542}
543
544#[cfg(all(not(feature = "rustls-aws-lc-rs"), not(feature = "rustls-ring")))]
545fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
546 None
547}
548
549#[derive(Debug, Default, Clone, PartialEq, Eq)]
550#[non_exhaustive]
551pub struct Page<T> {
553 pub values: Vec<T>,
555 pub has_more: bool,
557}
558
559impl<T> Page<T> {
560 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
561 Self {
562 values: values.into(),
563 has_more,
564 }
565 }
566}
567
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569pub enum StorageClass {
571 Standard,
573 Express,
575}
576
577impl From<api::config::StorageClass> for StorageClass {
578 fn from(value: api::config::StorageClass) -> Self {
579 match value {
580 api::config::StorageClass::Standard => StorageClass::Standard,
581 api::config::StorageClass::Express => StorageClass::Express,
582 }
583 }
584}
585
586impl From<StorageClass> for api::config::StorageClass {
587 fn from(value: StorageClass) -> Self {
588 match value {
589 StorageClass::Standard => api::config::StorageClass::Standard,
590 StorageClass::Express => api::config::StorageClass::Express,
591 }
592 }
593}
594
595#[derive(Debug, Clone, Copy, PartialEq, Eq)]
596pub enum RetentionPolicy {
598 Age(u64),
600 Infinite,
602}
603
604impl From<api::config::RetentionPolicy> for RetentionPolicy {
605 fn from(value: api::config::RetentionPolicy) -> Self {
606 match value {
607 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
608 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
609 }
610 }
611}
612
613impl From<RetentionPolicy> for api::config::RetentionPolicy {
614 fn from(value: RetentionPolicy) -> Self {
615 match value {
616 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
617 RetentionPolicy::Infinite => {
618 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
619 }
620 }
621 }
622}
623
624#[derive(Debug, Clone, Copy, PartialEq, Eq)]
625pub enum TimestampingMode {
627 ClientPrefer,
629 ClientRequire,
631 Arrival,
633}
634
635impl From<api::config::TimestampingMode> for TimestampingMode {
636 fn from(value: api::config::TimestampingMode) -> Self {
637 match value {
638 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
639 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
640 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
641 }
642 }
643}
644
645impl From<TimestampingMode> for api::config::TimestampingMode {
646 fn from(value: TimestampingMode) -> Self {
647 match value {
648 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
649 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
650 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
651 }
652 }
653}
654
655#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
656#[non_exhaustive]
657pub struct TimestampingConfig {
659 pub mode: Option<TimestampingMode>,
663 pub uncapped: Option<bool>,
667}
668
669impl TimestampingConfig {
670 pub fn new() -> Self {
672 Self::default()
673 }
674
675 pub fn with_mode(self, mode: TimestampingMode) -> Self {
677 Self {
678 mode: Some(mode),
679 ..self
680 }
681 }
682
683 pub fn with_uncapped(self, uncapped: bool) -> Self {
685 Self {
686 uncapped: Some(uncapped),
687 ..self
688 }
689 }
690}
691
692impl From<api::config::TimestampingConfig> for TimestampingConfig {
693 fn from(value: api::config::TimestampingConfig) -> Self {
694 Self {
695 mode: value.mode.map(Into::into),
696 uncapped: value.uncapped,
697 }
698 }
699}
700
701impl From<TimestampingConfig> for api::config::TimestampingConfig {
702 fn from(value: TimestampingConfig) -> Self {
703 Self {
704 mode: value.mode.map(Into::into),
705 uncapped: value.uncapped,
706 }
707 }
708}
709
710#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
711#[non_exhaustive]
712pub struct DeleteOnEmptyConfig {
714 pub min_age_secs: u64,
718}
719
720impl DeleteOnEmptyConfig {
721 pub fn new() -> Self {
723 Self::default()
724 }
725
726 pub fn with_min_age(self, min_age: Duration) -> Self {
728 Self {
729 min_age_secs: min_age.as_secs(),
730 }
731 }
732}
733
734impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
735 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
736 Self {
737 min_age_secs: value.min_age_secs,
738 }
739 }
740}
741
742impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
743 fn from(value: DeleteOnEmptyConfig) -> Self {
744 Self {
745 min_age_secs: value.min_age_secs,
746 }
747 }
748}
749
750#[derive(Debug, Clone, Default, PartialEq, Eq)]
751#[non_exhaustive]
752pub struct StreamConfig {
754 pub storage_class: Option<StorageClass>,
758 pub retention_policy: Option<RetentionPolicy>,
762 pub timestamping: Option<TimestampingConfig>,
766 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
770}
771
772impl StreamConfig {
773 pub fn new() -> Self {
775 Self::default()
776 }
777
778 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
780 Self {
781 storage_class: Some(storage_class),
782 ..self
783 }
784 }
785
786 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
788 Self {
789 retention_policy: Some(retention_policy),
790 ..self
791 }
792 }
793
794 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
796 Self {
797 timestamping: Some(timestamping),
798 ..self
799 }
800 }
801
802 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
804 Self {
805 delete_on_empty: Some(delete_on_empty),
806 ..self
807 }
808 }
809}
810
811impl From<api::config::StreamConfig> for StreamConfig {
812 fn from(value: api::config::StreamConfig) -> Self {
813 Self {
814 storage_class: value.storage_class.map(Into::into),
815 retention_policy: value.retention_policy.map(Into::into),
816 timestamping: value.timestamping.map(Into::into),
817 delete_on_empty: value.delete_on_empty.map(Into::into),
818 }
819 }
820}
821
822impl From<StreamConfig> for api::config::StreamConfig {
823 fn from(value: StreamConfig) -> Self {
824 Self {
825 storage_class: value.storage_class.map(Into::into),
826 retention_policy: value.retention_policy.map(Into::into),
827 timestamping: value.timestamping.map(Into::into),
828 delete_on_empty: value.delete_on_empty.map(Into::into),
829 }
830 }
831}
832
833#[derive(Debug, Clone, Default, PartialEq, Eq)]
834#[non_exhaustive]
835pub struct BasinConfig {
837 pub default_stream_config: Option<StreamConfig>,
841 pub stream_cipher: Option<EncryptionAlgorithm>,
843 pub create_stream_on_append: bool,
847 pub create_stream_on_read: bool,
851}
852
853impl BasinConfig {
854 pub fn new() -> Self {
856 Self::default()
857 }
858
859 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
861 Self {
862 default_stream_config: Some(config),
863 ..self
864 }
865 }
866
867 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
869 Self {
870 stream_cipher: Some(stream_cipher),
871 ..self
872 }
873 }
874
875 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
878 Self {
879 create_stream_on_append,
880 ..self
881 }
882 }
883
884 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
886 Self {
887 create_stream_on_read,
888 ..self
889 }
890 }
891}
892
893impl From<api::config::BasinConfig> for BasinConfig {
894 fn from(value: api::config::BasinConfig) -> Self {
895 Self {
896 default_stream_config: value.default_stream_config.map(Into::into),
897 stream_cipher: value.stream_cipher.map(Into::into),
898 create_stream_on_append: value.create_stream_on_append,
899 create_stream_on_read: value.create_stream_on_read,
900 }
901 }
902}
903
904impl From<BasinConfig> for api::config::BasinConfig {
905 fn from(value: BasinConfig) -> Self {
906 Self {
907 default_stream_config: value.default_stream_config.map(Into::into),
908 stream_cipher: value.stream_cipher.map(Into::into),
909 create_stream_on_append: value.create_stream_on_append,
910 create_stream_on_read: value.create_stream_on_read,
911 }
912 }
913}
914
915#[derive(Debug, Clone, PartialEq, Eq)]
916#[non_exhaustive]
918pub enum BasinScope {
919 AwsUsEast1,
921 AwsUsWest2,
923 AwsEuNorth1,
925}
926
927impl From<api::basin::BasinScope> for BasinScope {
928 fn from(value: api::basin::BasinScope) -> Self {
929 match value {
930 api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
931 api::basin::BasinScope::AwsUsWest2 => BasinScope::AwsUsWest2,
932 api::basin::BasinScope::AwsEuNorth1 => BasinScope::AwsEuNorth1,
933 }
934 }
935}
936
937impl From<BasinScope> for api::basin::BasinScope {
938 fn from(value: BasinScope) -> Self {
939 match value {
940 BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
941 BasinScope::AwsUsWest2 => api::basin::BasinScope::AwsUsWest2,
942 BasinScope::AwsEuNorth1 => api::basin::BasinScope::AwsEuNorth1,
943 }
944 }
945}
946
947#[derive(Debug, Clone)]
948#[non_exhaustive]
949pub struct CreateBasinInput {
951 pub name: BasinName,
953 pub config: Option<BasinConfig>,
957 pub scope: Option<BasinScope>,
961 idempotency_token: String,
962}
963
964impl CreateBasinInput {
965 pub fn new(name: BasinName) -> Self {
967 Self {
968 name,
969 config: None,
970 scope: None,
971 idempotency_token: idempotency_token(),
972 }
973 }
974
975 pub fn with_config(self, config: BasinConfig) -> Self {
977 Self {
978 config: Some(config),
979 ..self
980 }
981 }
982
983 pub fn with_scope(self, scope: BasinScope) -> Self {
985 Self {
986 scope: Some(scope),
987 ..self
988 }
989 }
990}
991
992impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
993 fn from(value: CreateBasinInput) -> Self {
994 (
995 api::basin::CreateBasinRequest {
996 basin: value.name,
997 config: value.config.map(Into::into),
998 scope: value.scope.map(Into::into),
999 },
1000 value.idempotency_token,
1001 )
1002 }
1003}
1004
1005#[derive(Debug, Clone)]
1006#[non_exhaustive]
1007pub struct EnsureBasinInput {
1009 pub name: BasinName,
1011 pub config: Option<BasinConfig>,
1015 pub scope: Option<BasinScope>,
1019}
1020
1021impl EnsureBasinInput {
1022 pub fn new(name: BasinName) -> Self {
1024 Self {
1025 name,
1026 config: None,
1027 scope: None,
1028 }
1029 }
1030
1031 pub fn with_config(self, config: BasinConfig) -> Self {
1033 Self {
1034 config: Some(config),
1035 ..self
1036 }
1037 }
1038
1039 pub fn with_scope(self, scope: BasinScope) -> Self {
1041 Self {
1042 scope: Some(scope),
1043 ..self
1044 }
1045 }
1046}
1047
1048impl From<EnsureBasinInput> for (BasinName, Option<api::basin::EnsureBasinRequest>) {
1049 fn from(value: EnsureBasinInput) -> Self {
1050 let config = value.config;
1051 let request = if config.is_some() || value.scope.is_some() {
1052 Some(api::basin::EnsureBasinRequest {
1053 config: config.map(Into::into),
1054 scope: value.scope.map(Into::into),
1055 })
1056 } else {
1057 None
1058 };
1059 (value.name, request)
1060 }
1061}
1062
1063#[derive(Debug, Clone)]
1064pub enum EnsureOutput<T> {
1067 Created(T),
1069 ConfigUpdated(T),
1071 ConfigUnchanged(T),
1073}
1074
1075impl<T> From<ProvisionResult<T>> for EnsureOutput<T> {
1076 fn from(result: ProvisionResult<T>) -> Self {
1077 match result {
1078 ProvisionResult::Created(info) => EnsureOutput::Created(info),
1079 ProvisionResult::Updated(info) => EnsureOutput::ConfigUpdated(info),
1080 ProvisionResult::Noop(info) => EnsureOutput::ConfigUnchanged(info),
1081 }
1082 }
1083}
1084
1085#[derive(Debug, Clone, Default)]
1086#[non_exhaustive]
1087pub struct ListBasinsInput {
1089 pub prefix: BasinNamePrefix,
1093 pub start_after: BasinNameStartAfter,
1099 pub limit: Option<usize>,
1103}
1104
1105impl ListBasinsInput {
1106 pub fn new() -> Self {
1108 Self::default()
1109 }
1110
1111 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1113 Self { prefix, ..self }
1114 }
1115
1116 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1119 Self {
1120 start_after,
1121 ..self
1122 }
1123 }
1124
1125 pub fn with_limit(self, limit: usize) -> Self {
1127 Self {
1128 limit: Some(limit),
1129 ..self
1130 }
1131 }
1132}
1133
1134impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1135 fn from(value: ListBasinsInput) -> Self {
1136 Self {
1137 prefix: Some(value.prefix),
1138 start_after: Some(value.start_after),
1139 limit: value.limit,
1140 }
1141 }
1142}
1143
1144#[derive(Debug, Clone, Default)]
1145pub struct ListAllBasinsInput {
1147 pub prefix: BasinNamePrefix,
1151 pub start_after: BasinNameStartAfter,
1157 pub include_deleted: bool,
1161}
1162
1163impl ListAllBasinsInput {
1164 pub fn new() -> Self {
1166 Self::default()
1167 }
1168
1169 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1171 Self { prefix, ..self }
1172 }
1173
1174 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1177 Self {
1178 start_after,
1179 ..self
1180 }
1181 }
1182
1183 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1185 Self {
1186 include_deleted,
1187 ..self
1188 }
1189 }
1190}
1191
1192#[derive(Debug, Clone, PartialEq, Eq)]
1193#[non_exhaustive]
1194pub struct BasinInfo {
1196 pub name: BasinName,
1198 pub scope: Option<BasinScope>,
1200 pub created_at: S2DateTime,
1202 pub deleted_at: Option<S2DateTime>,
1204}
1205
1206impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1207 type Error = ValidationError;
1208
1209 fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1210 Ok(Self {
1211 name: value.name,
1212 scope: value.scope.map(Into::into),
1213 created_at: value.created_at.try_into()?,
1214 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1215 })
1216 }
1217}
1218
1219#[derive(Debug, Clone)]
1220#[non_exhaustive]
1221pub struct DeleteBasinInput {
1223 pub name: BasinName,
1225 pub ignore_not_found: bool,
1227}
1228
1229impl DeleteBasinInput {
1230 pub fn new(name: BasinName) -> Self {
1232 Self {
1233 name,
1234 ignore_not_found: false,
1235 }
1236 }
1237
1238 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1240 Self {
1241 ignore_not_found,
1242 ..self
1243 }
1244 }
1245}
1246
1247#[derive(Debug, Clone, Default)]
1248#[non_exhaustive]
1249pub struct TimestampingReconfiguration {
1251 pub mode: Maybe<Option<TimestampingMode>>,
1253 pub uncapped: Maybe<Option<bool>>,
1255}
1256
1257impl TimestampingReconfiguration {
1258 pub fn new() -> Self {
1260 Self::default()
1261 }
1262
1263 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1265 Self {
1266 mode: Maybe::Specified(Some(mode)),
1267 ..self
1268 }
1269 }
1270
1271 pub fn with_uncapped(self, uncapped: bool) -> Self {
1273 Self {
1274 uncapped: Maybe::Specified(Some(uncapped)),
1275 ..self
1276 }
1277 }
1278}
1279
1280impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1281 fn from(value: TimestampingReconfiguration) -> Self {
1282 Self {
1283 mode: value.mode.map(|m| m.map(Into::into)),
1284 uncapped: value.uncapped,
1285 }
1286 }
1287}
1288
1289#[derive(Debug, Clone, Default)]
1290#[non_exhaustive]
1291pub struct DeleteOnEmptyReconfiguration {
1293 pub min_age_secs: Maybe<Option<u64>>,
1295}
1296
1297impl DeleteOnEmptyReconfiguration {
1298 pub fn new() -> Self {
1300 Self::default()
1301 }
1302
1303 pub fn with_min_age(self, min_age: Duration) -> Self {
1305 Self {
1306 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1307 }
1308 }
1309}
1310
1311impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1312 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1313 Self {
1314 min_age_secs: value.min_age_secs,
1315 }
1316 }
1317}
1318
1319#[derive(Debug, Clone, Default)]
1320#[non_exhaustive]
1321pub struct StreamReconfiguration {
1323 pub storage_class: Maybe<Option<StorageClass>>,
1325 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1327 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1329 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1331}
1332
1333impl StreamReconfiguration {
1334 pub fn new() -> Self {
1336 Self::default()
1337 }
1338
1339 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1341 Self {
1342 storage_class: Maybe::Specified(Some(storage_class)),
1343 ..self
1344 }
1345 }
1346
1347 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1349 Self {
1350 retention_policy: Maybe::Specified(Some(retention_policy)),
1351 ..self
1352 }
1353 }
1354
1355 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1357 Self {
1358 timestamping: Maybe::Specified(Some(timestamping)),
1359 ..self
1360 }
1361 }
1362
1363 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1365 Self {
1366 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1367 ..self
1368 }
1369 }
1370}
1371
1372impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1373 fn from(value: StreamReconfiguration) -> Self {
1374 Self {
1375 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1376 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1377 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1378 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1379 }
1380 }
1381}
1382
1383#[derive(Debug, Clone, Default)]
1384#[non_exhaustive]
1385pub struct BasinReconfiguration {
1387 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1389 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1391 pub create_stream_on_append: Maybe<bool>,
1394 pub create_stream_on_read: Maybe<bool>,
1396}
1397
1398impl BasinReconfiguration {
1399 pub fn new() -> Self {
1401 Self::default()
1402 }
1403
1404 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1407 Self {
1408 default_stream_config: Maybe::Specified(Some(config)),
1409 ..self
1410 }
1411 }
1412
1413 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1415 Self {
1416 stream_cipher: Maybe::Specified(Some(stream_cipher)),
1417 ..self
1418 }
1419 }
1420
1421 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1424 Self {
1425 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1426 ..self
1427 }
1428 }
1429
1430 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1433 Self {
1434 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1435 ..self
1436 }
1437 }
1438}
1439
1440impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1441 fn from(value: BasinReconfiguration) -> Self {
1442 Self {
1443 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1444 stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1445 create_stream_on_append: value.create_stream_on_append,
1446 create_stream_on_read: value.create_stream_on_read,
1447 }
1448 }
1449}
1450
1451#[derive(Debug, Clone)]
1452#[non_exhaustive]
1453pub struct ReconfigureBasinInput {
1455 pub name: BasinName,
1457 pub config: BasinReconfiguration,
1459}
1460
1461impl ReconfigureBasinInput {
1462 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1464 Self { name, config }
1465 }
1466}
1467
1468#[derive(Debug, Clone, Default)]
1469#[non_exhaustive]
1470pub struct ListAccessTokensInput {
1472 pub prefix: AccessTokenIdPrefix,
1476 pub start_after: AccessTokenIdStartAfter,
1482 pub limit: Option<usize>,
1486}
1487
1488impl ListAccessTokensInput {
1489 pub fn new() -> Self {
1491 Self::default()
1492 }
1493
1494 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1496 Self { prefix, ..self }
1497 }
1498
1499 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1502 Self {
1503 start_after,
1504 ..self
1505 }
1506 }
1507
1508 pub fn with_limit(self, limit: usize) -> Self {
1510 Self {
1511 limit: Some(limit),
1512 ..self
1513 }
1514 }
1515}
1516
1517impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1518 fn from(value: ListAccessTokensInput) -> Self {
1519 Self {
1520 prefix: Some(value.prefix),
1521 start_after: Some(value.start_after),
1522 limit: value.limit,
1523 }
1524 }
1525}
1526
1527#[derive(Debug, Clone, Default)]
1528pub struct ListAllAccessTokensInput {
1530 pub prefix: AccessTokenIdPrefix,
1534 pub start_after: AccessTokenIdStartAfter,
1540}
1541
1542impl ListAllAccessTokensInput {
1543 pub fn new() -> Self {
1545 Self::default()
1546 }
1547
1548 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1550 Self { prefix, ..self }
1551 }
1552
1553 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1556 Self {
1557 start_after,
1558 ..self
1559 }
1560 }
1561}
1562
1563#[derive(Debug, Clone)]
1564#[non_exhaustive]
1565pub struct AccessTokenInfo {
1567 pub id: AccessTokenId,
1569 pub expires_at: S2DateTime,
1571 pub auto_prefix_streams: bool,
1574 pub scope: AccessTokenScope,
1576}
1577
1578impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1579 type Error = ValidationError;
1580
1581 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1582 let expires_at = value
1583 .expires_at
1584 .map(S2DateTime::try_from)
1585 .transpose()?
1586 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1587 Ok(Self {
1588 id: value.id,
1589 expires_at,
1590 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1591 scope: value.scope.into(),
1592 })
1593 }
1594}
1595
1596#[derive(Debug, Clone)]
1597pub enum BasinMatcher {
1601 None,
1603 Exact(BasinName),
1605 Prefix(BasinNamePrefix),
1607}
1608
1609#[derive(Debug, Clone)]
1610pub enum StreamMatcher {
1614 None,
1616 Exact(StreamName),
1618 Prefix(StreamNamePrefix),
1620}
1621
1622#[derive(Debug, Clone)]
1623pub enum AccessTokenMatcher {
1627 None,
1629 Exact(AccessTokenId),
1631 Prefix(AccessTokenIdPrefix),
1633}
1634
1635#[derive(Debug, Clone, Default)]
1636#[non_exhaustive]
1637pub struct ReadWritePermissions {
1639 pub read: bool,
1643 pub write: bool,
1647}
1648
1649impl ReadWritePermissions {
1650 pub fn new() -> Self {
1652 Self::default()
1653 }
1654
1655 pub fn read_only() -> Self {
1657 Self {
1658 read: true,
1659 write: false,
1660 }
1661 }
1662
1663 pub fn write_only() -> Self {
1665 Self {
1666 read: false,
1667 write: true,
1668 }
1669 }
1670
1671 pub fn read_write() -> Self {
1673 Self {
1674 read: true,
1675 write: true,
1676 }
1677 }
1678}
1679
1680impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1681 fn from(value: ReadWritePermissions) -> Self {
1682 Self {
1683 read: Some(value.read),
1684 write: Some(value.write),
1685 }
1686 }
1687}
1688
1689impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1690 fn from(value: api::access::ReadWritePermissions) -> Self {
1691 Self {
1692 read: value.read.unwrap_or_default(),
1693 write: value.write.unwrap_or_default(),
1694 }
1695 }
1696}
1697
1698#[derive(Debug, Clone, Default)]
1699#[non_exhaustive]
1700pub struct OperationGroupPermissions {
1704 pub account: Option<ReadWritePermissions>,
1708 pub basin: Option<ReadWritePermissions>,
1712 pub stream: Option<ReadWritePermissions>,
1716}
1717
1718impl OperationGroupPermissions {
1719 pub fn new() -> Self {
1721 Self::default()
1722 }
1723
1724 pub fn read_only_all() -> Self {
1726 Self {
1727 account: Some(ReadWritePermissions::read_only()),
1728 basin: Some(ReadWritePermissions::read_only()),
1729 stream: Some(ReadWritePermissions::read_only()),
1730 }
1731 }
1732
1733 pub fn write_only_all() -> Self {
1735 Self {
1736 account: Some(ReadWritePermissions::write_only()),
1737 basin: Some(ReadWritePermissions::write_only()),
1738 stream: Some(ReadWritePermissions::write_only()),
1739 }
1740 }
1741
1742 pub fn read_write_all() -> Self {
1744 Self {
1745 account: Some(ReadWritePermissions::read_write()),
1746 basin: Some(ReadWritePermissions::read_write()),
1747 stream: Some(ReadWritePermissions::read_write()),
1748 }
1749 }
1750
1751 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1753 Self {
1754 account: Some(account),
1755 ..self
1756 }
1757 }
1758
1759 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1761 Self {
1762 basin: Some(basin),
1763 ..self
1764 }
1765 }
1766
1767 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1769 Self {
1770 stream: Some(stream),
1771 ..self
1772 }
1773 }
1774}
1775
1776impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1777 fn from(value: OperationGroupPermissions) -> Self {
1778 Self {
1779 account: value.account.map(Into::into),
1780 basin: value.basin.map(Into::into),
1781 stream: value.stream.map(Into::into),
1782 }
1783 }
1784}
1785
1786impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1787 fn from(value: api::access::PermittedOperationGroups) -> Self {
1788 Self {
1789 account: value.account.map(Into::into),
1790 basin: value.basin.map(Into::into),
1791 stream: value.stream.map(Into::into),
1792 }
1793 }
1794}
1795
1796#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1797pub enum Operation {
1801 ListBasins,
1803 CreateBasin,
1805 GetBasinConfig,
1807 DeleteBasin,
1809 ReconfigureBasin,
1811 ListAccessTokens,
1813 IssueAccessToken,
1815 RevokeAccessToken,
1817 GetAccountMetrics,
1819 GetBasinMetrics,
1821 GetStreamMetrics,
1823 ListStreams,
1825 CreateStream,
1827 GetStreamConfig,
1829 DeleteStream,
1831 ReconfigureStream,
1833 CheckTail,
1835 Append,
1837 Read,
1839 Trim,
1841 Fence,
1843}
1844
1845impl From<Operation> for api::access::Operation {
1846 fn from(value: Operation) -> Self {
1847 match value {
1848 Operation::ListBasins => api::access::Operation::ListBasins,
1849 Operation::CreateBasin => api::access::Operation::CreateBasin,
1850 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1851 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1852 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1853 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1854 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1855 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1856 Operation::ListStreams => api::access::Operation::ListStreams,
1857 Operation::CreateStream => api::access::Operation::CreateStream,
1858 Operation::DeleteStream => api::access::Operation::DeleteStream,
1859 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1860 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1861 Operation::CheckTail => api::access::Operation::CheckTail,
1862 Operation::Append => api::access::Operation::Append,
1863 Operation::Read => api::access::Operation::Read,
1864 Operation::Trim => api::access::Operation::Trim,
1865 Operation::Fence => api::access::Operation::Fence,
1866 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1867 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1868 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1869 }
1870 }
1871}
1872
1873impl From<api::access::Operation> for Operation {
1874 fn from(value: api::access::Operation) -> Self {
1875 match value {
1876 api::access::Operation::ListBasins => Operation::ListBasins,
1877 api::access::Operation::CreateBasin => Operation::CreateBasin,
1878 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1879 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1880 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1881 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1882 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1883 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1884 api::access::Operation::ListStreams => Operation::ListStreams,
1885 api::access::Operation::CreateStream => Operation::CreateStream,
1886 api::access::Operation::DeleteStream => Operation::DeleteStream,
1887 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1888 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1889 api::access::Operation::CheckTail => Operation::CheckTail,
1890 api::access::Operation::Append => Operation::Append,
1891 api::access::Operation::Read => Operation::Read,
1892 api::access::Operation::Trim => Operation::Trim,
1893 api::access::Operation::Fence => Operation::Fence,
1894 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1895 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1896 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1897 }
1898 }
1899}
1900
1901#[derive(Debug, Clone)]
1902#[non_exhaustive]
1903pub struct AccessTokenScopeInput {
1911 basins: Option<BasinMatcher>,
1912 streams: Option<StreamMatcher>,
1913 access_tokens: Option<AccessTokenMatcher>,
1914 op_group_perms: Option<OperationGroupPermissions>,
1915 ops: HashSet<Operation>,
1916}
1917
1918impl AccessTokenScopeInput {
1919 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1921 Self {
1922 basins: None,
1923 streams: None,
1924 access_tokens: None,
1925 op_group_perms: None,
1926 ops: ops.into_iter().collect(),
1927 }
1928 }
1929
1930 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1932 Self {
1933 basins: None,
1934 streams: None,
1935 access_tokens: None,
1936 op_group_perms: Some(op_group_perms),
1937 ops: HashSet::default(),
1938 }
1939 }
1940
1941 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1943 Self {
1944 ops: ops.into_iter().collect(),
1945 ..self
1946 }
1947 }
1948
1949 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1951 Self {
1952 op_group_perms: Some(op_group_perms),
1953 ..self
1954 }
1955 }
1956
1957 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1961 Self {
1962 basins: Some(basins),
1963 ..self
1964 }
1965 }
1966
1967 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1971 Self {
1972 streams: Some(streams),
1973 ..self
1974 }
1975 }
1976
1977 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1981 Self {
1982 access_tokens: Some(access_tokens),
1983 ..self
1984 }
1985 }
1986}
1987
1988#[derive(Debug, Clone)]
1989#[non_exhaustive]
1990pub struct AccessTokenScope {
1992 pub basins: Option<BasinMatcher>,
1994 pub streams: Option<StreamMatcher>,
1996 pub access_tokens: Option<AccessTokenMatcher>,
1998 pub op_group_perms: Option<OperationGroupPermissions>,
2000 pub ops: HashSet<Operation>,
2002}
2003
2004impl From<api::access::AccessTokenScope> for AccessTokenScope {
2005 fn from(value: api::access::AccessTokenScope) -> Self {
2006 Self {
2007 basins: value.basins.map(|rs| match rs {
2008 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2009 BasinMatcher::Exact(e)
2010 }
2011 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2012 BasinMatcher::None
2013 }
2014 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
2015 }),
2016 streams: value.streams.map(|rs| match rs {
2017 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2018 StreamMatcher::Exact(e)
2019 }
2020 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2021 StreamMatcher::None
2022 }
2023 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
2024 }),
2025 access_tokens: value.access_tokens.map(|rs| match rs {
2026 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2027 AccessTokenMatcher::Exact(e)
2028 }
2029 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2030 AccessTokenMatcher::None
2031 }
2032 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
2033 }),
2034 op_group_perms: value.op_groups.map(Into::into),
2035 ops: value
2036 .ops
2037 .map(|ops| ops.into_iter().map(Into::into).collect())
2038 .unwrap_or_default(),
2039 }
2040 }
2041}
2042
2043impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2044 fn from(value: AccessTokenScopeInput) -> Self {
2045 Self {
2046 basins: value.basins.map(|rs| match rs {
2047 BasinMatcher::None => {
2048 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2049 }
2050 BasinMatcher::Exact(e) => {
2051 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2052 }
2053 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2054 }),
2055 streams: value.streams.map(|rs| match rs {
2056 StreamMatcher::None => {
2057 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2058 }
2059 StreamMatcher::Exact(e) => {
2060 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2061 }
2062 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2063 }),
2064 access_tokens: value.access_tokens.map(|rs| match rs {
2065 AccessTokenMatcher::None => {
2066 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2067 }
2068 AccessTokenMatcher::Exact(e) => {
2069 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2070 }
2071 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2072 }),
2073 op_groups: value.op_group_perms.map(Into::into),
2074 ops: if value.ops.is_empty() {
2075 None
2076 } else {
2077 Some(value.ops.into_iter().map(Into::into).collect())
2078 },
2079 }
2080 }
2081}
2082
2083#[derive(Debug, Clone)]
2084#[non_exhaustive]
2085pub struct IssueAccessTokenInput {
2087 pub id: AccessTokenId,
2089 pub expires_at: Option<S2DateTime>,
2094 pub auto_prefix_streams: bool,
2102 pub scope: AccessTokenScopeInput,
2104}
2105
2106impl IssueAccessTokenInput {
2107 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2109 Self {
2110 id,
2111 expires_at: None,
2112 auto_prefix_streams: false,
2113 scope,
2114 }
2115 }
2116
2117 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2119 Self {
2120 expires_at: Some(expires_at),
2121 ..self
2122 }
2123 }
2124
2125 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2128 Self {
2129 auto_prefix_streams,
2130 ..self
2131 }
2132 }
2133}
2134
2135impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2136 fn from(value: IssueAccessTokenInput) -> Self {
2137 Self {
2138 id: value.id,
2139 expires_at: value.expires_at.map(Into::into),
2140 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2141 scope: value.scope.into(),
2142 }
2143 }
2144}
2145
2146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2147pub enum TimeseriesInterval {
2149 Minute,
2151 Hour,
2153 Day,
2155}
2156
2157impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2158 fn from(value: TimeseriesInterval) -> Self {
2159 match value {
2160 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2161 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2162 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2163 }
2164 }
2165}
2166
2167impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2168 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2169 match value {
2170 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2171 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2172 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2173 }
2174 }
2175}
2176
2177#[derive(Debug, Clone, Copy)]
2178#[non_exhaustive]
2179pub struct TimeRange {
2181 pub start: u32,
2183 pub end: u32,
2185}
2186
2187impl TimeRange {
2188 pub fn new(start: u32, end: u32) -> Self {
2190 Self { start, end }
2191 }
2192}
2193
2194#[derive(Debug, Clone, Copy)]
2195#[non_exhaustive]
2196pub struct TimeRangeAndInterval {
2198 pub start: u32,
2200 pub end: u32,
2202 pub interval: Option<TimeseriesInterval>,
2206}
2207
2208impl TimeRangeAndInterval {
2209 pub fn new(start: u32, end: u32) -> Self {
2211 Self {
2212 start,
2213 end,
2214 interval: None,
2215 }
2216 }
2217
2218 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2220 Self {
2221 interval: Some(interval),
2222 ..self
2223 }
2224 }
2225}
2226
2227#[derive(Debug, Clone, Copy)]
2228pub enum AccountMetricSet {
2230 ActiveBasins(TimeRange),
2233 AccountOps(TimeRangeAndInterval),
2240}
2241
2242#[derive(Debug, Clone)]
2243#[non_exhaustive]
2244pub struct GetAccountMetricsInput {
2246 pub set: AccountMetricSet,
2248}
2249
2250impl GetAccountMetricsInput {
2251 pub fn new(set: AccountMetricSet) -> Self {
2253 Self { set }
2254 }
2255}
2256
2257impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2258 fn from(value: GetAccountMetricsInput) -> Self {
2259 let (set, start, end, interval) = match value.set {
2260 AccountMetricSet::ActiveBasins(args) => (
2261 api::metrics::AccountMetricSet::ActiveBasins,
2262 args.start,
2263 args.end,
2264 None,
2265 ),
2266 AccountMetricSet::AccountOps(args) => (
2267 api::metrics::AccountMetricSet::AccountOps,
2268 args.start,
2269 args.end,
2270 args.interval,
2271 ),
2272 };
2273 Self {
2274 set,
2275 start: Some(start),
2276 end: Some(end),
2277 interval: interval.map(Into::into),
2278 }
2279 }
2280}
2281
2282#[derive(Debug, Clone, Copy)]
2283pub enum BasinMetricSet {
2285 Storage(TimeRange),
2288 AppendOps(TimeRangeAndInterval),
2296 ReadOps(TimeRangeAndInterval),
2304 ReadThroughput(TimeRangeAndInterval),
2311 AppendThroughput(TimeRangeAndInterval),
2318 BasinOps(TimeRangeAndInterval),
2325}
2326
2327#[derive(Debug, Clone)]
2328#[non_exhaustive]
2329pub struct GetBasinMetricsInput {
2331 pub name: BasinName,
2333 pub set: BasinMetricSet,
2335}
2336
2337impl GetBasinMetricsInput {
2338 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2340 Self { name, set }
2341 }
2342}
2343
2344impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2345 fn from(value: GetBasinMetricsInput) -> Self {
2346 let (set, start, end, interval) = match value.set {
2347 BasinMetricSet::Storage(args) => (
2348 api::metrics::BasinMetricSet::Storage,
2349 args.start,
2350 args.end,
2351 None,
2352 ),
2353 BasinMetricSet::AppendOps(args) => (
2354 api::metrics::BasinMetricSet::AppendOps,
2355 args.start,
2356 args.end,
2357 args.interval,
2358 ),
2359 BasinMetricSet::ReadOps(args) => (
2360 api::metrics::BasinMetricSet::ReadOps,
2361 args.start,
2362 args.end,
2363 args.interval,
2364 ),
2365 BasinMetricSet::ReadThroughput(args) => (
2366 api::metrics::BasinMetricSet::ReadThroughput,
2367 args.start,
2368 args.end,
2369 args.interval,
2370 ),
2371 BasinMetricSet::AppendThroughput(args) => (
2372 api::metrics::BasinMetricSet::AppendThroughput,
2373 args.start,
2374 args.end,
2375 args.interval,
2376 ),
2377 BasinMetricSet::BasinOps(args) => (
2378 api::metrics::BasinMetricSet::BasinOps,
2379 args.start,
2380 args.end,
2381 args.interval,
2382 ),
2383 };
2384 (
2385 value.name,
2386 api::metrics::BasinMetricSetRequest {
2387 set,
2388 start: Some(start),
2389 end: Some(end),
2390 interval: interval.map(Into::into),
2391 },
2392 )
2393 }
2394}
2395
2396#[derive(Debug, Clone, Copy)]
2397pub enum StreamMetricSet {
2399 Storage(TimeRange),
2402}
2403
2404#[derive(Debug, Clone)]
2405#[non_exhaustive]
2406pub struct GetStreamMetricsInput {
2408 pub basin_name: BasinName,
2410 pub stream_name: StreamName,
2412 pub set: StreamMetricSet,
2414}
2415
2416impl GetStreamMetricsInput {
2417 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2420 Self {
2421 basin_name,
2422 stream_name,
2423 set,
2424 }
2425 }
2426}
2427
2428impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2429 fn from(value: GetStreamMetricsInput) -> Self {
2430 let (set, start, end, interval) = match value.set {
2431 StreamMetricSet::Storage(args) => (
2432 api::metrics::StreamMetricSet::Storage,
2433 args.start,
2434 args.end,
2435 None,
2436 ),
2437 };
2438 (
2439 value.basin_name,
2440 value.stream_name,
2441 api::metrics::StreamMetricSetRequest {
2442 set,
2443 start: Some(start),
2444 end: Some(end),
2445 interval,
2446 },
2447 )
2448 }
2449}
2450
2451#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2452pub enum MetricUnit {
2454 Bytes,
2456 Operations,
2458}
2459
2460impl From<api::metrics::MetricUnit> for MetricUnit {
2461 fn from(value: api::metrics::MetricUnit) -> Self {
2462 match value {
2463 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2464 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2465 }
2466 }
2467}
2468
2469#[derive(Debug, Clone)]
2470#[non_exhaustive]
2471pub struct ScalarMetric {
2473 pub name: String,
2475 pub unit: MetricUnit,
2477 pub value: f64,
2479}
2480
2481#[derive(Debug, Clone)]
2482#[non_exhaustive]
2483pub struct AccumulationMetric {
2486 pub name: String,
2488 pub unit: MetricUnit,
2490 pub interval: TimeseriesInterval,
2492 pub values: Vec<(u32, f64)>,
2496}
2497
2498#[derive(Debug, Clone)]
2499#[non_exhaustive]
2500pub struct GaugeMetric {
2502 pub name: String,
2504 pub unit: MetricUnit,
2506 pub values: Vec<(u32, f64)>,
2509}
2510
2511#[derive(Debug, Clone)]
2512#[non_exhaustive]
2513pub struct LabelMetric {
2515 pub name: String,
2517 pub values: Vec<String>,
2519}
2520
2521#[derive(Debug, Clone)]
2522pub enum Metric {
2524 Scalar(ScalarMetric),
2526 Accumulation(AccumulationMetric),
2529 Gauge(GaugeMetric),
2531 Label(LabelMetric),
2533}
2534
2535impl From<api::metrics::Metric> for Metric {
2536 fn from(value: api::metrics::Metric) -> Self {
2537 match value {
2538 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2539 name: sm.name.into(),
2540 unit: sm.unit.into(),
2541 value: sm.value,
2542 }),
2543 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2544 name: am.name.into(),
2545 unit: am.unit.into(),
2546 interval: am.interval.into(),
2547 values: am.values,
2548 }),
2549 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2550 name: gm.name.into(),
2551 unit: gm.unit.into(),
2552 values: gm.values,
2553 }),
2554 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2555 name: lm.name.into(),
2556 values: lm.values,
2557 }),
2558 }
2559 }
2560}
2561
2562#[derive(Debug, Clone, Default)]
2563#[non_exhaustive]
2564pub struct ListStreamsInput {
2566 pub prefix: StreamNamePrefix,
2570 pub start_after: StreamNameStartAfter,
2576 pub limit: Option<usize>,
2580}
2581
2582impl ListStreamsInput {
2583 pub fn new() -> Self {
2585 Self::default()
2586 }
2587
2588 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2590 Self { prefix, ..self }
2591 }
2592
2593 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2596 Self {
2597 start_after,
2598 ..self
2599 }
2600 }
2601
2602 pub fn with_limit(self, limit: usize) -> Self {
2604 Self {
2605 limit: Some(limit),
2606 ..self
2607 }
2608 }
2609}
2610
2611impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2612 fn from(value: ListStreamsInput) -> Self {
2613 Self {
2614 prefix: Some(value.prefix),
2615 start_after: Some(value.start_after),
2616 limit: value.limit,
2617 }
2618 }
2619}
2620
2621#[derive(Debug, Clone, Default)]
2622pub struct ListAllStreamsInput {
2624 pub prefix: StreamNamePrefix,
2628 pub start_after: StreamNameStartAfter,
2634 pub include_deleted: bool,
2638}
2639
2640impl ListAllStreamsInput {
2641 pub fn new() -> Self {
2643 Self::default()
2644 }
2645
2646 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2648 Self { prefix, ..self }
2649 }
2650
2651 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2654 Self {
2655 start_after,
2656 ..self
2657 }
2658 }
2659
2660 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2662 Self {
2663 include_deleted,
2664 ..self
2665 }
2666 }
2667}
2668
2669#[derive(Debug, Clone, PartialEq, Eq)]
2670#[non_exhaustive]
2671pub struct StreamInfo {
2673 pub name: StreamName,
2675 pub created_at: S2DateTime,
2677 pub deleted_at: Option<S2DateTime>,
2679 pub cipher: Option<EncryptionAlgorithm>,
2681}
2682
2683impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2684 type Error = ValidationError;
2685
2686 fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2687 Ok(Self {
2688 name: value.name,
2689 created_at: value.created_at.try_into()?,
2690 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2691 cipher: value.cipher.map(Into::into),
2692 })
2693 }
2694}
2695
2696#[derive(Debug, Clone)]
2697#[non_exhaustive]
2698pub struct CreateStreamInput {
2700 pub name: StreamName,
2702 pub config: Option<StreamConfig>,
2706 idempotency_token: String,
2707}
2708
2709impl CreateStreamInput {
2710 pub fn new(name: StreamName) -> Self {
2712 Self {
2713 name,
2714 config: None,
2715 idempotency_token: idempotency_token(),
2716 }
2717 }
2718
2719 pub fn with_config(self, config: StreamConfig) -> Self {
2721 Self {
2722 config: Some(config),
2723 ..self
2724 }
2725 }
2726}
2727
2728impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2729 fn from(value: CreateStreamInput) -> Self {
2730 (
2731 api::stream::CreateStreamRequest {
2732 stream: value.name,
2733 config: value.config.map(Into::into),
2734 },
2735 value.idempotency_token,
2736 )
2737 }
2738}
2739
2740#[derive(Debug, Clone)]
2741#[non_exhaustive]
2742pub struct EnsureStreamInput {
2745 pub name: StreamName,
2747 pub config: Option<StreamConfig>,
2751}
2752
2753impl EnsureStreamInput {
2754 pub fn new(name: StreamName) -> Self {
2756 Self { name, config: None }
2757 }
2758
2759 pub fn with_config(self, config: StreamConfig) -> Self {
2761 Self {
2762 config: Some(config),
2763 ..self
2764 }
2765 }
2766}
2767
2768impl From<EnsureStreamInput> for (StreamName, Option<api::config::StreamConfig>) {
2769 fn from(value: EnsureStreamInput) -> Self {
2770 (value.name, value.config.map(Into::into))
2771 }
2772}
2773
2774#[derive(Debug, Clone)]
2775#[non_exhaustive]
2776pub struct DeleteStreamInput {
2778 pub name: StreamName,
2780 pub ignore_not_found: bool,
2782}
2783
2784impl DeleteStreamInput {
2785 pub fn new(name: StreamName) -> Self {
2787 Self {
2788 name,
2789 ignore_not_found: false,
2790 }
2791 }
2792
2793 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2795 Self {
2796 ignore_not_found,
2797 ..self
2798 }
2799 }
2800}
2801
2802#[derive(Debug, Clone)]
2803#[non_exhaustive]
2804pub struct ReconfigureStreamInput {
2806 pub name: StreamName,
2808 pub config: StreamReconfiguration,
2810}
2811
2812impl ReconfigureStreamInput {
2813 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2815 Self { name, config }
2816 }
2817}
2818
2819#[derive(Debug, Clone, PartialEq, Eq)]
2820pub struct FencingToken(String);
2826
2827impl FencingToken {
2828 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2830 rand::rng()
2831 .sample_iter(&rand::distr::Alphanumeric)
2832 .take(n)
2833 .map(char::from)
2834 .collect::<String>()
2835 .parse()
2836 }
2837}
2838
2839impl FromStr for FencingToken {
2840 type Err = ValidationError;
2841
2842 fn from_str(s: &str) -> Result<Self, Self::Err> {
2843 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2844 return Err(ValidationError(format!(
2845 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2846 )));
2847 }
2848 Ok(FencingToken(s.to_string()))
2849 }
2850}
2851
2852impl std::fmt::Display for FencingToken {
2853 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2854 write!(f, "{}", self.0)
2855 }
2856}
2857
2858impl Deref for FencingToken {
2859 type Target = str;
2860
2861 fn deref(&self) -> &Self::Target {
2862 &self.0
2863 }
2864}
2865
2866#[derive(Debug, Clone, Copy, PartialEq)]
2867#[non_exhaustive]
2868pub struct StreamPosition {
2870 pub seq_num: u64,
2872 pub timestamp: u64,
2875}
2876
2877impl std::fmt::Display for StreamPosition {
2878 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2879 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2880 }
2881}
2882
2883impl From<api::stream::proto::StreamPosition> for StreamPosition {
2884 fn from(value: api::stream::proto::StreamPosition) -> Self {
2885 Self {
2886 seq_num: value.seq_num,
2887 timestamp: value.timestamp,
2888 }
2889 }
2890}
2891
2892impl From<api::stream::StreamPosition> for StreamPosition {
2893 fn from(value: api::stream::StreamPosition) -> Self {
2894 Self {
2895 seq_num: value.seq_num,
2896 timestamp: value.timestamp,
2897 }
2898 }
2899}
2900
2901#[derive(Debug, Clone, PartialEq)]
2902#[non_exhaustive]
2903pub struct Header {
2905 pub name: Bytes,
2907 pub value: Bytes,
2909}
2910
2911impl Header {
2912 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2914 Self {
2915 name: name.into(),
2916 value: value.into(),
2917 }
2918 }
2919}
2920
2921impl From<Header> for api::stream::proto::Header {
2922 fn from(value: Header) -> Self {
2923 Self {
2924 name: value.name,
2925 value: value.value,
2926 }
2927 }
2928}
2929
2930impl From<api::stream::proto::Header> for Header {
2931 fn from(value: api::stream::proto::Header) -> Self {
2932 Self {
2933 name: value.name,
2934 value: value.value,
2935 }
2936 }
2937}
2938
2939#[derive(Debug, Clone, PartialEq)]
2940pub struct AppendRecord {
2942 body: Bytes,
2943 headers: Vec<Header>,
2944 timestamp: Option<u64>,
2945}
2946
2947impl AppendRecord {
2948 fn validate(self) -> Result<Self, ValidationError> {
2949 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2950 Err(ValidationError(format!(
2951 "metered_bytes: {} exceeds {}",
2952 self.metered_bytes(),
2953 RECORD_BATCH_MAX.bytes
2954 )))
2955 } else {
2956 Ok(self)
2957 }
2958 }
2959
2960 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2962 let record = Self {
2963 body: body.into(),
2964 headers: Vec::default(),
2965 timestamp: None,
2966 };
2967 record.validate()
2968 }
2969
2970 pub fn with_headers(
2972 self,
2973 headers: impl IntoIterator<Item = Header>,
2974 ) -> Result<Self, ValidationError> {
2975 let record = Self {
2976 headers: headers.into_iter().collect(),
2977 ..self
2978 };
2979 record.validate()
2980 }
2981
2982 pub fn with_timestamp(self, timestamp: u64) -> Self {
2986 Self {
2987 timestamp: Some(timestamp),
2988 ..self
2989 }
2990 }
2991
2992 pub fn body(&self) -> &[u8] {
2994 &self.body
2995 }
2996
2997 pub fn headers(&self) -> &[Header] {
2999 &self.headers
3000 }
3001
3002 pub fn timestamp(&self) -> Option<u64> {
3004 self.timestamp
3005 }
3006}
3007
3008impl From<AppendRecord> for api::stream::proto::AppendRecord {
3009 fn from(value: AppendRecord) -> Self {
3010 Self {
3011 timestamp: value.timestamp,
3012 headers: value.headers.into_iter().map(Into::into).collect(),
3013 body: value.body,
3014 }
3015 }
3016}
3017
3018pub trait MeteredBytes {
3025 fn metered_bytes(&self) -> usize;
3027}
3028
3029macro_rules! metered_bytes_impl {
3030 ($ty:ty) => {
3031 impl MeteredBytes for $ty {
3032 fn metered_bytes(&self) -> usize {
3033 8 + (2 * self.headers.len())
3034 + self
3035 .headers
3036 .iter()
3037 .map(|h| h.name.len() + h.value.len())
3038 .sum::<usize>()
3039 + self.body.len()
3040 }
3041 }
3042 };
3043}
3044
3045metered_bytes_impl!(AppendRecord);
3046
3047#[derive(Debug, Clone)]
3048pub struct AppendRecordBatch {
3057 records: Vec<AppendRecord>,
3058 metered_bytes: usize,
3059}
3060
3061impl AppendRecordBatch {
3062 pub(crate) fn with_capacity(capacity: usize) -> Self {
3063 Self {
3064 records: Vec::with_capacity(capacity),
3065 metered_bytes: 0,
3066 }
3067 }
3068
3069 pub(crate) fn push(&mut self, record: AppendRecord) {
3070 self.metered_bytes += record.metered_bytes();
3071 self.records.push(record);
3072 }
3073
3074 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3076 where
3077 I: IntoIterator<Item = AppendRecord>,
3078 {
3079 let mut records = Vec::new();
3080 let mut metered_bytes = 0;
3081
3082 for record in iter {
3083 metered_bytes += record.metered_bytes();
3084 records.push(record);
3085
3086 if metered_bytes > RECORD_BATCH_MAX.bytes {
3087 return Err(ValidationError(format!(
3088 "batch size in metered bytes ({metered_bytes}) exceeds {}",
3089 RECORD_BATCH_MAX.bytes
3090 )));
3091 }
3092
3093 if records.len() > RECORD_BATCH_MAX.count {
3094 return Err(ValidationError(format!(
3095 "number of records in the batch exceeds {}",
3096 RECORD_BATCH_MAX.count
3097 )));
3098 }
3099 }
3100
3101 if records.is_empty() {
3102 return Err(ValidationError("batch is empty".into()));
3103 }
3104
3105 Ok(Self {
3106 records,
3107 metered_bytes,
3108 })
3109 }
3110}
3111
3112impl Deref for AppendRecordBatch {
3113 type Target = [AppendRecord];
3114
3115 fn deref(&self) -> &Self::Target {
3116 &self.records
3117 }
3118}
3119
3120impl MeteredBytes for AppendRecordBatch {
3121 fn metered_bytes(&self) -> usize {
3122 self.metered_bytes
3123 }
3124}
3125
3126#[derive(Debug, Clone)]
3127pub enum Command {
3129 Fence {
3131 fencing_token: FencingToken,
3133 },
3134 Trim {
3136 trim_point: u64,
3138 },
3139}
3140
3141#[derive(Debug, Clone)]
3142#[non_exhaustive]
3143pub struct CommandRecord {
3147 pub command: Command,
3149 pub timestamp: Option<u64>,
3151}
3152
3153impl CommandRecord {
3154 const FENCE: &[u8] = b"fence";
3155 const TRIM: &[u8] = b"trim";
3156
3157 pub fn fence(fencing_token: FencingToken) -> Self {
3162 Self {
3163 command: Command::Fence { fencing_token },
3164 timestamp: None,
3165 }
3166 }
3167
3168 pub fn trim(trim_point: u64) -> Self {
3175 Self {
3176 command: Command::Trim { trim_point },
3177 timestamp: None,
3178 }
3179 }
3180
3181 pub fn with_timestamp(self, timestamp: u64) -> Self {
3183 Self {
3184 timestamp: Some(timestamp),
3185 ..self
3186 }
3187 }
3188}
3189
3190impl From<CommandRecord> for AppendRecord {
3191 fn from(value: CommandRecord) -> Self {
3192 let (header_value, body) = match value.command {
3193 Command::Fence { fencing_token } => (
3194 CommandRecord::FENCE,
3195 Bytes::copy_from_slice(fencing_token.as_bytes()),
3196 ),
3197 Command::Trim { trim_point } => (
3198 CommandRecord::TRIM,
3199 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3200 ),
3201 };
3202 Self {
3203 body,
3204 headers: vec![Header::new("", header_value)],
3205 timestamp: value.timestamp,
3206 }
3207 }
3208}
3209
3210#[derive(Debug, Clone)]
3211#[non_exhaustive]
3212pub struct AppendInput {
3215 pub records: AppendRecordBatch,
3217 pub match_seq_num: Option<u64>,
3221 pub fencing_token: Option<FencingToken>,
3226}
3227
3228impl AppendInput {
3229 pub fn new(records: AppendRecordBatch) -> Self {
3231 Self {
3232 records,
3233 match_seq_num: None,
3234 fencing_token: None,
3235 }
3236 }
3237
3238 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3240 Self {
3241 match_seq_num: Some(match_seq_num),
3242 ..self
3243 }
3244 }
3245
3246 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3248 Self {
3249 fencing_token: Some(fencing_token),
3250 ..self
3251 }
3252 }
3253}
3254
3255impl From<AppendInput> for api::stream::proto::AppendInput {
3256 fn from(value: AppendInput) -> Self {
3257 Self {
3258 records: value.records.iter().cloned().map(Into::into).collect(),
3259 match_seq_num: value.match_seq_num,
3260 fencing_token: value.fencing_token.map(|t| t.to_string()),
3261 }
3262 }
3263}
3264
3265#[derive(Debug, Clone, PartialEq)]
3266#[non_exhaustive]
3267pub struct AppendAck {
3269 pub start: StreamPosition,
3271 pub end: StreamPosition,
3277 pub tail: StreamPosition,
3282}
3283
3284impl From<api::stream::proto::AppendAck> for AppendAck {
3285 fn from(value: api::stream::proto::AppendAck) -> Self {
3286 Self {
3287 start: value.start.unwrap_or_default().into(),
3288 end: value.end.unwrap_or_default().into(),
3289 tail: value.tail.unwrap_or_default().into(),
3290 }
3291 }
3292}
3293
3294#[derive(Debug, Clone, Copy)]
3295pub enum ReadFrom {
3297 SeqNum(u64),
3299 Timestamp(u64),
3301 TailOffset(u64),
3303}
3304
3305impl Default for ReadFrom {
3306 fn default() -> Self {
3307 Self::SeqNum(0)
3308 }
3309}
3310
3311#[derive(Debug, Default, Clone)]
3312#[non_exhaustive]
3313pub struct ReadStart {
3315 pub from: ReadFrom,
3319 pub clamp_to_tail: bool,
3323}
3324
3325impl ReadStart {
3326 pub fn new() -> Self {
3328 Self::default()
3329 }
3330
3331 pub fn with_from(self, from: ReadFrom) -> Self {
3333 Self { from, ..self }
3334 }
3335
3336 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3338 Self {
3339 clamp_to_tail,
3340 ..self
3341 }
3342 }
3343}
3344
3345impl From<ReadStart> for api::stream::ReadStart {
3346 fn from(value: ReadStart) -> Self {
3347 let (seq_num, timestamp, tail_offset) = match value.from {
3348 ReadFrom::SeqNum(n) => (Some(n), None, None),
3349 ReadFrom::Timestamp(t) => (None, Some(t), None),
3350 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3351 };
3352 Self {
3353 seq_num,
3354 timestamp,
3355 tail_offset,
3356 clamp: if value.clamp_to_tail {
3357 Some(true)
3358 } else {
3359 None
3360 },
3361 }
3362 }
3363}
3364
3365#[derive(Debug, Clone, Default)]
3366#[non_exhaustive]
3367pub struct ReadLimits {
3369 pub count: Option<usize>,
3373 pub bytes: Option<usize>,
3377}
3378
3379impl ReadLimits {
3380 pub fn new() -> Self {
3382 Self::default()
3383 }
3384
3385 pub fn with_count(self, count: usize) -> Self {
3387 Self {
3388 count: Some(count),
3389 ..self
3390 }
3391 }
3392
3393 pub fn with_bytes(self, bytes: usize) -> Self {
3395 Self {
3396 bytes: Some(bytes),
3397 ..self
3398 }
3399 }
3400}
3401
3402#[derive(Debug, Clone, Default)]
3403#[non_exhaustive]
3404pub struct ReadStop {
3406 pub limits: ReadLimits,
3410 pub until: Option<RangeTo<u64>>,
3414 pub wait: Option<u32>,
3424}
3425
3426impl ReadStop {
3427 pub fn new() -> Self {
3429 Self::default()
3430 }
3431
3432 pub fn with_limits(self, limits: ReadLimits) -> Self {
3434 Self { limits, ..self }
3435 }
3436
3437 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3439 Self {
3440 until: Some(until),
3441 ..self
3442 }
3443 }
3444
3445 pub fn with_wait(self, wait: u32) -> Self {
3447 Self {
3448 wait: Some(wait),
3449 ..self
3450 }
3451 }
3452}
3453
3454impl From<ReadStop> for api::stream::ReadEnd {
3455 fn from(value: ReadStop) -> Self {
3456 Self {
3457 count: value.limits.count,
3458 bytes: value.limits.bytes,
3459 until: value.until.map(|r| r.end),
3460 wait: value.wait,
3461 }
3462 }
3463}
3464
3465#[derive(Debug, Clone, Default)]
3466#[non_exhaustive]
3467pub struct ReadInput {
3470 pub start: ReadStart,
3474 pub stop: ReadStop,
3478 pub ignore_command_records: bool,
3482}
3483
3484impl ReadInput {
3485 pub fn new() -> Self {
3487 Self::default()
3488 }
3489
3490 pub fn with_start(self, start: ReadStart) -> Self {
3492 Self { start, ..self }
3493 }
3494
3495 pub fn with_stop(self, stop: ReadStop) -> Self {
3497 Self { stop, ..self }
3498 }
3499
3500 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3502 Self {
3503 ignore_command_records,
3504 ..self
3505 }
3506 }
3507}
3508
3509#[derive(Debug, Clone)]
3510#[non_exhaustive]
3511pub struct SequencedRecord {
3513 pub seq_num: u64,
3515 pub body: Bytes,
3517 pub headers: Vec<Header>,
3519 pub timestamp: u64,
3521}
3522
3523impl SequencedRecord {
3524 pub fn is_command_record(&self) -> bool {
3526 self.headers.len() == 1 && *self.headers[0].name == *b""
3527 }
3528}
3529
3530impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3531 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3532 Self {
3533 seq_num: value.seq_num,
3534 body: value.body,
3535 headers: value.headers.into_iter().map(Into::into).collect(),
3536 timestamp: value.timestamp,
3537 }
3538 }
3539}
3540
3541metered_bytes_impl!(SequencedRecord);
3542
3543#[derive(Debug, Clone)]
3544#[non_exhaustive]
3545pub struct ReadBatch {
3548 pub records: Vec<SequencedRecord>,
3555 pub tail: Option<StreamPosition>,
3560}
3561
3562impl ReadBatch {
3563 pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3564 Self {
3565 records: batch.records.into_iter().map(Into::into).collect(),
3566 tail: batch.tail.map(Into::into),
3567 }
3568 }
3569}
3570
3571pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3573
3574#[derive(Debug, Clone, thiserror::Error)]
3575pub enum AppendConditionFailed {
3577 #[error("fencing token mismatch, expected: {0}")]
3578 FencingTokenMismatch(FencingToken),
3580 #[error("sequence number mismatch, expected: {0}")]
3581 SeqNumMismatch(u64),
3583}
3584
3585impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3586 fn from(value: api::stream::AppendConditionFailed) -> Self {
3587 match value {
3588 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3589 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3590 }
3591 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3592 AppendConditionFailed::SeqNumMismatch(seq)
3593 }
3594 }
3595 }
3596}
3597
3598#[derive(Debug, Clone, thiserror::Error)]
3599pub enum S2Error {
3601 #[error("{0}")]
3602 Client(String),
3604 #[error(transparent)]
3605 Validation(#[from] ValidationError),
3607 #[error("{0}")]
3608 AppendConditionFailed(AppendConditionFailed),
3610 #[error("read from an unwritten position. current tail: {0}")]
3611 ReadUnwritten(StreamPosition),
3613 #[error("{0}")]
3614 Server(ErrorResponse),
3616}
3617
3618impl From<ApiError> for S2Error {
3619 fn from(err: ApiError) -> Self {
3620 match err {
3621 ApiError::ReadUnwritten(tail_response) => {
3622 Self::ReadUnwritten(tail_response.tail.into())
3623 }
3624 ApiError::AppendConditionFailed(condition_failed) => {
3625 Self::AppendConditionFailed(condition_failed.into())
3626 }
3627 ApiError::Server(_, response) => Self::Server(response.into()),
3628 other => Self::Client(other.to_string()),
3629 }
3630 }
3631}
3632
3633#[derive(Debug, Clone, thiserror::Error)]
3634#[error("{code}: {message}")]
3635#[non_exhaustive]
3636pub struct ErrorResponse {
3638 pub code: String,
3640 pub message: String,
3642}
3643
3644impl From<ApiErrorResponse> for ErrorResponse {
3645 fn from(response: ApiErrorResponse) -> Self {
3646 Self {
3647 code: response.code,
3648 message: response.message,
3649 }
3650 }
3651}
3652
3653fn idempotency_token() -> String {
3654 uuid::Uuid::new_v4().simple().to_string()
3655}