Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Encryption algorithm.
23pub use s2_common::encryption::EncryptionAlgorithm;
24/// Encryption key for stream operations.
25pub use s2_common::encryption::EncryptionKey;
26/// Validation error.
27pub use s2_common::types::ValidationError;
28/// Access token ID.
29///
30/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
31pub use s2_common::types::access::AccessTokenId;
32/// See [`ListAccessTokensInput::prefix`].
33pub use s2_common::types::access::AccessTokenIdPrefix;
34/// See [`ListAccessTokensInput::start_after`].
35pub use s2_common::types::access::AccessTokenIdStartAfter;
36/// Basin name.
37///
38/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
39/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
40pub use s2_common::types::basin::BasinName;
41/// See [`ListBasinsInput::prefix`].
42pub use s2_common::types::basin::BasinNamePrefix;
43/// See [`ListBasinsInput::start_after`].
44pub use s2_common::types::basin::BasinNameStartAfter;
45/// Stream name.
46///
47/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
48pub use s2_common::types::stream::StreamName;
49/// See [`ListStreamsInput::prefix`].
50pub use s2_common::types::stream::StreamNamePrefix;
51/// See [`ListStreamsInput::start_after`].
52pub use s2_common::types::stream::StreamNameStartAfter;
53
54pub(crate) const ONE_MIB: u32 = 1024 * 1024;
55
56use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
57use secrecy::SecretString;
58
59use crate::api::{ApiError, ApiErrorResponse};
60
61/// An RFC 3339 datetime.
62///
63/// It can be created in either of the following ways:
64/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
65/// - Convert from [`time::OffsetDateTime`] using [`TryFrom`]/[`TryInto`].
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct S2DateTime(time::OffsetDateTime);
68
69impl TryFrom<time::OffsetDateTime> for S2DateTime {
70    type Error = ValidationError;
71
72    fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
73        dt.format(&time::format_description::well_known::Rfc3339)
74            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
75        Ok(Self(dt))
76    }
77}
78
79impl From<S2DateTime> for time::OffsetDateTime {
80    fn from(dt: S2DateTime) -> Self {
81        dt.0
82    }
83}
84
85impl FromStr for S2DateTime {
86    type Err = ValidationError;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
90            .map(Self)
91            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
92    }
93}
94
95impl fmt::Display for S2DateTime {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        write!(
98            f,
99            "{}",
100            self.0
101                .format(&time::format_description::well_known::Rfc3339)
102                .expect("RFC3339 formatting should not fail for S2DateTime")
103        )
104    }
105}
106
107/// Authority for connecting to an S2 basin.
108#[derive(Debug, Clone, PartialEq)]
109pub(crate) enum BasinAuthority {
110    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
111    ParentZone(Authority),
112    /// Direct cell authority. Basin is expected to be hosted by this cell.
113    Direct(Authority),
114}
115
116/// Account endpoint.
117#[derive(Debug, Clone)]
118pub struct AccountEndpoint {
119    scheme: Scheme,
120    authority: Authority,
121}
122
123impl AccountEndpoint {
124    /// Create a new [`AccountEndpoint`] with the given endpoint.
125    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
126        endpoint.parse()
127    }
128}
129
130impl FromStr for AccountEndpoint {
131    type Err = ValidationError;
132
133    fn from_str(s: &str) -> Result<Self, Self::Err> {
134        let (scheme, authority) = match s.find("://") {
135            Some(idx) => {
136                let scheme: Scheme = s[..idx]
137                    .parse()
138                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
139                (scheme, &s[idx + 3..])
140            }
141            None => (Scheme::HTTPS, s),
142        };
143        Ok(Self {
144            scheme,
145            authority: authority
146                .parse()
147                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
148        })
149    }
150}
151
152/// Basin endpoint.
153#[derive(Debug, Clone)]
154pub struct BasinEndpoint {
155    scheme: Scheme,
156    authority: BasinAuthority,
157}
158
159impl BasinEndpoint {
160    /// Create a new [`BasinEndpoint`] with the given endpoint.
161    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
162        endpoint.parse()
163    }
164}
165
166impl FromStr for BasinEndpoint {
167    type Err = ValidationError;
168
169    fn from_str(s: &str) -> Result<Self, Self::Err> {
170        let (scheme, authority) = match s.find("://") {
171            Some(idx) => {
172                let scheme: Scheme = s[..idx]
173                    .parse()
174                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
175                (scheme, &s[idx + 3..])
176            }
177            None => (Scheme::HTTPS, s),
178        };
179        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
180            BasinAuthority::ParentZone(
181                authority
182                    .parse()
183                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
184            )
185        } else {
186            BasinAuthority::Direct(
187                authority
188                    .parse()
189                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
190            )
191        };
192        Ok(Self { scheme, authority })
193    }
194}
195
196#[derive(Debug, Clone)]
197#[non_exhaustive]
198/// Endpoints for the S2 environment.
199pub struct S2Endpoints {
200    pub(crate) scheme: Scheme,
201    pub(crate) account_authority: Authority,
202    pub(crate) basin_authority: BasinAuthority,
203}
204
205impl S2Endpoints {
206    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
207    pub fn new(
208        account_endpoint: AccountEndpoint,
209        basin_endpoint: BasinEndpoint,
210    ) -> Result<Self, ValidationError> {
211        if account_endpoint.scheme != basin_endpoint.scheme {
212            return Err("account and basin endpoints must have the same scheme".into());
213        }
214        Ok(Self {
215            scheme: account_endpoint.scheme,
216            account_authority: account_endpoint.authority,
217            basin_authority: basin_endpoint.authority,
218        })
219    }
220
221    /// Create a new [`S2Endpoints`] from environment variables.
222    ///
223    /// The following environment variables are expected to be set:
224    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
225    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
226    pub fn from_env() -> Result<Self, ValidationError> {
227        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
228            Ok(endpoint) => endpoint.parse()?,
229            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
230            Err(VarError::NotUnicode(_)) => {
231                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
232            }
233        };
234
235        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
236            Ok(endpoint) => endpoint.parse()?,
237            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
238            Err(VarError::NotUnicode(_)) => {
239                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
240            }
241        };
242
243        if account_endpoint.scheme != basin_endpoint.scheme {
244            return Err(
245                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
246            );
247        }
248
249        Ok(Self {
250            scheme: account_endpoint.scheme,
251            account_authority: account_endpoint.authority,
252            basin_authority: basin_endpoint.authority,
253        })
254    }
255
256    pub(crate) fn for_aws() -> Self {
257        Self {
258            scheme: Scheme::HTTPS,
259            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
260            basin_authority: BasinAuthority::ParentZone(
261                "b.s2.dev".try_into().expect("valid authority"),
262            ),
263        }
264    }
265}
266
267#[derive(Debug, Clone, Copy)]
268/// Compression algorithm for request and response bodies.
269pub enum Compression {
270    /// No compression.
271    None,
272    /// Gzip compression.
273    Gzip,
274    /// Zstd compression.
275    Zstd,
276}
277
278impl From<Compression> for CompressionAlgorithm {
279    fn from(value: Compression) -> Self {
280        match value {
281            Compression::None => CompressionAlgorithm::None,
282            Compression::Gzip => CompressionAlgorithm::Gzip,
283            Compression::Zstd => CompressionAlgorithm::Zstd,
284        }
285    }
286}
287
288#[derive(Debug, Clone, Copy, PartialEq)]
289#[non_exhaustive]
290/// Retry policy for [`append`](crate::S2Stream::append) and
291/// [`append_session`](crate::S2Stream::append_session) operations.
292pub enum AppendRetryPolicy {
293    /// Retry all appends. Use when duplicate records on the stream are acceptable.
294    All,
295    /// Retry when it can be determined that the request had no side effects.
296    ///
297    /// Uses a frame-level signal to detect whether any body frames were consumed
298    /// by the HTTP transport. If no frames were sent, the server never saw the
299    /// request, so retry is safe and will not cause duplicate records.
300    ///
301    /// Certain server errors (`rate_limited`, `hot_server`) are also safe to
302    /// retry regardless of frame signal state, since they guarantee no mutation
303    /// occurred.
304    NoSideEffects,
305}
306
307#[derive(Debug, Clone)]
308#[non_exhaustive]
309/// Configuration for retrying requests in case of transient failures.
310///
311/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
312/// ```text
313/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
314///     jitter = rand[0, base_delay]
315///     delay  = base_delay + jitter
316/// ````
317pub struct RetryConfig {
318    /// Total number of attempts including the initial try. A value of `1` means no retries.
319    ///
320    /// Defaults to `3`.
321    pub max_attempts: NonZeroU32,
322    /// Minimum base delay for retries.
323    ///
324    /// Defaults to `100ms`.
325    pub min_base_delay: Duration,
326    /// Maximum base delay for retries.
327    ///
328    /// Defaults to `1s`.
329    pub max_base_delay: Duration,
330    /// Retry policy for [`append`](crate::S2Stream::append) and
331    /// [`append_session`](crate::S2Stream::append_session) operations.
332    ///
333    /// Defaults to `All`.
334    pub append_retry_policy: AppendRetryPolicy,
335}
336
337impl Default for RetryConfig {
338    fn default() -> Self {
339        Self {
340            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
341            min_base_delay: Duration::from_millis(100),
342            max_base_delay: Duration::from_secs(1),
343            append_retry_policy: AppendRetryPolicy::All,
344        }
345    }
346}
347
348impl RetryConfig {
349    /// Create a new [`RetryConfig`] with default settings.
350    pub fn new() -> Self {
351        Self::default()
352    }
353
354    pub(crate) fn max_retries(&self) -> u32 {
355        self.max_attempts.get() - 1
356    }
357
358    /// Set the total number of attempts including the initial try.
359    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
360        Self {
361            max_attempts,
362            ..self
363        }
364    }
365
366    /// Set the minimum base delay for retries.
367    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
368        Self {
369            min_base_delay,
370            ..self
371        }
372    }
373
374    /// Set the maximum base delay for retries.
375    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
376        Self {
377            max_base_delay,
378            ..self
379        }
380    }
381
382    /// Set the retry policy for [`append`](crate::S2Stream::append) and
383    /// [`append_session`](crate::S2Stream::append_session) operations.
384    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
385        Self {
386            append_retry_policy,
387            ..self
388        }
389    }
390}
391
392#[derive(Debug, Clone)]
393#[non_exhaustive]
394/// Configuration for [`S2`](crate::S2).
395pub struct S2Config {
396    pub(crate) access_token: SecretString,
397    pub(crate) endpoints: S2Endpoints,
398    pub(crate) connection_timeout: Duration,
399    pub(crate) request_timeout: Duration,
400    pub(crate) retry: RetryConfig,
401    pub(crate) compression: Compression,
402    pub(crate) user_agent: HeaderValue,
403    pub(crate) insecure_skip_cert_verification: bool,
404}
405
406impl S2Config {
407    /// Create a new [`S2Config`] with the given access token and default settings.
408    pub fn new(access_token: impl Into<String>) -> Self {
409        Self {
410            access_token: access_token.into().into(),
411            endpoints: S2Endpoints::for_aws(),
412            connection_timeout: Duration::from_secs(3),
413            request_timeout: Duration::from_secs(5),
414            retry: RetryConfig::new(),
415            compression: Compression::None,
416            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
417                .parse()
418                .expect("valid user agent"),
419            insecure_skip_cert_verification: false,
420        }
421    }
422
423    /// Set the S2 endpoints to connect to.
424    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
425        Self { endpoints, ..self }
426    }
427
428    /// Set the timeout for establishing a connection to the server.
429    ///
430    /// Defaults to `3s`.
431    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
432        Self {
433            connection_timeout,
434            ..self
435        }
436    }
437
438    /// Set the timeout for requests.
439    ///
440    /// Defaults to `5s`.
441    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
442        Self {
443            request_timeout,
444            ..self
445        }
446    }
447
448    /// Set the retry configuration for requests.
449    ///
450    /// See [`RetryConfig`] for defaults.
451    pub fn with_retry(self, retry: RetryConfig) -> Self {
452        Self { retry, ..self }
453    }
454
455    /// Set the compression algorithm for requests and responses.
456    ///
457    /// Defaults to no compression.
458    pub fn with_compression(self, compression: Compression) -> Self {
459        Self {
460            compression,
461            ..self
462        }
463    }
464
465    /// Skip TLS certificate verification (insecure).
466    ///
467    /// This is useful for connecting to endpoints with self-signed certificates
468    /// or certificates that don't match the hostname (similar to `curl -k`).
469    ///
470    /// # Warning
471    ///
472    /// This disables certificate verification and should only be used for
473    /// testing or development purposes. **Never use this in production.**
474    ///
475    /// Defaults to `false`.
476    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
477        Self {
478            insecure_skip_cert_verification: skip,
479            ..self
480        }
481    }
482
483    #[doc(hidden)]
484    #[cfg(feature = "_hidden")]
485    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
486        let user_agent = user_agent
487            .into()
488            .parse()
489            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
490        Ok(Self { user_agent, ..self })
491    }
492}
493
494#[derive(Debug, Default, Clone, PartialEq, Eq)]
495#[non_exhaustive]
496/// A page of values.
497pub struct Page<T> {
498    /// Values in this page.
499    pub values: Vec<T>,
500    /// Whether there are more pages.
501    pub has_more: bool,
502}
503
504impl<T> Page<T> {
505    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
506        Self {
507            values: values.into(),
508            has_more,
509        }
510    }
511}
512
513#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514/// Storage class for recent appends.
515pub enum StorageClass {
516    /// Standard storage class that offers append latencies under `500ms`.
517    Standard,
518    /// Express storage class that offers append latencies under `50ms`.
519    Express,
520}
521
522impl From<api::config::StorageClass> for StorageClass {
523    fn from(value: api::config::StorageClass) -> Self {
524        match value {
525            api::config::StorageClass::Standard => StorageClass::Standard,
526            api::config::StorageClass::Express => StorageClass::Express,
527        }
528    }
529}
530
531impl From<StorageClass> for api::config::StorageClass {
532    fn from(value: StorageClass) -> Self {
533        match value {
534            StorageClass::Standard => api::config::StorageClass::Standard,
535            StorageClass::Express => api::config::StorageClass::Express,
536        }
537    }
538}
539
540#[derive(Debug, Clone, Copy, PartialEq, Eq)]
541/// Retention policy for records in a stream.
542pub enum RetentionPolicy {
543    /// Age in seconds. Records older than this age are automatically trimmed.
544    Age(u64),
545    /// Records are retained indefinitely unless explicitly trimmed.
546    Infinite,
547}
548
549impl From<api::config::RetentionPolicy> for RetentionPolicy {
550    fn from(value: api::config::RetentionPolicy) -> Self {
551        match value {
552            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
553            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
554        }
555    }
556}
557
558impl From<RetentionPolicy> for api::config::RetentionPolicy {
559    fn from(value: RetentionPolicy) -> Self {
560        match value {
561            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
562            RetentionPolicy::Infinite => {
563                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
564            }
565        }
566    }
567}
568
569#[derive(Debug, Clone, Copy, PartialEq, Eq)]
570/// Timestamping mode for appends that influences how timestamps are handled.
571pub enum TimestampingMode {
572    /// Prefer client-specified timestamp if present otherwise use arrival time.
573    ClientPrefer,
574    /// Require a client-specified timestamp and reject the append if it is missing.
575    ClientRequire,
576    /// Use the arrival time and ignore any client-specified timestamp.
577    Arrival,
578}
579
580impl From<api::config::TimestampingMode> for TimestampingMode {
581    fn from(value: api::config::TimestampingMode) -> Self {
582        match value {
583            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
584            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
585            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
586        }
587    }
588}
589
590impl From<TimestampingMode> for api::config::TimestampingMode {
591    fn from(value: TimestampingMode) -> Self {
592        match value {
593            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
594            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
595            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
596        }
597    }
598}
599
600#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
601#[non_exhaustive]
602/// Configuration for timestamping behavior.
603pub struct TimestampingConfig {
604    /// Timestamping mode for appends that influences how timestamps are handled.
605    ///
606    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
607    pub mode: Option<TimestampingMode>,
608    /// Whether client-specified timestamps are allowed to exceed the arrival time.
609    ///
610    /// Defaults to `false` (client timestamps are capped at the arrival time).
611    pub uncapped: bool,
612}
613
614impl TimestampingConfig {
615    /// Create a new [`TimestampingConfig`] with default settings.
616    pub fn new() -> Self {
617        Self::default()
618    }
619
620    /// Set the timestamping mode for appends that influences how timestamps are handled.
621    pub fn with_mode(self, mode: TimestampingMode) -> Self {
622        Self {
623            mode: Some(mode),
624            ..self
625        }
626    }
627
628    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
629    pub fn with_uncapped(self, uncapped: bool) -> Self {
630        Self { uncapped, ..self }
631    }
632}
633
634impl From<api::config::TimestampingConfig> for TimestampingConfig {
635    fn from(value: api::config::TimestampingConfig) -> Self {
636        Self {
637            mode: value.mode.map(Into::into),
638            uncapped: value.uncapped.unwrap_or_default(),
639        }
640    }
641}
642
643impl From<TimestampingConfig> for api::config::TimestampingConfig {
644    fn from(value: TimestampingConfig) -> Self {
645        Self {
646            mode: value.mode.map(Into::into),
647            uncapped: Some(value.uncapped),
648        }
649    }
650}
651
652#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
653#[non_exhaustive]
654/// Configuration for automatically deleting a stream when it becomes empty.
655pub struct DeleteOnEmptyConfig {
656    /// Minimum age in seconds before an empty stream can be deleted.
657    ///
658    /// Defaults to `0` (disables automatic deletion).
659    pub min_age_secs: u64,
660}
661
662impl DeleteOnEmptyConfig {
663    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
664    pub fn new() -> Self {
665        Self::default()
666    }
667
668    /// Set the minimum age in seconds before an empty stream can be deleted.
669    pub fn with_min_age(self, min_age: Duration) -> Self {
670        Self {
671            min_age_secs: min_age.as_secs(),
672        }
673    }
674}
675
676impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
677    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
678        Self {
679            min_age_secs: value.min_age_secs,
680        }
681    }
682}
683
684impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
685    fn from(value: DeleteOnEmptyConfig) -> Self {
686        Self {
687            min_age_secs: value.min_age_secs,
688        }
689    }
690}
691
692#[derive(Debug, Clone, Default, PartialEq, Eq)]
693#[non_exhaustive]
694/// Configuration for a stream.
695pub struct StreamConfig {
696    /// Storage class for the stream.
697    ///
698    /// Defaults to [`Express`](StorageClass::Express).
699    pub storage_class: Option<StorageClass>,
700    /// Retention policy for records in the stream.
701    ///
702    /// Defaults to `7 days` of retention.
703    pub retention_policy: Option<RetentionPolicy>,
704    /// Configuration for timestamping behavior.
705    ///
706    /// See [`TimestampingConfig`] for defaults.
707    pub timestamping: Option<TimestampingConfig>,
708    /// Configuration for automatically deleting the stream when it becomes empty.
709    ///
710    /// See [`DeleteOnEmptyConfig`] for defaults.
711    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
712}
713
714impl StreamConfig {
715    /// Create a new [`StreamConfig`] with default settings.
716    pub fn new() -> Self {
717        Self::default()
718    }
719
720    /// Set the storage class for the stream.
721    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
722        Self {
723            storage_class: Some(storage_class),
724            ..self
725        }
726    }
727
728    /// Set the retention policy for records in the stream.
729    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
730        Self {
731            retention_policy: Some(retention_policy),
732            ..self
733        }
734    }
735
736    /// Set the configuration for timestamping behavior.
737    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
738        Self {
739            timestamping: Some(timestamping),
740            ..self
741        }
742    }
743
744    /// Set the configuration for automatically deleting the stream when it becomes empty.
745    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
746        Self {
747            delete_on_empty: Some(delete_on_empty),
748            ..self
749        }
750    }
751}
752
753impl From<api::config::StreamConfig> for StreamConfig {
754    fn from(value: api::config::StreamConfig) -> Self {
755        Self {
756            storage_class: value.storage_class.map(Into::into),
757            retention_policy: value.retention_policy.map(Into::into),
758            timestamping: value.timestamping.map(Into::into),
759            delete_on_empty: value.delete_on_empty.map(Into::into),
760        }
761    }
762}
763
764impl From<StreamConfig> for api::config::StreamConfig {
765    fn from(value: StreamConfig) -> Self {
766        Self {
767            storage_class: value.storage_class.map(Into::into),
768            retention_policy: value.retention_policy.map(Into::into),
769            timestamping: value.timestamping.map(Into::into),
770            delete_on_empty: value.delete_on_empty.map(Into::into),
771        }
772    }
773}
774
775#[derive(Debug, Clone, Default)]
776#[non_exhaustive]
777/// Configuration for a basin.
778pub struct BasinConfig {
779    /// Default configuration for all streams in the basin.
780    ///
781    /// See [`StreamConfig`] for defaults.
782    pub default_stream_config: Option<StreamConfig>,
783    /// Encryption algorithm to apply to newly created streams in the basin.
784    pub stream_cipher: Option<EncryptionAlgorithm>,
785    /// Whether to create stream on append if it doesn't exist using default stream configuration.
786    ///
787    /// Defaults to `false`.
788    pub create_stream_on_append: bool,
789    /// Whether to create stream on read if it doesn't exist using default stream configuration.
790    ///
791    /// Defaults to `false`.
792    pub create_stream_on_read: bool,
793}
794
795impl BasinConfig {
796    /// Create a new [`BasinConfig`] with default settings.
797    pub fn new() -> Self {
798        Self::default()
799    }
800
801    /// Set the default configuration for all streams in the basin.
802    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
803        Self {
804            default_stream_config: Some(config),
805            ..self
806        }
807    }
808
809    /// Set the encryption algorithm to apply to newly created streams in the basin.
810    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
811        Self {
812            stream_cipher: Some(stream_cipher),
813            ..self
814        }
815    }
816
817    /// Set whether to create stream on append if it doesn't exist using default stream
818    /// configuration.
819    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
820        Self {
821            create_stream_on_append,
822            ..self
823        }
824    }
825
826    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
827    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
828        Self {
829            create_stream_on_read,
830            ..self
831        }
832    }
833}
834
835impl From<api::config::BasinConfig> for BasinConfig {
836    fn from(value: api::config::BasinConfig) -> Self {
837        Self {
838            default_stream_config: value.default_stream_config.map(Into::into),
839            stream_cipher: value.stream_cipher.map(Into::into),
840            create_stream_on_append: value.create_stream_on_append,
841            create_stream_on_read: value.create_stream_on_read,
842        }
843    }
844}
845
846impl From<BasinConfig> for api::config::BasinConfig {
847    fn from(value: BasinConfig) -> Self {
848        Self {
849            default_stream_config: value.default_stream_config.map(Into::into),
850            stream_cipher: value.stream_cipher.map(Into::into),
851            create_stream_on_append: value.create_stream_on_append,
852            create_stream_on_read: value.create_stream_on_read,
853        }
854    }
855}
856
857#[derive(Debug, Clone, PartialEq, Eq)]
858/// Scope of a basin.
859pub enum BasinScope {
860    /// AWS `us-east-1` region.
861    AwsUsEast1,
862    /// AWS `us-west-2` region.
863    AwsUsWest2,
864    /// AWS `eu-north-1` region.
865    AwsEuNorth1,
866}
867
868impl From<api::basin::BasinScope> for BasinScope {
869    fn from(value: api::basin::BasinScope) -> Self {
870        match value {
871            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
872            api::basin::BasinScope::AwsUsWest2 => BasinScope::AwsUsWest2,
873            api::basin::BasinScope::AwsEuNorth1 => BasinScope::AwsEuNorth1,
874        }
875    }
876}
877
878impl From<BasinScope> for api::basin::BasinScope {
879    fn from(value: BasinScope) -> Self {
880        match value {
881            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
882            BasinScope::AwsUsWest2 => api::basin::BasinScope::AwsUsWest2,
883            BasinScope::AwsEuNorth1 => api::basin::BasinScope::AwsEuNorth1,
884        }
885    }
886}
887
888/// Result of a create-or-reconfigure operation.
889///
890/// Indicates whether the resource was newly created or already existed and was
891/// reconfigured. Both variants hold the resource's current state.
892#[doc(hidden)]
893#[cfg(feature = "_hidden")]
894#[derive(Debug, Clone, PartialEq, Eq)]
895pub enum CreateOrReconfigured<T> {
896    /// Resource was newly created.
897    Created(T),
898    /// Resource already existed and was reconfigured to match the spec.
899    Reconfigured(T),
900}
901
902#[cfg(feature = "_hidden")]
903impl<T> CreateOrReconfigured<T> {
904    /// Returns `true` if the resource was newly created.
905    pub fn is_created(&self) -> bool {
906        matches!(self, Self::Created(_))
907    }
908
909    /// Unwrap the inner value regardless of variant.
910    pub fn into_inner(self) -> T {
911        match self {
912            Self::Created(t) | Self::Reconfigured(t) => t,
913        }
914    }
915}
916
917#[derive(Debug, Clone)]
918#[non_exhaustive]
919/// Input for [`create_basin`](crate::S2::create_basin) operation.
920pub struct CreateBasinInput {
921    /// Basin name.
922    pub name: BasinName,
923    /// Configuration for the basin.
924    ///
925    /// See [`BasinConfig`] for defaults.
926    pub config: Option<BasinConfig>,
927    /// Scope of the basin.
928    ///
929    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
930    pub scope: Option<BasinScope>,
931    idempotency_token: String,
932}
933
934impl CreateBasinInput {
935    /// Create a new [`CreateBasinInput`] with the given basin name.
936    pub fn new(name: BasinName) -> Self {
937        Self {
938            name,
939            config: None,
940            scope: None,
941            idempotency_token: idempotency_token(),
942        }
943    }
944
945    /// Set the configuration for the basin.
946    pub fn with_config(self, config: BasinConfig) -> Self {
947        Self {
948            config: Some(config),
949            ..self
950        }
951    }
952
953    /// Set the scope of the basin.
954    pub fn with_scope(self, scope: BasinScope) -> Self {
955        Self {
956            scope: Some(scope),
957            ..self
958        }
959    }
960}
961
962impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
963    fn from(value: CreateBasinInput) -> Self {
964        (
965            api::basin::CreateBasinRequest {
966                basin: value.name,
967                config: value.config.map(Into::into),
968                scope: value.scope.map(Into::into),
969            },
970            value.idempotency_token,
971        )
972    }
973}
974
975#[derive(Debug, Clone)]
976#[non_exhaustive]
977/// Input for [`create_or_reconfigure_basin`](crate::S2::create_or_reconfigure_basin) operation.
978#[doc(hidden)]
979#[cfg(feature = "_hidden")]
980pub struct CreateOrReconfigureBasinInput {
981    /// Basin name.
982    pub name: BasinName,
983    /// Reconfiguration for the basin.
984    ///
985    /// If `None`, the basin is created with default configuration or left unchanged if it exists.
986    pub config: Option<BasinReconfiguration>,
987    /// Scope of the basin.
988    ///
989    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once set.
990    pub scope: Option<BasinScope>,
991}
992
993#[cfg(feature = "_hidden")]
994impl CreateOrReconfigureBasinInput {
995    /// Create a new [`CreateOrReconfigureBasinInput`] with the given basin name.
996    pub fn new(name: BasinName) -> Self {
997        Self {
998            name,
999            config: None,
1000            scope: None,
1001        }
1002    }
1003
1004    /// Set the reconfiguration for the basin.
1005    pub fn with_config(self, config: BasinReconfiguration) -> Self {
1006        Self {
1007            config: Some(config),
1008            ..self
1009        }
1010    }
1011
1012    /// Set the scope of the basin.
1013    pub fn with_scope(self, scope: BasinScope) -> Self {
1014        Self {
1015            scope: Some(scope),
1016            ..self
1017        }
1018    }
1019}
1020
1021#[cfg(feature = "_hidden")]
1022impl From<CreateOrReconfigureBasinInput>
1023    for (
1024        BasinName,
1025        Option<api::basin::CreateOrReconfigureBasinRequest>,
1026    )
1027{
1028    fn from(value: CreateOrReconfigureBasinInput) -> Self {
1029        let request = if value.config.is_some() || value.scope.is_some() {
1030            Some(api::basin::CreateOrReconfigureBasinRequest {
1031                config: value.config.map(Into::into),
1032                scope: value.scope.map(Into::into),
1033            })
1034        } else {
1035            None
1036        };
1037        (value.name, request)
1038    }
1039}
1040
1041#[derive(Debug, Clone, Default)]
1042#[non_exhaustive]
1043/// Input for [`list_basins`](crate::S2::list_basins) operation.
1044pub struct ListBasinsInput {
1045    /// Filter basins whose names begin with this value.
1046    ///
1047    /// Defaults to `""`.
1048    pub prefix: BasinNamePrefix,
1049    /// Filter basins whose names are lexicographically greater than this value.
1050    ///
1051    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
1052    ///
1053    /// Defaults to `""`.
1054    pub start_after: BasinNameStartAfter,
1055    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
1056    ///
1057    /// Defaults to `1000`.
1058    pub limit: Option<usize>,
1059}
1060
1061impl ListBasinsInput {
1062    /// Create a new [`ListBasinsInput`] with default values.
1063    pub fn new() -> Self {
1064        Self::default()
1065    }
1066
1067    /// Set the prefix used to filter basins whose names begin with this value.
1068    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1069        Self { prefix, ..self }
1070    }
1071
1072    /// Set the value used to filter basins whose names are lexicographically greater than this
1073    /// value.
1074    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1075        Self {
1076            start_after,
1077            ..self
1078        }
1079    }
1080
1081    /// Set the limit on number of basins to return in a page.
1082    pub fn with_limit(self, limit: usize) -> Self {
1083        Self {
1084            limit: Some(limit),
1085            ..self
1086        }
1087    }
1088}
1089
1090impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1091    fn from(value: ListBasinsInput) -> Self {
1092        Self {
1093            prefix: Some(value.prefix),
1094            start_after: Some(value.start_after),
1095            limit: value.limit,
1096        }
1097    }
1098}
1099
1100#[derive(Debug, Clone, Default)]
1101/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
1102pub struct ListAllBasinsInput {
1103    /// Filter basins whose names begin with this value.
1104    ///
1105    /// Defaults to `""`.
1106    pub prefix: BasinNamePrefix,
1107    /// Filter basins whose names are lexicographically greater than this value.
1108    ///
1109    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
1110    ///
1111    /// Defaults to `""`.
1112    pub start_after: BasinNameStartAfter,
1113    /// Whether to include basins that are being deleted.
1114    ///
1115    /// Defaults to `false`.
1116    pub include_deleted: bool,
1117}
1118
1119impl ListAllBasinsInput {
1120    /// Create a new [`ListAllBasinsInput`] with default values.
1121    pub fn new() -> Self {
1122        Self::default()
1123    }
1124
1125    /// Set the prefix used to filter basins whose names begin with this value.
1126    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1127        Self { prefix, ..self }
1128    }
1129
1130    /// Set the value used to filter basins whose names are lexicographically greater than this
1131    /// value.
1132    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1133        Self {
1134            start_after,
1135            ..self
1136        }
1137    }
1138
1139    /// Set whether to include basins that are being deleted.
1140    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1141        Self {
1142            include_deleted,
1143            ..self
1144        }
1145    }
1146}
1147
1148#[derive(Debug, Clone, PartialEq, Eq)]
1149#[non_exhaustive]
1150/// Basin information.
1151pub struct BasinInfo {
1152    /// Basin name.
1153    pub name: BasinName,
1154    /// Scope of the basin.
1155    pub scope: Option<BasinScope>,
1156    /// Creation time.
1157    pub created_at: S2DateTime,
1158    /// Deletion time if the basin is being deleted.
1159    pub deleted_at: Option<S2DateTime>,
1160}
1161
1162impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1163    type Error = ValidationError;
1164
1165    fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1166        Ok(Self {
1167            name: value.name,
1168            scope: value.scope.map(Into::into),
1169            created_at: value.created_at.try_into()?,
1170            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1171        })
1172    }
1173}
1174
1175#[derive(Debug, Clone)]
1176#[non_exhaustive]
1177/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1178pub struct DeleteBasinInput {
1179    /// Basin name.
1180    pub name: BasinName,
1181    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1182    pub ignore_not_found: bool,
1183}
1184
1185impl DeleteBasinInput {
1186    /// Create a new [`DeleteBasinInput`] with the given basin name.
1187    pub fn new(name: BasinName) -> Self {
1188        Self {
1189            name,
1190            ignore_not_found: false,
1191        }
1192    }
1193
1194    /// Set whether to ignore `Not Found` error if the basin is not existing.
1195    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1196        Self {
1197            ignore_not_found,
1198            ..self
1199        }
1200    }
1201}
1202
1203#[derive(Debug, Clone, Default)]
1204#[non_exhaustive]
1205/// Reconfiguration for [`TimestampingConfig`].
1206pub struct TimestampingReconfiguration {
1207    /// Override for the existing [`mode`](TimestampingConfig::mode).
1208    pub mode: Maybe<Option<TimestampingMode>>,
1209    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1210    pub uncapped: Maybe<Option<bool>>,
1211}
1212
1213impl TimestampingReconfiguration {
1214    /// Create a new [`TimestampingReconfiguration`].
1215    pub fn new() -> Self {
1216        Self::default()
1217    }
1218
1219    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1220    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1221        Self {
1222            mode: Maybe::Specified(Some(mode)),
1223            ..self
1224        }
1225    }
1226
1227    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1228    pub fn with_uncapped(self, uncapped: bool) -> Self {
1229        Self {
1230            uncapped: Maybe::Specified(Some(uncapped)),
1231            ..self
1232        }
1233    }
1234}
1235
1236impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1237    fn from(value: TimestampingReconfiguration) -> Self {
1238        Self {
1239            mode: value.mode.map(|m| m.map(Into::into)),
1240            uncapped: value.uncapped,
1241        }
1242    }
1243}
1244
1245#[derive(Debug, Clone, Default)]
1246#[non_exhaustive]
1247/// Reconfiguration for [`DeleteOnEmptyConfig`].
1248pub struct DeleteOnEmptyReconfiguration {
1249    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1250    pub min_age_secs: Maybe<Option<u64>>,
1251}
1252
1253impl DeleteOnEmptyReconfiguration {
1254    /// Create a new [`DeleteOnEmptyReconfiguration`].
1255    pub fn new() -> Self {
1256        Self::default()
1257    }
1258
1259    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1260    pub fn with_min_age(self, min_age: Duration) -> Self {
1261        Self {
1262            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1263        }
1264    }
1265}
1266
1267impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1268    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1269        Self {
1270            min_age_secs: value.min_age_secs,
1271        }
1272    }
1273}
1274
1275#[derive(Debug, Clone, Default)]
1276#[non_exhaustive]
1277/// Reconfiguration for [`StreamConfig`].
1278pub struct StreamReconfiguration {
1279    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1280    pub storage_class: Maybe<Option<StorageClass>>,
1281    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1282    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1283    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1284    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1285    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1286    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1287}
1288
1289impl StreamReconfiguration {
1290    /// Create a new [`StreamReconfiguration`].
1291    pub fn new() -> Self {
1292        Self::default()
1293    }
1294
1295    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1296    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1297        Self {
1298            storage_class: Maybe::Specified(Some(storage_class)),
1299            ..self
1300        }
1301    }
1302
1303    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1304    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1305        Self {
1306            retention_policy: Maybe::Specified(Some(retention_policy)),
1307            ..self
1308        }
1309    }
1310
1311    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1312    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1313        Self {
1314            timestamping: Maybe::Specified(Some(timestamping)),
1315            ..self
1316        }
1317    }
1318
1319    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1320    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1321        Self {
1322            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1323            ..self
1324        }
1325    }
1326}
1327
1328impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1329    fn from(value: StreamReconfiguration) -> Self {
1330        Self {
1331            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1332            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1333            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1334            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1335        }
1336    }
1337}
1338
1339#[derive(Debug, Clone, Default)]
1340#[non_exhaustive]
1341/// Reconfiguration for [`BasinConfig`].
1342pub struct BasinReconfiguration {
1343    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1344    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1345    /// Override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1346    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1347    /// Override for the existing
1348    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1349    pub create_stream_on_append: Maybe<bool>,
1350    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1351    pub create_stream_on_read: Maybe<bool>,
1352}
1353
1354impl BasinReconfiguration {
1355    /// Create a new [`BasinReconfiguration`].
1356    pub fn new() -> Self {
1357        Self::default()
1358    }
1359
1360    /// Set the override for the existing
1361    /// [`default_stream_config`](BasinConfig::default_stream_config).
1362    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1363        Self {
1364            default_stream_config: Maybe::Specified(Some(config)),
1365            ..self
1366        }
1367    }
1368
1369    /// Set the override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1370    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1371        Self {
1372            stream_cipher: Maybe::Specified(Some(stream_cipher)),
1373            ..self
1374        }
1375    }
1376
1377    /// Set the override for the existing
1378    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1379    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1380        Self {
1381            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1382            ..self
1383        }
1384    }
1385
1386    /// Set the override for the existing
1387    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1388    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1389        Self {
1390            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1391            ..self
1392        }
1393    }
1394}
1395
1396impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1397    fn from(value: BasinReconfiguration) -> Self {
1398        Self {
1399            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1400            stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1401            create_stream_on_append: value.create_stream_on_append,
1402            create_stream_on_read: value.create_stream_on_read,
1403        }
1404    }
1405}
1406
1407#[derive(Debug, Clone)]
1408#[non_exhaustive]
1409/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1410pub struct ReconfigureBasinInput {
1411    /// Basin name.
1412    pub name: BasinName,
1413    /// Reconfiguration for [`BasinConfig`].
1414    pub config: BasinReconfiguration,
1415}
1416
1417impl ReconfigureBasinInput {
1418    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1419    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1420        Self { name, config }
1421    }
1422}
1423
1424#[derive(Debug, Clone, Default)]
1425#[non_exhaustive]
1426/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1427pub struct ListAccessTokensInput {
1428    /// Filter access tokens whose IDs begin with this value.
1429    ///
1430    /// Defaults to `""`.
1431    pub prefix: AccessTokenIdPrefix,
1432    /// Filter access tokens whose IDs are lexicographically greater than this value.
1433    ///
1434    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1435    ///
1436    /// Defaults to `""`.
1437    pub start_after: AccessTokenIdStartAfter,
1438    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1439    ///
1440    /// Defaults to `1000`.
1441    pub limit: Option<usize>,
1442}
1443
1444impl ListAccessTokensInput {
1445    /// Create a new [`ListAccessTokensInput`] with default values.
1446    pub fn new() -> Self {
1447        Self::default()
1448    }
1449
1450    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1451    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1452        Self { prefix, ..self }
1453    }
1454
1455    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1456    /// value.
1457    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1458        Self {
1459            start_after,
1460            ..self
1461        }
1462    }
1463
1464    /// Set the limit on number of access tokens to return in a page.
1465    pub fn with_limit(self, limit: usize) -> Self {
1466        Self {
1467            limit: Some(limit),
1468            ..self
1469        }
1470    }
1471}
1472
1473impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1474    fn from(value: ListAccessTokensInput) -> Self {
1475        Self {
1476            prefix: Some(value.prefix),
1477            start_after: Some(value.start_after),
1478            limit: value.limit,
1479        }
1480    }
1481}
1482
1483#[derive(Debug, Clone, Default)]
1484/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1485pub struct ListAllAccessTokensInput {
1486    /// Filter access tokens whose IDs begin with this value.
1487    ///
1488    /// Defaults to `""`.
1489    pub prefix: AccessTokenIdPrefix,
1490    /// Filter access tokens whose IDs are lexicographically greater than this value.
1491    ///
1492    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1493    ///
1494    /// Defaults to `""`.
1495    pub start_after: AccessTokenIdStartAfter,
1496}
1497
1498impl ListAllAccessTokensInput {
1499    /// Create a new [`ListAllAccessTokensInput`] with default values.
1500    pub fn new() -> Self {
1501        Self::default()
1502    }
1503
1504    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1505    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1506        Self { prefix, ..self }
1507    }
1508
1509    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1510    /// this value.
1511    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1512        Self {
1513            start_after,
1514            ..self
1515        }
1516    }
1517}
1518
1519#[derive(Debug, Clone)]
1520#[non_exhaustive]
1521/// Access token information.
1522pub struct AccessTokenInfo {
1523    /// Access token ID.
1524    pub id: AccessTokenId,
1525    /// Expiration time.
1526    pub expires_at: S2DateTime,
1527    /// Whether to automatically prefix stream names during creation and strip the prefix during
1528    /// listing.
1529    pub auto_prefix_streams: bool,
1530    /// Scope of the access token.
1531    pub scope: AccessTokenScope,
1532}
1533
1534impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1535    type Error = ValidationError;
1536
1537    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1538        let expires_at = value
1539            .expires_at
1540            .map(S2DateTime::try_from)
1541            .transpose()?
1542            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1543        Ok(Self {
1544            id: value.id,
1545            expires_at,
1546            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1547            scope: value.scope.into(),
1548        })
1549    }
1550}
1551
1552#[derive(Debug, Clone)]
1553/// Pattern for matching basins.
1554///
1555/// See [`AccessTokenScope::basins`].
1556pub enum BasinMatcher {
1557    /// Match no basins.
1558    None,
1559    /// Match exactly this basin.
1560    Exact(BasinName),
1561    /// Match all basins with this prefix.
1562    Prefix(BasinNamePrefix),
1563}
1564
1565#[derive(Debug, Clone)]
1566/// Pattern for matching streams.
1567///
1568/// See [`AccessTokenScope::streams`].
1569pub enum StreamMatcher {
1570    /// Match no streams.
1571    None,
1572    /// Match exactly this stream.
1573    Exact(StreamName),
1574    /// Match all streams with this prefix.
1575    Prefix(StreamNamePrefix),
1576}
1577
1578#[derive(Debug, Clone)]
1579/// Pattern for matching access tokens.
1580///
1581/// See [`AccessTokenScope::access_tokens`].
1582pub enum AccessTokenMatcher {
1583    /// Match no access tokens.
1584    None,
1585    /// Match exactly this access token.
1586    Exact(AccessTokenId),
1587    /// Match all access tokens with this prefix.
1588    Prefix(AccessTokenIdPrefix),
1589}
1590
1591#[derive(Debug, Clone, Default)]
1592#[non_exhaustive]
1593/// Permissions indicating allowed operations.
1594pub struct ReadWritePermissions {
1595    /// Read permission.
1596    ///
1597    /// Defaults to `false`.
1598    pub read: bool,
1599    /// Write permission.
1600    ///
1601    /// Defaults to `false`.
1602    pub write: bool,
1603}
1604
1605impl ReadWritePermissions {
1606    /// Create a new [`ReadWritePermissions`] with default values.
1607    pub fn new() -> Self {
1608        Self::default()
1609    }
1610
1611    /// Create read-only permissions.
1612    pub fn read_only() -> Self {
1613        Self {
1614            read: true,
1615            write: false,
1616        }
1617    }
1618
1619    /// Create write-only permissions.
1620    pub fn write_only() -> Self {
1621        Self {
1622            read: false,
1623            write: true,
1624        }
1625    }
1626
1627    /// Create read-write permissions.
1628    pub fn read_write() -> Self {
1629        Self {
1630            read: true,
1631            write: true,
1632        }
1633    }
1634}
1635
1636impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1637    fn from(value: ReadWritePermissions) -> Self {
1638        Self {
1639            read: Some(value.read),
1640            write: Some(value.write),
1641        }
1642    }
1643}
1644
1645impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1646    fn from(value: api::access::ReadWritePermissions) -> Self {
1647        Self {
1648            read: value.read.unwrap_or_default(),
1649            write: value.write.unwrap_or_default(),
1650        }
1651    }
1652}
1653
1654#[derive(Debug, Clone, Default)]
1655#[non_exhaustive]
1656/// Permissions at the operation group level.
1657///
1658/// See [`AccessTokenScope::op_group_perms`].
1659pub struct OperationGroupPermissions {
1660    /// Account-level access permissions.
1661    ///
1662    /// Defaults to `None`.
1663    pub account: Option<ReadWritePermissions>,
1664    /// Basin-level access permissions.
1665    ///
1666    /// Defaults to `None`.
1667    pub basin: Option<ReadWritePermissions>,
1668    /// Stream-level access permissions.
1669    ///
1670    /// Defaults to `None`.
1671    pub stream: Option<ReadWritePermissions>,
1672}
1673
1674impl OperationGroupPermissions {
1675    /// Create a new [`OperationGroupPermissions`] with default values.
1676    pub fn new() -> Self {
1677        Self::default()
1678    }
1679
1680    /// Create read-only permissions for all groups.
1681    pub fn read_only_all() -> Self {
1682        Self {
1683            account: Some(ReadWritePermissions::read_only()),
1684            basin: Some(ReadWritePermissions::read_only()),
1685            stream: Some(ReadWritePermissions::read_only()),
1686        }
1687    }
1688
1689    /// Create write-only permissions for all groups.
1690    pub fn write_only_all() -> Self {
1691        Self {
1692            account: Some(ReadWritePermissions::write_only()),
1693            basin: Some(ReadWritePermissions::write_only()),
1694            stream: Some(ReadWritePermissions::write_only()),
1695        }
1696    }
1697
1698    /// Create read-write permissions for all groups.
1699    pub fn read_write_all() -> Self {
1700        Self {
1701            account: Some(ReadWritePermissions::read_write()),
1702            basin: Some(ReadWritePermissions::read_write()),
1703            stream: Some(ReadWritePermissions::read_write()),
1704        }
1705    }
1706
1707    /// Set account-level access permissions.
1708    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1709        Self {
1710            account: Some(account),
1711            ..self
1712        }
1713    }
1714
1715    /// Set basin-level access permissions.
1716    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1717        Self {
1718            basin: Some(basin),
1719            ..self
1720        }
1721    }
1722
1723    /// Set stream-level access permissions.
1724    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1725        Self {
1726            stream: Some(stream),
1727            ..self
1728        }
1729    }
1730}
1731
1732impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1733    fn from(value: OperationGroupPermissions) -> Self {
1734        Self {
1735            account: value.account.map(Into::into),
1736            basin: value.basin.map(Into::into),
1737            stream: value.stream.map(Into::into),
1738        }
1739    }
1740}
1741
1742impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1743    fn from(value: api::access::PermittedOperationGroups) -> Self {
1744        Self {
1745            account: value.account.map(Into::into),
1746            basin: value.basin.map(Into::into),
1747            stream: value.stream.map(Into::into),
1748        }
1749    }
1750}
1751
1752#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1753/// Individual operation that can be permitted.
1754///
1755/// See [`AccessTokenScope::ops`].
1756pub enum Operation {
1757    /// List basins.
1758    ListBasins,
1759    /// Create a basin.
1760    CreateBasin,
1761    /// Get basin configuration.
1762    GetBasinConfig,
1763    /// Delete a basin.
1764    DeleteBasin,
1765    /// Reconfigure a basin.
1766    ReconfigureBasin,
1767    /// List access tokens.
1768    ListAccessTokens,
1769    /// Issue an access token.
1770    IssueAccessToken,
1771    /// Revoke an access token.
1772    RevokeAccessToken,
1773    /// Get account metrics.
1774    GetAccountMetrics,
1775    /// Get basin metrics.
1776    GetBasinMetrics,
1777    /// Get stream metrics.
1778    GetStreamMetrics,
1779    /// List streams.
1780    ListStreams,
1781    /// Create a stream.
1782    CreateStream,
1783    /// Get stream configuration.
1784    GetStreamConfig,
1785    /// Delete a stream.
1786    DeleteStream,
1787    /// Reconfigure a stream.
1788    ReconfigureStream,
1789    /// Check the tail of a stream.
1790    CheckTail,
1791    /// Append records to a stream.
1792    Append,
1793    /// Read records from a stream.
1794    Read,
1795    /// Trim records on a stream.
1796    Trim,
1797    /// Set the fencing token on a stream.
1798    Fence,
1799}
1800
1801impl From<Operation> for api::access::Operation {
1802    fn from(value: Operation) -> Self {
1803        match value {
1804            Operation::ListBasins => api::access::Operation::ListBasins,
1805            Operation::CreateBasin => api::access::Operation::CreateBasin,
1806            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1807            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1808            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1809            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1810            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1811            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1812            Operation::ListStreams => api::access::Operation::ListStreams,
1813            Operation::CreateStream => api::access::Operation::CreateStream,
1814            Operation::DeleteStream => api::access::Operation::DeleteStream,
1815            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1816            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1817            Operation::CheckTail => api::access::Operation::CheckTail,
1818            Operation::Append => api::access::Operation::Append,
1819            Operation::Read => api::access::Operation::Read,
1820            Operation::Trim => api::access::Operation::Trim,
1821            Operation::Fence => api::access::Operation::Fence,
1822            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1823            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1824            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1825        }
1826    }
1827}
1828
1829impl From<api::access::Operation> for Operation {
1830    fn from(value: api::access::Operation) -> Self {
1831        match value {
1832            api::access::Operation::ListBasins => Operation::ListBasins,
1833            api::access::Operation::CreateBasin => Operation::CreateBasin,
1834            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1835            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1836            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1837            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1838            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1839            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1840            api::access::Operation::ListStreams => Operation::ListStreams,
1841            api::access::Operation::CreateStream => Operation::CreateStream,
1842            api::access::Operation::DeleteStream => Operation::DeleteStream,
1843            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1844            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1845            api::access::Operation::CheckTail => Operation::CheckTail,
1846            api::access::Operation::Append => Operation::Append,
1847            api::access::Operation::Read => Operation::Read,
1848            api::access::Operation::Trim => Operation::Trim,
1849            api::access::Operation::Fence => Operation::Fence,
1850            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1851            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1852            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1853        }
1854    }
1855}
1856
1857#[derive(Debug, Clone)]
1858#[non_exhaustive]
1859/// Scope of an access token.
1860///
1861/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1862/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1863/// final set must not be empty.
1864///
1865/// See [`IssueAccessTokenInput::scope`].
1866pub struct AccessTokenScopeInput {
1867    basins: Option<BasinMatcher>,
1868    streams: Option<StreamMatcher>,
1869    access_tokens: Option<AccessTokenMatcher>,
1870    op_group_perms: Option<OperationGroupPermissions>,
1871    ops: HashSet<Operation>,
1872}
1873
1874impl AccessTokenScopeInput {
1875    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1876    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1877        Self {
1878            basins: None,
1879            streams: None,
1880            access_tokens: None,
1881            op_group_perms: None,
1882            ops: ops.into_iter().collect(),
1883        }
1884    }
1885
1886    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1887    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1888        Self {
1889            basins: None,
1890            streams: None,
1891            access_tokens: None,
1892            op_group_perms: Some(op_group_perms),
1893            ops: HashSet::default(),
1894        }
1895    }
1896
1897    /// Set the permitted operations.
1898    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1899        Self {
1900            ops: ops.into_iter().collect(),
1901            ..self
1902        }
1903    }
1904
1905    /// Set the access permissions at the operation group level.
1906    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1907        Self {
1908            op_group_perms: Some(op_group_perms),
1909            ..self
1910        }
1911    }
1912
1913    /// Set the permitted basins.
1914    ///
1915    /// Defaults to no basins.
1916    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1917        Self {
1918            basins: Some(basins),
1919            ..self
1920        }
1921    }
1922
1923    /// Set the permitted streams.
1924    ///
1925    /// Defaults to no streams.
1926    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1927        Self {
1928            streams: Some(streams),
1929            ..self
1930        }
1931    }
1932
1933    /// Set the permitted access tokens.
1934    ///
1935    /// Defaults to no access tokens.
1936    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1937        Self {
1938            access_tokens: Some(access_tokens),
1939            ..self
1940        }
1941    }
1942}
1943
1944#[derive(Debug, Clone)]
1945#[non_exhaustive]
1946/// Scope of an access token.
1947pub struct AccessTokenScope {
1948    /// Permitted basins.
1949    pub basins: Option<BasinMatcher>,
1950    /// Permitted streams.
1951    pub streams: Option<StreamMatcher>,
1952    /// Permitted access tokens.
1953    pub access_tokens: Option<AccessTokenMatcher>,
1954    /// Permissions at the operation group level.
1955    pub op_group_perms: Option<OperationGroupPermissions>,
1956    /// Permitted operations.
1957    pub ops: HashSet<Operation>,
1958}
1959
1960impl From<api::access::AccessTokenScope> for AccessTokenScope {
1961    fn from(value: api::access::AccessTokenScope) -> Self {
1962        Self {
1963            basins: value.basins.map(|rs| match rs {
1964                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1965                    BasinMatcher::Exact(e)
1966                }
1967                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1968                    BasinMatcher::None
1969                }
1970                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1971            }),
1972            streams: value.streams.map(|rs| match rs {
1973                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1974                    StreamMatcher::Exact(e)
1975                }
1976                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1977                    StreamMatcher::None
1978                }
1979                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1980            }),
1981            access_tokens: value.access_tokens.map(|rs| match rs {
1982                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1983                    AccessTokenMatcher::Exact(e)
1984                }
1985                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1986                    AccessTokenMatcher::None
1987                }
1988                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1989            }),
1990            op_group_perms: value.op_groups.map(Into::into),
1991            ops: value
1992                .ops
1993                .map(|ops| ops.into_iter().map(Into::into).collect())
1994                .unwrap_or_default(),
1995        }
1996    }
1997}
1998
1999impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2000    fn from(value: AccessTokenScopeInput) -> Self {
2001        Self {
2002            basins: value.basins.map(|rs| match rs {
2003                BasinMatcher::None => {
2004                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2005                }
2006                BasinMatcher::Exact(e) => {
2007                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2008                }
2009                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2010            }),
2011            streams: value.streams.map(|rs| match rs {
2012                StreamMatcher::None => {
2013                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2014                }
2015                StreamMatcher::Exact(e) => {
2016                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2017                }
2018                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2019            }),
2020            access_tokens: value.access_tokens.map(|rs| match rs {
2021                AccessTokenMatcher::None => {
2022                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2023                }
2024                AccessTokenMatcher::Exact(e) => {
2025                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2026                }
2027                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2028            }),
2029            op_groups: value.op_group_perms.map(Into::into),
2030            ops: if value.ops.is_empty() {
2031                None
2032            } else {
2033                Some(value.ops.into_iter().map(Into::into).collect())
2034            },
2035        }
2036    }
2037}
2038
2039#[derive(Debug, Clone)]
2040#[non_exhaustive]
2041/// Input for [`issue_access_token`](crate::S2::issue_access_token).
2042pub struct IssueAccessTokenInput {
2043    /// Access token ID.
2044    pub id: AccessTokenId,
2045    /// Expiration time.
2046    ///
2047    /// Defaults to the expiration time of requestor's access token passed via
2048    /// [`S2Config`](S2Config::new).
2049    pub expires_at: Option<S2DateTime>,
2050    /// Whether to automatically prefix stream names during creation and strip the prefix during
2051    /// listing.
2052    ///
2053    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
2054    /// prefix.
2055    ///
2056    /// Defaults to `false`.
2057    pub auto_prefix_streams: bool,
2058    /// Scope of the token.
2059    pub scope: AccessTokenScopeInput,
2060}
2061
2062impl IssueAccessTokenInput {
2063    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
2064    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2065        Self {
2066            id,
2067            expires_at: None,
2068            auto_prefix_streams: false,
2069            scope,
2070        }
2071    }
2072
2073    /// Set the expiration time.
2074    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2075        Self {
2076            expires_at: Some(expires_at),
2077            ..self
2078        }
2079    }
2080
2081    /// Set whether to automatically prefix stream names during creation and strip the prefix during
2082    /// listing.
2083    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2084        Self {
2085            auto_prefix_streams,
2086            ..self
2087        }
2088    }
2089}
2090
2091impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2092    fn from(value: IssueAccessTokenInput) -> Self {
2093        Self {
2094            id: value.id,
2095            expires_at: value.expires_at.map(Into::into),
2096            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2097            scope: value.scope.into(),
2098        }
2099    }
2100}
2101
2102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2103/// Interval to accumulate over for timeseries metric sets.
2104pub enum TimeseriesInterval {
2105    /// Minute.
2106    Minute,
2107    /// Hour.
2108    Hour,
2109    /// Day.
2110    Day,
2111}
2112
2113impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2114    fn from(value: TimeseriesInterval) -> Self {
2115        match value {
2116            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2117            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2118            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2119        }
2120    }
2121}
2122
2123impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2124    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2125        match value {
2126            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2127            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2128            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2129        }
2130    }
2131}
2132
2133#[derive(Debug, Clone, Copy)]
2134#[non_exhaustive]
2135/// Time range as Unix epoch seconds.
2136pub struct TimeRange {
2137    /// Start timestamp (inclusive).
2138    pub start: u32,
2139    /// End timestamp (exclusive).
2140    pub end: u32,
2141}
2142
2143impl TimeRange {
2144    /// Create a new [`TimeRange`] with the given start and end timestamps.
2145    pub fn new(start: u32, end: u32) -> Self {
2146        Self { start, end }
2147    }
2148}
2149
2150#[derive(Debug, Clone, Copy)]
2151#[non_exhaustive]
2152/// Time range as Unix epoch seconds and accumulation interval.
2153pub struct TimeRangeAndInterval {
2154    /// Start timestamp (inclusive).
2155    pub start: u32,
2156    /// End timestamp (exclusive).
2157    pub end: u32,
2158    /// Interval to accumulate over for timeseries metric sets.
2159    ///
2160    /// Default is dependent on the requested metric set.
2161    pub interval: Option<TimeseriesInterval>,
2162}
2163
2164impl TimeRangeAndInterval {
2165    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2166    pub fn new(start: u32, end: u32) -> Self {
2167        Self {
2168            start,
2169            end,
2170            interval: None,
2171        }
2172    }
2173
2174    /// Set the interval to accumulate over for timeseries metric sets.
2175    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2176        Self {
2177            interval: Some(interval),
2178            ..self
2179        }
2180    }
2181}
2182
2183#[derive(Debug, Clone, Copy)]
2184/// Account metric set to return.
2185pub enum AccountMetricSet {
2186    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2187    /// specified time range.
2188    ActiveBasins(TimeRange),
2189    /// Returns [`AccumulationMetric`]s, one per account operation type.
2190    ///
2191    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2192    /// per interval over the requested time range.
2193    ///
2194    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2195    AccountOps(TimeRangeAndInterval),
2196}
2197
2198#[derive(Debug, Clone)]
2199#[non_exhaustive]
2200/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2201pub struct GetAccountMetricsInput {
2202    /// Metric set to return.
2203    pub set: AccountMetricSet,
2204}
2205
2206impl GetAccountMetricsInput {
2207    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2208    pub fn new(set: AccountMetricSet) -> Self {
2209        Self { set }
2210    }
2211}
2212
2213impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2214    fn from(value: GetAccountMetricsInput) -> Self {
2215        let (set, start, end, interval) = match value.set {
2216            AccountMetricSet::ActiveBasins(args) => (
2217                api::metrics::AccountMetricSet::ActiveBasins,
2218                args.start,
2219                args.end,
2220                None,
2221            ),
2222            AccountMetricSet::AccountOps(args) => (
2223                api::metrics::AccountMetricSet::AccountOps,
2224                args.start,
2225                args.end,
2226                args.interval,
2227            ),
2228        };
2229        Self {
2230            set,
2231            start: Some(start),
2232            end: Some(end),
2233            interval: interval.map(Into::into),
2234        }
2235    }
2236}
2237
2238#[derive(Debug, Clone, Copy)]
2239/// Basin metric set to return.
2240pub enum BasinMetricSet {
2241    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2242    /// in the basin, with one observed value for each hour over the requested time range.
2243    Storage(TimeRange),
2244    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2245    ///
2246    /// Each metric represents a timeseries of the number of append operations across all streams
2247    /// in the basin, with one accumulated value per interval over the requested time range.
2248    ///
2249    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2250    /// [`minute`](TimeseriesInterval::Minute).
2251    AppendOps(TimeRangeAndInterval),
2252    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2253    ///
2254    /// Each metric represents a timeseries of the number of read operations across all streams
2255    /// in the basin, with one accumulated value per interval over the requested time range.
2256    ///
2257    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2258    /// [`minute`](TimeseriesInterval::Minute).
2259    ReadOps(TimeRangeAndInterval),
2260    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2261    /// across all streams in the basin, with one accumulated value per interval
2262    /// over the requested time range.
2263    ///
2264    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2265    /// [`minute`](TimeseriesInterval::Minute).
2266    ReadThroughput(TimeRangeAndInterval),
2267    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2268    /// across all streams in the basin, with one accumulated value per interval
2269    /// over the requested time range.
2270    ///
2271    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2272    /// [`minute`](TimeseriesInterval::Minute).
2273    AppendThroughput(TimeRangeAndInterval),
2274    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2275    ///
2276    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2277    /// per interval over the requested time range.
2278    ///
2279    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2280    BasinOps(TimeRangeAndInterval),
2281}
2282
2283#[derive(Debug, Clone)]
2284#[non_exhaustive]
2285/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2286pub struct GetBasinMetricsInput {
2287    /// Basin name.
2288    pub name: BasinName,
2289    /// Metric set to return.
2290    pub set: BasinMetricSet,
2291}
2292
2293impl GetBasinMetricsInput {
2294    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2295    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2296        Self { name, set }
2297    }
2298}
2299
2300impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2301    fn from(value: GetBasinMetricsInput) -> Self {
2302        let (set, start, end, interval) = match value.set {
2303            BasinMetricSet::Storage(args) => (
2304                api::metrics::BasinMetricSet::Storage,
2305                args.start,
2306                args.end,
2307                None,
2308            ),
2309            BasinMetricSet::AppendOps(args) => (
2310                api::metrics::BasinMetricSet::AppendOps,
2311                args.start,
2312                args.end,
2313                args.interval,
2314            ),
2315            BasinMetricSet::ReadOps(args) => (
2316                api::metrics::BasinMetricSet::ReadOps,
2317                args.start,
2318                args.end,
2319                args.interval,
2320            ),
2321            BasinMetricSet::ReadThroughput(args) => (
2322                api::metrics::BasinMetricSet::ReadThroughput,
2323                args.start,
2324                args.end,
2325                args.interval,
2326            ),
2327            BasinMetricSet::AppendThroughput(args) => (
2328                api::metrics::BasinMetricSet::AppendThroughput,
2329                args.start,
2330                args.end,
2331                args.interval,
2332            ),
2333            BasinMetricSet::BasinOps(args) => (
2334                api::metrics::BasinMetricSet::BasinOps,
2335                args.start,
2336                args.end,
2337                args.interval,
2338            ),
2339        };
2340        (
2341            value.name,
2342            api::metrics::BasinMetricSetRequest {
2343                set,
2344                start: Some(start),
2345                end: Some(end),
2346                interval: interval.map(Into::into),
2347            },
2348        )
2349    }
2350}
2351
2352#[derive(Debug, Clone, Copy)]
2353/// Stream metric set to return.
2354pub enum StreamMetricSet {
2355    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2356    /// with one observed value for each minute over the requested time range.
2357    Storage(TimeRange),
2358}
2359
2360#[derive(Debug, Clone)]
2361#[non_exhaustive]
2362/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2363pub struct GetStreamMetricsInput {
2364    /// Basin name.
2365    pub basin_name: BasinName,
2366    /// Stream name.
2367    pub stream_name: StreamName,
2368    /// Metric set to return.
2369    pub set: StreamMetricSet,
2370}
2371
2372impl GetStreamMetricsInput {
2373    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2374    /// set.
2375    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2376        Self {
2377            basin_name,
2378            stream_name,
2379            set,
2380        }
2381    }
2382}
2383
2384impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2385    fn from(value: GetStreamMetricsInput) -> Self {
2386        let (set, start, end, interval) = match value.set {
2387            StreamMetricSet::Storage(args) => (
2388                api::metrics::StreamMetricSet::Storage,
2389                args.start,
2390                args.end,
2391                None,
2392            ),
2393        };
2394        (
2395            value.basin_name,
2396            value.stream_name,
2397            api::metrics::StreamMetricSetRequest {
2398                set,
2399                start: Some(start),
2400                end: Some(end),
2401                interval,
2402            },
2403        )
2404    }
2405}
2406
2407#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2408/// Unit in which metric values are measured.
2409pub enum MetricUnit {
2410    /// Size in bytes.
2411    Bytes,
2412    /// Number of operations.
2413    Operations,
2414}
2415
2416impl From<api::metrics::MetricUnit> for MetricUnit {
2417    fn from(value: api::metrics::MetricUnit) -> Self {
2418        match value {
2419            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2420            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2421        }
2422    }
2423}
2424
2425#[derive(Debug, Clone)]
2426#[non_exhaustive]
2427/// Single named value.
2428pub struct ScalarMetric {
2429    /// Metric name.
2430    pub name: String,
2431    /// Unit for the metric value.
2432    pub unit: MetricUnit,
2433    /// Metric value.
2434    pub value: f64,
2435}
2436
2437#[derive(Debug, Clone)]
2438#[non_exhaustive]
2439/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2440/// specified interval.
2441pub struct AccumulationMetric {
2442    /// Timeseries name.
2443    pub name: String,
2444    /// Unit for the accumulated values.
2445    pub unit: MetricUnit,
2446    /// The interval at which datapoints are accumulated.
2447    pub interval: TimeseriesInterval,
2448    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2449    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2450    /// one `interval`.
2451    pub values: Vec<(u32, f64)>,
2452}
2453
2454#[derive(Debug, Clone)]
2455#[non_exhaustive]
2456/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2457pub struct GaugeMetric {
2458    /// Timeseries name.
2459    pub name: String,
2460    /// Unit for the instantaneous values.
2461    pub unit: MetricUnit,
2462    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2463    /// instant of the `timestamp` (in Unix epoch seconds).
2464    pub values: Vec<(u32, f64)>,
2465}
2466
2467#[derive(Debug, Clone)]
2468#[non_exhaustive]
2469/// Set of string labels.
2470pub struct LabelMetric {
2471    /// Label name.
2472    pub name: String,
2473    /// Label values.
2474    pub values: Vec<String>,
2475}
2476
2477#[derive(Debug, Clone)]
2478/// Individual metric in a returned metric set.
2479pub enum Metric {
2480    /// Single named value.
2481    Scalar(ScalarMetric),
2482    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2483    /// specified interval.
2484    Accumulation(AccumulationMetric),
2485    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2486    Gauge(GaugeMetric),
2487    /// Set of string labels.
2488    Label(LabelMetric),
2489}
2490
2491impl From<api::metrics::Metric> for Metric {
2492    fn from(value: api::metrics::Metric) -> Self {
2493        match value {
2494            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2495                name: sm.name.into(),
2496                unit: sm.unit.into(),
2497                value: sm.value,
2498            }),
2499            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2500                name: am.name.into(),
2501                unit: am.unit.into(),
2502                interval: am.interval.into(),
2503                values: am.values,
2504            }),
2505            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2506                name: gm.name.into(),
2507                unit: gm.unit.into(),
2508                values: gm.values,
2509            }),
2510            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2511                name: lm.name.into(),
2512                values: lm.values,
2513            }),
2514        }
2515    }
2516}
2517
2518#[derive(Debug, Clone, Default)]
2519#[non_exhaustive]
2520/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2521pub struct ListStreamsInput {
2522    /// Filter streams whose names begin with this value.
2523    ///
2524    /// Defaults to `""`.
2525    pub prefix: StreamNamePrefix,
2526    /// Filter streams whose names are lexicographically greater than this value.
2527    ///
2528    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2529    ///
2530    /// Defaults to `""`.
2531    pub start_after: StreamNameStartAfter,
2532    /// Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2533    ///
2534    /// Defaults to `1000`.
2535    pub limit: Option<usize>,
2536}
2537
2538impl ListStreamsInput {
2539    /// Create a new [`ListStreamsInput`] with default values.
2540    pub fn new() -> Self {
2541        Self::default()
2542    }
2543
2544    /// Set the prefix used to filter streams whose names begin with this value.
2545    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2546        Self { prefix, ..self }
2547    }
2548
2549    /// Set the value used to filter streams whose names are lexicographically greater than this
2550    /// value.
2551    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2552        Self {
2553            start_after,
2554            ..self
2555        }
2556    }
2557
2558    /// Set the limit on number of streams to return in a page.
2559    pub fn with_limit(self, limit: usize) -> Self {
2560        Self {
2561            limit: Some(limit),
2562            ..self
2563        }
2564    }
2565}
2566
2567impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2568    fn from(value: ListStreamsInput) -> Self {
2569        Self {
2570            prefix: Some(value.prefix),
2571            start_after: Some(value.start_after),
2572            limit: value.limit,
2573        }
2574    }
2575}
2576
2577#[derive(Debug, Clone, Default)]
2578/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2579pub struct ListAllStreamsInput {
2580    /// Filter streams whose names begin with this value.
2581    ///
2582    /// Defaults to `""`.
2583    pub prefix: StreamNamePrefix,
2584    /// Filter streams whose names are lexicographically greater than this value.
2585    ///
2586    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2587    ///
2588    /// Defaults to `""`.
2589    pub start_after: StreamNameStartAfter,
2590    /// Whether to include streams that are being deleted.
2591    ///
2592    /// Defaults to `false`.
2593    pub include_deleted: bool,
2594}
2595
2596impl ListAllStreamsInput {
2597    /// Create a new [`ListAllStreamsInput`] with default values.
2598    pub fn new() -> Self {
2599        Self::default()
2600    }
2601
2602    /// Set the prefix used to filter streams whose names begin with this value.
2603    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2604        Self { prefix, ..self }
2605    }
2606
2607    /// Set the value used to filter streams whose names are lexicographically greater than this
2608    /// value.
2609    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2610        Self {
2611            start_after,
2612            ..self
2613        }
2614    }
2615
2616    /// Set whether to include streams that are being deleted.
2617    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2618        Self {
2619            include_deleted,
2620            ..self
2621        }
2622    }
2623}
2624
2625#[derive(Debug, Clone, PartialEq, Eq)]
2626#[non_exhaustive]
2627/// Stream information.
2628pub struct StreamInfo {
2629    /// Stream name.
2630    pub name: StreamName,
2631    /// Creation time.
2632    pub created_at: S2DateTime,
2633    /// Deletion time if the stream is being deleted.
2634    pub deleted_at: Option<S2DateTime>,
2635    /// Encryption algorithm for this stream, if encryption is enabled.
2636    pub cipher: Option<EncryptionAlgorithm>,
2637}
2638
2639impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2640    type Error = ValidationError;
2641
2642    fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2643        Ok(Self {
2644            name: value.name,
2645            created_at: value.created_at.try_into()?,
2646            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2647            cipher: value.cipher.map(Into::into),
2648        })
2649    }
2650}
2651
2652#[derive(Debug, Clone)]
2653#[non_exhaustive]
2654/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2655pub struct CreateStreamInput {
2656    /// Stream name.
2657    pub name: StreamName,
2658    /// Configuration for the stream.
2659    ///
2660    /// See [`StreamConfig`] for defaults.
2661    pub config: Option<StreamConfig>,
2662    idempotency_token: String,
2663}
2664
2665impl CreateStreamInput {
2666    /// Create a new [`CreateStreamInput`] with the given stream name.
2667    pub fn new(name: StreamName) -> Self {
2668        Self {
2669            name,
2670            config: None,
2671            idempotency_token: idempotency_token(),
2672        }
2673    }
2674
2675    /// Set the configuration for the stream.
2676    pub fn with_config(self, config: StreamConfig) -> Self {
2677        Self {
2678            config: Some(config),
2679            ..self
2680        }
2681    }
2682}
2683
2684impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2685    fn from(value: CreateStreamInput) -> Self {
2686        (
2687            api::stream::CreateStreamRequest {
2688                stream: value.name,
2689                config: value.config.map(Into::into),
2690            },
2691            value.idempotency_token,
2692        )
2693    }
2694}
2695
2696#[derive(Debug, Clone)]
2697#[non_exhaustive]
2698/// Input for [`create_or_reconfigure_stream`](crate::S2Basin::create_or_reconfigure_stream)
2699/// operation.
2700#[doc(hidden)]
2701#[cfg(feature = "_hidden")]
2702pub struct CreateOrReconfigureStreamInput {
2703    /// Stream name.
2704    pub name: StreamName,
2705    /// Reconfiguration for the stream.
2706    ///
2707    /// If `None`, the stream is created with default configuration or left unchanged if it exists.
2708    pub config: Option<StreamReconfiguration>,
2709}
2710
2711#[cfg(feature = "_hidden")]
2712impl CreateOrReconfigureStreamInput {
2713    /// Create a new [`CreateOrReconfigureStreamInput`] with the given stream name.
2714    pub fn new(name: StreamName) -> Self {
2715        Self { name, config: None }
2716    }
2717
2718    /// Set the reconfiguration for the stream.
2719    pub fn with_config(self, config: StreamReconfiguration) -> Self {
2720        Self {
2721            config: Some(config),
2722            ..self
2723        }
2724    }
2725}
2726
2727#[cfg(feature = "_hidden")]
2728impl From<CreateOrReconfigureStreamInput>
2729    for (StreamName, Option<api::config::StreamReconfiguration>)
2730{
2731    fn from(value: CreateOrReconfigureStreamInput) -> Self {
2732        (value.name, value.config.map(Into::into))
2733    }
2734}
2735
2736#[derive(Debug, Clone)]
2737#[non_exhaustive]
2738/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2739pub struct DeleteStreamInput {
2740    /// Stream name.
2741    pub name: StreamName,
2742    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2743    pub ignore_not_found: bool,
2744}
2745
2746impl DeleteStreamInput {
2747    /// Create a new [`DeleteStreamInput`] with the given stream name.
2748    pub fn new(name: StreamName) -> Self {
2749        Self {
2750            name,
2751            ignore_not_found: false,
2752        }
2753    }
2754
2755    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2756    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2757        Self {
2758            ignore_not_found,
2759            ..self
2760        }
2761    }
2762}
2763
2764#[derive(Debug, Clone)]
2765#[non_exhaustive]
2766/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2767pub struct ReconfigureStreamInput {
2768    /// Stream name.
2769    pub name: StreamName,
2770    /// Reconfiguration for [`StreamConfig`].
2771    pub config: StreamReconfiguration,
2772}
2773
2774impl ReconfigureStreamInput {
2775    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2776    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2777        Self { name, config }
2778    }
2779}
2780
2781#[derive(Debug, Clone, PartialEq, Eq)]
2782/// Token for fencing appends to a stream.
2783///
2784/// **Note:** It must not exceed 36 bytes in length.
2785///
2786/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2787pub struct FencingToken(String);
2788
2789impl FencingToken {
2790    /// Generate a random alphanumeric fencing token of `n` bytes.
2791    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2792        rand::rng()
2793            .sample_iter(&rand::distr::Alphanumeric)
2794            .take(n)
2795            .map(char::from)
2796            .collect::<String>()
2797            .parse()
2798    }
2799}
2800
2801impl FromStr for FencingToken {
2802    type Err = ValidationError;
2803
2804    fn from_str(s: &str) -> Result<Self, Self::Err> {
2805        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2806            return Err(ValidationError(format!(
2807                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2808            )));
2809        }
2810        Ok(FencingToken(s.to_string()))
2811    }
2812}
2813
2814impl std::fmt::Display for FencingToken {
2815    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2816        write!(f, "{}", self.0)
2817    }
2818}
2819
2820impl Deref for FencingToken {
2821    type Target = str;
2822
2823    fn deref(&self) -> &Self::Target {
2824        &self.0
2825    }
2826}
2827
2828#[derive(Debug, Clone, Copy, PartialEq)]
2829#[non_exhaustive]
2830/// A position in a stream.
2831pub struct StreamPosition {
2832    /// Sequence number assigned by the service.
2833    pub seq_num: u64,
2834    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2835    /// User-specified timestamps are passed through as-is.
2836    pub timestamp: u64,
2837}
2838
2839impl std::fmt::Display for StreamPosition {
2840    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2841        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2842    }
2843}
2844
2845impl From<api::stream::proto::StreamPosition> for StreamPosition {
2846    fn from(value: api::stream::proto::StreamPosition) -> Self {
2847        Self {
2848            seq_num: value.seq_num,
2849            timestamp: value.timestamp,
2850        }
2851    }
2852}
2853
2854impl From<api::stream::StreamPosition> for StreamPosition {
2855    fn from(value: api::stream::StreamPosition) -> Self {
2856        Self {
2857            seq_num: value.seq_num,
2858            timestamp: value.timestamp,
2859        }
2860    }
2861}
2862
2863#[derive(Debug, Clone, PartialEq)]
2864#[non_exhaustive]
2865/// A name-value pair.
2866pub struct Header {
2867    /// Name.
2868    pub name: Bytes,
2869    /// Value.
2870    pub value: Bytes,
2871}
2872
2873impl Header {
2874    /// Create a new [`Header`] with the given name and value.
2875    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2876        Self {
2877            name: name.into(),
2878            value: value.into(),
2879        }
2880    }
2881}
2882
2883impl From<Header> for api::stream::proto::Header {
2884    fn from(value: Header) -> Self {
2885        Self {
2886            name: value.name,
2887            value: value.value,
2888        }
2889    }
2890}
2891
2892impl From<api::stream::proto::Header> for Header {
2893    fn from(value: api::stream::proto::Header) -> Self {
2894        Self {
2895            name: value.name,
2896            value: value.value,
2897        }
2898    }
2899}
2900
2901#[derive(Debug, Clone, PartialEq)]
2902/// A record to append.
2903pub struct AppendRecord {
2904    body: Bytes,
2905    headers: Vec<Header>,
2906    timestamp: Option<u64>,
2907}
2908
2909impl AppendRecord {
2910    fn validate(self) -> Result<Self, ValidationError> {
2911        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2912            Err(ValidationError(format!(
2913                "metered_bytes: {} exceeds {}",
2914                self.metered_bytes(),
2915                RECORD_BATCH_MAX.bytes
2916            )))
2917        } else {
2918            Ok(self)
2919        }
2920    }
2921
2922    /// Create a new [`AppendRecord`] with the given record body.
2923    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2924        let record = Self {
2925            body: body.into(),
2926            headers: Vec::default(),
2927            timestamp: None,
2928        };
2929        record.validate()
2930    }
2931
2932    /// Set the headers for this record.
2933    pub fn with_headers(
2934        self,
2935        headers: impl IntoIterator<Item = Header>,
2936    ) -> Result<Self, ValidationError> {
2937        let record = Self {
2938            headers: headers.into_iter().collect(),
2939            ..self
2940        };
2941        record.validate()
2942    }
2943
2944    /// Set the timestamp for this record.
2945    ///
2946    /// Precise semantics depend on [`StreamConfig::timestamping`].
2947    pub fn with_timestamp(self, timestamp: u64) -> Self {
2948        Self {
2949            timestamp: Some(timestamp),
2950            ..self
2951        }
2952    }
2953
2954    /// Get the body of this record.
2955    pub fn body(&self) -> &[u8] {
2956        &self.body
2957    }
2958
2959    /// Get the headers of this record.
2960    pub fn headers(&self) -> &[Header] {
2961        &self.headers
2962    }
2963
2964    /// Get the timestamp of this record.
2965    pub fn timestamp(&self) -> Option<u64> {
2966        self.timestamp
2967    }
2968}
2969
2970impl From<AppendRecord> for api::stream::proto::AppendRecord {
2971    fn from(value: AppendRecord) -> Self {
2972        Self {
2973            timestamp: value.timestamp,
2974            headers: value.headers.into_iter().map(Into::into).collect(),
2975            body: value.body,
2976        }
2977    }
2978}
2979
2980/// Metered byte size calculation.
2981///
2982/// Formula for a record:
2983/// ```text
2984/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2985/// ```
2986pub trait MeteredBytes {
2987    /// Returns the metered byte size.
2988    fn metered_bytes(&self) -> usize;
2989}
2990
2991macro_rules! metered_bytes_impl {
2992    ($ty:ty) => {
2993        impl MeteredBytes for $ty {
2994            fn metered_bytes(&self) -> usize {
2995                8 + (2 * self.headers.len())
2996                    + self
2997                        .headers
2998                        .iter()
2999                        .map(|h| h.name.len() + h.value.len())
3000                        .sum::<usize>()
3001                    + self.body.len()
3002            }
3003        }
3004    };
3005}
3006
3007metered_bytes_impl!(AppendRecord);
3008
3009#[derive(Debug, Clone)]
3010/// A batch of records to append atomically.
3011///
3012/// **Note:** It must contain at least `1` record and no more than `1000`.
3013/// The total size of the batch must not exceed `1MiB` in metered bytes.
3014///
3015/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
3016/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
3017/// that takes care of the abovementioned constraints.
3018pub struct AppendRecordBatch {
3019    records: Vec<AppendRecord>,
3020    metered_bytes: usize,
3021}
3022
3023impl AppendRecordBatch {
3024    pub(crate) fn with_capacity(capacity: usize) -> Self {
3025        Self {
3026            records: Vec::with_capacity(capacity),
3027            metered_bytes: 0,
3028        }
3029    }
3030
3031    pub(crate) fn push(&mut self, record: AppendRecord) {
3032        self.metered_bytes += record.metered_bytes();
3033        self.records.push(record);
3034    }
3035
3036    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
3037    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3038    where
3039        I: IntoIterator<Item = AppendRecord>,
3040    {
3041        let mut records = Vec::new();
3042        let mut metered_bytes = 0;
3043
3044        for record in iter {
3045            metered_bytes += record.metered_bytes();
3046            records.push(record);
3047
3048            if metered_bytes > RECORD_BATCH_MAX.bytes {
3049                return Err(ValidationError(format!(
3050                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
3051                    RECORD_BATCH_MAX.bytes
3052                )));
3053            }
3054
3055            if records.len() > RECORD_BATCH_MAX.count {
3056                return Err(ValidationError(format!(
3057                    "number of records in the batch exceeds {}",
3058                    RECORD_BATCH_MAX.count
3059                )));
3060            }
3061        }
3062
3063        if records.is_empty() {
3064            return Err(ValidationError("batch is empty".into()));
3065        }
3066
3067        Ok(Self {
3068            records,
3069            metered_bytes,
3070        })
3071    }
3072}
3073
3074impl Deref for AppendRecordBatch {
3075    type Target = [AppendRecord];
3076
3077    fn deref(&self) -> &Self::Target {
3078        &self.records
3079    }
3080}
3081
3082impl MeteredBytes for AppendRecordBatch {
3083    fn metered_bytes(&self) -> usize {
3084        self.metered_bytes
3085    }
3086}
3087
3088#[derive(Debug, Clone)]
3089/// Command to signal an operation.
3090pub enum Command {
3091    /// Fence operation.
3092    Fence {
3093        /// Fencing token.
3094        fencing_token: FencingToken,
3095    },
3096    /// Trim operation.
3097    Trim {
3098        /// Trim point.
3099        trim_point: u64,
3100    },
3101}
3102
3103#[derive(Debug, Clone)]
3104#[non_exhaustive]
3105/// Command record for signaling operations to the service.
3106///
3107/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
3108pub struct CommandRecord {
3109    /// Command to signal an operation.
3110    pub command: Command,
3111    /// Timestamp for this record.
3112    pub timestamp: Option<u64>,
3113}
3114
3115impl CommandRecord {
3116    const FENCE: &[u8] = b"fence";
3117    const TRIM: &[u8] = b"trim";
3118
3119    /// Create a fence command record with the given fencing token.
3120    ///
3121    /// Fencing is strongly consistent, and subsequent appends that specify a
3122    /// fencing token will fail if it does not match.
3123    pub fn fence(fencing_token: FencingToken) -> Self {
3124        Self {
3125            command: Command::Fence { fencing_token },
3126            timestamp: None,
3127        }
3128    }
3129
3130    /// Create a trim command record with the given trim point.
3131    ///
3132    /// Trim point is the desired earliest sequence number for the stream.
3133    ///
3134    /// Trimming is eventually consistent, and trimmed records may be visible
3135    /// for a brief period.
3136    pub fn trim(trim_point: u64) -> Self {
3137        Self {
3138            command: Command::Trim { trim_point },
3139            timestamp: None,
3140        }
3141    }
3142
3143    /// Set the timestamp for this record.
3144    pub fn with_timestamp(self, timestamp: u64) -> Self {
3145        Self {
3146            timestamp: Some(timestamp),
3147            ..self
3148        }
3149    }
3150}
3151
3152impl From<CommandRecord> for AppendRecord {
3153    fn from(value: CommandRecord) -> Self {
3154        let (header_value, body) = match value.command {
3155            Command::Fence { fencing_token } => (
3156                CommandRecord::FENCE,
3157                Bytes::copy_from_slice(fencing_token.as_bytes()),
3158            ),
3159            Command::Trim { trim_point } => (
3160                CommandRecord::TRIM,
3161                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3162            ),
3163        };
3164        Self {
3165            body,
3166            headers: vec![Header::new("", header_value)],
3167            timestamp: value.timestamp,
3168        }
3169    }
3170}
3171
3172#[derive(Debug, Clone)]
3173#[non_exhaustive]
3174/// Input for [`append`](crate::S2Stream::append) operation and
3175/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3176pub struct AppendInput {
3177    /// Batch of records to append atomically.
3178    pub records: AppendRecordBatch,
3179    /// Expected sequence number for the first record in the batch.
3180    ///
3181    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3182    pub match_seq_num: Option<u64>,
3183    /// Fencing token to match against the stream's current fencing token.
3184    ///
3185    /// If unspecified, no matching is performed. If specified and mismatched,
3186    /// the append fails. A stream defaults to `""` as its fencing token.
3187    pub fencing_token: Option<FencingToken>,
3188}
3189
3190impl AppendInput {
3191    /// Create a new [`AppendInput`] with the given batch of records.
3192    pub fn new(records: AppendRecordBatch) -> Self {
3193        Self {
3194            records,
3195            match_seq_num: None,
3196            fencing_token: None,
3197        }
3198    }
3199
3200    /// Set the expected sequence number for the first record in the batch.
3201    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3202        Self {
3203            match_seq_num: Some(match_seq_num),
3204            ..self
3205        }
3206    }
3207
3208    /// Set the fencing token to match against the stream's current fencing token.
3209    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3210        Self {
3211            fencing_token: Some(fencing_token),
3212            ..self
3213        }
3214    }
3215}
3216
3217impl From<AppendInput> for api::stream::proto::AppendInput {
3218    fn from(value: AppendInput) -> Self {
3219        Self {
3220            records: value.records.iter().cloned().map(Into::into).collect(),
3221            match_seq_num: value.match_seq_num,
3222            fencing_token: value.fencing_token.map(|t| t.to_string()),
3223        }
3224    }
3225}
3226
3227#[derive(Debug, Clone, PartialEq)]
3228#[non_exhaustive]
3229/// Acknowledgement for an [`AppendInput`].
3230pub struct AppendAck {
3231    /// Sequence number and timestamp of the first record that was appended.
3232    pub start: StreamPosition,
3233    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3234    /// that was appended.
3235    ///
3236    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3237    /// appended.
3238    pub end: StreamPosition,
3239    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3240    /// the last record on the stream.
3241    ///
3242    /// This can be greater than the `end` position in case of concurrent appends.
3243    pub tail: StreamPosition,
3244}
3245
3246impl From<api::stream::proto::AppendAck> for AppendAck {
3247    fn from(value: api::stream::proto::AppendAck) -> Self {
3248        Self {
3249            start: value.start.unwrap_or_default().into(),
3250            end: value.end.unwrap_or_default().into(),
3251            tail: value.tail.unwrap_or_default().into(),
3252        }
3253    }
3254}
3255
3256#[derive(Debug, Clone, Copy)]
3257/// Starting position for reading from a stream.
3258pub enum ReadFrom {
3259    /// Read from this sequence number.
3260    SeqNum(u64),
3261    /// Read from this timestamp.
3262    Timestamp(u64),
3263    /// Read from N records before the tail.
3264    TailOffset(u64),
3265}
3266
3267impl Default for ReadFrom {
3268    fn default() -> Self {
3269        Self::SeqNum(0)
3270    }
3271}
3272
3273#[derive(Debug, Default, Clone)]
3274#[non_exhaustive]
3275/// Where to start reading.
3276pub struct ReadStart {
3277    /// Starting position.
3278    ///
3279    /// Defaults to reading from sequence number `0`.
3280    pub from: ReadFrom,
3281    /// Whether to start from tail if the requested starting position is beyond it.
3282    ///
3283    /// Defaults to `false` (errors if position is beyond tail).
3284    pub clamp_to_tail: bool,
3285}
3286
3287impl ReadStart {
3288    /// Create a new [`ReadStart`] with default values.
3289    pub fn new() -> Self {
3290        Self::default()
3291    }
3292
3293    /// Set the starting position.
3294    pub fn with_from(self, from: ReadFrom) -> Self {
3295        Self { from, ..self }
3296    }
3297
3298    /// Set whether to start from tail if the requested starting position is beyond it.
3299    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3300        Self {
3301            clamp_to_tail,
3302            ..self
3303        }
3304    }
3305}
3306
3307impl From<ReadStart> for api::stream::ReadStart {
3308    fn from(value: ReadStart) -> Self {
3309        let (seq_num, timestamp, tail_offset) = match value.from {
3310            ReadFrom::SeqNum(n) => (Some(n), None, None),
3311            ReadFrom::Timestamp(t) => (None, Some(t), None),
3312            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3313        };
3314        Self {
3315            seq_num,
3316            timestamp,
3317            tail_offset,
3318            clamp: if value.clamp_to_tail {
3319                Some(true)
3320            } else {
3321                None
3322            },
3323        }
3324    }
3325}
3326
3327#[derive(Debug, Clone, Default)]
3328#[non_exhaustive]
3329/// Limits on how much to read.
3330pub struct ReadLimits {
3331    /// Limit on number of records.
3332    ///
3333    /// Defaults to `1000` for non-streaming read.
3334    pub count: Option<usize>,
3335    /// Limit on total metered bytes of records.
3336    ///
3337    /// Defaults to `1MiB` for non-streaming read.
3338    pub bytes: Option<usize>,
3339}
3340
3341impl ReadLimits {
3342    /// Create a new [`ReadLimits`] with default values.
3343    pub fn new() -> Self {
3344        Self::default()
3345    }
3346
3347    /// Set the limit on number of records.
3348    pub fn with_count(self, count: usize) -> Self {
3349        Self {
3350            count: Some(count),
3351            ..self
3352        }
3353    }
3354
3355    /// Set the limit on total metered bytes of records.
3356    pub fn with_bytes(self, bytes: usize) -> Self {
3357        Self {
3358            bytes: Some(bytes),
3359            ..self
3360        }
3361    }
3362}
3363
3364#[derive(Debug, Clone, Default)]
3365#[non_exhaustive]
3366/// When to stop reading.
3367pub struct ReadStop {
3368    /// Limits on how much to read.
3369    ///
3370    /// See [`ReadLimits`] for defaults.
3371    pub limits: ReadLimits,
3372    /// Timestamp at which to stop (exclusive).
3373    ///
3374    /// Defaults to `None`.
3375    pub until: Option<RangeTo<u64>>,
3376    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3377    /// seconds for [`read`](crate::S2Stream::read).
3378    ///
3379    /// Defaults to:
3380    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3381    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3382    ///   is specified.
3383    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3384    ///   `until` is specified.
3385    pub wait: Option<u32>,
3386}
3387
3388impl ReadStop {
3389    /// Create a new [`ReadStop`] with default values.
3390    pub fn new() -> Self {
3391        Self::default()
3392    }
3393
3394    /// Set the limits on how much to read.
3395    pub fn with_limits(self, limits: ReadLimits) -> Self {
3396        Self { limits, ..self }
3397    }
3398
3399    /// Set the timestamp at which to stop (exclusive).
3400    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3401        Self {
3402            until: Some(until),
3403            ..self
3404        }
3405    }
3406
3407    /// Set the duration in seconds to wait for new records before stopping.
3408    pub fn with_wait(self, wait: u32) -> Self {
3409        Self {
3410            wait: Some(wait),
3411            ..self
3412        }
3413    }
3414}
3415
3416impl From<ReadStop> for api::stream::ReadEnd {
3417    fn from(value: ReadStop) -> Self {
3418        Self {
3419            count: value.limits.count,
3420            bytes: value.limits.bytes,
3421            until: value.until.map(|r| r.end),
3422            wait: value.wait,
3423        }
3424    }
3425}
3426
3427#[derive(Debug, Clone, Default)]
3428#[non_exhaustive]
3429/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3430/// operations.
3431pub struct ReadInput {
3432    /// Where to start reading.
3433    ///
3434    /// See [`ReadStart`] for defaults.
3435    pub start: ReadStart,
3436    /// When to stop reading.
3437    ///
3438    /// See [`ReadStop`] for defaults.
3439    pub stop: ReadStop,
3440    /// Whether to filter out command records from the stream when reading.
3441    ///
3442    /// Defaults to `false`.
3443    pub ignore_command_records: bool,
3444}
3445
3446impl ReadInput {
3447    /// Create a new [`ReadInput`] with default values.
3448    pub fn new() -> Self {
3449        Self::default()
3450    }
3451
3452    /// Set where to start reading.
3453    pub fn with_start(self, start: ReadStart) -> Self {
3454        Self { start, ..self }
3455    }
3456
3457    /// Set when to stop reading.
3458    pub fn with_stop(self, stop: ReadStop) -> Self {
3459        Self { stop, ..self }
3460    }
3461
3462    /// Set whether to filter out command records from the stream when reading.
3463    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3464        Self {
3465            ignore_command_records,
3466            ..self
3467        }
3468    }
3469}
3470
3471#[derive(Debug, Clone)]
3472#[non_exhaustive]
3473/// Record that is durably sequenced on a stream.
3474pub struct SequencedRecord {
3475    /// Sequence number assigned to this record.
3476    pub seq_num: u64,
3477    /// Body of this record.
3478    pub body: Bytes,
3479    /// Headers for this record.
3480    pub headers: Vec<Header>,
3481    /// Timestamp for this record.
3482    pub timestamp: u64,
3483}
3484
3485impl SequencedRecord {
3486    /// Whether this is a command record.
3487    pub fn is_command_record(&self) -> bool {
3488        self.headers.len() == 1 && *self.headers[0].name == *b""
3489    }
3490}
3491
3492impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3493    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3494        Self {
3495            seq_num: value.seq_num,
3496            body: value.body,
3497            headers: value.headers.into_iter().map(Into::into).collect(),
3498            timestamp: value.timestamp,
3499        }
3500    }
3501}
3502
3503metered_bytes_impl!(SequencedRecord);
3504
3505#[derive(Debug, Clone)]
3506#[non_exhaustive]
3507/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3508/// [`read_session`](crate::S2Stream::read_session).
3509pub struct ReadBatch {
3510    /// Records that are durably sequenced on the stream.
3511    ///
3512    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3513    /// - the [`stop condition`](ReadInput::stop) was already met, or
3514    /// - all records in the batch were command records and
3515    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3516    pub records: Vec<SequencedRecord>,
3517    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3518    /// the last record.
3519    ///
3520    /// It will only be present when reading recent records.
3521    pub tail: Option<StreamPosition>,
3522}
3523
3524impl ReadBatch {
3525    pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3526        Self {
3527            records: batch.records.into_iter().map(Into::into).collect(),
3528            tail: batch.tail.map(Into::into),
3529        }
3530    }
3531}
3532
3533/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3534pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3535
3536#[derive(Debug, Clone, thiserror::Error)]
3537/// Why an append condition check failed.
3538pub enum AppendConditionFailed {
3539    #[error("fencing token mismatch, expected: {0}")]
3540    /// Fencing token did not match. Contains the expected fencing token.
3541    FencingTokenMismatch(FencingToken),
3542    #[error("sequence number mismatch, expected: {0}")]
3543    /// Sequence number did not match. Contains the expected sequence number.
3544    SeqNumMismatch(u64),
3545}
3546
3547impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3548    fn from(value: api::stream::AppendConditionFailed) -> Self {
3549        match value {
3550            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3551                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3552            }
3553            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3554                AppendConditionFailed::SeqNumMismatch(seq)
3555            }
3556        }
3557    }
3558}
3559
3560#[derive(Debug, Clone, thiserror::Error)]
3561/// Errors from S2 operations.
3562pub enum S2Error {
3563    #[error("{0}")]
3564    /// Client-side error.
3565    Client(String),
3566    #[error(transparent)]
3567    /// Validation error.
3568    Validation(#[from] ValidationError),
3569    #[error("{0}")]
3570    /// Append condition check failed. Contains the failure reason.
3571    AppendConditionFailed(AppendConditionFailed),
3572    #[error("read from an unwritten position. current tail: {0}")]
3573    /// Read from an unwritten position. Contains the current tail.
3574    ReadUnwritten(StreamPosition),
3575    #[error("{0}")]
3576    /// Other server-side error.
3577    Server(ErrorResponse),
3578}
3579
3580impl From<ApiError> for S2Error {
3581    fn from(err: ApiError) -> Self {
3582        match err {
3583            ApiError::ReadUnwritten(tail_response) => {
3584                Self::ReadUnwritten(tail_response.tail.into())
3585            }
3586            ApiError::AppendConditionFailed(condition_failed) => {
3587                Self::AppendConditionFailed(condition_failed.into())
3588            }
3589            ApiError::Server(_, response) => Self::Server(response.into()),
3590            other => Self::Client(other.to_string()),
3591        }
3592    }
3593}
3594
3595#[derive(Debug, Clone, thiserror::Error)]
3596#[error("{code}: {message}")]
3597#[non_exhaustive]
3598/// Error response from S2 server.
3599pub struct ErrorResponse {
3600    /// Error code.
3601    pub code: String,
3602    /// Error message.
3603    pub message: String,
3604}
3605
3606impl From<ApiErrorResponse> for ErrorResponse {
3607    fn from(response: ApiErrorResponse) -> Self {
3608        Self {
3609            code: response.code,
3610            message: response.message,
3611        }
3612    }
3613}
3614
3615fn idempotency_token() -> String {
3616    uuid::Uuid::new_v4().simple().to_string()
3617}