1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 sync::Arc,
12 time::Duration,
13};
14
15use bytes::Bytes;
16use http::{
17 header::HeaderValue,
18 uri::{Authority, Scheme},
19};
20use rand::RngExt;
21use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::location::LocationName;
46pub use s2_common::types::stream::StreamName;
50pub use s2_common::types::stream::StreamNamePrefix;
52pub use s2_common::types::stream::StreamNameStartAfter;
54pub use s2_common::{
55 caps::RECORD_BATCH_MAX,
56 encryption::{EncryptionAlgorithm, EncryptionKey},
57};
58
59pub(crate) const ONE_MIB: u32 = 1024 * 1024;
60
61use s2_common::{
62 maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH, types::resources::ProvisionResult,
63};
64use secrecy::SecretString;
65
66use crate::api::{ApiError, ApiErrorResponse};
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub struct S2DateTime(time::OffsetDateTime);
75
76impl TryFrom<time::OffsetDateTime> for S2DateTime {
77 type Error = ValidationError;
78
79 fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
80 dt.format(&time::format_description::well_known::Rfc3339)
81 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
82 Ok(Self(dt))
83 }
84}
85
86impl From<S2DateTime> for time::OffsetDateTime {
87 fn from(dt: S2DateTime) -> Self {
88 dt.0
89 }
90}
91
92impl FromStr for S2DateTime {
93 type Err = ValidationError;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
97 .map(Self)
98 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
99 }
100}
101
102impl fmt::Display for S2DateTime {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 write!(
105 f,
106 "{}",
107 self.0
108 .format(&time::format_description::well_known::Rfc3339)
109 .expect("RFC3339 formatting should not fail for S2DateTime")
110 )
111 }
112}
113
114#[derive(Debug, Clone, PartialEq)]
116pub(crate) enum BasinAuthority {
117 ParentZone(Authority),
119 Direct(Authority),
121}
122
123#[derive(Debug, Clone)]
125pub struct AccountEndpoint {
126 scheme: Scheme,
127 authority: Authority,
128}
129
130impl AccountEndpoint {
131 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
133 endpoint.parse()
134 }
135}
136
137impl FromStr for AccountEndpoint {
138 type Err = ValidationError;
139
140 fn from_str(s: &str) -> Result<Self, Self::Err> {
141 let (scheme, authority) = match s.find("://") {
142 Some(idx) => {
143 let scheme: Scheme = s[..idx]
144 .parse()
145 .map_err(|_| "invalid account endpoint scheme".to_string())?;
146 (scheme, &s[idx + 3..])
147 }
148 None => (Scheme::HTTPS, s),
149 };
150 Ok(Self {
151 scheme,
152 authority: authority
153 .parse()
154 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
155 })
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct BasinEndpoint {
162 scheme: Scheme,
163 authority: BasinAuthority,
164}
165
166impl BasinEndpoint {
167 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
169 endpoint.parse()
170 }
171}
172
173impl FromStr for BasinEndpoint {
174 type Err = ValidationError;
175
176 fn from_str(s: &str) -> Result<Self, Self::Err> {
177 let (scheme, authority) = match s.find("://") {
178 Some(idx) => {
179 let scheme: Scheme = s[..idx]
180 .parse()
181 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
182 (scheme, &s[idx + 3..])
183 }
184 None => (Scheme::HTTPS, s),
185 };
186 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
187 BasinAuthority::ParentZone(
188 authority
189 .parse()
190 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
191 )
192 } else {
193 BasinAuthority::Direct(
194 authority
195 .parse()
196 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
197 )
198 };
199 Ok(Self { scheme, authority })
200 }
201}
202
203#[derive(Debug, Clone)]
204#[non_exhaustive]
205pub struct S2Endpoints {
207 pub(crate) scheme: Scheme,
208 pub(crate) account_authority: Authority,
209 pub(crate) basin_authority: BasinAuthority,
210}
211
212impl S2Endpoints {
213 pub fn new(
215 account_endpoint: AccountEndpoint,
216 basin_endpoint: BasinEndpoint,
217 ) -> Result<Self, ValidationError> {
218 if account_endpoint.scheme != basin_endpoint.scheme {
219 return Err("account and basin endpoints must have the same scheme".into());
220 }
221 Ok(Self {
222 scheme: account_endpoint.scheme,
223 account_authority: account_endpoint.authority,
224 basin_authority: basin_endpoint.authority,
225 })
226 }
227
228 pub fn from_env() -> Result<Self, ValidationError> {
234 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
235 Ok(endpoint) => endpoint.parse()?,
236 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
237 Err(VarError::NotUnicode(_)) => {
238 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
239 }
240 };
241
242 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
243 Ok(endpoint) => endpoint.parse()?,
244 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
245 Err(VarError::NotUnicode(_)) => {
246 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
247 }
248 };
249
250 if account_endpoint.scheme != basin_endpoint.scheme {
251 return Err(
252 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
253 );
254 }
255
256 Ok(Self {
257 scheme: account_endpoint.scheme,
258 account_authority: account_endpoint.authority,
259 basin_authority: basin_endpoint.authority,
260 })
261 }
262
263 pub(crate) fn for_aws() -> Self {
264 Self {
265 scheme: Scheme::HTTPS,
266 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
267 basin_authority: BasinAuthority::ParentZone(
268 "b.s2.dev".try_into().expect("valid authority"),
269 ),
270 }
271 }
272}
273
274#[derive(Debug, Clone, Copy)]
275pub enum Compression {
277 None,
279 Gzip,
281 Zstd,
283}
284
285impl From<Compression> for CompressionAlgorithm {
286 fn from(value: Compression) -> Self {
287 match value {
288 Compression::None => CompressionAlgorithm::None,
289 Compression::Gzip => CompressionAlgorithm::Gzip,
290 Compression::Zstd => CompressionAlgorithm::Zstd,
291 }
292 }
293}
294
295#[derive(Debug, Clone, Copy, PartialEq)]
296#[non_exhaustive]
297pub enum AppendRetryPolicy {
300 All,
302 NoSideEffects,
312}
313
314#[derive(Debug, Clone)]
315#[non_exhaustive]
316pub struct RetryConfig {
325 pub max_attempts: NonZeroU32,
329 pub min_base_delay: Duration,
333 pub max_base_delay: Duration,
337 pub append_retry_policy: AppendRetryPolicy,
342}
343
344impl Default for RetryConfig {
345 fn default() -> Self {
346 Self {
347 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
348 min_base_delay: Duration::from_millis(100),
349 max_base_delay: Duration::from_secs(1),
350 append_retry_policy: AppendRetryPolicy::All,
351 }
352 }
353}
354
355impl RetryConfig {
356 pub fn new() -> Self {
358 Self::default()
359 }
360
361 pub(crate) fn max_retries(&self) -> u32 {
362 self.max_attempts.get() - 1
363 }
364
365 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
367 Self {
368 max_attempts,
369 ..self
370 }
371 }
372
373 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
375 Self {
376 min_base_delay,
377 ..self
378 }
379 }
380
381 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
383 Self {
384 max_base_delay,
385 ..self
386 }
387 }
388
389 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
392 Self {
393 append_retry_policy,
394 ..self
395 }
396 }
397}
398
399#[derive(Debug, Clone)]
400#[non_exhaustive]
401pub struct S2Config {
403 pub(crate) access_token: SecretString,
404 pub(crate) endpoints: S2Endpoints,
405 pub(crate) connection_timeout: Duration,
406 pub(crate) request_timeout: Duration,
407 pub(crate) retry: RetryConfig,
408 pub(crate) compression: Compression,
409 pub(crate) user_agent: HeaderValue,
410 pub(crate) insecure_skip_cert_verification: bool,
411 pub(crate) rustls_crypto_provider: Option<Arc<rustls::crypto::CryptoProvider>>,
412}
413
414impl S2Config {
415 pub fn new(access_token: impl Into<String>) -> Self {
417 Self {
418 access_token: access_token.into().into(),
419 endpoints: S2Endpoints::for_aws(),
420 connection_timeout: Duration::from_secs(3),
421 request_timeout: Duration::from_secs(5),
422 retry: RetryConfig::new(),
423 compression: Compression::None,
424 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
425 .parse()
426 .expect("valid user agent"),
427 insecure_skip_cert_verification: false,
428 rustls_crypto_provider: default_rustls_crypto_provider(),
429 }
430 }
431
432 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
434 Self { endpoints, ..self }
435 }
436
437 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
441 Self {
442 connection_timeout,
443 ..self
444 }
445 }
446
447 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
451 Self {
452 request_timeout,
453 ..self
454 }
455 }
456
457 pub fn with_retry(self, retry: RetryConfig) -> Self {
461 Self { retry, ..self }
462 }
463
464 pub fn with_compression(self, compression: Compression) -> Self {
468 Self {
469 compression,
470 ..self
471 }
472 }
473
474 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
486 Self {
487 insecure_skip_cert_verification: skip,
488 ..self
489 }
490 }
491
492 pub fn with_rustls_crypto_provider(
502 self,
503 provider: impl Into<Arc<rustls::crypto::CryptoProvider>>,
504 ) -> Self {
505 Self {
506 rustls_crypto_provider: Some(provider.into()),
507 ..self
508 }
509 }
510
511 #[cfg(feature = "rustls-aws-lc-rs")]
515 pub fn with_rustls_aws_lc_rs_crypto_provider(self) -> Self {
516 self.with_rustls_crypto_provider(rustls::crypto::aws_lc_rs::default_provider())
517 }
518
519 #[cfg(feature = "rustls-ring")]
523 pub fn with_rustls_ring_crypto_provider(self) -> Self {
524 self.with_rustls_crypto_provider(rustls::crypto::ring::default_provider())
525 }
526
527 #[doc(hidden)]
528 #[cfg(feature = "_hidden")]
529 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
530 let user_agent = user_agent
531 .into()
532 .parse()
533 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
534 Ok(Self { user_agent, ..self })
535 }
536}
537
538#[cfg(feature = "rustls-aws-lc-rs")]
539fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
540 Some(Arc::new(rustls::crypto::aws_lc_rs::default_provider()))
541}
542
543#[cfg(all(not(feature = "rustls-aws-lc-rs"), feature = "rustls-ring"))]
544fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
545 Some(Arc::new(rustls::crypto::ring::default_provider()))
546}
547
548#[cfg(all(not(feature = "rustls-aws-lc-rs"), not(feature = "rustls-ring")))]
549fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
550 None
551}
552
553#[derive(Debug, Default, Clone, PartialEq, Eq)]
554#[non_exhaustive]
555pub struct Page<T> {
557 pub values: Vec<T>,
559 pub has_more: bool,
561}
562
563impl<T> Page<T> {
564 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
565 Self {
566 values: values.into(),
567 has_more,
568 }
569 }
570}
571
572#[derive(Debug, Clone, Copy, PartialEq, Eq)]
573pub enum StorageClass {
575 Standard,
577 Express,
579}
580
581impl From<api::config::StorageClass> for StorageClass {
582 fn from(value: api::config::StorageClass) -> Self {
583 match value {
584 api::config::StorageClass::Standard => StorageClass::Standard,
585 api::config::StorageClass::Express => StorageClass::Express,
586 }
587 }
588}
589
590impl From<StorageClass> for api::config::StorageClass {
591 fn from(value: StorageClass) -> Self {
592 match value {
593 StorageClass::Standard => api::config::StorageClass::Standard,
594 StorageClass::Express => api::config::StorageClass::Express,
595 }
596 }
597}
598
599#[derive(Debug, Clone, Copy, PartialEq, Eq)]
600pub enum RetentionPolicy {
602 Age(u64),
604 Infinite,
606}
607
608impl From<api::config::RetentionPolicy> for RetentionPolicy {
609 fn from(value: api::config::RetentionPolicy) -> Self {
610 match value {
611 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
612 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
613 }
614 }
615}
616
617impl From<RetentionPolicy> for api::config::RetentionPolicy {
618 fn from(value: RetentionPolicy) -> Self {
619 match value {
620 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
621 RetentionPolicy::Infinite => {
622 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
623 }
624 }
625 }
626}
627
628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
629pub enum TimestampingMode {
631 ClientPrefer,
633 ClientRequire,
635 Arrival,
637}
638
639impl From<api::config::TimestampingMode> for TimestampingMode {
640 fn from(value: api::config::TimestampingMode) -> Self {
641 match value {
642 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
643 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
644 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
645 }
646 }
647}
648
649impl From<TimestampingMode> for api::config::TimestampingMode {
650 fn from(value: TimestampingMode) -> Self {
651 match value {
652 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
653 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
654 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
655 }
656 }
657}
658
659#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
660#[non_exhaustive]
661pub struct TimestampingConfig {
663 pub mode: Option<TimestampingMode>,
667 pub uncapped: Option<bool>,
671}
672
673impl TimestampingConfig {
674 pub fn new() -> Self {
676 Self::default()
677 }
678
679 pub fn with_mode(self, mode: TimestampingMode) -> Self {
681 Self {
682 mode: Some(mode),
683 ..self
684 }
685 }
686
687 pub fn with_uncapped(self, uncapped: bool) -> Self {
689 Self {
690 uncapped: Some(uncapped),
691 ..self
692 }
693 }
694}
695
696impl From<api::config::TimestampingConfig> for TimestampingConfig {
697 fn from(value: api::config::TimestampingConfig) -> Self {
698 Self {
699 mode: value.mode.map(Into::into),
700 uncapped: value.uncapped,
701 }
702 }
703}
704
705impl From<TimestampingConfig> for api::config::TimestampingConfig {
706 fn from(value: TimestampingConfig) -> Self {
707 Self {
708 mode: value.mode.map(Into::into),
709 uncapped: value.uncapped,
710 }
711 }
712}
713
714#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
715#[non_exhaustive]
716pub struct DeleteOnEmptyConfig {
718 pub min_age_secs: u64,
722}
723
724impl DeleteOnEmptyConfig {
725 pub fn new() -> Self {
727 Self::default()
728 }
729
730 pub fn with_min_age(self, min_age: Duration) -> Self {
732 Self {
733 min_age_secs: min_age.as_secs(),
734 }
735 }
736}
737
738impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
739 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
740 Self {
741 min_age_secs: value.min_age_secs,
742 }
743 }
744}
745
746impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
747 fn from(value: DeleteOnEmptyConfig) -> Self {
748 Self {
749 min_age_secs: value.min_age_secs,
750 }
751 }
752}
753
754#[derive(Debug, Clone, Default, PartialEq, Eq)]
755#[non_exhaustive]
756pub struct StreamConfig {
758 pub storage_class: Option<StorageClass>,
762 pub retention_policy: Option<RetentionPolicy>,
766 pub timestamping: Option<TimestampingConfig>,
770 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
774}
775
776impl StreamConfig {
777 pub fn new() -> Self {
779 Self::default()
780 }
781
782 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
784 Self {
785 storage_class: Some(storage_class),
786 ..self
787 }
788 }
789
790 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
792 Self {
793 retention_policy: Some(retention_policy),
794 ..self
795 }
796 }
797
798 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
800 Self {
801 timestamping: Some(timestamping),
802 ..self
803 }
804 }
805
806 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
808 Self {
809 delete_on_empty: Some(delete_on_empty),
810 ..self
811 }
812 }
813}
814
815impl From<api::config::StreamConfig> for StreamConfig {
816 fn from(value: api::config::StreamConfig) -> Self {
817 Self {
818 storage_class: value.storage_class.map(Into::into),
819 retention_policy: value.retention_policy.map(Into::into),
820 timestamping: value.timestamping.map(Into::into),
821 delete_on_empty: value.delete_on_empty.map(Into::into),
822 }
823 }
824}
825
826impl From<StreamConfig> for api::config::StreamConfig {
827 fn from(value: StreamConfig) -> Self {
828 Self {
829 storage_class: value.storage_class.map(Into::into),
830 retention_policy: value.retention_policy.map(Into::into),
831 timestamping: value.timestamping.map(Into::into),
832 delete_on_empty: value.delete_on_empty.map(Into::into),
833 }
834 }
835}
836
837#[derive(Debug, Clone, Default, PartialEq, Eq)]
838#[non_exhaustive]
839pub struct BasinConfig {
841 pub default_stream_config: Option<StreamConfig>,
845 pub stream_cipher: Option<EncryptionAlgorithm>,
847 pub create_stream_on_append: bool,
851 pub create_stream_on_read: bool,
855}
856
857impl BasinConfig {
858 pub fn new() -> Self {
860 Self::default()
861 }
862
863 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
865 Self {
866 default_stream_config: Some(config),
867 ..self
868 }
869 }
870
871 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
873 Self {
874 stream_cipher: Some(stream_cipher),
875 ..self
876 }
877 }
878
879 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
882 Self {
883 create_stream_on_append,
884 ..self
885 }
886 }
887
888 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
890 Self {
891 create_stream_on_read,
892 ..self
893 }
894 }
895}
896
897impl From<api::config::BasinConfig> for BasinConfig {
898 fn from(value: api::config::BasinConfig) -> Self {
899 Self {
900 default_stream_config: value.default_stream_config.map(Into::into),
901 stream_cipher: value.stream_cipher.map(Into::into),
902 create_stream_on_append: value.create_stream_on_append,
903 create_stream_on_read: value.create_stream_on_read,
904 }
905 }
906}
907
908impl From<BasinConfig> for api::config::BasinConfig {
909 fn from(value: BasinConfig) -> Self {
910 Self {
911 default_stream_config: value.default_stream_config.map(Into::into),
912 stream_cipher: value.stream_cipher.map(Into::into),
913 create_stream_on_append: value.create_stream_on_append,
914 create_stream_on_read: value.create_stream_on_read,
915 }
916 }
917}
918
919#[derive(Debug, Clone)]
920#[non_exhaustive]
921pub struct CreateBasinInput {
923 pub name: BasinName,
925 pub config: Option<BasinConfig>,
929 pub location: Option<LocationName>,
933 idempotency_token: String,
934}
935
936impl CreateBasinInput {
937 pub fn new(name: BasinName) -> Self {
939 Self {
940 name,
941 config: None,
942 location: None,
943 idempotency_token: idempotency_token(),
944 }
945 }
946
947 pub fn with_config(self, config: BasinConfig) -> Self {
949 Self {
950 config: Some(config),
951 ..self
952 }
953 }
954
955 pub fn with_location<S>(self, location: S) -> Result<Self, ValidationError>
957 where
958 S: TryInto<LocationName>,
959 S::Error: fmt::Display,
960 {
961 let location = location
962 .try_into()
963 .map_err(|e| ValidationError(e.to_string()))?;
964 Ok(Self {
965 location: Some(location),
966 ..self
967 })
968 }
969}
970
971impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
972 fn from(value: CreateBasinInput) -> Self {
973 (
974 api::basin::CreateBasinRequest {
975 basin: value.name,
976 config: value.config.map(Into::into),
977 location: value.location,
978 },
979 value.idempotency_token,
980 )
981 }
982}
983
984#[derive(Debug, Clone)]
985#[non_exhaustive]
986pub struct EnsureBasinInput {
988 pub name: BasinName,
990 pub config: Option<BasinConfig>,
994 pub location: Option<LocationName>,
999}
1000
1001impl EnsureBasinInput {
1002 pub fn new(name: BasinName) -> Self {
1004 Self {
1005 name,
1006 config: None,
1007 location: None,
1008 }
1009 }
1010
1011 pub fn with_config(self, config: BasinConfig) -> Self {
1013 Self {
1014 config: Some(config),
1015 ..self
1016 }
1017 }
1018
1019 pub fn with_location<S>(self, location: S) -> Result<Self, ValidationError>
1021 where
1022 S: TryInto<LocationName>,
1023 S::Error: fmt::Display,
1024 {
1025 let location = location
1026 .try_into()
1027 .map_err(|e| ValidationError(e.to_string()))?;
1028 Ok(Self {
1029 location: Some(location),
1030 ..self
1031 })
1032 }
1033}
1034
1035impl From<EnsureBasinInput> for (BasinName, Option<api::basin::EnsureBasinRequest>) {
1036 fn from(value: EnsureBasinInput) -> Self {
1037 let config = value.config;
1038 let request = if config.is_some() || value.location.is_some() {
1039 Some(api::basin::EnsureBasinRequest {
1040 config: config.map(Into::into),
1041 location: value.location,
1042 })
1043 } else {
1044 None
1045 };
1046 (value.name, request)
1047 }
1048}
1049
1050#[derive(Debug, Clone)]
1051pub enum EnsureOutput<T> {
1054 Created(T),
1056 ConfigUpdated(T),
1058 ConfigUnchanged(T),
1060}
1061
1062impl<T> From<ProvisionResult<T>> for EnsureOutput<T> {
1063 fn from(result: ProvisionResult<T>) -> Self {
1064 match result {
1065 ProvisionResult::Created(info) => EnsureOutput::Created(info),
1066 ProvisionResult::Updated(info) => EnsureOutput::ConfigUpdated(info),
1067 ProvisionResult::Noop(info) => EnsureOutput::ConfigUnchanged(info),
1068 }
1069 }
1070}
1071
1072#[derive(Debug, Clone, Default)]
1073#[non_exhaustive]
1074pub struct ListBasinsInput {
1076 pub prefix: BasinNamePrefix,
1080 pub start_after: BasinNameStartAfter,
1086 pub limit: Option<usize>,
1090}
1091
1092impl ListBasinsInput {
1093 pub fn new() -> Self {
1095 Self::default()
1096 }
1097
1098 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1100 Self { prefix, ..self }
1101 }
1102
1103 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1106 Self {
1107 start_after,
1108 ..self
1109 }
1110 }
1111
1112 pub fn with_limit(self, limit: usize) -> Self {
1114 Self {
1115 limit: Some(limit),
1116 ..self
1117 }
1118 }
1119}
1120
1121impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1122 fn from(value: ListBasinsInput) -> Self {
1123 Self {
1124 prefix: Some(value.prefix),
1125 start_after: Some(value.start_after),
1126 limit: value.limit,
1127 }
1128 }
1129}
1130
1131#[derive(Debug, Clone, Default)]
1132pub struct ListAllBasinsInput {
1134 pub prefix: BasinNamePrefix,
1138 pub start_after: BasinNameStartAfter,
1144 pub include_deleted: bool,
1148}
1149
1150impl ListAllBasinsInput {
1151 pub fn new() -> Self {
1153 Self::default()
1154 }
1155
1156 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1158 Self { prefix, ..self }
1159 }
1160
1161 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1164 Self {
1165 start_after,
1166 ..self
1167 }
1168 }
1169
1170 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1172 Self {
1173 include_deleted,
1174 ..self
1175 }
1176 }
1177}
1178
1179#[derive(Debug, Clone, PartialEq, Eq)]
1180#[non_exhaustive]
1181pub struct BasinInfo {
1183 pub name: BasinName,
1185 pub location: Option<LocationName>,
1187 pub created_at: S2DateTime,
1189 pub deleted_at: Option<S2DateTime>,
1191}
1192
1193impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1194 type Error = ValidationError;
1195
1196 fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1197 Ok(Self {
1198 name: value.name,
1199 location: value.location,
1200 created_at: value.created_at.try_into()?,
1201 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1202 })
1203 }
1204}
1205
1206#[derive(Debug, Clone)]
1207#[non_exhaustive]
1208pub struct DeleteBasinInput {
1210 pub name: BasinName,
1212 pub ignore_not_found: bool,
1214}
1215
1216impl DeleteBasinInput {
1217 pub fn new(name: BasinName) -> Self {
1219 Self {
1220 name,
1221 ignore_not_found: false,
1222 }
1223 }
1224
1225 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1227 Self {
1228 ignore_not_found,
1229 ..self
1230 }
1231 }
1232}
1233
1234#[derive(Debug, Clone, Default)]
1235#[non_exhaustive]
1236pub struct TimestampingReconfiguration {
1238 pub mode: Maybe<Option<TimestampingMode>>,
1240 pub uncapped: Maybe<Option<bool>>,
1242}
1243
1244impl TimestampingReconfiguration {
1245 pub fn new() -> Self {
1247 Self::default()
1248 }
1249
1250 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1252 Self {
1253 mode: Maybe::Specified(Some(mode)),
1254 ..self
1255 }
1256 }
1257
1258 pub fn with_uncapped(self, uncapped: bool) -> Self {
1260 Self {
1261 uncapped: Maybe::Specified(Some(uncapped)),
1262 ..self
1263 }
1264 }
1265}
1266
1267impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1268 fn from(value: TimestampingReconfiguration) -> Self {
1269 Self {
1270 mode: value.mode.map(|m| m.map(Into::into)),
1271 uncapped: value.uncapped,
1272 }
1273 }
1274}
1275
1276#[derive(Debug, Clone, Default)]
1277#[non_exhaustive]
1278pub struct DeleteOnEmptyReconfiguration {
1280 pub min_age_secs: Maybe<Option<u64>>,
1282}
1283
1284impl DeleteOnEmptyReconfiguration {
1285 pub fn new() -> Self {
1287 Self::default()
1288 }
1289
1290 pub fn with_min_age(self, min_age: Duration) -> Self {
1292 Self {
1293 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1294 }
1295 }
1296}
1297
1298impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1299 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1300 Self {
1301 min_age_secs: value.min_age_secs,
1302 }
1303 }
1304}
1305
1306#[derive(Debug, Clone, Default)]
1307#[non_exhaustive]
1308pub struct StreamReconfiguration {
1310 pub storage_class: Maybe<Option<StorageClass>>,
1312 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1314 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1316 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1318}
1319
1320impl StreamReconfiguration {
1321 pub fn new() -> Self {
1323 Self::default()
1324 }
1325
1326 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1328 Self {
1329 storage_class: Maybe::Specified(Some(storage_class)),
1330 ..self
1331 }
1332 }
1333
1334 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1336 Self {
1337 retention_policy: Maybe::Specified(Some(retention_policy)),
1338 ..self
1339 }
1340 }
1341
1342 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1344 Self {
1345 timestamping: Maybe::Specified(Some(timestamping)),
1346 ..self
1347 }
1348 }
1349
1350 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1352 Self {
1353 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1354 ..self
1355 }
1356 }
1357}
1358
1359impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1360 fn from(value: StreamReconfiguration) -> Self {
1361 Self {
1362 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1363 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1364 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1365 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1366 }
1367 }
1368}
1369
1370#[derive(Debug, Clone, Default)]
1371#[non_exhaustive]
1372pub struct BasinReconfiguration {
1374 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1376 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1378 pub create_stream_on_append: Maybe<bool>,
1381 pub create_stream_on_read: Maybe<bool>,
1383}
1384
1385impl BasinReconfiguration {
1386 pub fn new() -> Self {
1388 Self::default()
1389 }
1390
1391 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1394 Self {
1395 default_stream_config: Maybe::Specified(Some(config)),
1396 ..self
1397 }
1398 }
1399
1400 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1402 Self {
1403 stream_cipher: Maybe::Specified(Some(stream_cipher)),
1404 ..self
1405 }
1406 }
1407
1408 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1411 Self {
1412 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1413 ..self
1414 }
1415 }
1416
1417 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1420 Self {
1421 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1422 ..self
1423 }
1424 }
1425}
1426
1427impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1428 fn from(value: BasinReconfiguration) -> Self {
1429 Self {
1430 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1431 stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1432 create_stream_on_append: value.create_stream_on_append,
1433 create_stream_on_read: value.create_stream_on_read,
1434 }
1435 }
1436}
1437
1438#[derive(Debug, Clone)]
1439#[non_exhaustive]
1440pub struct ReconfigureBasinInput {
1442 pub name: BasinName,
1444 pub config: BasinReconfiguration,
1446}
1447
1448impl ReconfigureBasinInput {
1449 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1451 Self { name, config }
1452 }
1453}
1454
1455#[derive(Debug, Clone, Default)]
1456#[non_exhaustive]
1457pub struct ListAccessTokensInput {
1459 pub prefix: AccessTokenIdPrefix,
1463 pub start_after: AccessTokenIdStartAfter,
1469 pub limit: Option<usize>,
1473}
1474
1475impl ListAccessTokensInput {
1476 pub fn new() -> Self {
1478 Self::default()
1479 }
1480
1481 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1483 Self { prefix, ..self }
1484 }
1485
1486 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1489 Self {
1490 start_after,
1491 ..self
1492 }
1493 }
1494
1495 pub fn with_limit(self, limit: usize) -> Self {
1497 Self {
1498 limit: Some(limit),
1499 ..self
1500 }
1501 }
1502}
1503
1504impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1505 fn from(value: ListAccessTokensInput) -> Self {
1506 Self {
1507 prefix: Some(value.prefix),
1508 start_after: Some(value.start_after),
1509 limit: value.limit,
1510 }
1511 }
1512}
1513
1514#[derive(Debug, Clone, Default)]
1515pub struct ListAllAccessTokensInput {
1517 pub prefix: AccessTokenIdPrefix,
1521 pub start_after: AccessTokenIdStartAfter,
1527}
1528
1529impl ListAllAccessTokensInput {
1530 pub fn new() -> Self {
1532 Self::default()
1533 }
1534
1535 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1537 Self { prefix, ..self }
1538 }
1539
1540 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1543 Self {
1544 start_after,
1545 ..self
1546 }
1547 }
1548}
1549
1550#[derive(Debug, Clone, PartialEq, Eq)]
1551#[non_exhaustive]
1552pub struct LocationInfo {
1554 pub name: LocationName,
1556 pub is_private: bool,
1558}
1559
1560impl From<api::location::LocationInfo> for LocationInfo {
1561 fn from(value: api::location::LocationInfo) -> Self {
1562 Self {
1563 name: value.name,
1564 is_private: value.is_private,
1565 }
1566 }
1567}
1568
1569#[derive(Debug, Clone)]
1570#[non_exhaustive]
1571pub struct AccessTokenInfo {
1573 pub id: AccessTokenId,
1575 pub expires_at: S2DateTime,
1577 pub auto_prefix_streams: bool,
1580 pub scope: AccessTokenScope,
1582}
1583
1584impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1585 type Error = ValidationError;
1586
1587 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1588 let expires_at = value
1589 .expires_at
1590 .map(S2DateTime::try_from)
1591 .transpose()?
1592 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1593 Ok(Self {
1594 id: value.id,
1595 expires_at,
1596 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1597 scope: value.scope.into(),
1598 })
1599 }
1600}
1601
1602#[derive(Debug, Clone)]
1603pub enum BasinMatcher {
1607 None,
1609 Exact(BasinName),
1611 Prefix(BasinNamePrefix),
1613}
1614
1615#[derive(Debug, Clone)]
1616pub enum StreamMatcher {
1620 None,
1622 Exact(StreamName),
1624 Prefix(StreamNamePrefix),
1626}
1627
1628#[derive(Debug, Clone)]
1629pub enum AccessTokenMatcher {
1633 None,
1635 Exact(AccessTokenId),
1637 Prefix(AccessTokenIdPrefix),
1639}
1640
1641#[derive(Debug, Clone, Default)]
1642#[non_exhaustive]
1643pub struct ReadWritePermissions {
1645 pub read: bool,
1649 pub write: bool,
1653}
1654
1655impl ReadWritePermissions {
1656 pub fn new() -> Self {
1658 Self::default()
1659 }
1660
1661 pub fn read_only() -> Self {
1663 Self {
1664 read: true,
1665 write: false,
1666 }
1667 }
1668
1669 pub fn write_only() -> Self {
1671 Self {
1672 read: false,
1673 write: true,
1674 }
1675 }
1676
1677 pub fn read_write() -> Self {
1679 Self {
1680 read: true,
1681 write: true,
1682 }
1683 }
1684}
1685
1686impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1687 fn from(value: ReadWritePermissions) -> Self {
1688 Self {
1689 read: Some(value.read),
1690 write: Some(value.write),
1691 }
1692 }
1693}
1694
1695impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1696 fn from(value: api::access::ReadWritePermissions) -> Self {
1697 Self {
1698 read: value.read.unwrap_or_default(),
1699 write: value.write.unwrap_or_default(),
1700 }
1701 }
1702}
1703
1704#[derive(Debug, Clone, Default)]
1705#[non_exhaustive]
1706pub struct OperationGroupPermissions {
1710 pub account: Option<ReadWritePermissions>,
1714 pub basin: Option<ReadWritePermissions>,
1718 pub stream: Option<ReadWritePermissions>,
1722}
1723
1724impl OperationGroupPermissions {
1725 pub fn new() -> Self {
1727 Self::default()
1728 }
1729
1730 pub fn read_only_all() -> Self {
1732 Self {
1733 account: Some(ReadWritePermissions::read_only()),
1734 basin: Some(ReadWritePermissions::read_only()),
1735 stream: Some(ReadWritePermissions::read_only()),
1736 }
1737 }
1738
1739 pub fn write_only_all() -> Self {
1741 Self {
1742 account: Some(ReadWritePermissions::write_only()),
1743 basin: Some(ReadWritePermissions::write_only()),
1744 stream: Some(ReadWritePermissions::write_only()),
1745 }
1746 }
1747
1748 pub fn read_write_all() -> Self {
1750 Self {
1751 account: Some(ReadWritePermissions::read_write()),
1752 basin: Some(ReadWritePermissions::read_write()),
1753 stream: Some(ReadWritePermissions::read_write()),
1754 }
1755 }
1756
1757 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1759 Self {
1760 account: Some(account),
1761 ..self
1762 }
1763 }
1764
1765 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1767 Self {
1768 basin: Some(basin),
1769 ..self
1770 }
1771 }
1772
1773 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1775 Self {
1776 stream: Some(stream),
1777 ..self
1778 }
1779 }
1780}
1781
1782impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1783 fn from(value: OperationGroupPermissions) -> Self {
1784 Self {
1785 account: value.account.map(Into::into),
1786 basin: value.basin.map(Into::into),
1787 stream: value.stream.map(Into::into),
1788 }
1789 }
1790}
1791
1792impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1793 fn from(value: api::access::PermittedOperationGroups) -> Self {
1794 Self {
1795 account: value.account.map(Into::into),
1796 basin: value.basin.map(Into::into),
1797 stream: value.stream.map(Into::into),
1798 }
1799 }
1800}
1801
1802#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1803pub enum Operation {
1807 ListBasins,
1809 CreateBasin,
1811 GetBasinConfig,
1813 DeleteBasin,
1815 ReconfigureBasin,
1817 ListAccessTokens,
1819 IssueAccessToken,
1821 RevokeAccessToken,
1823 GetAccountMetrics,
1825 GetBasinMetrics,
1827 GetStreamMetrics,
1829 ListStreams,
1831 CreateStream,
1833 GetStreamConfig,
1835 DeleteStream,
1837 ReconfigureStream,
1839 CheckTail,
1841 Append,
1843 Read,
1845 Trim,
1847 Fence,
1849 ListLocations,
1851 GetDefaultLocation,
1853 SetDefaultLocation,
1855}
1856
1857impl From<Operation> for api::access::Operation {
1858 fn from(value: Operation) -> Self {
1859 match value {
1860 Operation::ListBasins => api::access::Operation::ListBasins,
1861 Operation::CreateBasin => api::access::Operation::CreateBasin,
1862 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1863 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1864 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1865 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1866 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1867 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1868 Operation::ListStreams => api::access::Operation::ListStreams,
1869 Operation::CreateStream => api::access::Operation::CreateStream,
1870 Operation::DeleteStream => api::access::Operation::DeleteStream,
1871 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1872 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1873 Operation::CheckTail => api::access::Operation::CheckTail,
1874 Operation::Append => api::access::Operation::Append,
1875 Operation::Read => api::access::Operation::Read,
1876 Operation::Trim => api::access::Operation::Trim,
1877 Operation::Fence => api::access::Operation::Fence,
1878 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1879 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1880 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1881 Operation::ListLocations => api::access::Operation::ListLocations,
1882 Operation::GetDefaultLocation => api::access::Operation::GetDefaultLocation,
1883 Operation::SetDefaultLocation => api::access::Operation::SetDefaultLocation,
1884 }
1885 }
1886}
1887
1888impl From<api::access::Operation> for Operation {
1889 fn from(value: api::access::Operation) -> Self {
1890 match value {
1891 api::access::Operation::ListBasins => Operation::ListBasins,
1892 api::access::Operation::CreateBasin => Operation::CreateBasin,
1893 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1894 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1895 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1896 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1897 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1898 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1899 api::access::Operation::ListStreams => Operation::ListStreams,
1900 api::access::Operation::CreateStream => Operation::CreateStream,
1901 api::access::Operation::DeleteStream => Operation::DeleteStream,
1902 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1903 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1904 api::access::Operation::CheckTail => Operation::CheckTail,
1905 api::access::Operation::Append => Operation::Append,
1906 api::access::Operation::Read => Operation::Read,
1907 api::access::Operation::Trim => Operation::Trim,
1908 api::access::Operation::Fence => Operation::Fence,
1909 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1910 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1911 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1912 api::access::Operation::ListLocations => Operation::ListLocations,
1913 api::access::Operation::GetDefaultLocation => Operation::GetDefaultLocation,
1914 api::access::Operation::SetDefaultLocation => Operation::SetDefaultLocation,
1915 }
1916 }
1917}
1918
1919#[derive(Debug, Clone)]
1920#[non_exhaustive]
1921pub struct AccessTokenScopeInput {
1929 basins: Option<BasinMatcher>,
1930 streams: Option<StreamMatcher>,
1931 access_tokens: Option<AccessTokenMatcher>,
1932 op_group_perms: Option<OperationGroupPermissions>,
1933 ops: HashSet<Operation>,
1934}
1935
1936impl AccessTokenScopeInput {
1937 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1939 Self {
1940 basins: None,
1941 streams: None,
1942 access_tokens: None,
1943 op_group_perms: None,
1944 ops: ops.into_iter().collect(),
1945 }
1946 }
1947
1948 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1950 Self {
1951 basins: None,
1952 streams: None,
1953 access_tokens: None,
1954 op_group_perms: Some(op_group_perms),
1955 ops: HashSet::default(),
1956 }
1957 }
1958
1959 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1961 Self {
1962 ops: ops.into_iter().collect(),
1963 ..self
1964 }
1965 }
1966
1967 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1969 Self {
1970 op_group_perms: Some(op_group_perms),
1971 ..self
1972 }
1973 }
1974
1975 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1979 Self {
1980 basins: Some(basins),
1981 ..self
1982 }
1983 }
1984
1985 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1989 Self {
1990 streams: Some(streams),
1991 ..self
1992 }
1993 }
1994
1995 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1999 Self {
2000 access_tokens: Some(access_tokens),
2001 ..self
2002 }
2003 }
2004}
2005
2006#[derive(Debug, Clone)]
2007#[non_exhaustive]
2008pub struct AccessTokenScope {
2010 pub basins: Option<BasinMatcher>,
2012 pub streams: Option<StreamMatcher>,
2014 pub access_tokens: Option<AccessTokenMatcher>,
2016 pub op_group_perms: Option<OperationGroupPermissions>,
2018 pub ops: HashSet<Operation>,
2020}
2021
2022impl From<api::access::AccessTokenScope> for AccessTokenScope {
2023 fn from(value: api::access::AccessTokenScope) -> Self {
2024 Self {
2025 basins: value.basins.map(|rs| match rs {
2026 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2027 BasinMatcher::Exact(e)
2028 }
2029 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2030 BasinMatcher::None
2031 }
2032 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
2033 }),
2034 streams: value.streams.map(|rs| match rs {
2035 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2036 StreamMatcher::Exact(e)
2037 }
2038 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2039 StreamMatcher::None
2040 }
2041 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
2042 }),
2043 access_tokens: value.access_tokens.map(|rs| match rs {
2044 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2045 AccessTokenMatcher::Exact(e)
2046 }
2047 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2048 AccessTokenMatcher::None
2049 }
2050 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
2051 }),
2052 op_group_perms: value.op_groups.map(Into::into),
2053 ops: value
2054 .ops
2055 .map(|ops| ops.into_iter().map(Into::into).collect())
2056 .unwrap_or_default(),
2057 }
2058 }
2059}
2060
2061impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2062 fn from(value: AccessTokenScopeInput) -> Self {
2063 Self {
2064 basins: value.basins.map(|rs| match rs {
2065 BasinMatcher::None => {
2066 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2067 }
2068 BasinMatcher::Exact(e) => {
2069 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2070 }
2071 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2072 }),
2073 streams: value.streams.map(|rs| match rs {
2074 StreamMatcher::None => {
2075 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2076 }
2077 StreamMatcher::Exact(e) => {
2078 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2079 }
2080 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2081 }),
2082 access_tokens: value.access_tokens.map(|rs| match rs {
2083 AccessTokenMatcher::None => {
2084 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2085 }
2086 AccessTokenMatcher::Exact(e) => {
2087 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2088 }
2089 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2090 }),
2091 op_groups: value.op_group_perms.map(Into::into),
2092 ops: if value.ops.is_empty() {
2093 None
2094 } else {
2095 Some(value.ops.into_iter().map(Into::into).collect())
2096 },
2097 }
2098 }
2099}
2100
2101#[derive(Debug, Clone)]
2102#[non_exhaustive]
2103pub struct IssueAccessTokenInput {
2105 pub id: AccessTokenId,
2107 pub expires_at: Option<S2DateTime>,
2112 pub auto_prefix_streams: bool,
2120 pub scope: AccessTokenScopeInput,
2122}
2123
2124impl IssueAccessTokenInput {
2125 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2127 Self {
2128 id,
2129 expires_at: None,
2130 auto_prefix_streams: false,
2131 scope,
2132 }
2133 }
2134
2135 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2137 Self {
2138 expires_at: Some(expires_at),
2139 ..self
2140 }
2141 }
2142
2143 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2146 Self {
2147 auto_prefix_streams,
2148 ..self
2149 }
2150 }
2151}
2152
2153impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2154 fn from(value: IssueAccessTokenInput) -> Self {
2155 Self {
2156 id: value.id,
2157 expires_at: value.expires_at.map(Into::into),
2158 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2159 scope: value.scope.into(),
2160 }
2161 }
2162}
2163
2164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2165pub enum TimeseriesInterval {
2167 Minute,
2169 Hour,
2171 Day,
2173}
2174
2175impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2176 fn from(value: TimeseriesInterval) -> Self {
2177 match value {
2178 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2179 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2180 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2181 }
2182 }
2183}
2184
2185impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2186 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2187 match value {
2188 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2189 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2190 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2191 }
2192 }
2193}
2194
2195#[derive(Debug, Clone, Copy)]
2196#[non_exhaustive]
2197pub struct TimeRange {
2199 pub start: u32,
2201 pub end: u32,
2203}
2204
2205impl TimeRange {
2206 pub fn new(start: u32, end: u32) -> Self {
2208 Self { start, end }
2209 }
2210}
2211
2212#[derive(Debug, Clone, Copy)]
2213#[non_exhaustive]
2214pub struct TimeRangeAndInterval {
2216 pub start: u32,
2218 pub end: u32,
2220 pub interval: Option<TimeseriesInterval>,
2224}
2225
2226impl TimeRangeAndInterval {
2227 pub fn new(start: u32, end: u32) -> Self {
2229 Self {
2230 start,
2231 end,
2232 interval: None,
2233 }
2234 }
2235
2236 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2238 Self {
2239 interval: Some(interval),
2240 ..self
2241 }
2242 }
2243}
2244
2245#[derive(Debug, Clone, Copy)]
2246pub enum AccountMetricSet {
2248 ActiveBasins(TimeRange),
2251 AccountOps(TimeRangeAndInterval),
2258}
2259
2260#[derive(Debug, Clone)]
2261#[non_exhaustive]
2262pub struct GetAccountMetricsInput {
2264 pub set: AccountMetricSet,
2266}
2267
2268impl GetAccountMetricsInput {
2269 pub fn new(set: AccountMetricSet) -> Self {
2271 Self { set }
2272 }
2273}
2274
2275impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2276 fn from(value: GetAccountMetricsInput) -> Self {
2277 let (set, start, end, interval) = match value.set {
2278 AccountMetricSet::ActiveBasins(args) => (
2279 api::metrics::AccountMetricSet::ActiveBasins,
2280 args.start,
2281 args.end,
2282 None,
2283 ),
2284 AccountMetricSet::AccountOps(args) => (
2285 api::metrics::AccountMetricSet::AccountOps,
2286 args.start,
2287 args.end,
2288 args.interval,
2289 ),
2290 };
2291 Self {
2292 set,
2293 start: Some(start),
2294 end: Some(end),
2295 interval: interval.map(Into::into),
2296 }
2297 }
2298}
2299
2300#[derive(Debug, Clone, Copy)]
2301pub enum BasinMetricSet {
2303 Storage(TimeRange),
2306 AppendOps(TimeRangeAndInterval),
2314 ReadOps(TimeRangeAndInterval),
2322 ReadThroughput(TimeRangeAndInterval),
2329 AppendThroughput(TimeRangeAndInterval),
2336 BasinOps(TimeRangeAndInterval),
2343}
2344
2345#[derive(Debug, Clone)]
2346#[non_exhaustive]
2347pub struct GetBasinMetricsInput {
2349 pub name: BasinName,
2351 pub set: BasinMetricSet,
2353}
2354
2355impl GetBasinMetricsInput {
2356 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2358 Self { name, set }
2359 }
2360}
2361
2362impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2363 fn from(value: GetBasinMetricsInput) -> Self {
2364 let (set, start, end, interval) = match value.set {
2365 BasinMetricSet::Storage(args) => (
2366 api::metrics::BasinMetricSet::Storage,
2367 args.start,
2368 args.end,
2369 None,
2370 ),
2371 BasinMetricSet::AppendOps(args) => (
2372 api::metrics::BasinMetricSet::AppendOps,
2373 args.start,
2374 args.end,
2375 args.interval,
2376 ),
2377 BasinMetricSet::ReadOps(args) => (
2378 api::metrics::BasinMetricSet::ReadOps,
2379 args.start,
2380 args.end,
2381 args.interval,
2382 ),
2383 BasinMetricSet::ReadThroughput(args) => (
2384 api::metrics::BasinMetricSet::ReadThroughput,
2385 args.start,
2386 args.end,
2387 args.interval,
2388 ),
2389 BasinMetricSet::AppendThroughput(args) => (
2390 api::metrics::BasinMetricSet::AppendThroughput,
2391 args.start,
2392 args.end,
2393 args.interval,
2394 ),
2395 BasinMetricSet::BasinOps(args) => (
2396 api::metrics::BasinMetricSet::BasinOps,
2397 args.start,
2398 args.end,
2399 args.interval,
2400 ),
2401 };
2402 (
2403 value.name,
2404 api::metrics::BasinMetricSetRequest {
2405 set,
2406 start: Some(start),
2407 end: Some(end),
2408 interval: interval.map(Into::into),
2409 },
2410 )
2411 }
2412}
2413
2414#[derive(Debug, Clone, Copy)]
2415pub enum StreamMetricSet {
2417 Storage(TimeRange),
2420}
2421
2422#[derive(Debug, Clone)]
2423#[non_exhaustive]
2424pub struct GetStreamMetricsInput {
2426 pub basin_name: BasinName,
2428 pub stream_name: StreamName,
2430 pub set: StreamMetricSet,
2432}
2433
2434impl GetStreamMetricsInput {
2435 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2438 Self {
2439 basin_name,
2440 stream_name,
2441 set,
2442 }
2443 }
2444}
2445
2446impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2447 fn from(value: GetStreamMetricsInput) -> Self {
2448 let (set, start, end, interval) = match value.set {
2449 StreamMetricSet::Storage(args) => (
2450 api::metrics::StreamMetricSet::Storage,
2451 args.start,
2452 args.end,
2453 None,
2454 ),
2455 };
2456 (
2457 value.basin_name,
2458 value.stream_name,
2459 api::metrics::StreamMetricSetRequest {
2460 set,
2461 start: Some(start),
2462 end: Some(end),
2463 interval,
2464 },
2465 )
2466 }
2467}
2468
2469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2470pub enum MetricUnit {
2472 Bytes,
2474 Operations,
2476}
2477
2478impl From<api::metrics::MetricUnit> for MetricUnit {
2479 fn from(value: api::metrics::MetricUnit) -> Self {
2480 match value {
2481 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2482 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2483 }
2484 }
2485}
2486
2487#[derive(Debug, Clone)]
2488#[non_exhaustive]
2489pub struct ScalarMetric {
2491 pub name: String,
2493 pub unit: MetricUnit,
2495 pub value: f64,
2497}
2498
2499#[derive(Debug, Clone)]
2500#[non_exhaustive]
2501pub struct AccumulationMetric {
2504 pub name: String,
2506 pub unit: MetricUnit,
2508 pub interval: TimeseriesInterval,
2510 pub values: Vec<(u32, f64)>,
2514}
2515
2516#[derive(Debug, Clone)]
2517#[non_exhaustive]
2518pub struct GaugeMetric {
2520 pub name: String,
2522 pub unit: MetricUnit,
2524 pub values: Vec<(u32, f64)>,
2527}
2528
2529#[derive(Debug, Clone)]
2530#[non_exhaustive]
2531pub struct LabelMetric {
2533 pub name: String,
2535 pub values: Vec<String>,
2537}
2538
2539#[derive(Debug, Clone)]
2540pub enum Metric {
2542 Scalar(ScalarMetric),
2544 Accumulation(AccumulationMetric),
2547 Gauge(GaugeMetric),
2549 Label(LabelMetric),
2551}
2552
2553impl From<api::metrics::Metric> for Metric {
2554 fn from(value: api::metrics::Metric) -> Self {
2555 match value {
2556 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2557 name: sm.name.into(),
2558 unit: sm.unit.into(),
2559 value: sm.value,
2560 }),
2561 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2562 name: am.name.into(),
2563 unit: am.unit.into(),
2564 interval: am.interval.into(),
2565 values: am.values,
2566 }),
2567 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2568 name: gm.name.into(),
2569 unit: gm.unit.into(),
2570 values: gm.values,
2571 }),
2572 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2573 name: lm.name.into(),
2574 values: lm.values,
2575 }),
2576 }
2577 }
2578}
2579
2580#[derive(Debug, Clone, Default)]
2581#[non_exhaustive]
2582pub struct ListStreamsInput {
2584 pub prefix: StreamNamePrefix,
2588 pub start_after: StreamNameStartAfter,
2594 pub limit: Option<usize>,
2598}
2599
2600impl ListStreamsInput {
2601 pub fn new() -> Self {
2603 Self::default()
2604 }
2605
2606 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2608 Self { prefix, ..self }
2609 }
2610
2611 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2614 Self {
2615 start_after,
2616 ..self
2617 }
2618 }
2619
2620 pub fn with_limit(self, limit: usize) -> Self {
2622 Self {
2623 limit: Some(limit),
2624 ..self
2625 }
2626 }
2627}
2628
2629impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2630 fn from(value: ListStreamsInput) -> Self {
2631 Self {
2632 prefix: Some(value.prefix),
2633 start_after: Some(value.start_after),
2634 limit: value.limit,
2635 }
2636 }
2637}
2638
2639#[derive(Debug, Clone, Default)]
2640pub struct ListAllStreamsInput {
2642 pub prefix: StreamNamePrefix,
2646 pub start_after: StreamNameStartAfter,
2652 pub include_deleted: bool,
2656}
2657
2658impl ListAllStreamsInput {
2659 pub fn new() -> Self {
2661 Self::default()
2662 }
2663
2664 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2666 Self { prefix, ..self }
2667 }
2668
2669 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2672 Self {
2673 start_after,
2674 ..self
2675 }
2676 }
2677
2678 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2680 Self {
2681 include_deleted,
2682 ..self
2683 }
2684 }
2685}
2686
2687#[derive(Debug, Clone, PartialEq, Eq)]
2688#[non_exhaustive]
2689pub struct StreamInfo {
2691 pub name: StreamName,
2693 pub created_at: S2DateTime,
2695 pub deleted_at: Option<S2DateTime>,
2697 pub cipher: Option<EncryptionAlgorithm>,
2699}
2700
2701impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2702 type Error = ValidationError;
2703
2704 fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2705 Ok(Self {
2706 name: value.name,
2707 created_at: value.created_at.try_into()?,
2708 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2709 cipher: value.cipher.map(Into::into),
2710 })
2711 }
2712}
2713
2714#[derive(Debug, Clone)]
2715#[non_exhaustive]
2716pub struct CreateStreamInput {
2718 pub name: StreamName,
2720 pub config: Option<StreamConfig>,
2724 idempotency_token: String,
2725}
2726
2727impl CreateStreamInput {
2728 pub fn new(name: StreamName) -> Self {
2730 Self {
2731 name,
2732 config: None,
2733 idempotency_token: idempotency_token(),
2734 }
2735 }
2736
2737 pub fn with_config(self, config: StreamConfig) -> Self {
2739 Self {
2740 config: Some(config),
2741 ..self
2742 }
2743 }
2744}
2745
2746impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2747 fn from(value: CreateStreamInput) -> Self {
2748 (
2749 api::stream::CreateStreamRequest {
2750 stream: value.name,
2751 config: value.config.map(Into::into),
2752 },
2753 value.idempotency_token,
2754 )
2755 }
2756}
2757
2758#[derive(Debug, Clone)]
2759#[non_exhaustive]
2760pub struct EnsureStreamInput {
2763 pub name: StreamName,
2765 pub config: Option<StreamConfig>,
2769}
2770
2771impl EnsureStreamInput {
2772 pub fn new(name: StreamName) -> Self {
2774 Self { name, config: None }
2775 }
2776
2777 pub fn with_config(self, config: StreamConfig) -> Self {
2779 Self {
2780 config: Some(config),
2781 ..self
2782 }
2783 }
2784}
2785
2786impl From<EnsureStreamInput> for (StreamName, Option<api::config::StreamConfig>) {
2787 fn from(value: EnsureStreamInput) -> Self {
2788 (value.name, value.config.map(Into::into))
2789 }
2790}
2791
2792#[derive(Debug, Clone)]
2793#[non_exhaustive]
2794pub struct DeleteStreamInput {
2796 pub name: StreamName,
2798 pub ignore_not_found: bool,
2800}
2801
2802impl DeleteStreamInput {
2803 pub fn new(name: StreamName) -> Self {
2805 Self {
2806 name,
2807 ignore_not_found: false,
2808 }
2809 }
2810
2811 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2813 Self {
2814 ignore_not_found,
2815 ..self
2816 }
2817 }
2818}
2819
2820#[derive(Debug, Clone)]
2821#[non_exhaustive]
2822pub struct ReconfigureStreamInput {
2824 pub name: StreamName,
2826 pub config: StreamReconfiguration,
2828}
2829
2830impl ReconfigureStreamInput {
2831 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2833 Self { name, config }
2834 }
2835}
2836
2837#[derive(Debug, Clone, PartialEq, Eq)]
2838pub struct FencingToken(String);
2844
2845impl FencingToken {
2846 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2848 rand::rng()
2849 .sample_iter(&rand::distr::Alphanumeric)
2850 .take(n)
2851 .map(char::from)
2852 .collect::<String>()
2853 .parse()
2854 }
2855}
2856
2857impl FromStr for FencingToken {
2858 type Err = ValidationError;
2859
2860 fn from_str(s: &str) -> Result<Self, Self::Err> {
2861 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2862 return Err(ValidationError(format!(
2863 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2864 )));
2865 }
2866 Ok(FencingToken(s.to_string()))
2867 }
2868}
2869
2870impl std::fmt::Display for FencingToken {
2871 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2872 write!(f, "{}", self.0)
2873 }
2874}
2875
2876impl Deref for FencingToken {
2877 type Target = str;
2878
2879 fn deref(&self) -> &Self::Target {
2880 &self.0
2881 }
2882}
2883
2884#[derive(Debug, Clone, Copy, PartialEq)]
2885#[non_exhaustive]
2886pub struct StreamPosition {
2888 pub seq_num: u64,
2890 pub timestamp: u64,
2893}
2894
2895impl std::fmt::Display for StreamPosition {
2896 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2897 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2898 }
2899}
2900
2901impl From<api::stream::proto::StreamPosition> for StreamPosition {
2902 fn from(value: api::stream::proto::StreamPosition) -> Self {
2903 Self {
2904 seq_num: value.seq_num,
2905 timestamp: value.timestamp,
2906 }
2907 }
2908}
2909
2910impl From<api::stream::StreamPosition> for StreamPosition {
2911 fn from(value: api::stream::StreamPosition) -> Self {
2912 Self {
2913 seq_num: value.seq_num,
2914 timestamp: value.timestamp,
2915 }
2916 }
2917}
2918
2919#[derive(Debug, Clone, PartialEq)]
2920#[non_exhaustive]
2921pub struct Header {
2923 pub name: Bytes,
2925 pub value: Bytes,
2927}
2928
2929impl Header {
2930 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2932 Self {
2933 name: name.into(),
2934 value: value.into(),
2935 }
2936 }
2937}
2938
2939impl From<Header> for api::stream::proto::Header {
2940 fn from(value: Header) -> Self {
2941 Self {
2942 name: value.name,
2943 value: value.value,
2944 }
2945 }
2946}
2947
2948impl From<api::stream::proto::Header> for Header {
2949 fn from(value: api::stream::proto::Header) -> Self {
2950 Self {
2951 name: value.name,
2952 value: value.value,
2953 }
2954 }
2955}
2956
2957#[derive(Debug, Clone, PartialEq)]
2958pub struct AppendRecord {
2960 body: Bytes,
2961 headers: Vec<Header>,
2962 timestamp: Option<u64>,
2963}
2964
2965impl AppendRecord {
2966 fn validate(self) -> Result<Self, ValidationError> {
2967 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2968 Err(ValidationError(format!(
2969 "metered_bytes: {} exceeds {}",
2970 self.metered_bytes(),
2971 RECORD_BATCH_MAX.bytes
2972 )))
2973 } else {
2974 Ok(self)
2975 }
2976 }
2977
2978 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2980 let record = Self {
2981 body: body.into(),
2982 headers: Vec::default(),
2983 timestamp: None,
2984 };
2985 record.validate()
2986 }
2987
2988 pub fn with_headers(
2990 self,
2991 headers: impl IntoIterator<Item = Header>,
2992 ) -> Result<Self, ValidationError> {
2993 let record = Self {
2994 headers: headers.into_iter().collect(),
2995 ..self
2996 };
2997 record.validate()
2998 }
2999
3000 pub fn with_timestamp(self, timestamp: u64) -> Self {
3004 Self {
3005 timestamp: Some(timestamp),
3006 ..self
3007 }
3008 }
3009
3010 pub fn body(&self) -> &[u8] {
3012 &self.body
3013 }
3014
3015 pub fn headers(&self) -> &[Header] {
3017 &self.headers
3018 }
3019
3020 pub fn timestamp(&self) -> Option<u64> {
3022 self.timestamp
3023 }
3024}
3025
3026impl From<AppendRecord> for api::stream::proto::AppendRecord {
3027 fn from(value: AppendRecord) -> Self {
3028 Self {
3029 timestamp: value.timestamp,
3030 headers: value.headers.into_iter().map(Into::into).collect(),
3031 body: value.body,
3032 }
3033 }
3034}
3035
3036pub trait MeteredBytes {
3043 fn metered_bytes(&self) -> usize;
3045}
3046
3047macro_rules! metered_bytes_impl {
3048 ($ty:ty) => {
3049 impl MeteredBytes for $ty {
3050 fn metered_bytes(&self) -> usize {
3051 8 + (2 * self.headers.len())
3052 + self
3053 .headers
3054 .iter()
3055 .map(|h| h.name.len() + h.value.len())
3056 .sum::<usize>()
3057 + self.body.len()
3058 }
3059 }
3060 };
3061}
3062
3063metered_bytes_impl!(AppendRecord);
3064
3065#[derive(Debug, Clone)]
3066pub struct AppendRecordBatch {
3075 records: Vec<AppendRecord>,
3076 metered_bytes: usize,
3077}
3078
3079impl AppendRecordBatch {
3080 pub(crate) fn with_capacity(capacity: usize) -> Self {
3081 Self {
3082 records: Vec::with_capacity(capacity),
3083 metered_bytes: 0,
3084 }
3085 }
3086
3087 pub(crate) fn push(&mut self, record: AppendRecord) {
3088 self.metered_bytes += record.metered_bytes();
3089 self.records.push(record);
3090 }
3091
3092 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3094 where
3095 I: IntoIterator<Item = AppendRecord>,
3096 {
3097 let mut records = Vec::new();
3098 let mut metered_bytes = 0;
3099
3100 for record in iter {
3101 metered_bytes += record.metered_bytes();
3102 records.push(record);
3103
3104 if metered_bytes > RECORD_BATCH_MAX.bytes {
3105 return Err(ValidationError(format!(
3106 "batch size in metered bytes ({metered_bytes}) exceeds {}",
3107 RECORD_BATCH_MAX.bytes
3108 )));
3109 }
3110
3111 if records.len() > RECORD_BATCH_MAX.count {
3112 return Err(ValidationError(format!(
3113 "number of records in the batch exceeds {}",
3114 RECORD_BATCH_MAX.count
3115 )));
3116 }
3117 }
3118
3119 if records.is_empty() {
3120 return Err(ValidationError("batch is empty".into()));
3121 }
3122
3123 Ok(Self {
3124 records,
3125 metered_bytes,
3126 })
3127 }
3128}
3129
3130impl Deref for AppendRecordBatch {
3131 type Target = [AppendRecord];
3132
3133 fn deref(&self) -> &Self::Target {
3134 &self.records
3135 }
3136}
3137
3138impl MeteredBytes for AppendRecordBatch {
3139 fn metered_bytes(&self) -> usize {
3140 self.metered_bytes
3141 }
3142}
3143
3144#[derive(Debug, Clone)]
3145pub enum Command {
3147 Fence {
3149 fencing_token: FencingToken,
3151 },
3152 Trim {
3154 trim_point: u64,
3156 },
3157}
3158
3159#[derive(Debug, Clone)]
3160#[non_exhaustive]
3161pub struct CommandRecord {
3165 pub command: Command,
3167 pub timestamp: Option<u64>,
3169}
3170
3171impl CommandRecord {
3172 const FENCE: &[u8] = b"fence";
3173 const TRIM: &[u8] = b"trim";
3174
3175 pub fn fence(fencing_token: FencingToken) -> Self {
3180 Self {
3181 command: Command::Fence { fencing_token },
3182 timestamp: None,
3183 }
3184 }
3185
3186 pub fn trim(trim_point: u64) -> Self {
3193 Self {
3194 command: Command::Trim { trim_point },
3195 timestamp: None,
3196 }
3197 }
3198
3199 pub fn with_timestamp(self, timestamp: u64) -> Self {
3201 Self {
3202 timestamp: Some(timestamp),
3203 ..self
3204 }
3205 }
3206}
3207
3208impl From<CommandRecord> for AppendRecord {
3209 fn from(value: CommandRecord) -> Self {
3210 let (header_value, body) = match value.command {
3211 Command::Fence { fencing_token } => (
3212 CommandRecord::FENCE,
3213 Bytes::copy_from_slice(fencing_token.as_bytes()),
3214 ),
3215 Command::Trim { trim_point } => (
3216 CommandRecord::TRIM,
3217 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3218 ),
3219 };
3220 Self {
3221 body,
3222 headers: vec![Header::new("", header_value)],
3223 timestamp: value.timestamp,
3224 }
3225 }
3226}
3227
3228#[derive(Debug, Clone)]
3229#[non_exhaustive]
3230pub struct AppendInput {
3233 pub records: AppendRecordBatch,
3235 pub match_seq_num: Option<u64>,
3239 pub fencing_token: Option<FencingToken>,
3244}
3245
3246impl AppendInput {
3247 pub fn new(records: AppendRecordBatch) -> Self {
3249 Self {
3250 records,
3251 match_seq_num: None,
3252 fencing_token: None,
3253 }
3254 }
3255
3256 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3258 Self {
3259 match_seq_num: Some(match_seq_num),
3260 ..self
3261 }
3262 }
3263
3264 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3266 Self {
3267 fencing_token: Some(fencing_token),
3268 ..self
3269 }
3270 }
3271}
3272
3273impl From<AppendInput> for api::stream::proto::AppendInput {
3274 fn from(value: AppendInput) -> Self {
3275 Self {
3276 records: value.records.iter().cloned().map(Into::into).collect(),
3277 match_seq_num: value.match_seq_num,
3278 fencing_token: value.fencing_token.map(|t| t.to_string()),
3279 }
3280 }
3281}
3282
3283#[derive(Debug, Clone, PartialEq)]
3284#[non_exhaustive]
3285pub struct AppendAck {
3287 pub start: StreamPosition,
3289 pub end: StreamPosition,
3295 pub tail: StreamPosition,
3300}
3301
3302impl From<api::stream::proto::AppendAck> for AppendAck {
3303 fn from(value: api::stream::proto::AppendAck) -> Self {
3304 Self {
3305 start: value.start.unwrap_or_default().into(),
3306 end: value.end.unwrap_or_default().into(),
3307 tail: value.tail.unwrap_or_default().into(),
3308 }
3309 }
3310}
3311
3312#[derive(Debug, Clone, Copy)]
3313pub enum ReadFrom {
3315 SeqNum(u64),
3317 Timestamp(u64),
3319 TailOffset(u64),
3321}
3322
3323impl Default for ReadFrom {
3324 fn default() -> Self {
3325 Self::SeqNum(0)
3326 }
3327}
3328
3329#[derive(Debug, Default, Clone)]
3330#[non_exhaustive]
3331pub struct ReadStart {
3333 pub from: ReadFrom,
3337 pub clamp_to_tail: bool,
3341}
3342
3343impl ReadStart {
3344 pub fn new() -> Self {
3346 Self::default()
3347 }
3348
3349 pub fn with_from(self, from: ReadFrom) -> Self {
3351 Self { from, ..self }
3352 }
3353
3354 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3356 Self {
3357 clamp_to_tail,
3358 ..self
3359 }
3360 }
3361}
3362
3363impl From<ReadStart> for api::stream::ReadStart {
3364 fn from(value: ReadStart) -> Self {
3365 let (seq_num, timestamp, tail_offset) = match value.from {
3366 ReadFrom::SeqNum(n) => (Some(n), None, None),
3367 ReadFrom::Timestamp(t) => (None, Some(t), None),
3368 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3369 };
3370 Self {
3371 seq_num,
3372 timestamp,
3373 tail_offset,
3374 clamp: if value.clamp_to_tail {
3375 Some(true)
3376 } else {
3377 None
3378 },
3379 }
3380 }
3381}
3382
3383#[derive(Debug, Clone, Default)]
3384#[non_exhaustive]
3385pub struct ReadLimits {
3387 pub count: Option<usize>,
3391 pub bytes: Option<usize>,
3395}
3396
3397impl ReadLimits {
3398 pub fn new() -> Self {
3400 Self::default()
3401 }
3402
3403 pub fn with_count(self, count: usize) -> Self {
3405 Self {
3406 count: Some(count),
3407 ..self
3408 }
3409 }
3410
3411 pub fn with_bytes(self, bytes: usize) -> Self {
3413 Self {
3414 bytes: Some(bytes),
3415 ..self
3416 }
3417 }
3418}
3419
3420#[derive(Debug, Clone, Default)]
3421#[non_exhaustive]
3422pub struct ReadStop {
3424 pub limits: ReadLimits,
3428 pub until: Option<RangeTo<u64>>,
3432 pub wait: Option<u32>,
3442}
3443
3444impl ReadStop {
3445 pub fn new() -> Self {
3447 Self::default()
3448 }
3449
3450 pub fn with_limits(self, limits: ReadLimits) -> Self {
3452 Self { limits, ..self }
3453 }
3454
3455 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3457 Self {
3458 until: Some(until),
3459 ..self
3460 }
3461 }
3462
3463 pub fn with_wait(self, wait: u32) -> Self {
3465 Self {
3466 wait: Some(wait),
3467 ..self
3468 }
3469 }
3470}
3471
3472impl From<ReadStop> for api::stream::ReadEnd {
3473 fn from(value: ReadStop) -> Self {
3474 Self {
3475 count: value.limits.count,
3476 bytes: value.limits.bytes,
3477 until: value.until.map(|r| r.end),
3478 wait: value.wait,
3479 }
3480 }
3481}
3482
3483#[derive(Debug, Clone, Default)]
3484#[non_exhaustive]
3485pub struct ReadInput {
3488 pub start: ReadStart,
3492 pub stop: ReadStop,
3496 pub ignore_command_records: bool,
3500}
3501
3502impl ReadInput {
3503 pub fn new() -> Self {
3505 Self::default()
3506 }
3507
3508 pub fn with_start(self, start: ReadStart) -> Self {
3510 Self { start, ..self }
3511 }
3512
3513 pub fn with_stop(self, stop: ReadStop) -> Self {
3515 Self { stop, ..self }
3516 }
3517
3518 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3520 Self {
3521 ignore_command_records,
3522 ..self
3523 }
3524 }
3525}
3526
3527#[derive(Debug, Clone)]
3528#[non_exhaustive]
3529pub struct SequencedRecord {
3531 pub seq_num: u64,
3533 pub body: Bytes,
3535 pub headers: Vec<Header>,
3537 pub timestamp: u64,
3539}
3540
3541impl SequencedRecord {
3542 pub fn is_command_record(&self) -> bool {
3544 self.headers.len() == 1 && *self.headers[0].name == *b""
3545 }
3546}
3547
3548impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3549 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3550 Self {
3551 seq_num: value.seq_num,
3552 body: value.body,
3553 headers: value.headers.into_iter().map(Into::into).collect(),
3554 timestamp: value.timestamp,
3555 }
3556 }
3557}
3558
3559metered_bytes_impl!(SequencedRecord);
3560
3561#[derive(Debug, Clone)]
3562#[non_exhaustive]
3563pub struct ReadBatch {
3566 pub records: Vec<SequencedRecord>,
3573 pub tail: Option<StreamPosition>,
3578}
3579
3580impl ReadBatch {
3581 pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3582 Self {
3583 records: batch.records.into_iter().map(Into::into).collect(),
3584 tail: batch.tail.map(Into::into),
3585 }
3586 }
3587}
3588
3589pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3591
3592#[derive(Debug, Clone, thiserror::Error)]
3593pub enum AppendConditionFailed {
3595 #[error("fencing token mismatch, expected: {0}")]
3596 FencingTokenMismatch(FencingToken),
3598 #[error("sequence number mismatch, expected: {0}")]
3599 SeqNumMismatch(u64),
3601}
3602
3603impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3604 fn from(value: api::stream::AppendConditionFailed) -> Self {
3605 match value {
3606 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3607 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3608 }
3609 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3610 AppendConditionFailed::SeqNumMismatch(seq)
3611 }
3612 }
3613 }
3614}
3615
3616#[derive(Debug, Clone, thiserror::Error)]
3617pub enum S2Error {
3619 #[error("{0}")]
3620 Client(String),
3622 #[error(transparent)]
3623 Validation(#[from] ValidationError),
3625 #[error("{0}")]
3626 AppendConditionFailed(AppendConditionFailed),
3628 #[error("read from an unwritten position. current tail: {0}")]
3629 ReadUnwritten(StreamPosition),
3631 #[error("{0}")]
3632 Server(ErrorResponse),
3634}
3635
3636impl From<ApiError> for S2Error {
3637 fn from(err: ApiError) -> Self {
3638 match err {
3639 ApiError::ReadUnwritten(tail_response) => {
3640 Self::ReadUnwritten(tail_response.tail.into())
3641 }
3642 ApiError::AppendConditionFailed(condition_failed) => {
3643 Self::AppendConditionFailed(condition_failed.into())
3644 }
3645 ApiError::Server(_, response) => Self::Server(response.into()),
3646 other => Self::Client(other.to_string()),
3647 }
3648 }
3649}
3650
3651#[derive(Debug, Clone, thiserror::Error)]
3652#[error("{code}: {message}")]
3653#[non_exhaustive]
3654pub struct ErrorResponse {
3656 pub code: String,
3658 pub message: String,
3660}
3661
3662impl From<ApiErrorResponse> for ErrorResponse {
3663 fn from(response: ApiErrorResponse) -> Self {
3664 Self {
3665 code: response.code,
3666 message: response.message,
3667 }
3668 }
3669}
3670
3671fn idempotency_token() -> String {
3672 uuid::Uuid::new_v4().simple().to_string()
3673}