1use std::{
4 collections::HashSet,
5 env::VarError,
6 fmt,
7 num::NonZeroU32,
8 ops::{Deref, RangeTo},
9 pin::Pin,
10 str::FromStr,
11 sync::Arc,
12 time::Duration,
13};
14
15use bytes::Bytes;
16use http::{
17 header::HeaderValue,
18 uri::{Authority, Scheme},
19};
20use rand::RngExt;
21use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
22pub use s2_common::types::ValidationError;
24pub use s2_common::types::access::AccessTokenId;
28pub use s2_common::types::access::AccessTokenIdPrefix;
30pub use s2_common::types::access::AccessTokenIdStartAfter;
32pub use s2_common::types::basin::BasinName;
37pub use s2_common::types::basin::BasinNamePrefix;
39pub use s2_common::types::basin::BasinNameStartAfter;
41pub use s2_common::types::location::LocationName;
46pub use s2_common::types::stream::StreamName;
50pub use s2_common::types::stream::StreamNamePrefix;
52pub use s2_common::types::stream::StreamNameStartAfter;
54pub use s2_common::{
55 caps::RECORD_BATCH_MAX,
56 encryption::{EncryptionAlgorithm, EncryptionKey},
57};
58
59pub(crate) const ONE_MIB: u32 = 1024 * 1024;
60
61use s2_common::{
62 maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH, types::resources::ProvisionResult,
63};
64use secrecy::SecretString;
65
66use crate::api::{ApiError, ApiErrorResponse};
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub struct S2DateTime(time::OffsetDateTime);
75
76impl TryFrom<time::OffsetDateTime> for S2DateTime {
77 type Error = ValidationError;
78
79 fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
80 dt.format(&time::format_description::well_known::Rfc3339)
81 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
82 Ok(Self(dt))
83 }
84}
85
86impl From<S2DateTime> for time::OffsetDateTime {
87 fn from(dt: S2DateTime) -> Self {
88 dt.0
89 }
90}
91
92impl FromStr for S2DateTime {
93 type Err = ValidationError;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
97 .map(Self)
98 .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
99 }
100}
101
102impl fmt::Display for S2DateTime {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 write!(
105 f,
106 "{}",
107 self.0
108 .format(&time::format_description::well_known::Rfc3339)
109 .expect("RFC3339 formatting should not fail for S2DateTime")
110 )
111 }
112}
113
114#[derive(Debug, Clone, PartialEq)]
116pub(crate) enum BasinAuthority {
117 ParentZone(Authority),
119 Direct(Authority),
121}
122
123#[derive(Debug, Clone)]
125pub struct AccountEndpoint {
126 scheme: Scheme,
127 authority: Authority,
128}
129
130impl AccountEndpoint {
131 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
133 endpoint.parse()
134 }
135}
136
137impl FromStr for AccountEndpoint {
138 type Err = ValidationError;
139
140 fn from_str(s: &str) -> Result<Self, Self::Err> {
141 let (scheme, authority) = match s.find("://") {
142 Some(idx) => {
143 let scheme: Scheme = s[..idx]
144 .parse()
145 .map_err(|_| "invalid account endpoint scheme".to_string())?;
146 (scheme, &s[idx + 3..])
147 }
148 None => (Scheme::HTTPS, s),
149 };
150 Ok(Self {
151 scheme,
152 authority: authority
153 .parse()
154 .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
155 })
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct BasinEndpoint {
162 scheme: Scheme,
163 authority: BasinAuthority,
164}
165
166impl BasinEndpoint {
167 pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
169 endpoint.parse()
170 }
171}
172
173impl FromStr for BasinEndpoint {
174 type Err = ValidationError;
175
176 fn from_str(s: &str) -> Result<Self, Self::Err> {
177 let (scheme, authority) = match s.find("://") {
178 Some(idx) => {
179 let scheme: Scheme = s[..idx]
180 .parse()
181 .map_err(|_| "invalid basin endpoint scheme".to_string())?;
182 (scheme, &s[idx + 3..])
183 }
184 None => (Scheme::HTTPS, s),
185 };
186 let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
187 BasinAuthority::ParentZone(
188 authority
189 .parse()
190 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
191 )
192 } else {
193 BasinAuthority::Direct(
194 authority
195 .parse()
196 .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
197 )
198 };
199 Ok(Self { scheme, authority })
200 }
201}
202
203#[derive(Debug, Clone)]
204#[non_exhaustive]
205pub struct S2Endpoints {
207 pub(crate) scheme: Scheme,
208 pub(crate) account_authority: Authority,
209 pub(crate) basin_authority: BasinAuthority,
210}
211
212impl S2Endpoints {
213 pub fn new(
215 account_endpoint: AccountEndpoint,
216 basin_endpoint: BasinEndpoint,
217 ) -> Result<Self, ValidationError> {
218 if account_endpoint.scheme != basin_endpoint.scheme {
219 return Err("account and basin endpoints must have the same scheme".into());
220 }
221 Ok(Self {
222 scheme: account_endpoint.scheme,
223 account_authority: account_endpoint.authority,
224 basin_authority: basin_endpoint.authority,
225 })
226 }
227
228 pub fn from_env() -> Result<Self, ValidationError> {
234 let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
235 Ok(endpoint) => endpoint.parse()?,
236 Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
237 Err(VarError::NotUnicode(_)) => {
238 return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
239 }
240 };
241
242 let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
243 Ok(endpoint) => endpoint.parse()?,
244 Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
245 Err(VarError::NotUnicode(_)) => {
246 return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
247 }
248 };
249
250 if account_endpoint.scheme != basin_endpoint.scheme {
251 return Err(
252 "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
253 );
254 }
255
256 Ok(Self {
257 scheme: account_endpoint.scheme,
258 account_authority: account_endpoint.authority,
259 basin_authority: basin_endpoint.authority,
260 })
261 }
262
263 pub(crate) fn for_aws() -> Self {
264 Self {
265 scheme: Scheme::HTTPS,
266 account_authority: "aws.s2.dev".try_into().expect("valid authority"),
267 basin_authority: BasinAuthority::ParentZone(
268 "b.s2.dev".try_into().expect("valid authority"),
269 ),
270 }
271 }
272}
273
274#[derive(Debug, Clone, Copy)]
275pub enum Compression {
277 None,
279 Gzip,
281 Zstd,
283}
284
285impl From<Compression> for CompressionAlgorithm {
286 fn from(value: Compression) -> Self {
287 match value {
288 Compression::None => CompressionAlgorithm::None,
289 Compression::Gzip => CompressionAlgorithm::Gzip,
290 Compression::Zstd => CompressionAlgorithm::Zstd,
291 }
292 }
293}
294
295#[derive(Debug, Clone, Copy, PartialEq)]
296#[non_exhaustive]
297pub enum AppendRetryPolicy {
300 All,
302 NoSideEffects,
312}
313
314#[derive(Debug, Clone)]
315#[non_exhaustive]
316pub struct RetryConfig {
325 pub max_attempts: NonZeroU32,
329 pub min_base_delay: Duration,
333 pub max_base_delay: Duration,
337 pub append_retry_policy: AppendRetryPolicy,
342}
343
344impl Default for RetryConfig {
345 fn default() -> Self {
346 Self {
347 max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
348 min_base_delay: Duration::from_millis(100),
349 max_base_delay: Duration::from_secs(1),
350 append_retry_policy: AppendRetryPolicy::All,
351 }
352 }
353}
354
355impl RetryConfig {
356 pub fn new() -> Self {
358 Self::default()
359 }
360
361 pub(crate) fn max_retries(&self) -> u32 {
362 self.max_attempts.get() - 1
363 }
364
365 pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
367 Self {
368 max_attempts,
369 ..self
370 }
371 }
372
373 pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
375 Self {
376 min_base_delay,
377 ..self
378 }
379 }
380
381 pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
383 Self {
384 max_base_delay,
385 ..self
386 }
387 }
388
389 pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
392 Self {
393 append_retry_policy,
394 ..self
395 }
396 }
397}
398
399#[derive(Debug, Clone)]
400#[non_exhaustive]
401pub struct S2Config {
403 pub(crate) access_token: SecretString,
404 pub(crate) endpoints: S2Endpoints,
405 pub(crate) connection_timeout: Duration,
406 pub(crate) request_timeout: Duration,
407 pub(crate) retry: RetryConfig,
408 pub(crate) compression: Compression,
409 pub(crate) user_agent: HeaderValue,
410 pub(crate) insecure_skip_cert_verification: bool,
411 pub(crate) rustls_crypto_provider: Option<Arc<rustls::crypto::CryptoProvider>>,
412}
413
414impl S2Config {
415 pub fn new(access_token: impl Into<String>) -> Self {
417 Self {
418 access_token: access_token.into().into(),
419 endpoints: S2Endpoints::for_aws(),
420 connection_timeout: Duration::from_secs(3),
421 request_timeout: Duration::from_secs(5),
422 retry: RetryConfig::new(),
423 compression: Compression::None,
424 user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
425 .parse()
426 .expect("valid user agent"),
427 insecure_skip_cert_verification: false,
428 rustls_crypto_provider: default_rustls_crypto_provider(),
429 }
430 }
431
432 pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
434 Self { endpoints, ..self }
435 }
436
437 pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
441 Self {
442 connection_timeout,
443 ..self
444 }
445 }
446
447 pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
451 Self {
452 request_timeout,
453 ..self
454 }
455 }
456
457 pub fn with_retry(self, retry: RetryConfig) -> Self {
461 Self { retry, ..self }
462 }
463
464 pub fn with_compression(self, compression: Compression) -> Self {
468 Self {
469 compression,
470 ..self
471 }
472 }
473
474 pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
486 Self {
487 insecure_skip_cert_verification: skip,
488 ..self
489 }
490 }
491
492 pub fn with_rustls_crypto_provider(
502 self,
503 provider: impl Into<Arc<rustls::crypto::CryptoProvider>>,
504 ) -> Self {
505 Self {
506 rustls_crypto_provider: Some(provider.into()),
507 ..self
508 }
509 }
510
511 #[cfg(feature = "rustls-aws-lc-rs")]
515 pub fn with_rustls_aws_lc_rs_crypto_provider(self) -> Self {
516 self.with_rustls_crypto_provider(rustls::crypto::aws_lc_rs::default_provider())
517 }
518
519 #[cfg(feature = "rustls-ring")]
523 pub fn with_rustls_ring_crypto_provider(self) -> Self {
524 self.with_rustls_crypto_provider(rustls::crypto::ring::default_provider())
525 }
526
527 #[doc(hidden)]
528 #[cfg(feature = "_hidden")]
529 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
530 let user_agent = user_agent
531 .into()
532 .parse()
533 .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
534 Ok(Self { user_agent, ..self })
535 }
536}
537
538#[cfg(feature = "rustls-aws-lc-rs")]
539fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
540 Some(Arc::new(rustls::crypto::aws_lc_rs::default_provider()))
541}
542
543#[cfg(all(not(feature = "rustls-aws-lc-rs"), feature = "rustls-ring"))]
544fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
545 Some(Arc::new(rustls::crypto::ring::default_provider()))
546}
547
548#[cfg(all(not(feature = "rustls-aws-lc-rs"), not(feature = "rustls-ring")))]
549fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
550 None
551}
552
553#[derive(Debug, Default, Clone, PartialEq, Eq)]
554#[non_exhaustive]
555pub struct Page<T> {
557 pub values: Vec<T>,
559 pub has_more: bool,
561}
562
563impl<T> Page<T> {
564 pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
565 Self {
566 values: values.into(),
567 has_more,
568 }
569 }
570}
571
572#[derive(Debug, Clone, Copy, PartialEq, Eq)]
573pub enum StorageClass {
575 Standard,
577 Express,
579}
580
581impl From<api::config::StorageClass> for StorageClass {
582 fn from(value: api::config::StorageClass) -> Self {
583 match value {
584 api::config::StorageClass::Standard => StorageClass::Standard,
585 api::config::StorageClass::Express => StorageClass::Express,
586 }
587 }
588}
589
590impl From<StorageClass> for api::config::StorageClass {
591 fn from(value: StorageClass) -> Self {
592 match value {
593 StorageClass::Standard => api::config::StorageClass::Standard,
594 StorageClass::Express => api::config::StorageClass::Express,
595 }
596 }
597}
598
599#[derive(Debug, Clone, Copy, PartialEq, Eq)]
600pub enum RetentionPolicy {
602 Age(u64),
604 Infinite,
606}
607
608impl From<api::config::RetentionPolicy> for RetentionPolicy {
609 fn from(value: api::config::RetentionPolicy) -> Self {
610 match value {
611 api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
612 api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
613 }
614 }
615}
616
617impl From<RetentionPolicy> for api::config::RetentionPolicy {
618 fn from(value: RetentionPolicy) -> Self {
619 match value {
620 RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
621 RetentionPolicy::Infinite => {
622 api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
623 }
624 }
625 }
626}
627
628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
629pub enum TimestampingMode {
631 ClientPrefer,
633 ClientRequire,
635 Arrival,
637}
638
639impl From<api::config::TimestampingMode> for TimestampingMode {
640 fn from(value: api::config::TimestampingMode) -> Self {
641 match value {
642 api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
643 api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
644 api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
645 }
646 }
647}
648
649impl From<TimestampingMode> for api::config::TimestampingMode {
650 fn from(value: TimestampingMode) -> Self {
651 match value {
652 TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
653 TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
654 TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
655 }
656 }
657}
658
659#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
660#[non_exhaustive]
661pub struct TimestampingConfig {
663 pub mode: Option<TimestampingMode>,
667 pub uncapped: Option<bool>,
671}
672
673impl TimestampingConfig {
674 pub fn new() -> Self {
676 Self::default()
677 }
678
679 pub fn with_mode(self, mode: TimestampingMode) -> Self {
681 Self {
682 mode: Some(mode),
683 ..self
684 }
685 }
686
687 pub fn with_uncapped(self, uncapped: bool) -> Self {
689 Self {
690 uncapped: Some(uncapped),
691 ..self
692 }
693 }
694}
695
696impl From<api::config::TimestampingConfig> for TimestampingConfig {
697 fn from(value: api::config::TimestampingConfig) -> Self {
698 Self {
699 mode: value.mode.map(Into::into),
700 uncapped: value.uncapped,
701 }
702 }
703}
704
705impl From<TimestampingConfig> for api::config::TimestampingConfig {
706 fn from(value: TimestampingConfig) -> Self {
707 Self {
708 mode: value.mode.map(Into::into),
709 uncapped: value.uncapped,
710 }
711 }
712}
713
714#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
715#[non_exhaustive]
716pub struct DeleteOnEmptyConfig {
718 pub min_age_secs: u64,
722}
723
724impl DeleteOnEmptyConfig {
725 pub fn new() -> Self {
727 Self::default()
728 }
729
730 pub fn with_min_age(self, min_age: Duration) -> Self {
732 Self {
733 min_age_secs: min_age.as_secs(),
734 }
735 }
736}
737
738impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
739 fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
740 Self {
741 min_age_secs: value.min_age_secs,
742 }
743 }
744}
745
746impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
747 fn from(value: DeleteOnEmptyConfig) -> Self {
748 Self {
749 min_age_secs: value.min_age_secs,
750 }
751 }
752}
753
754#[derive(Debug, Clone, Default, PartialEq, Eq)]
755#[non_exhaustive]
756pub struct StreamConfig {
758 pub storage_class: Option<StorageClass>,
762 pub retention_policy: Option<RetentionPolicy>,
766 pub timestamping: Option<TimestampingConfig>,
770 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
774}
775
776impl StreamConfig {
777 pub fn new() -> Self {
779 Self::default()
780 }
781
782 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
784 Self {
785 storage_class: Some(storage_class),
786 ..self
787 }
788 }
789
790 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
792 Self {
793 retention_policy: Some(retention_policy),
794 ..self
795 }
796 }
797
798 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
800 Self {
801 timestamping: Some(timestamping),
802 ..self
803 }
804 }
805
806 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
808 Self {
809 delete_on_empty: Some(delete_on_empty),
810 ..self
811 }
812 }
813}
814
815impl From<api::config::StreamConfig> for StreamConfig {
816 fn from(value: api::config::StreamConfig) -> Self {
817 Self {
818 storage_class: value.storage_class.map(Into::into),
819 retention_policy: value.retention_policy.map(Into::into),
820 timestamping: value.timestamping.map(Into::into),
821 delete_on_empty: value.delete_on_empty.map(Into::into),
822 }
823 }
824}
825
826impl From<StreamConfig> for api::config::StreamConfig {
827 fn from(value: StreamConfig) -> Self {
828 Self {
829 storage_class: value.storage_class.map(Into::into),
830 retention_policy: value.retention_policy.map(Into::into),
831 timestamping: value.timestamping.map(Into::into),
832 delete_on_empty: value.delete_on_empty.map(Into::into),
833 }
834 }
835}
836
837#[derive(Debug, Clone, Default, PartialEq, Eq)]
838#[non_exhaustive]
839pub struct BasinConfig {
841 pub default_stream_config: Option<StreamConfig>,
845 pub stream_cipher: Option<EncryptionAlgorithm>,
847 pub create_stream_on_append: bool,
851 pub create_stream_on_read: bool,
855}
856
857impl BasinConfig {
858 pub fn new() -> Self {
860 Self::default()
861 }
862
863 pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
865 Self {
866 default_stream_config: Some(config),
867 ..self
868 }
869 }
870
871 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
873 Self {
874 stream_cipher: Some(stream_cipher),
875 ..self
876 }
877 }
878
879 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
882 Self {
883 create_stream_on_append,
884 ..self
885 }
886 }
887
888 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
890 Self {
891 create_stream_on_read,
892 ..self
893 }
894 }
895}
896
897impl From<api::config::BasinConfig> for BasinConfig {
898 fn from(value: api::config::BasinConfig) -> Self {
899 Self {
900 default_stream_config: value.default_stream_config.map(Into::into),
901 stream_cipher: value.stream_cipher.map(Into::into),
902 create_stream_on_append: value.create_stream_on_append,
903 create_stream_on_read: value.create_stream_on_read,
904 }
905 }
906}
907
908impl From<BasinConfig> for api::config::BasinConfig {
909 fn from(value: BasinConfig) -> Self {
910 Self {
911 default_stream_config: value.default_stream_config.map(Into::into),
912 stream_cipher: value.stream_cipher.map(Into::into),
913 create_stream_on_append: value.create_stream_on_append,
914 create_stream_on_read: value.create_stream_on_read,
915 }
916 }
917}
918
919#[derive(Debug, Clone)]
920#[non_exhaustive]
921pub struct CreateBasinInput {
923 pub name: BasinName,
925 pub config: Option<BasinConfig>,
929 pub location: Option<LocationName>,
933 idempotency_token: String,
934}
935
936impl CreateBasinInput {
937 pub fn new(name: BasinName) -> Self {
939 Self {
940 name,
941 config: None,
942 location: None,
943 idempotency_token: idempotency_token(),
944 }
945 }
946
947 pub fn with_config(self, config: BasinConfig) -> Self {
949 Self {
950 config: Some(config),
951 ..self
952 }
953 }
954
955 pub fn with_location<S>(self, location: S) -> Result<Self, ValidationError>
957 where
958 S: TryInto<LocationName>,
959 S::Error: fmt::Display,
960 {
961 let location = location
962 .try_into()
963 .map_err(|e| ValidationError(e.to_string()))?;
964 Ok(Self {
965 location: Some(location),
966 ..self
967 })
968 }
969}
970
971impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
972 fn from(value: CreateBasinInput) -> Self {
973 (
974 api::basin::CreateBasinRequest {
975 basin: value.name,
976 config: value.config.map(Into::into),
977 location: value.location,
978 },
979 value.idempotency_token,
980 )
981 }
982}
983
984#[derive(Debug, Clone)]
985#[non_exhaustive]
986pub struct EnsureBasinInput {
988 pub name: BasinName,
990 pub config: Option<BasinConfig>,
994 pub location: Option<LocationName>,
999}
1000
1001impl EnsureBasinInput {
1002 pub fn new(name: BasinName) -> Self {
1004 Self {
1005 name,
1006 config: None,
1007 location: None,
1008 }
1009 }
1010
1011 pub fn with_config(self, config: BasinConfig) -> Self {
1013 Self {
1014 config: Some(config),
1015 ..self
1016 }
1017 }
1018
1019 pub fn with_location<S>(self, location: S) -> Result<Self, ValidationError>
1021 where
1022 S: TryInto<LocationName>,
1023 S::Error: fmt::Display,
1024 {
1025 let location = location
1026 .try_into()
1027 .map_err(|e| ValidationError(e.to_string()))?;
1028 Ok(Self {
1029 location: Some(location),
1030 ..self
1031 })
1032 }
1033}
1034
1035impl From<EnsureBasinInput> for (BasinName, Option<api::basin::EnsureBasinRequest>) {
1036 fn from(value: EnsureBasinInput) -> Self {
1037 let config = value.config;
1038 let request = if config.is_some() || value.location.is_some() {
1039 Some(api::basin::EnsureBasinRequest {
1040 config: config.map(Into::into),
1041 location: value.location,
1042 })
1043 } else {
1044 None
1045 };
1046 (value.name, request)
1047 }
1048}
1049
1050#[derive(Debug, Clone)]
1051pub enum EnsureOutput<T> {
1054 Created(T),
1056 ConfigUpdated(T),
1058 ConfigUnchanged(T),
1060}
1061
1062impl<T> From<ProvisionResult<T>> for EnsureOutput<T> {
1063 fn from(result: ProvisionResult<T>) -> Self {
1064 match result {
1065 ProvisionResult::Created(info) => EnsureOutput::Created(info),
1066 ProvisionResult::Updated(info) => EnsureOutput::ConfigUpdated(info),
1067 ProvisionResult::Noop(info) => EnsureOutput::ConfigUnchanged(info),
1068 }
1069 }
1070}
1071
1072#[derive(Debug, Clone, Default)]
1073#[non_exhaustive]
1074pub struct ListBasinsInput {
1076 pub prefix: BasinNamePrefix,
1080 pub start_after: BasinNameStartAfter,
1084 pub limit: Option<usize>,
1088}
1089
1090impl ListBasinsInput {
1091 pub fn new() -> Self {
1093 Self::default()
1094 }
1095
1096 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1098 Self { prefix, ..self }
1099 }
1100
1101 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1104 Self {
1105 start_after,
1106 ..self
1107 }
1108 }
1109
1110 pub fn with_limit(self, limit: usize) -> Self {
1112 Self {
1113 limit: Some(limit),
1114 ..self
1115 }
1116 }
1117}
1118
1119impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1120 fn from(value: ListBasinsInput) -> Self {
1121 Self {
1122 prefix: Some(value.prefix),
1123 start_after: Some(value.start_after),
1124 limit: value.limit,
1125 }
1126 }
1127}
1128
1129#[derive(Debug, Clone, Default)]
1130pub struct ListAllBasinsInput {
1132 pub prefix: BasinNamePrefix,
1136 pub start_after: BasinNameStartAfter,
1140 pub include_deleted: bool,
1144}
1145
1146impl ListAllBasinsInput {
1147 pub fn new() -> Self {
1149 Self::default()
1150 }
1151
1152 pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1154 Self { prefix, ..self }
1155 }
1156
1157 pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1160 Self {
1161 start_after,
1162 ..self
1163 }
1164 }
1165
1166 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1168 Self {
1169 include_deleted,
1170 ..self
1171 }
1172 }
1173}
1174
1175#[derive(Debug, Clone, PartialEq, Eq)]
1176#[non_exhaustive]
1177pub struct BasinInfo {
1179 pub name: BasinName,
1181 pub location: Option<LocationName>,
1183 pub created_at: S2DateTime,
1185 pub deleted_at: Option<S2DateTime>,
1187}
1188
1189impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1190 type Error = ValidationError;
1191
1192 fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1193 Ok(Self {
1194 name: value.name,
1195 location: value.location,
1196 created_at: value.created_at.try_into()?,
1197 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1198 })
1199 }
1200}
1201
1202#[derive(Debug, Clone)]
1203#[non_exhaustive]
1204pub struct DeleteBasinInput {
1206 pub name: BasinName,
1208 pub ignore_not_found: bool,
1210}
1211
1212impl DeleteBasinInput {
1213 pub fn new(name: BasinName) -> Self {
1215 Self {
1216 name,
1217 ignore_not_found: false,
1218 }
1219 }
1220
1221 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1223 Self {
1224 ignore_not_found,
1225 ..self
1226 }
1227 }
1228}
1229
1230#[derive(Debug, Clone, Default)]
1231#[non_exhaustive]
1232pub struct TimestampingReconfiguration {
1234 pub mode: Maybe<Option<TimestampingMode>>,
1236 pub uncapped: Maybe<Option<bool>>,
1238}
1239
1240impl TimestampingReconfiguration {
1241 pub fn new() -> Self {
1243 Self::default()
1244 }
1245
1246 pub fn with_mode(self, mode: TimestampingMode) -> Self {
1248 Self {
1249 mode: Maybe::Specified(Some(mode)),
1250 ..self
1251 }
1252 }
1253
1254 pub fn with_uncapped(self, uncapped: bool) -> Self {
1256 Self {
1257 uncapped: Maybe::Specified(Some(uncapped)),
1258 ..self
1259 }
1260 }
1261}
1262
1263impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1264 fn from(value: TimestampingReconfiguration) -> Self {
1265 Self {
1266 mode: value.mode.map(|m| m.map(Into::into)),
1267 uncapped: value.uncapped,
1268 }
1269 }
1270}
1271
1272#[derive(Debug, Clone, Default)]
1273#[non_exhaustive]
1274pub struct DeleteOnEmptyReconfiguration {
1276 pub min_age_secs: Maybe<Option<u64>>,
1278}
1279
1280impl DeleteOnEmptyReconfiguration {
1281 pub fn new() -> Self {
1283 Self::default()
1284 }
1285
1286 pub fn with_min_age(self, min_age: Duration) -> Self {
1288 Self {
1289 min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1290 }
1291 }
1292}
1293
1294impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1295 fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1296 Self {
1297 min_age_secs: value.min_age_secs,
1298 }
1299 }
1300}
1301
1302#[derive(Debug, Clone, Default)]
1303#[non_exhaustive]
1304pub struct StreamReconfiguration {
1306 pub storage_class: Maybe<Option<StorageClass>>,
1308 pub retention_policy: Maybe<Option<RetentionPolicy>>,
1310 pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1312 pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1314}
1315
1316impl StreamReconfiguration {
1317 pub fn new() -> Self {
1319 Self::default()
1320 }
1321
1322 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1324 Self {
1325 storage_class: Maybe::Specified(Some(storage_class)),
1326 ..self
1327 }
1328 }
1329
1330 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1332 Self {
1333 retention_policy: Maybe::Specified(Some(retention_policy)),
1334 ..self
1335 }
1336 }
1337
1338 pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1340 Self {
1341 timestamping: Maybe::Specified(Some(timestamping)),
1342 ..self
1343 }
1344 }
1345
1346 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1348 Self {
1349 delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1350 ..self
1351 }
1352 }
1353}
1354
1355impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1356 fn from(value: StreamReconfiguration) -> Self {
1357 Self {
1358 storage_class: value.storage_class.map(|m| m.map(Into::into)),
1359 retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1360 timestamping: value.timestamping.map(|m| m.map(Into::into)),
1361 delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1362 }
1363 }
1364}
1365
1366#[derive(Debug, Clone, Default)]
1367#[non_exhaustive]
1368pub struct BasinReconfiguration {
1370 pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1372 pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1374 pub create_stream_on_append: Maybe<bool>,
1377 pub create_stream_on_read: Maybe<bool>,
1379}
1380
1381impl BasinReconfiguration {
1382 pub fn new() -> Self {
1384 Self::default()
1385 }
1386
1387 pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1390 Self {
1391 default_stream_config: Maybe::Specified(Some(config)),
1392 ..self
1393 }
1394 }
1395
1396 pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1398 Self {
1399 stream_cipher: Maybe::Specified(Some(stream_cipher)),
1400 ..self
1401 }
1402 }
1403
1404 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1407 Self {
1408 create_stream_on_append: Maybe::Specified(create_stream_on_append),
1409 ..self
1410 }
1411 }
1412
1413 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1416 Self {
1417 create_stream_on_read: Maybe::Specified(create_stream_on_read),
1418 ..self
1419 }
1420 }
1421}
1422
1423impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1424 fn from(value: BasinReconfiguration) -> Self {
1425 Self {
1426 default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1427 stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1428 create_stream_on_append: value.create_stream_on_append,
1429 create_stream_on_read: value.create_stream_on_read,
1430 }
1431 }
1432}
1433
1434#[derive(Debug, Clone)]
1435#[non_exhaustive]
1436pub struct ReconfigureBasinInput {
1438 pub name: BasinName,
1440 pub config: BasinReconfiguration,
1442}
1443
1444impl ReconfigureBasinInput {
1445 pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1447 Self { name, config }
1448 }
1449}
1450
1451#[derive(Debug, Clone, Default)]
1452#[non_exhaustive]
1453pub struct ListAccessTokensInput {
1455 pub prefix: AccessTokenIdPrefix,
1459 pub start_after: AccessTokenIdStartAfter,
1463 pub limit: Option<usize>,
1467}
1468
1469impl ListAccessTokensInput {
1470 pub fn new() -> Self {
1472 Self::default()
1473 }
1474
1475 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1477 Self { prefix, ..self }
1478 }
1479
1480 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1483 Self {
1484 start_after,
1485 ..self
1486 }
1487 }
1488
1489 pub fn with_limit(self, limit: usize) -> Self {
1491 Self {
1492 limit: Some(limit),
1493 ..self
1494 }
1495 }
1496}
1497
1498impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1499 fn from(value: ListAccessTokensInput) -> Self {
1500 Self {
1501 prefix: Some(value.prefix),
1502 start_after: Some(value.start_after),
1503 limit: value.limit,
1504 }
1505 }
1506}
1507
1508#[derive(Debug, Clone, Default)]
1509pub struct ListAllAccessTokensInput {
1511 pub prefix: AccessTokenIdPrefix,
1515 pub start_after: AccessTokenIdStartAfter,
1519}
1520
1521impl ListAllAccessTokensInput {
1522 pub fn new() -> Self {
1524 Self::default()
1525 }
1526
1527 pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1529 Self { prefix, ..self }
1530 }
1531
1532 pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1535 Self {
1536 start_after,
1537 ..self
1538 }
1539 }
1540}
1541
1542#[derive(Debug, Clone, PartialEq, Eq)]
1543#[non_exhaustive]
1544pub struct LocationInfo {
1546 pub name: LocationName,
1548 pub is_private: bool,
1550}
1551
1552impl From<api::location::LocationInfo> for LocationInfo {
1553 fn from(value: api::location::LocationInfo) -> Self {
1554 Self {
1555 name: value.name,
1556 is_private: value.is_private,
1557 }
1558 }
1559}
1560
1561#[derive(Debug, Clone)]
1562#[non_exhaustive]
1563pub struct AccessTokenInfo {
1565 pub id: AccessTokenId,
1567 pub expires_at: S2DateTime,
1569 pub auto_prefix_streams: bool,
1572 pub scope: AccessTokenScope,
1574}
1575
1576impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1577 type Error = ValidationError;
1578
1579 fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1580 let expires_at = value
1581 .expires_at
1582 .map(S2DateTime::try_from)
1583 .transpose()?
1584 .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1585 Ok(Self {
1586 id: value.id,
1587 expires_at,
1588 auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1589 scope: value.scope.into(),
1590 })
1591 }
1592}
1593
1594#[derive(Debug, Clone)]
1595pub enum BasinMatcher {
1599 None,
1601 Exact(BasinName),
1603 Prefix(BasinNamePrefix),
1605}
1606
1607#[derive(Debug, Clone)]
1608pub enum StreamMatcher {
1612 None,
1614 Exact(StreamName),
1616 Prefix(StreamNamePrefix),
1618}
1619
1620#[derive(Debug, Clone)]
1621pub enum AccessTokenMatcher {
1625 None,
1627 Exact(AccessTokenId),
1629 Prefix(AccessTokenIdPrefix),
1631}
1632
1633#[derive(Debug, Clone, Default)]
1634#[non_exhaustive]
1635pub struct ReadWritePermissions {
1637 pub read: bool,
1641 pub write: bool,
1645}
1646
1647impl ReadWritePermissions {
1648 pub fn new() -> Self {
1650 Self::default()
1651 }
1652
1653 pub fn read_only() -> Self {
1655 Self {
1656 read: true,
1657 write: false,
1658 }
1659 }
1660
1661 pub fn write_only() -> Self {
1663 Self {
1664 read: false,
1665 write: true,
1666 }
1667 }
1668
1669 pub fn read_write() -> Self {
1671 Self {
1672 read: true,
1673 write: true,
1674 }
1675 }
1676}
1677
1678impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1679 fn from(value: ReadWritePermissions) -> Self {
1680 Self {
1681 read: Some(value.read),
1682 write: Some(value.write),
1683 }
1684 }
1685}
1686
1687impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1688 fn from(value: api::access::ReadWritePermissions) -> Self {
1689 Self {
1690 read: value.read.unwrap_or_default(),
1691 write: value.write.unwrap_or_default(),
1692 }
1693 }
1694}
1695
1696#[derive(Debug, Clone, Default)]
1697#[non_exhaustive]
1698pub struct OperationGroupPermissions {
1702 pub account: Option<ReadWritePermissions>,
1706 pub basin: Option<ReadWritePermissions>,
1710 pub stream: Option<ReadWritePermissions>,
1714}
1715
1716impl OperationGroupPermissions {
1717 pub fn new() -> Self {
1719 Self::default()
1720 }
1721
1722 pub fn read_only_all() -> Self {
1724 Self {
1725 account: Some(ReadWritePermissions::read_only()),
1726 basin: Some(ReadWritePermissions::read_only()),
1727 stream: Some(ReadWritePermissions::read_only()),
1728 }
1729 }
1730
1731 pub fn write_only_all() -> Self {
1733 Self {
1734 account: Some(ReadWritePermissions::write_only()),
1735 basin: Some(ReadWritePermissions::write_only()),
1736 stream: Some(ReadWritePermissions::write_only()),
1737 }
1738 }
1739
1740 pub fn read_write_all() -> Self {
1742 Self {
1743 account: Some(ReadWritePermissions::read_write()),
1744 basin: Some(ReadWritePermissions::read_write()),
1745 stream: Some(ReadWritePermissions::read_write()),
1746 }
1747 }
1748
1749 pub fn with_account(self, account: ReadWritePermissions) -> Self {
1751 Self {
1752 account: Some(account),
1753 ..self
1754 }
1755 }
1756
1757 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1759 Self {
1760 basin: Some(basin),
1761 ..self
1762 }
1763 }
1764
1765 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1767 Self {
1768 stream: Some(stream),
1769 ..self
1770 }
1771 }
1772}
1773
1774impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1775 fn from(value: OperationGroupPermissions) -> Self {
1776 Self {
1777 account: value.account.map(Into::into),
1778 basin: value.basin.map(Into::into),
1779 stream: value.stream.map(Into::into),
1780 }
1781 }
1782}
1783
1784impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1785 fn from(value: api::access::PermittedOperationGroups) -> Self {
1786 Self {
1787 account: value.account.map(Into::into),
1788 basin: value.basin.map(Into::into),
1789 stream: value.stream.map(Into::into),
1790 }
1791 }
1792}
1793
1794#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1795pub enum Operation {
1799 ListBasins,
1801 CreateBasin,
1803 GetBasinConfig,
1805 DeleteBasin,
1807 ReconfigureBasin,
1809 ListAccessTokens,
1811 IssueAccessToken,
1813 RevokeAccessToken,
1815 GetAccountMetrics,
1817 GetBasinMetrics,
1819 GetStreamMetrics,
1821 ListStreams,
1823 CreateStream,
1825 GetStreamConfig,
1827 DeleteStream,
1829 ReconfigureStream,
1831 CheckTail,
1833 Append,
1835 Read,
1837 Trim,
1839 Fence,
1841 ListLocations,
1843 GetDefaultLocation,
1845 SetDefaultLocation,
1847}
1848
1849impl From<Operation> for api::access::Operation {
1850 fn from(value: Operation) -> Self {
1851 match value {
1852 Operation::ListBasins => api::access::Operation::ListBasins,
1853 Operation::CreateBasin => api::access::Operation::CreateBasin,
1854 Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1855 Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1856 Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1857 Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1858 Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1859 Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1860 Operation::ListStreams => api::access::Operation::ListStreams,
1861 Operation::CreateStream => api::access::Operation::CreateStream,
1862 Operation::DeleteStream => api::access::Operation::DeleteStream,
1863 Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1864 Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1865 Operation::CheckTail => api::access::Operation::CheckTail,
1866 Operation::Append => api::access::Operation::Append,
1867 Operation::Read => api::access::Operation::Read,
1868 Operation::Trim => api::access::Operation::Trim,
1869 Operation::Fence => api::access::Operation::Fence,
1870 Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1871 Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1872 Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1873 Operation::ListLocations => api::access::Operation::ListLocations,
1874 Operation::GetDefaultLocation => api::access::Operation::GetDefaultLocation,
1875 Operation::SetDefaultLocation => api::access::Operation::SetDefaultLocation,
1876 }
1877 }
1878}
1879
1880impl From<api::access::Operation> for Operation {
1881 fn from(value: api::access::Operation) -> Self {
1882 match value {
1883 api::access::Operation::ListBasins => Operation::ListBasins,
1884 api::access::Operation::CreateBasin => Operation::CreateBasin,
1885 api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1886 api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1887 api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1888 api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1889 api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1890 api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1891 api::access::Operation::ListStreams => Operation::ListStreams,
1892 api::access::Operation::CreateStream => Operation::CreateStream,
1893 api::access::Operation::DeleteStream => Operation::DeleteStream,
1894 api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1895 api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1896 api::access::Operation::CheckTail => Operation::CheckTail,
1897 api::access::Operation::Append => Operation::Append,
1898 api::access::Operation::Read => Operation::Read,
1899 api::access::Operation::Trim => Operation::Trim,
1900 api::access::Operation::Fence => Operation::Fence,
1901 api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1902 api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1903 api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1904 api::access::Operation::ListLocations => Operation::ListLocations,
1905 api::access::Operation::GetDefaultLocation => Operation::GetDefaultLocation,
1906 api::access::Operation::SetDefaultLocation => Operation::SetDefaultLocation,
1907 }
1908 }
1909}
1910
1911#[derive(Debug, Clone)]
1912#[non_exhaustive]
1913pub struct AccessTokenScopeInput {
1921 basins: Option<BasinMatcher>,
1922 streams: Option<StreamMatcher>,
1923 access_tokens: Option<AccessTokenMatcher>,
1924 op_group_perms: Option<OperationGroupPermissions>,
1925 ops: HashSet<Operation>,
1926}
1927
1928impl AccessTokenScopeInput {
1929 pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1931 Self {
1932 basins: None,
1933 streams: None,
1934 access_tokens: None,
1935 op_group_perms: None,
1936 ops: ops.into_iter().collect(),
1937 }
1938 }
1939
1940 pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1942 Self {
1943 basins: None,
1944 streams: None,
1945 access_tokens: None,
1946 op_group_perms: Some(op_group_perms),
1947 ops: HashSet::default(),
1948 }
1949 }
1950
1951 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1953 Self {
1954 ops: ops.into_iter().collect(),
1955 ..self
1956 }
1957 }
1958
1959 pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1961 Self {
1962 op_group_perms: Some(op_group_perms),
1963 ..self
1964 }
1965 }
1966
1967 pub fn with_basins(self, basins: BasinMatcher) -> Self {
1971 Self {
1972 basins: Some(basins),
1973 ..self
1974 }
1975 }
1976
1977 pub fn with_streams(self, streams: StreamMatcher) -> Self {
1981 Self {
1982 streams: Some(streams),
1983 ..self
1984 }
1985 }
1986
1987 pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1991 Self {
1992 access_tokens: Some(access_tokens),
1993 ..self
1994 }
1995 }
1996}
1997
1998#[derive(Debug, Clone)]
1999#[non_exhaustive]
2000pub struct AccessTokenScope {
2002 pub basins: Option<BasinMatcher>,
2004 pub streams: Option<StreamMatcher>,
2006 pub access_tokens: Option<AccessTokenMatcher>,
2008 pub op_group_perms: Option<OperationGroupPermissions>,
2010 pub ops: HashSet<Operation>,
2012}
2013
2014impl From<api::access::AccessTokenScope> for AccessTokenScope {
2015 fn from(value: api::access::AccessTokenScope) -> Self {
2016 Self {
2017 basins: value.basins.map(|rs| match rs {
2018 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2019 BasinMatcher::Exact(e)
2020 }
2021 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2022 BasinMatcher::None
2023 }
2024 api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
2025 }),
2026 streams: value.streams.map(|rs| match rs {
2027 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2028 StreamMatcher::Exact(e)
2029 }
2030 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2031 StreamMatcher::None
2032 }
2033 api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
2034 }),
2035 access_tokens: value.access_tokens.map(|rs| match rs {
2036 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2037 AccessTokenMatcher::Exact(e)
2038 }
2039 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2040 AccessTokenMatcher::None
2041 }
2042 api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
2043 }),
2044 op_group_perms: value.op_groups.map(Into::into),
2045 ops: value
2046 .ops
2047 .map(|ops| ops.into_iter().map(Into::into).collect())
2048 .unwrap_or_default(),
2049 }
2050 }
2051}
2052
2053impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2054 fn from(value: AccessTokenScopeInput) -> Self {
2055 Self {
2056 basins: value.basins.map(|rs| match rs {
2057 BasinMatcher::None => {
2058 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2059 }
2060 BasinMatcher::Exact(e) => {
2061 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2062 }
2063 BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2064 }),
2065 streams: value.streams.map(|rs| match rs {
2066 StreamMatcher::None => {
2067 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2068 }
2069 StreamMatcher::Exact(e) => {
2070 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2071 }
2072 StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2073 }),
2074 access_tokens: value.access_tokens.map(|rs| match rs {
2075 AccessTokenMatcher::None => {
2076 api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2077 }
2078 AccessTokenMatcher::Exact(e) => {
2079 api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2080 }
2081 AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2082 }),
2083 op_groups: value.op_group_perms.map(Into::into),
2084 ops: if value.ops.is_empty() {
2085 None
2086 } else {
2087 Some(value.ops.into_iter().map(Into::into).collect())
2088 },
2089 }
2090 }
2091}
2092
2093#[derive(Debug, Clone)]
2094#[non_exhaustive]
2095pub struct IssueAccessTokenInput {
2097 pub id: AccessTokenId,
2099 pub expires_at: Option<S2DateTime>,
2104 pub auto_prefix_streams: bool,
2112 pub scope: AccessTokenScopeInput,
2114}
2115
2116impl IssueAccessTokenInput {
2117 pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2119 Self {
2120 id,
2121 expires_at: None,
2122 auto_prefix_streams: false,
2123 scope,
2124 }
2125 }
2126
2127 pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2129 Self {
2130 expires_at: Some(expires_at),
2131 ..self
2132 }
2133 }
2134
2135 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2138 Self {
2139 auto_prefix_streams,
2140 ..self
2141 }
2142 }
2143}
2144
2145impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2146 fn from(value: IssueAccessTokenInput) -> Self {
2147 Self {
2148 id: value.id,
2149 expires_at: value.expires_at.map(Into::into),
2150 auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2151 scope: value.scope.into(),
2152 }
2153 }
2154}
2155
2156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2157pub enum TimeseriesInterval {
2159 Minute,
2161 Hour,
2163 Day,
2165}
2166
2167impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2168 fn from(value: TimeseriesInterval) -> Self {
2169 match value {
2170 TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2171 TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2172 TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2173 }
2174 }
2175}
2176
2177impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2178 fn from(value: api::metrics::TimeseriesInterval) -> Self {
2179 match value {
2180 api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2181 api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2182 api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2183 }
2184 }
2185}
2186
2187#[derive(Debug, Clone, Copy)]
2188#[non_exhaustive]
2189pub struct TimeRange {
2191 pub start: u32,
2193 pub end: u32,
2195}
2196
2197impl TimeRange {
2198 pub fn new(start: u32, end: u32) -> Self {
2200 Self { start, end }
2201 }
2202}
2203
2204#[derive(Debug, Clone, Copy)]
2205#[non_exhaustive]
2206pub struct TimeRangeAndInterval {
2208 pub start: u32,
2210 pub end: u32,
2212 pub interval: Option<TimeseriesInterval>,
2216}
2217
2218impl TimeRangeAndInterval {
2219 pub fn new(start: u32, end: u32) -> Self {
2221 Self {
2222 start,
2223 end,
2224 interval: None,
2225 }
2226 }
2227
2228 pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2230 Self {
2231 interval: Some(interval),
2232 ..self
2233 }
2234 }
2235}
2236
2237#[derive(Debug, Clone, Copy)]
2238pub enum AccountMetricSet {
2240 ActiveBasins(TimeRange),
2243 AccountOps(TimeRangeAndInterval),
2250}
2251
2252#[derive(Debug, Clone)]
2253#[non_exhaustive]
2254pub struct GetAccountMetricsInput {
2256 pub set: AccountMetricSet,
2258}
2259
2260impl GetAccountMetricsInput {
2261 pub fn new(set: AccountMetricSet) -> Self {
2263 Self { set }
2264 }
2265}
2266
2267impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2268 fn from(value: GetAccountMetricsInput) -> Self {
2269 let (set, start, end, interval) = match value.set {
2270 AccountMetricSet::ActiveBasins(args) => (
2271 api::metrics::AccountMetricSet::ActiveBasins,
2272 args.start,
2273 args.end,
2274 None,
2275 ),
2276 AccountMetricSet::AccountOps(args) => (
2277 api::metrics::AccountMetricSet::AccountOps,
2278 args.start,
2279 args.end,
2280 args.interval,
2281 ),
2282 };
2283 Self {
2284 set,
2285 start: Some(start),
2286 end: Some(end),
2287 interval: interval.map(Into::into),
2288 }
2289 }
2290}
2291
2292#[derive(Debug, Clone, Copy)]
2293pub enum BasinMetricSet {
2295 Storage(TimeRange),
2298 AppendOps(TimeRangeAndInterval),
2306 ReadOps(TimeRangeAndInterval),
2314 ReadThroughput(TimeRangeAndInterval),
2321 AppendThroughput(TimeRangeAndInterval),
2328 BasinOps(TimeRangeAndInterval),
2335}
2336
2337#[derive(Debug, Clone)]
2338#[non_exhaustive]
2339pub struct GetBasinMetricsInput {
2341 pub name: BasinName,
2343 pub set: BasinMetricSet,
2345}
2346
2347impl GetBasinMetricsInput {
2348 pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2350 Self { name, set }
2351 }
2352}
2353
2354impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2355 fn from(value: GetBasinMetricsInput) -> Self {
2356 let (set, start, end, interval) = match value.set {
2357 BasinMetricSet::Storage(args) => (
2358 api::metrics::BasinMetricSet::Storage,
2359 args.start,
2360 args.end,
2361 None,
2362 ),
2363 BasinMetricSet::AppendOps(args) => (
2364 api::metrics::BasinMetricSet::AppendOps,
2365 args.start,
2366 args.end,
2367 args.interval,
2368 ),
2369 BasinMetricSet::ReadOps(args) => (
2370 api::metrics::BasinMetricSet::ReadOps,
2371 args.start,
2372 args.end,
2373 args.interval,
2374 ),
2375 BasinMetricSet::ReadThroughput(args) => (
2376 api::metrics::BasinMetricSet::ReadThroughput,
2377 args.start,
2378 args.end,
2379 args.interval,
2380 ),
2381 BasinMetricSet::AppendThroughput(args) => (
2382 api::metrics::BasinMetricSet::AppendThroughput,
2383 args.start,
2384 args.end,
2385 args.interval,
2386 ),
2387 BasinMetricSet::BasinOps(args) => (
2388 api::metrics::BasinMetricSet::BasinOps,
2389 args.start,
2390 args.end,
2391 args.interval,
2392 ),
2393 };
2394 (
2395 value.name,
2396 api::metrics::BasinMetricSetRequest {
2397 set,
2398 start: Some(start),
2399 end: Some(end),
2400 interval: interval.map(Into::into),
2401 },
2402 )
2403 }
2404}
2405
2406#[derive(Debug, Clone, Copy)]
2407pub enum StreamMetricSet {
2409 Storage(TimeRange),
2412}
2413
2414#[derive(Debug, Clone)]
2415#[non_exhaustive]
2416pub struct GetStreamMetricsInput {
2418 pub basin_name: BasinName,
2420 pub stream_name: StreamName,
2422 pub set: StreamMetricSet,
2424}
2425
2426impl GetStreamMetricsInput {
2427 pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2430 Self {
2431 basin_name,
2432 stream_name,
2433 set,
2434 }
2435 }
2436}
2437
2438impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2439 fn from(value: GetStreamMetricsInput) -> Self {
2440 let (set, start, end, interval) = match value.set {
2441 StreamMetricSet::Storage(args) => (
2442 api::metrics::StreamMetricSet::Storage,
2443 args.start,
2444 args.end,
2445 None,
2446 ),
2447 };
2448 (
2449 value.basin_name,
2450 value.stream_name,
2451 api::metrics::StreamMetricSetRequest {
2452 set,
2453 start: Some(start),
2454 end: Some(end),
2455 interval,
2456 },
2457 )
2458 }
2459}
2460
2461#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2462pub enum MetricUnit {
2464 Bytes,
2466 Operations,
2468}
2469
2470impl From<api::metrics::MetricUnit> for MetricUnit {
2471 fn from(value: api::metrics::MetricUnit) -> Self {
2472 match value {
2473 api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2474 api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2475 }
2476 }
2477}
2478
2479#[derive(Debug, Clone)]
2480#[non_exhaustive]
2481pub struct ScalarMetric {
2483 pub name: String,
2485 pub unit: MetricUnit,
2487 pub value: f64,
2489}
2490
2491#[derive(Debug, Clone)]
2492#[non_exhaustive]
2493pub struct AccumulationMetric {
2496 pub name: String,
2498 pub unit: MetricUnit,
2500 pub interval: TimeseriesInterval,
2502 pub values: Vec<(u32, f64)>,
2506}
2507
2508#[derive(Debug, Clone)]
2509#[non_exhaustive]
2510pub struct GaugeMetric {
2512 pub name: String,
2514 pub unit: MetricUnit,
2516 pub values: Vec<(u32, f64)>,
2519}
2520
2521#[derive(Debug, Clone)]
2522#[non_exhaustive]
2523pub struct LabelMetric {
2525 pub name: String,
2527 pub values: Vec<String>,
2529}
2530
2531#[derive(Debug, Clone)]
2532pub enum Metric {
2534 Scalar(ScalarMetric),
2536 Accumulation(AccumulationMetric),
2539 Gauge(GaugeMetric),
2541 Label(LabelMetric),
2543}
2544
2545impl From<api::metrics::Metric> for Metric {
2546 fn from(value: api::metrics::Metric) -> Self {
2547 match value {
2548 api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2549 name: sm.name.into(),
2550 unit: sm.unit.into(),
2551 value: sm.value,
2552 }),
2553 api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2554 name: am.name.into(),
2555 unit: am.unit.into(),
2556 interval: am.interval.into(),
2557 values: am.values,
2558 }),
2559 api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2560 name: gm.name.into(),
2561 unit: gm.unit.into(),
2562 values: gm.values,
2563 }),
2564 api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2565 name: lm.name.into(),
2566 values: lm.values,
2567 }),
2568 }
2569 }
2570}
2571
2572#[derive(Debug, Clone, Default)]
2573#[non_exhaustive]
2574pub struct ListStreamsInput {
2576 pub prefix: StreamNamePrefix,
2580 pub start_after: StreamNameStartAfter,
2584 pub limit: Option<usize>,
2588}
2589
2590impl ListStreamsInput {
2591 pub fn new() -> Self {
2593 Self::default()
2594 }
2595
2596 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2598 Self { prefix, ..self }
2599 }
2600
2601 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2604 Self {
2605 start_after,
2606 ..self
2607 }
2608 }
2609
2610 pub fn with_limit(self, limit: usize) -> Self {
2612 Self {
2613 limit: Some(limit),
2614 ..self
2615 }
2616 }
2617}
2618
2619impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2620 fn from(value: ListStreamsInput) -> Self {
2621 Self {
2622 prefix: Some(value.prefix),
2623 start_after: Some(value.start_after),
2624 limit: value.limit,
2625 }
2626 }
2627}
2628
2629#[derive(Debug, Clone, Default)]
2630pub struct ListAllStreamsInput {
2632 pub prefix: StreamNamePrefix,
2636 pub start_after: StreamNameStartAfter,
2640 pub include_deleted: bool,
2644}
2645
2646impl ListAllStreamsInput {
2647 pub fn new() -> Self {
2649 Self::default()
2650 }
2651
2652 pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2654 Self { prefix, ..self }
2655 }
2656
2657 pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2660 Self {
2661 start_after,
2662 ..self
2663 }
2664 }
2665
2666 pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2668 Self {
2669 include_deleted,
2670 ..self
2671 }
2672 }
2673}
2674
2675#[derive(Debug, Clone, PartialEq, Eq)]
2676#[non_exhaustive]
2677pub struct StreamInfo {
2679 pub name: StreamName,
2681 pub created_at: S2DateTime,
2683 pub deleted_at: Option<S2DateTime>,
2685 pub cipher: Option<EncryptionAlgorithm>,
2687}
2688
2689impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2690 type Error = ValidationError;
2691
2692 fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2693 Ok(Self {
2694 name: value.name,
2695 created_at: value.created_at.try_into()?,
2696 deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2697 cipher: value.cipher.map(Into::into),
2698 })
2699 }
2700}
2701
2702#[derive(Debug, Clone)]
2703#[non_exhaustive]
2704pub struct CreateStreamInput {
2706 pub name: StreamName,
2708 pub config: Option<StreamConfig>,
2712 idempotency_token: String,
2713}
2714
2715impl CreateStreamInput {
2716 pub fn new(name: StreamName) -> Self {
2718 Self {
2719 name,
2720 config: None,
2721 idempotency_token: idempotency_token(),
2722 }
2723 }
2724
2725 pub fn with_config(self, config: StreamConfig) -> Self {
2727 Self {
2728 config: Some(config),
2729 ..self
2730 }
2731 }
2732}
2733
2734impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2735 fn from(value: CreateStreamInput) -> Self {
2736 (
2737 api::stream::CreateStreamRequest {
2738 stream: value.name,
2739 config: value.config.map(Into::into),
2740 },
2741 value.idempotency_token,
2742 )
2743 }
2744}
2745
2746#[derive(Debug, Clone)]
2747#[non_exhaustive]
2748pub struct EnsureStreamInput {
2751 pub name: StreamName,
2753 pub config: Option<StreamConfig>,
2757}
2758
2759impl EnsureStreamInput {
2760 pub fn new(name: StreamName) -> Self {
2762 Self { name, config: None }
2763 }
2764
2765 pub fn with_config(self, config: StreamConfig) -> Self {
2767 Self {
2768 config: Some(config),
2769 ..self
2770 }
2771 }
2772}
2773
2774impl From<EnsureStreamInput> for (StreamName, Option<api::config::StreamConfig>) {
2775 fn from(value: EnsureStreamInput) -> Self {
2776 (value.name, value.config.map(Into::into))
2777 }
2778}
2779
2780#[derive(Debug, Clone)]
2781#[non_exhaustive]
2782pub struct DeleteStreamInput {
2784 pub name: StreamName,
2786 pub ignore_not_found: bool,
2788}
2789
2790impl DeleteStreamInput {
2791 pub fn new(name: StreamName) -> Self {
2793 Self {
2794 name,
2795 ignore_not_found: false,
2796 }
2797 }
2798
2799 pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2801 Self {
2802 ignore_not_found,
2803 ..self
2804 }
2805 }
2806}
2807
2808#[derive(Debug, Clone)]
2809#[non_exhaustive]
2810pub struct ReconfigureStreamInput {
2812 pub name: StreamName,
2814 pub config: StreamReconfiguration,
2816}
2817
2818impl ReconfigureStreamInput {
2819 pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2821 Self { name, config }
2822 }
2823}
2824
2825#[derive(Debug, Clone, PartialEq, Eq)]
2826pub struct FencingToken(String);
2832
2833impl FencingToken {
2834 pub fn generate(n: usize) -> Result<Self, ValidationError> {
2836 rand::rng()
2837 .sample_iter(&rand::distr::Alphanumeric)
2838 .take(n)
2839 .map(char::from)
2840 .collect::<String>()
2841 .parse()
2842 }
2843}
2844
2845impl FromStr for FencingToken {
2846 type Err = ValidationError;
2847
2848 fn from_str(s: &str) -> Result<Self, Self::Err> {
2849 if s.len() > MAX_FENCING_TOKEN_LENGTH {
2850 return Err(ValidationError(format!(
2851 "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2852 )));
2853 }
2854 Ok(FencingToken(s.to_string()))
2855 }
2856}
2857
2858impl std::fmt::Display for FencingToken {
2859 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2860 write!(f, "{}", self.0)
2861 }
2862}
2863
2864impl Deref for FencingToken {
2865 type Target = str;
2866
2867 fn deref(&self) -> &Self::Target {
2868 &self.0
2869 }
2870}
2871
2872#[derive(Debug, Clone, Copy, PartialEq)]
2873#[non_exhaustive]
2874pub struct StreamPosition {
2876 pub seq_num: u64,
2878 pub timestamp: u64,
2881}
2882
2883impl std::fmt::Display for StreamPosition {
2884 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2885 write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2886 }
2887}
2888
2889impl From<api::stream::proto::StreamPosition> for StreamPosition {
2890 fn from(value: api::stream::proto::StreamPosition) -> Self {
2891 Self {
2892 seq_num: value.seq_num,
2893 timestamp: value.timestamp,
2894 }
2895 }
2896}
2897
2898impl From<api::stream::StreamPosition> for StreamPosition {
2899 fn from(value: api::stream::StreamPosition) -> Self {
2900 Self {
2901 seq_num: value.seq_num,
2902 timestamp: value.timestamp,
2903 }
2904 }
2905}
2906
2907#[derive(Debug, Clone, PartialEq)]
2908#[non_exhaustive]
2909pub struct Header {
2911 pub name: Bytes,
2913 pub value: Bytes,
2915}
2916
2917impl Header {
2918 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2920 Self {
2921 name: name.into(),
2922 value: value.into(),
2923 }
2924 }
2925}
2926
2927impl From<Header> for api::stream::proto::Header {
2928 fn from(value: Header) -> Self {
2929 Self {
2930 name: value.name,
2931 value: value.value,
2932 }
2933 }
2934}
2935
2936impl From<api::stream::proto::Header> for Header {
2937 fn from(value: api::stream::proto::Header) -> Self {
2938 Self {
2939 name: value.name,
2940 value: value.value,
2941 }
2942 }
2943}
2944
2945#[derive(Debug, Clone, PartialEq)]
2946pub struct AppendRecord {
2948 body: Bytes,
2949 headers: Vec<Header>,
2950 timestamp: Option<u64>,
2951}
2952
2953impl AppendRecord {
2954 fn validate(self) -> Result<Self, ValidationError> {
2955 if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2956 Err(ValidationError(format!(
2957 "metered_bytes: {} exceeds {}",
2958 self.metered_bytes(),
2959 RECORD_BATCH_MAX.bytes
2960 )))
2961 } else {
2962 Ok(self)
2963 }
2964 }
2965
2966 pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2968 let record = Self {
2969 body: body.into(),
2970 headers: Vec::default(),
2971 timestamp: None,
2972 };
2973 record.validate()
2974 }
2975
2976 pub fn with_headers(
2978 self,
2979 headers: impl IntoIterator<Item = Header>,
2980 ) -> Result<Self, ValidationError> {
2981 let record = Self {
2982 headers: headers.into_iter().collect(),
2983 ..self
2984 };
2985 record.validate()
2986 }
2987
2988 pub fn with_timestamp(self, timestamp: u64) -> Self {
2992 Self {
2993 timestamp: Some(timestamp),
2994 ..self
2995 }
2996 }
2997
2998 pub fn body(&self) -> &[u8] {
3000 &self.body
3001 }
3002
3003 pub fn headers(&self) -> &[Header] {
3005 &self.headers
3006 }
3007
3008 pub fn timestamp(&self) -> Option<u64> {
3010 self.timestamp
3011 }
3012}
3013
3014impl From<AppendRecord> for api::stream::proto::AppendRecord {
3015 fn from(value: AppendRecord) -> Self {
3016 Self {
3017 timestamp: value.timestamp,
3018 headers: value.headers.into_iter().map(Into::into).collect(),
3019 body: value.body,
3020 }
3021 }
3022}
3023
3024pub trait MeteredBytes {
3031 fn metered_bytes(&self) -> usize;
3033}
3034
3035macro_rules! metered_bytes_impl {
3036 ($ty:ty) => {
3037 impl MeteredBytes for $ty {
3038 fn metered_bytes(&self) -> usize {
3039 8 + (2 * self.headers.len())
3040 + self
3041 .headers
3042 .iter()
3043 .map(|h| h.name.len() + h.value.len())
3044 .sum::<usize>()
3045 + self.body.len()
3046 }
3047 }
3048 };
3049}
3050
3051metered_bytes_impl!(AppendRecord);
3052
3053#[derive(Debug, Clone)]
3054pub struct AppendRecordBatch {
3063 records: Vec<AppendRecord>,
3064 metered_bytes: usize,
3065}
3066
3067impl AppendRecordBatch {
3068 pub(crate) fn with_capacity(capacity: usize) -> Self {
3069 Self {
3070 records: Vec::with_capacity(capacity),
3071 metered_bytes: 0,
3072 }
3073 }
3074
3075 pub(crate) fn push(&mut self, record: AppendRecord) {
3076 self.metered_bytes += record.metered_bytes();
3077 self.records.push(record);
3078 }
3079
3080 pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3082 where
3083 I: IntoIterator<Item = AppendRecord>,
3084 {
3085 let mut records = Vec::new();
3086 let mut metered_bytes = 0;
3087
3088 for record in iter {
3089 metered_bytes += record.metered_bytes();
3090 records.push(record);
3091
3092 if metered_bytes > RECORD_BATCH_MAX.bytes {
3093 return Err(ValidationError(format!(
3094 "batch size in metered bytes ({metered_bytes}) exceeds {}",
3095 RECORD_BATCH_MAX.bytes
3096 )));
3097 }
3098
3099 if records.len() > RECORD_BATCH_MAX.count {
3100 return Err(ValidationError(format!(
3101 "number of records in the batch exceeds {}",
3102 RECORD_BATCH_MAX.count
3103 )));
3104 }
3105 }
3106
3107 if records.is_empty() {
3108 return Err(ValidationError("batch is empty".into()));
3109 }
3110
3111 Ok(Self {
3112 records,
3113 metered_bytes,
3114 })
3115 }
3116}
3117
3118impl Deref for AppendRecordBatch {
3119 type Target = [AppendRecord];
3120
3121 fn deref(&self) -> &Self::Target {
3122 &self.records
3123 }
3124}
3125
3126impl MeteredBytes for AppendRecordBatch {
3127 fn metered_bytes(&self) -> usize {
3128 self.metered_bytes
3129 }
3130}
3131
3132#[derive(Debug, Clone)]
3133pub enum Command {
3135 Fence {
3137 fencing_token: FencingToken,
3139 },
3140 Trim {
3142 trim_point: u64,
3144 },
3145}
3146
3147#[derive(Debug, Clone)]
3148#[non_exhaustive]
3149pub struct CommandRecord {
3153 pub command: Command,
3155 pub timestamp: Option<u64>,
3157}
3158
3159impl CommandRecord {
3160 const FENCE: &[u8] = b"fence";
3161 const TRIM: &[u8] = b"trim";
3162
3163 pub fn fence(fencing_token: FencingToken) -> Self {
3168 Self {
3169 command: Command::Fence { fencing_token },
3170 timestamp: None,
3171 }
3172 }
3173
3174 pub fn trim(trim_point: u64) -> Self {
3181 Self {
3182 command: Command::Trim { trim_point },
3183 timestamp: None,
3184 }
3185 }
3186
3187 pub fn with_timestamp(self, timestamp: u64) -> Self {
3189 Self {
3190 timestamp: Some(timestamp),
3191 ..self
3192 }
3193 }
3194}
3195
3196impl From<CommandRecord> for AppendRecord {
3197 fn from(value: CommandRecord) -> Self {
3198 let (header_value, body) = match value.command {
3199 Command::Fence { fencing_token } => (
3200 CommandRecord::FENCE,
3201 Bytes::copy_from_slice(fencing_token.as_bytes()),
3202 ),
3203 Command::Trim { trim_point } => (
3204 CommandRecord::TRIM,
3205 Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3206 ),
3207 };
3208 Self {
3209 body,
3210 headers: vec![Header::new("", header_value)],
3211 timestamp: value.timestamp,
3212 }
3213 }
3214}
3215
3216#[derive(Debug, Clone)]
3217#[non_exhaustive]
3218pub struct AppendInput {
3221 pub records: AppendRecordBatch,
3223 pub match_seq_num: Option<u64>,
3227 pub fencing_token: Option<FencingToken>,
3232}
3233
3234impl AppendInput {
3235 pub fn new(records: AppendRecordBatch) -> Self {
3237 Self {
3238 records,
3239 match_seq_num: None,
3240 fencing_token: None,
3241 }
3242 }
3243
3244 pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3246 Self {
3247 match_seq_num: Some(match_seq_num),
3248 ..self
3249 }
3250 }
3251
3252 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3254 Self {
3255 fencing_token: Some(fencing_token),
3256 ..self
3257 }
3258 }
3259}
3260
3261impl From<AppendInput> for api::stream::proto::AppendInput {
3262 fn from(value: AppendInput) -> Self {
3263 Self {
3264 records: value.records.iter().cloned().map(Into::into).collect(),
3265 match_seq_num: value.match_seq_num,
3266 fencing_token: value.fencing_token.map(|t| t.to_string()),
3267 }
3268 }
3269}
3270
3271#[derive(Debug, Clone, PartialEq)]
3272#[non_exhaustive]
3273pub struct AppendAck {
3275 pub start: StreamPosition,
3277 pub end: StreamPosition,
3283 pub tail: StreamPosition,
3288}
3289
3290impl From<api::stream::proto::AppendAck> for AppendAck {
3291 fn from(value: api::stream::proto::AppendAck) -> Self {
3292 Self {
3293 start: value.start.unwrap_or_default().into(),
3294 end: value.end.unwrap_or_default().into(),
3295 tail: value.tail.unwrap_or_default().into(),
3296 }
3297 }
3298}
3299
3300#[derive(Debug, Clone, Copy)]
3301pub enum ReadFrom {
3303 SeqNum(u64),
3305 Timestamp(u64),
3307 TailOffset(u64),
3309}
3310
3311impl Default for ReadFrom {
3312 fn default() -> Self {
3313 Self::SeqNum(0)
3314 }
3315}
3316
3317#[derive(Debug, Default, Clone)]
3318#[non_exhaustive]
3319pub struct ReadStart {
3321 pub from: ReadFrom,
3325 pub clamp_to_tail: bool,
3329}
3330
3331impl ReadStart {
3332 pub fn new() -> Self {
3334 Self::default()
3335 }
3336
3337 pub fn with_from(self, from: ReadFrom) -> Self {
3339 Self { from, ..self }
3340 }
3341
3342 pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3344 Self {
3345 clamp_to_tail,
3346 ..self
3347 }
3348 }
3349}
3350
3351impl From<ReadStart> for api::stream::ReadStart {
3352 fn from(value: ReadStart) -> Self {
3353 let (seq_num, timestamp, tail_offset) = match value.from {
3354 ReadFrom::SeqNum(n) => (Some(n), None, None),
3355 ReadFrom::Timestamp(t) => (None, Some(t), None),
3356 ReadFrom::TailOffset(o) => (None, None, Some(o)),
3357 };
3358 Self {
3359 seq_num,
3360 timestamp,
3361 tail_offset,
3362 clamp: if value.clamp_to_tail {
3363 Some(true)
3364 } else {
3365 None
3366 },
3367 }
3368 }
3369}
3370
3371#[derive(Debug, Clone, Default)]
3372#[non_exhaustive]
3373pub struct ReadLimits {
3375 pub count: Option<usize>,
3379 pub bytes: Option<usize>,
3383}
3384
3385impl ReadLimits {
3386 pub fn new() -> Self {
3388 Self::default()
3389 }
3390
3391 pub fn with_count(self, count: usize) -> Self {
3393 Self {
3394 count: Some(count),
3395 ..self
3396 }
3397 }
3398
3399 pub fn with_bytes(self, bytes: usize) -> Self {
3401 Self {
3402 bytes: Some(bytes),
3403 ..self
3404 }
3405 }
3406}
3407
3408#[derive(Debug, Clone, Default)]
3409#[non_exhaustive]
3410pub struct ReadStop {
3412 pub limits: ReadLimits,
3416 pub until: Option<RangeTo<u64>>,
3420 pub wait: Option<u32>,
3430}
3431
3432impl ReadStop {
3433 pub fn new() -> Self {
3435 Self::default()
3436 }
3437
3438 pub fn with_limits(self, limits: ReadLimits) -> Self {
3440 Self { limits, ..self }
3441 }
3442
3443 pub fn with_until(self, until: RangeTo<u64>) -> Self {
3445 Self {
3446 until: Some(until),
3447 ..self
3448 }
3449 }
3450
3451 pub fn with_wait(self, wait: u32) -> Self {
3453 Self {
3454 wait: Some(wait),
3455 ..self
3456 }
3457 }
3458}
3459
3460impl From<ReadStop> for api::stream::ReadEnd {
3461 fn from(value: ReadStop) -> Self {
3462 Self {
3463 count: value.limits.count,
3464 bytes: value.limits.bytes,
3465 until: value.until.map(|r| r.end),
3466 wait: value.wait,
3467 }
3468 }
3469}
3470
3471#[derive(Debug, Clone, Default)]
3472#[non_exhaustive]
3473pub struct ReadInput {
3476 pub start: ReadStart,
3480 pub stop: ReadStop,
3484 pub ignore_command_records: bool,
3488}
3489
3490impl ReadInput {
3491 pub fn new() -> Self {
3493 Self::default()
3494 }
3495
3496 pub fn with_start(self, start: ReadStart) -> Self {
3498 Self { start, ..self }
3499 }
3500
3501 pub fn with_stop(self, stop: ReadStop) -> Self {
3503 Self { stop, ..self }
3504 }
3505
3506 pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3508 Self {
3509 ignore_command_records,
3510 ..self
3511 }
3512 }
3513}
3514
3515#[derive(Debug, Clone)]
3516#[non_exhaustive]
3517pub struct SequencedRecord {
3519 pub seq_num: u64,
3521 pub body: Bytes,
3523 pub headers: Vec<Header>,
3525 pub timestamp: u64,
3527}
3528
3529impl SequencedRecord {
3530 pub fn is_command_record(&self) -> bool {
3532 self.headers.len() == 1 && *self.headers[0].name == *b""
3533 }
3534}
3535
3536impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3537 fn from(value: api::stream::proto::SequencedRecord) -> Self {
3538 Self {
3539 seq_num: value.seq_num,
3540 body: value.body,
3541 headers: value.headers.into_iter().map(Into::into).collect(),
3542 timestamp: value.timestamp,
3543 }
3544 }
3545}
3546
3547metered_bytes_impl!(SequencedRecord);
3548
3549#[derive(Debug, Clone)]
3550#[non_exhaustive]
3551pub struct ReadBatch {
3554 pub records: Vec<SequencedRecord>,
3561 pub tail: Option<StreamPosition>,
3566}
3567
3568impl ReadBatch {
3569 pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3570 Self {
3571 records: batch.records.into_iter().map(Into::into).collect(),
3572 tail: batch.tail.map(Into::into),
3573 }
3574 }
3575}
3576
3577pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3579
3580#[derive(Debug, Clone, thiserror::Error)]
3581pub enum AppendConditionFailed {
3583 #[error("fencing token mismatch, expected: {0}")]
3584 FencingTokenMismatch(FencingToken),
3586 #[error("sequence number mismatch, expected: {0}")]
3587 SeqNumMismatch(u64),
3589}
3590
3591impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3592 fn from(value: api::stream::AppendConditionFailed) -> Self {
3593 match value {
3594 api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3595 AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3596 }
3597 api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3598 AppendConditionFailed::SeqNumMismatch(seq)
3599 }
3600 }
3601 }
3602}
3603
3604#[derive(Debug, Clone, thiserror::Error)]
3605pub enum S2Error {
3607 #[error("{0}")]
3608 Client(String),
3610 #[error(transparent)]
3611 Validation(#[from] ValidationError),
3613 #[error("{0}")]
3614 AppendConditionFailed(AppendConditionFailed),
3616 #[error("read from an unwritten position. current tail: {0}")]
3617 ReadUnwritten(StreamPosition),
3619 #[error("{0}")]
3620 Server(ErrorResponse),
3622}
3623
3624impl From<ApiError> for S2Error {
3625 fn from(err: ApiError) -> Self {
3626 match err {
3627 ApiError::ReadUnwritten(tail_response) => {
3628 Self::ReadUnwritten(tail_response.tail.into())
3629 }
3630 ApiError::AppendConditionFailed(condition_failed) => {
3631 Self::AppendConditionFailed(condition_failed.into())
3632 }
3633 ApiError::Server(_, response) => Self::Server(response.into()),
3634 other => Self::Client(other.to_string()),
3635 }
3636 }
3637}
3638
3639#[derive(Debug, Clone, thiserror::Error)]
3640#[error("{code}: {message}")]
3641#[non_exhaustive]
3642pub struct ErrorResponse {
3644 pub code: String,
3646 pub message: String,
3648}
3649
3650impl From<ApiErrorResponse> for ErrorResponse {
3651 fn from(response: ApiErrorResponse) -> Self {
3652 Self {
3653 code: response.code,
3654 message: response.message,
3655 }
3656 }
3657}
3658
3659fn idempotency_token() -> String {
3660 uuid::Uuid::new_v4().simple().to_string()
3661}