Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    sync::Arc,
12    time::Duration,
13};
14
15use bytes::Bytes;
16use http::{
17    header::HeaderValue,
18    uri::{Authority, Scheme},
19};
20use rand::RngExt;
21use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Location name.
42///
43/// **Note:** It must be between 1 and 64 characters in length and can only comprise ASCII
44/// letters, numbers, colons, hyphens, and periods.
45pub use s2_common::types::location::LocationName;
46/// Stream name.
47///
48/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
49pub use s2_common::types::stream::StreamName;
50/// See [`ListStreamsInput::prefix`].
51pub use s2_common::types::stream::StreamNamePrefix;
52/// See [`ListStreamsInput::start_after`].
53pub use s2_common::types::stream::StreamNameStartAfter;
54pub use s2_common::{
55    caps::RECORD_BATCH_MAX,
56    encryption::{EncryptionAlgorithm, EncryptionKey},
57};
58
59pub(crate) const ONE_MIB: u32 = 1024 * 1024;
60
61use s2_common::{
62    maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH, types::resources::ProvisionResult,
63};
64use secrecy::SecretString;
65
66use crate::api::{ApiError, ApiErrorResponse};
67
68/// An RFC 3339 datetime.
69///
70/// It can be created in either of the following ways:
71/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
72/// - Convert from [`time::OffsetDateTime`] using [`TryFrom`]/[`TryInto`].
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub struct S2DateTime(time::OffsetDateTime);
75
76impl TryFrom<time::OffsetDateTime> for S2DateTime {
77    type Error = ValidationError;
78
79    fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
80        dt.format(&time::format_description::well_known::Rfc3339)
81            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
82        Ok(Self(dt))
83    }
84}
85
86impl From<S2DateTime> for time::OffsetDateTime {
87    fn from(dt: S2DateTime) -> Self {
88        dt.0
89    }
90}
91
92impl FromStr for S2DateTime {
93    type Err = ValidationError;
94
95    fn from_str(s: &str) -> Result<Self, Self::Err> {
96        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
97            .map(Self)
98            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
99    }
100}
101
102impl fmt::Display for S2DateTime {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        write!(
105            f,
106            "{}",
107            self.0
108                .format(&time::format_description::well_known::Rfc3339)
109                .expect("RFC3339 formatting should not fail for S2DateTime")
110        )
111    }
112}
113
114/// Authority for connecting to an S2 basin.
115#[derive(Debug, Clone, PartialEq)]
116pub(crate) enum BasinAuthority {
117    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
118    ParentZone(Authority),
119    /// Direct cell authority. Basin is expected to be hosted by this cell.
120    Direct(Authority),
121}
122
123/// Account endpoint.
124#[derive(Debug, Clone)]
125pub struct AccountEndpoint {
126    scheme: Scheme,
127    authority: Authority,
128}
129
130impl AccountEndpoint {
131    /// Create a new [`AccountEndpoint`] with the given endpoint.
132    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
133        endpoint.parse()
134    }
135}
136
137impl FromStr for AccountEndpoint {
138    type Err = ValidationError;
139
140    fn from_str(s: &str) -> Result<Self, Self::Err> {
141        let (scheme, authority) = match s.find("://") {
142            Some(idx) => {
143                let scheme: Scheme = s[..idx]
144                    .parse()
145                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
146                (scheme, &s[idx + 3..])
147            }
148            None => (Scheme::HTTPS, s),
149        };
150        Ok(Self {
151            scheme,
152            authority: authority
153                .parse()
154                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
155        })
156    }
157}
158
159/// Basin endpoint.
160#[derive(Debug, Clone)]
161pub struct BasinEndpoint {
162    scheme: Scheme,
163    authority: BasinAuthority,
164}
165
166impl BasinEndpoint {
167    /// Create a new [`BasinEndpoint`] with the given endpoint.
168    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
169        endpoint.parse()
170    }
171}
172
173impl FromStr for BasinEndpoint {
174    type Err = ValidationError;
175
176    fn from_str(s: &str) -> Result<Self, Self::Err> {
177        let (scheme, authority) = match s.find("://") {
178            Some(idx) => {
179                let scheme: Scheme = s[..idx]
180                    .parse()
181                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
182                (scheme, &s[idx + 3..])
183            }
184            None => (Scheme::HTTPS, s),
185        };
186        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
187            BasinAuthority::ParentZone(
188                authority
189                    .parse()
190                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
191            )
192        } else {
193            BasinAuthority::Direct(
194                authority
195                    .parse()
196                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
197            )
198        };
199        Ok(Self { scheme, authority })
200    }
201}
202
203#[derive(Debug, Clone)]
204#[non_exhaustive]
205/// Endpoints for the S2 environment.
206pub struct S2Endpoints {
207    pub(crate) scheme: Scheme,
208    pub(crate) account_authority: Authority,
209    pub(crate) basin_authority: BasinAuthority,
210}
211
212impl S2Endpoints {
213    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
214    pub fn new(
215        account_endpoint: AccountEndpoint,
216        basin_endpoint: BasinEndpoint,
217    ) -> Result<Self, ValidationError> {
218        if account_endpoint.scheme != basin_endpoint.scheme {
219            return Err("account and basin endpoints must have the same scheme".into());
220        }
221        Ok(Self {
222            scheme: account_endpoint.scheme,
223            account_authority: account_endpoint.authority,
224            basin_authority: basin_endpoint.authority,
225        })
226    }
227
228    /// Create a new [`S2Endpoints`] from environment variables.
229    ///
230    /// The following environment variables are expected to be set:
231    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
232    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
233    pub fn from_env() -> Result<Self, ValidationError> {
234        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
235            Ok(endpoint) => endpoint.parse()?,
236            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
237            Err(VarError::NotUnicode(_)) => {
238                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
239            }
240        };
241
242        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
243            Ok(endpoint) => endpoint.parse()?,
244            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
245            Err(VarError::NotUnicode(_)) => {
246                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
247            }
248        };
249
250        if account_endpoint.scheme != basin_endpoint.scheme {
251            return Err(
252                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
253            );
254        }
255
256        Ok(Self {
257            scheme: account_endpoint.scheme,
258            account_authority: account_endpoint.authority,
259            basin_authority: basin_endpoint.authority,
260        })
261    }
262
263    pub(crate) fn for_aws() -> Self {
264        Self {
265            scheme: Scheme::HTTPS,
266            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
267            basin_authority: BasinAuthority::ParentZone(
268                "b.s2.dev".try_into().expect("valid authority"),
269            ),
270        }
271    }
272}
273
274#[derive(Debug, Clone, Copy)]
275/// Compression algorithm for request and response bodies.
276pub enum Compression {
277    /// No compression.
278    None,
279    /// Gzip compression.
280    Gzip,
281    /// Zstd compression.
282    Zstd,
283}
284
285impl From<Compression> for CompressionAlgorithm {
286    fn from(value: Compression) -> Self {
287        match value {
288            Compression::None => CompressionAlgorithm::None,
289            Compression::Gzip => CompressionAlgorithm::Gzip,
290            Compression::Zstd => CompressionAlgorithm::Zstd,
291        }
292    }
293}
294
295#[derive(Debug, Clone, Copy, PartialEq)]
296#[non_exhaustive]
297/// Retry policy for [`append`](crate::S2Stream::append) and
298/// [`append_session`](crate::S2Stream::append_session) operations.
299pub enum AppendRetryPolicy {
300    /// Retry all appends. Use when duplicate records on the stream are acceptable.
301    All,
302    /// Retry when it can be determined that the request had no side effects.
303    ///
304    /// Uses a frame-level signal to detect whether any body frames were consumed
305    /// by the HTTP transport. If no frames were sent, the server never saw the
306    /// request, so retry is safe and will not cause duplicate records.
307    ///
308    /// Certain server errors (`rate_limited`, `hot_server`) are also safe to
309    /// retry regardless of frame signal state, since they guarantee no mutation
310    /// occurred.
311    NoSideEffects,
312}
313
314#[derive(Debug, Clone)]
315#[non_exhaustive]
316/// Configuration for retrying requests in case of transient failures.
317///
318/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
319/// ```text
320/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
321///     jitter = rand[0, base_delay]
322///     delay  = base_delay + jitter
323/// ````
324pub struct RetryConfig {
325    /// Total number of attempts including the initial try. A value of `1` means no retries.
326    ///
327    /// Defaults to `3`.
328    pub max_attempts: NonZeroU32,
329    /// Minimum base delay for retries.
330    ///
331    /// Defaults to `100ms`.
332    pub min_base_delay: Duration,
333    /// Maximum base delay for retries.
334    ///
335    /// Defaults to `1s`.
336    pub max_base_delay: Duration,
337    /// Retry policy for [`append`](crate::S2Stream::append) and
338    /// [`append_session`](crate::S2Stream::append_session) operations.
339    ///
340    /// Defaults to `All`.
341    pub append_retry_policy: AppendRetryPolicy,
342}
343
344impl Default for RetryConfig {
345    fn default() -> Self {
346        Self {
347            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
348            min_base_delay: Duration::from_millis(100),
349            max_base_delay: Duration::from_secs(1),
350            append_retry_policy: AppendRetryPolicy::All,
351        }
352    }
353}
354
355impl RetryConfig {
356    /// Create a new [`RetryConfig`] with default settings.
357    pub fn new() -> Self {
358        Self::default()
359    }
360
361    pub(crate) fn max_retries(&self) -> u32 {
362        self.max_attempts.get() - 1
363    }
364
365    /// Set the total number of attempts including the initial try.
366    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
367        Self {
368            max_attempts,
369            ..self
370        }
371    }
372
373    /// Set the minimum base delay for retries.
374    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
375        Self {
376            min_base_delay,
377            ..self
378        }
379    }
380
381    /// Set the maximum base delay for retries.
382    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
383        Self {
384            max_base_delay,
385            ..self
386        }
387    }
388
389    /// Set the retry policy for [`append`](crate::S2Stream::append) and
390    /// [`append_session`](crate::S2Stream::append_session) operations.
391    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
392        Self {
393            append_retry_policy,
394            ..self
395        }
396    }
397}
398
399#[derive(Debug, Clone)]
400#[non_exhaustive]
401/// Configuration for [`S2`](crate::S2).
402pub struct S2Config {
403    pub(crate) access_token: SecretString,
404    pub(crate) endpoints: S2Endpoints,
405    pub(crate) connection_timeout: Duration,
406    pub(crate) request_timeout: Duration,
407    pub(crate) retry: RetryConfig,
408    pub(crate) compression: Compression,
409    pub(crate) user_agent: HeaderValue,
410    pub(crate) insecure_skip_cert_verification: bool,
411    pub(crate) rustls_crypto_provider: Option<Arc<rustls::crypto::CryptoProvider>>,
412}
413
414impl S2Config {
415    /// Create a new [`S2Config`] with the given access token and default settings.
416    pub fn new(access_token: impl Into<String>) -> Self {
417        Self {
418            access_token: access_token.into().into(),
419            endpoints: S2Endpoints::for_aws(),
420            connection_timeout: Duration::from_secs(3),
421            request_timeout: Duration::from_secs(5),
422            retry: RetryConfig::new(),
423            compression: Compression::None,
424            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
425                .parse()
426                .expect("valid user agent"),
427            insecure_skip_cert_verification: false,
428            rustls_crypto_provider: default_rustls_crypto_provider(),
429        }
430    }
431
432    /// Set the S2 endpoints to connect to.
433    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
434        Self { endpoints, ..self }
435    }
436
437    /// Set the timeout for establishing a connection to the server.
438    ///
439    /// Defaults to `3s`.
440    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
441        Self {
442            connection_timeout,
443            ..self
444        }
445    }
446
447    /// Set the timeout for requests.
448    ///
449    /// Defaults to `5s`.
450    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
451        Self {
452            request_timeout,
453            ..self
454        }
455    }
456
457    /// Set the retry configuration for requests.
458    ///
459    /// See [`RetryConfig`] for defaults.
460    pub fn with_retry(self, retry: RetryConfig) -> Self {
461        Self { retry, ..self }
462    }
463
464    /// Set the compression algorithm for requests and responses.
465    ///
466    /// Defaults to no compression.
467    pub fn with_compression(self, compression: Compression) -> Self {
468        Self {
469            compression,
470            ..self
471        }
472    }
473
474    /// Skip TLS certificate verification (insecure).
475    ///
476    /// This is useful for connecting to endpoints with self-signed certificates
477    /// or certificates that don't match the hostname (similar to `curl -k`).
478    ///
479    /// # Warning
480    ///
481    /// This disables certificate verification and should only be used for
482    /// testing or development purposes. **Never use this in production.**
483    ///
484    /// Defaults to `false`.
485    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
486        Self {
487            insecure_skip_cert_verification: skip,
488            ..self
489        }
490    }
491
492    /// Use a specific rustls crypto provider for SDK TLS connections.
493    ///
494    /// With default features enabled, the SDK uses the `aws-lc-rs` provider.
495    /// With default features disabled, the SDK uses rustls's process-global
496    /// provider if one has been installed, or returns an error otherwise.
497    ///
498    /// Use this when your application needs a specific rustls provider, such as
499    /// `ring` or a custom [`rustls::crypto::CryptoProvider`]. The corresponding
500    /// rustls provider feature must be enabled in the dependency graph.
501    pub fn with_rustls_crypto_provider(
502        self,
503        provider: impl Into<Arc<rustls::crypto::CryptoProvider>>,
504    ) -> Self {
505        Self {
506            rustls_crypto_provider: Some(provider.into()),
507            ..self
508        }
509    }
510
511    /// Use rustls's `aws-lc-rs` crypto provider.
512    ///
513    /// Requires the `rustls-aws-lc-rs` crate feature.
514    #[cfg(feature = "rustls-aws-lc-rs")]
515    pub fn with_rustls_aws_lc_rs_crypto_provider(self) -> Self {
516        self.with_rustls_crypto_provider(rustls::crypto::aws_lc_rs::default_provider())
517    }
518
519    /// Use rustls's `ring` crypto provider.
520    ///
521    /// Requires the `rustls-ring` crate feature.
522    #[cfg(feature = "rustls-ring")]
523    pub fn with_rustls_ring_crypto_provider(self) -> Self {
524        self.with_rustls_crypto_provider(rustls::crypto::ring::default_provider())
525    }
526
527    #[doc(hidden)]
528    #[cfg(feature = "_hidden")]
529    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
530        let user_agent = user_agent
531            .into()
532            .parse()
533            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
534        Ok(Self { user_agent, ..self })
535    }
536}
537
538#[cfg(feature = "rustls-aws-lc-rs")]
539fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
540    Some(Arc::new(rustls::crypto::aws_lc_rs::default_provider()))
541}
542
543#[cfg(all(not(feature = "rustls-aws-lc-rs"), feature = "rustls-ring"))]
544fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
545    Some(Arc::new(rustls::crypto::ring::default_provider()))
546}
547
548#[cfg(all(not(feature = "rustls-aws-lc-rs"), not(feature = "rustls-ring")))]
549fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
550    None
551}
552
553#[derive(Debug, Default, Clone, PartialEq, Eq)]
554#[non_exhaustive]
555/// A page of values.
556pub struct Page<T> {
557    /// Values in this page.
558    pub values: Vec<T>,
559    /// Whether there are more pages.
560    pub has_more: bool,
561}
562
563impl<T> Page<T> {
564    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
565        Self {
566            values: values.into(),
567            has_more,
568        }
569    }
570}
571
572#[derive(Debug, Clone, Copy, PartialEq, Eq)]
573/// Storage class for recent appends.
574pub enum StorageClass {
575    /// Standard storage class that offers append latencies under `500ms`.
576    Standard,
577    /// Express storage class that offers append latencies under `50ms`.
578    Express,
579}
580
581impl From<api::config::StorageClass> for StorageClass {
582    fn from(value: api::config::StorageClass) -> Self {
583        match value {
584            api::config::StorageClass::Standard => StorageClass::Standard,
585            api::config::StorageClass::Express => StorageClass::Express,
586        }
587    }
588}
589
590impl From<StorageClass> for api::config::StorageClass {
591    fn from(value: StorageClass) -> Self {
592        match value {
593            StorageClass::Standard => api::config::StorageClass::Standard,
594            StorageClass::Express => api::config::StorageClass::Express,
595        }
596    }
597}
598
599#[derive(Debug, Clone, Copy, PartialEq, Eq)]
600/// Retention policy for records in a stream.
601pub enum RetentionPolicy {
602    /// Age in seconds. Records older than this age are automatically trimmed.
603    Age(u64),
604    /// Records are retained indefinitely unless explicitly trimmed.
605    Infinite,
606}
607
608impl From<api::config::RetentionPolicy> for RetentionPolicy {
609    fn from(value: api::config::RetentionPolicy) -> Self {
610        match value {
611            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
612            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
613        }
614    }
615}
616
617impl From<RetentionPolicy> for api::config::RetentionPolicy {
618    fn from(value: RetentionPolicy) -> Self {
619        match value {
620            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
621            RetentionPolicy::Infinite => {
622                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
623            }
624        }
625    }
626}
627
628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
629/// Timestamping mode for appends that influences how timestamps are handled.
630pub enum TimestampingMode {
631    /// Prefer client-specified timestamp if present otherwise use arrival time.
632    ClientPrefer,
633    /// Require a client-specified timestamp and reject the append if it is missing.
634    ClientRequire,
635    /// Use the arrival time and ignore any client-specified timestamp.
636    Arrival,
637}
638
639impl From<api::config::TimestampingMode> for TimestampingMode {
640    fn from(value: api::config::TimestampingMode) -> Self {
641        match value {
642            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
643            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
644            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
645        }
646    }
647}
648
649impl From<TimestampingMode> for api::config::TimestampingMode {
650    fn from(value: TimestampingMode) -> Self {
651        match value {
652            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
653            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
654            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
655        }
656    }
657}
658
659#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
660#[non_exhaustive]
661/// Configuration for timestamping behavior.
662pub struct TimestampingConfig {
663    /// Timestamping mode for appends that influences how timestamps are handled.
664    ///
665    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
666    pub mode: Option<TimestampingMode>,
667    /// Whether client-specified timestamps are allowed to exceed the arrival time.
668    ///
669    /// Defaults to `false` (client timestamps are capped at the arrival time).
670    pub uncapped: Option<bool>,
671}
672
673impl TimestampingConfig {
674    /// Create a new [`TimestampingConfig`] with default settings.
675    pub fn new() -> Self {
676        Self::default()
677    }
678
679    /// Set the timestamping mode for appends that influences how timestamps are handled.
680    pub fn with_mode(self, mode: TimestampingMode) -> Self {
681        Self {
682            mode: Some(mode),
683            ..self
684        }
685    }
686
687    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
688    pub fn with_uncapped(self, uncapped: bool) -> Self {
689        Self {
690            uncapped: Some(uncapped),
691            ..self
692        }
693    }
694}
695
696impl From<api::config::TimestampingConfig> for TimestampingConfig {
697    fn from(value: api::config::TimestampingConfig) -> Self {
698        Self {
699            mode: value.mode.map(Into::into),
700            uncapped: value.uncapped,
701        }
702    }
703}
704
705impl From<TimestampingConfig> for api::config::TimestampingConfig {
706    fn from(value: TimestampingConfig) -> Self {
707        Self {
708            mode: value.mode.map(Into::into),
709            uncapped: value.uncapped,
710        }
711    }
712}
713
714#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
715#[non_exhaustive]
716/// Configuration for automatically deleting a stream when it becomes empty.
717pub struct DeleteOnEmptyConfig {
718    /// Minimum age in seconds before an empty stream can be deleted.
719    ///
720    /// Defaults to `0` (disables automatic deletion).
721    pub min_age_secs: u64,
722}
723
724impl DeleteOnEmptyConfig {
725    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
726    pub fn new() -> Self {
727        Self::default()
728    }
729
730    /// Set the minimum age in seconds before an empty stream can be deleted.
731    pub fn with_min_age(self, min_age: Duration) -> Self {
732        Self {
733            min_age_secs: min_age.as_secs(),
734        }
735    }
736}
737
738impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
739    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
740        Self {
741            min_age_secs: value.min_age_secs,
742        }
743    }
744}
745
746impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
747    fn from(value: DeleteOnEmptyConfig) -> Self {
748        Self {
749            min_age_secs: value.min_age_secs,
750        }
751    }
752}
753
754#[derive(Debug, Clone, Default, PartialEq, Eq)]
755#[non_exhaustive]
756/// Configuration for a stream.
757pub struct StreamConfig {
758    /// Storage class for the stream.
759    ///
760    /// Defaults to [`Express`](StorageClass::Express).
761    pub storage_class: Option<StorageClass>,
762    /// Retention policy for records in the stream.
763    ///
764    /// Defaults to `7 days` of retention.
765    pub retention_policy: Option<RetentionPolicy>,
766    /// Configuration for timestamping behavior.
767    ///
768    /// See [`TimestampingConfig`] for defaults.
769    pub timestamping: Option<TimestampingConfig>,
770    /// Configuration for automatically deleting the stream when it becomes empty.
771    ///
772    /// See [`DeleteOnEmptyConfig`] for defaults.
773    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
774}
775
776impl StreamConfig {
777    /// Create a new [`StreamConfig`] with default settings.
778    pub fn new() -> Self {
779        Self::default()
780    }
781
782    /// Set the storage class for the stream.
783    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
784        Self {
785            storage_class: Some(storage_class),
786            ..self
787        }
788    }
789
790    /// Set the retention policy for records in the stream.
791    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
792        Self {
793            retention_policy: Some(retention_policy),
794            ..self
795        }
796    }
797
798    /// Set the configuration for timestamping behavior.
799    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
800        Self {
801            timestamping: Some(timestamping),
802            ..self
803        }
804    }
805
806    /// Set the configuration for automatically deleting the stream when it becomes empty.
807    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
808        Self {
809            delete_on_empty: Some(delete_on_empty),
810            ..self
811        }
812    }
813}
814
815impl From<api::config::StreamConfig> for StreamConfig {
816    fn from(value: api::config::StreamConfig) -> Self {
817        Self {
818            storage_class: value.storage_class.map(Into::into),
819            retention_policy: value.retention_policy.map(Into::into),
820            timestamping: value.timestamping.map(Into::into),
821            delete_on_empty: value.delete_on_empty.map(Into::into),
822        }
823    }
824}
825
826impl From<StreamConfig> for api::config::StreamConfig {
827    fn from(value: StreamConfig) -> Self {
828        Self {
829            storage_class: value.storage_class.map(Into::into),
830            retention_policy: value.retention_policy.map(Into::into),
831            timestamping: value.timestamping.map(Into::into),
832            delete_on_empty: value.delete_on_empty.map(Into::into),
833        }
834    }
835}
836
837#[derive(Debug, Clone, Default, PartialEq, Eq)]
838#[non_exhaustive]
839/// Configuration for a basin.
840pub struct BasinConfig {
841    /// Default configuration for all streams in the basin.
842    ///
843    /// See [`StreamConfig`] for defaults.
844    pub default_stream_config: Option<StreamConfig>,
845    /// Encryption algorithm to apply to newly created streams in the basin.
846    pub stream_cipher: Option<EncryptionAlgorithm>,
847    /// Whether to create stream on append if it doesn't exist using default stream configuration.
848    ///
849    /// Defaults to `false`.
850    pub create_stream_on_append: bool,
851    /// Whether to create stream on read if it doesn't exist using default stream configuration.
852    ///
853    /// Defaults to `false`.
854    pub create_stream_on_read: bool,
855}
856
857impl BasinConfig {
858    /// Create a new [`BasinConfig`] with default settings.
859    pub fn new() -> Self {
860        Self::default()
861    }
862
863    /// Set the default configuration for all streams in the basin.
864    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
865        Self {
866            default_stream_config: Some(config),
867            ..self
868        }
869    }
870
871    /// Set the encryption algorithm to apply to newly created streams in the basin.
872    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
873        Self {
874            stream_cipher: Some(stream_cipher),
875            ..self
876        }
877    }
878
879    /// Set whether to create stream on append if it doesn't exist using default stream
880    /// configuration.
881    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
882        Self {
883            create_stream_on_append,
884            ..self
885        }
886    }
887
888    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
889    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
890        Self {
891            create_stream_on_read,
892            ..self
893        }
894    }
895}
896
897impl From<api::config::BasinConfig> for BasinConfig {
898    fn from(value: api::config::BasinConfig) -> Self {
899        Self {
900            default_stream_config: value.default_stream_config.map(Into::into),
901            stream_cipher: value.stream_cipher.map(Into::into),
902            create_stream_on_append: value.create_stream_on_append,
903            create_stream_on_read: value.create_stream_on_read,
904        }
905    }
906}
907
908impl From<BasinConfig> for api::config::BasinConfig {
909    fn from(value: BasinConfig) -> Self {
910        Self {
911            default_stream_config: value.default_stream_config.map(Into::into),
912            stream_cipher: value.stream_cipher.map(Into::into),
913            create_stream_on_append: value.create_stream_on_append,
914            create_stream_on_read: value.create_stream_on_read,
915        }
916    }
917}
918
919#[derive(Debug, Clone)]
920#[non_exhaustive]
921/// Input for [`create_basin`](crate::S2::create_basin) operation.
922pub struct CreateBasinInput {
923    /// Basin name.
924    pub name: BasinName,
925    /// Configuration for the basin.
926    ///
927    /// See [`BasinConfig`] for defaults.
928    pub config: Option<BasinConfig>,
929    /// Location of the basin.
930    ///
931    /// If omitted when creating, uses the default location for the account.
932    pub location: Option<LocationName>,
933    idempotency_token: String,
934}
935
936impl CreateBasinInput {
937    /// Create a new [`CreateBasinInput`] with the given basin name.
938    pub fn new(name: BasinName) -> Self {
939        Self {
940            name,
941            config: None,
942            location: None,
943            idempotency_token: idempotency_token(),
944        }
945    }
946
947    /// Set the configuration for the basin.
948    pub fn with_config(self, config: BasinConfig) -> Self {
949        Self {
950            config: Some(config),
951            ..self
952        }
953    }
954
955    /// Set the location of the basin.
956    pub fn with_location<S>(self, location: S) -> Result<Self, ValidationError>
957    where
958        S: TryInto<LocationName>,
959        S::Error: fmt::Display,
960    {
961        let location = location
962            .try_into()
963            .map_err(|e| ValidationError(e.to_string()))?;
964        Ok(Self {
965            location: Some(location),
966            ..self
967        })
968    }
969}
970
971impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
972    fn from(value: CreateBasinInput) -> Self {
973        (
974            api::basin::CreateBasinRequest {
975                basin: value.name,
976                config: value.config.map(Into::into),
977                location: value.location,
978            },
979            value.idempotency_token,
980        )
981    }
982}
983
984#[derive(Debug, Clone)]
985#[non_exhaustive]
986/// Input for [`ensure_basin`](crate::S2::ensure_basin) operation.
987pub struct EnsureBasinInput {
988    /// Basin name.
989    pub name: BasinName,
990    /// Configuration for the basin.
991    ///
992    /// See [`BasinConfig`] for defaults.
993    pub config: Option<BasinConfig>,
994    /// Location of the basin.
995    ///
996    /// If omitted when creating, uses the default location for the account. Cannot be changed once
997    /// set.
998    pub location: Option<LocationName>,
999}
1000
1001impl EnsureBasinInput {
1002    /// Create a new [`EnsureBasinInput`] with the given basin name.
1003    pub fn new(name: BasinName) -> Self {
1004        Self {
1005            name,
1006            config: None,
1007            location: None,
1008        }
1009    }
1010
1011    /// Set the configuration for the basin.
1012    pub fn with_config(self, config: BasinConfig) -> Self {
1013        Self {
1014            config: Some(config),
1015            ..self
1016        }
1017    }
1018
1019    /// Set the location of the basin.
1020    pub fn with_location<S>(self, location: S) -> Result<Self, ValidationError>
1021    where
1022        S: TryInto<LocationName>,
1023        S::Error: fmt::Display,
1024    {
1025        let location = location
1026            .try_into()
1027            .map_err(|e| ValidationError(e.to_string()))?;
1028        Ok(Self {
1029            location: Some(location),
1030            ..self
1031        })
1032    }
1033}
1034
1035impl From<EnsureBasinInput> for (BasinName, Option<api::basin::EnsureBasinRequest>) {
1036    fn from(value: EnsureBasinInput) -> Self {
1037        let config = value.config;
1038        let request = if config.is_some() || value.location.is_some() {
1039            Some(api::basin::EnsureBasinRequest {
1040                config: config.map(Into::into),
1041                location: value.location,
1042            })
1043        } else {
1044            None
1045        };
1046        (value.name, request)
1047    }
1048}
1049
1050#[derive(Debug, Clone)]
1051/// Output for `ensure` operations ([`ensure_basin`](crate::S2::ensure_basin),
1052/// [`ensure_stream`](crate::S2Basin::ensure_stream)).
1053pub enum EnsureOutput<T> {
1054    /// Resource created.
1055    Created(T),
1056    /// Resource already existed, and its config was updated.
1057    ConfigUpdated(T),
1058    /// Resource already existed, and its config is unchanged.
1059    ConfigUnchanged(T),
1060}
1061
1062impl<T> From<ProvisionResult<T>> for EnsureOutput<T> {
1063    fn from(result: ProvisionResult<T>) -> Self {
1064        match result {
1065            ProvisionResult::Created(info) => EnsureOutput::Created(info),
1066            ProvisionResult::Updated(info) => EnsureOutput::ConfigUpdated(info),
1067            ProvisionResult::Noop(info) => EnsureOutput::ConfigUnchanged(info),
1068        }
1069    }
1070}
1071
1072#[derive(Debug, Clone, Default)]
1073#[non_exhaustive]
1074/// Input for [`list_basins`](crate::S2::list_basins) operation.
1075pub struct ListBasinsInput {
1076    /// Filter basins whose names begin with this value.
1077    ///
1078    /// Defaults to `""`.
1079    pub prefix: BasinNamePrefix,
1080    /// Filter basins whose names are lexicographically greater than this value.
1081    ///
1082    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
1083    ///
1084    /// Defaults to `""`.
1085    pub start_after: BasinNameStartAfter,
1086    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
1087    ///
1088    /// Defaults to `1000`.
1089    pub limit: Option<usize>,
1090}
1091
1092impl ListBasinsInput {
1093    /// Create a new [`ListBasinsInput`] with default values.
1094    pub fn new() -> Self {
1095        Self::default()
1096    }
1097
1098    /// Set the prefix used to filter basins whose names begin with this value.
1099    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1100        Self { prefix, ..self }
1101    }
1102
1103    /// Set the value used to filter basins whose names are lexicographically greater than this
1104    /// value.
1105    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1106        Self {
1107            start_after,
1108            ..self
1109        }
1110    }
1111
1112    /// Set the limit on number of basins to return in a page.
1113    pub fn with_limit(self, limit: usize) -> Self {
1114        Self {
1115            limit: Some(limit),
1116            ..self
1117        }
1118    }
1119}
1120
1121impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1122    fn from(value: ListBasinsInput) -> Self {
1123        Self {
1124            prefix: Some(value.prefix),
1125            start_after: Some(value.start_after),
1126            limit: value.limit,
1127        }
1128    }
1129}
1130
1131#[derive(Debug, Clone, Default)]
1132/// Input for [`list_all_basins`](crate::S2::list_all_basins) operation.
1133pub struct ListAllBasinsInput {
1134    /// Filter basins whose names begin with this value.
1135    ///
1136    /// Defaults to `""`.
1137    pub prefix: BasinNamePrefix,
1138    /// Filter basins whose names are lexicographically greater than this value.
1139    ///
1140    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
1141    ///
1142    /// Defaults to `""`.
1143    pub start_after: BasinNameStartAfter,
1144    /// Whether to include basins that are being deleted.
1145    ///
1146    /// Defaults to `false`.
1147    pub include_deleted: bool,
1148}
1149
1150impl ListAllBasinsInput {
1151    /// Create a new [`ListAllBasinsInput`] with default values.
1152    pub fn new() -> Self {
1153        Self::default()
1154    }
1155
1156    /// Set the prefix used to filter basins whose names begin with this value.
1157    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1158        Self { prefix, ..self }
1159    }
1160
1161    /// Set the value used to filter basins whose names are lexicographically greater than this
1162    /// value.
1163    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1164        Self {
1165            start_after,
1166            ..self
1167        }
1168    }
1169
1170    /// Set whether to include basins that are being deleted.
1171    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1172        Self {
1173            include_deleted,
1174            ..self
1175        }
1176    }
1177}
1178
1179#[derive(Debug, Clone, PartialEq, Eq)]
1180#[non_exhaustive]
1181/// Basin information.
1182pub struct BasinInfo {
1183    /// Basin name.
1184    pub name: BasinName,
1185    /// Location of the basin.
1186    pub location: Option<LocationName>,
1187    /// Creation time.
1188    pub created_at: S2DateTime,
1189    /// Deletion time if the basin is being deleted.
1190    pub deleted_at: Option<S2DateTime>,
1191}
1192
1193impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1194    type Error = ValidationError;
1195
1196    fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1197        Ok(Self {
1198            name: value.name,
1199            location: value.location,
1200            created_at: value.created_at.try_into()?,
1201            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1202        })
1203    }
1204}
1205
1206#[derive(Debug, Clone)]
1207#[non_exhaustive]
1208/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1209pub struct DeleteBasinInput {
1210    /// Basin name.
1211    pub name: BasinName,
1212    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1213    pub ignore_not_found: bool,
1214}
1215
1216impl DeleteBasinInput {
1217    /// Create a new [`DeleteBasinInput`] with the given basin name.
1218    pub fn new(name: BasinName) -> Self {
1219        Self {
1220            name,
1221            ignore_not_found: false,
1222        }
1223    }
1224
1225    /// Set whether to ignore `Not Found` error if the basin is not existing.
1226    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1227        Self {
1228            ignore_not_found,
1229            ..self
1230        }
1231    }
1232}
1233
1234#[derive(Debug, Clone, Default)]
1235#[non_exhaustive]
1236/// Reconfiguration for [`TimestampingConfig`].
1237pub struct TimestampingReconfiguration {
1238    /// Override for the existing [`mode`](TimestampingConfig::mode).
1239    pub mode: Maybe<Option<TimestampingMode>>,
1240    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1241    pub uncapped: Maybe<Option<bool>>,
1242}
1243
1244impl TimestampingReconfiguration {
1245    /// Create a new [`TimestampingReconfiguration`].
1246    pub fn new() -> Self {
1247        Self::default()
1248    }
1249
1250    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1251    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1252        Self {
1253            mode: Maybe::Specified(Some(mode)),
1254            ..self
1255        }
1256    }
1257
1258    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1259    pub fn with_uncapped(self, uncapped: bool) -> Self {
1260        Self {
1261            uncapped: Maybe::Specified(Some(uncapped)),
1262            ..self
1263        }
1264    }
1265}
1266
1267impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1268    fn from(value: TimestampingReconfiguration) -> Self {
1269        Self {
1270            mode: value.mode.map(|m| m.map(Into::into)),
1271            uncapped: value.uncapped,
1272        }
1273    }
1274}
1275
1276#[derive(Debug, Clone, Default)]
1277#[non_exhaustive]
1278/// Reconfiguration for [`DeleteOnEmptyConfig`].
1279pub struct DeleteOnEmptyReconfiguration {
1280    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1281    pub min_age_secs: Maybe<Option<u64>>,
1282}
1283
1284impl DeleteOnEmptyReconfiguration {
1285    /// Create a new [`DeleteOnEmptyReconfiguration`].
1286    pub fn new() -> Self {
1287        Self::default()
1288    }
1289
1290    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1291    pub fn with_min_age(self, min_age: Duration) -> Self {
1292        Self {
1293            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1294        }
1295    }
1296}
1297
1298impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1299    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1300        Self {
1301            min_age_secs: value.min_age_secs,
1302        }
1303    }
1304}
1305
1306#[derive(Debug, Clone, Default)]
1307#[non_exhaustive]
1308/// Reconfiguration for [`StreamConfig`].
1309pub struct StreamReconfiguration {
1310    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1311    pub storage_class: Maybe<Option<StorageClass>>,
1312    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1313    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1314    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1315    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1316    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1317    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1318}
1319
1320impl StreamReconfiguration {
1321    /// Create a new [`StreamReconfiguration`].
1322    pub fn new() -> Self {
1323        Self::default()
1324    }
1325
1326    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1327    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1328        Self {
1329            storage_class: Maybe::Specified(Some(storage_class)),
1330            ..self
1331        }
1332    }
1333
1334    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1335    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1336        Self {
1337            retention_policy: Maybe::Specified(Some(retention_policy)),
1338            ..self
1339        }
1340    }
1341
1342    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1343    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1344        Self {
1345            timestamping: Maybe::Specified(Some(timestamping)),
1346            ..self
1347        }
1348    }
1349
1350    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1351    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1352        Self {
1353            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1354            ..self
1355        }
1356    }
1357}
1358
1359impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1360    fn from(value: StreamReconfiguration) -> Self {
1361        Self {
1362            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1363            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1364            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1365            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1366        }
1367    }
1368}
1369
1370#[derive(Debug, Clone, Default)]
1371#[non_exhaustive]
1372/// Reconfiguration for [`BasinConfig`].
1373pub struct BasinReconfiguration {
1374    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1375    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1376    /// Override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1377    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1378    /// Override for the existing
1379    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1380    pub create_stream_on_append: Maybe<bool>,
1381    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1382    pub create_stream_on_read: Maybe<bool>,
1383}
1384
1385impl BasinReconfiguration {
1386    /// Create a new [`BasinReconfiguration`].
1387    pub fn new() -> Self {
1388        Self::default()
1389    }
1390
1391    /// Set the override for the existing
1392    /// [`default_stream_config`](BasinConfig::default_stream_config).
1393    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1394        Self {
1395            default_stream_config: Maybe::Specified(Some(config)),
1396            ..self
1397        }
1398    }
1399
1400    /// Set the override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1401    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1402        Self {
1403            stream_cipher: Maybe::Specified(Some(stream_cipher)),
1404            ..self
1405        }
1406    }
1407
1408    /// Set the override for the existing
1409    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1410    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1411        Self {
1412            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1413            ..self
1414        }
1415    }
1416
1417    /// Set the override for the existing
1418    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1419    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1420        Self {
1421            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1422            ..self
1423        }
1424    }
1425}
1426
1427impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1428    fn from(value: BasinReconfiguration) -> Self {
1429        Self {
1430            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1431            stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1432            create_stream_on_append: value.create_stream_on_append,
1433            create_stream_on_read: value.create_stream_on_read,
1434        }
1435    }
1436}
1437
1438#[derive(Debug, Clone)]
1439#[non_exhaustive]
1440/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1441pub struct ReconfigureBasinInput {
1442    /// Basin name.
1443    pub name: BasinName,
1444    /// Reconfiguration for [`BasinConfig`].
1445    pub config: BasinReconfiguration,
1446}
1447
1448impl ReconfigureBasinInput {
1449    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1450    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1451        Self { name, config }
1452    }
1453}
1454
1455#[derive(Debug, Clone, Default)]
1456#[non_exhaustive]
1457/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1458pub struct ListAccessTokensInput {
1459    /// Filter access tokens whose IDs begin with this value.
1460    ///
1461    /// Defaults to `""`.
1462    pub prefix: AccessTokenIdPrefix,
1463    /// Filter access tokens whose IDs are lexicographically greater than this value.
1464    ///
1465    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1466    ///
1467    /// Defaults to `""`.
1468    pub start_after: AccessTokenIdStartAfter,
1469    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1470    ///
1471    /// Defaults to `1000`.
1472    pub limit: Option<usize>,
1473}
1474
1475impl ListAccessTokensInput {
1476    /// Create a new [`ListAccessTokensInput`] with default values.
1477    pub fn new() -> Self {
1478        Self::default()
1479    }
1480
1481    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1482    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1483        Self { prefix, ..self }
1484    }
1485
1486    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1487    /// value.
1488    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1489        Self {
1490            start_after,
1491            ..self
1492        }
1493    }
1494
1495    /// Set the limit on number of access tokens to return in a page.
1496    pub fn with_limit(self, limit: usize) -> Self {
1497        Self {
1498            limit: Some(limit),
1499            ..self
1500        }
1501    }
1502}
1503
1504impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1505    fn from(value: ListAccessTokensInput) -> Self {
1506        Self {
1507            prefix: Some(value.prefix),
1508            start_after: Some(value.start_after),
1509            limit: value.limit,
1510        }
1511    }
1512}
1513
1514#[derive(Debug, Clone, Default)]
1515/// Input for [`list_all_access_tokens`](crate::S2::list_all_access_tokens) operation.
1516pub struct ListAllAccessTokensInput {
1517    /// Filter access tokens whose IDs begin with this value.
1518    ///
1519    /// Defaults to `""`.
1520    pub prefix: AccessTokenIdPrefix,
1521    /// Filter access tokens whose IDs are lexicographically greater than this value.
1522    ///
1523    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1524    ///
1525    /// Defaults to `""`.
1526    pub start_after: AccessTokenIdStartAfter,
1527}
1528
1529impl ListAllAccessTokensInput {
1530    /// Create a new [`ListAllAccessTokensInput`] with default values.
1531    pub fn new() -> Self {
1532        Self::default()
1533    }
1534
1535    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1536    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1537        Self { prefix, ..self }
1538    }
1539
1540    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1541    /// this value.
1542    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1543        Self {
1544            start_after,
1545            ..self
1546        }
1547    }
1548}
1549
1550#[derive(Debug, Clone, PartialEq, Eq)]
1551#[non_exhaustive]
1552/// Location information.
1553pub struct LocationInfo {
1554    /// Location name.
1555    pub name: LocationName,
1556    /// Location represents a private placement, limited by account.
1557    pub is_private: bool,
1558}
1559
1560impl From<api::location::LocationInfo> for LocationInfo {
1561    fn from(value: api::location::LocationInfo) -> Self {
1562        Self {
1563            name: value.name,
1564            is_private: value.is_private,
1565        }
1566    }
1567}
1568
1569#[derive(Debug, Clone)]
1570#[non_exhaustive]
1571/// Access token information.
1572pub struct AccessTokenInfo {
1573    /// Access token ID.
1574    pub id: AccessTokenId,
1575    /// Expiration time.
1576    pub expires_at: S2DateTime,
1577    /// Whether to automatically prefix stream names during creation and strip the prefix during
1578    /// listing.
1579    pub auto_prefix_streams: bool,
1580    /// Scope of the access token.
1581    pub scope: AccessTokenScope,
1582}
1583
1584impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1585    type Error = ValidationError;
1586
1587    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1588        let expires_at = value
1589            .expires_at
1590            .map(S2DateTime::try_from)
1591            .transpose()?
1592            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1593        Ok(Self {
1594            id: value.id,
1595            expires_at,
1596            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1597            scope: value.scope.into(),
1598        })
1599    }
1600}
1601
1602#[derive(Debug, Clone)]
1603/// Pattern for matching basins.
1604///
1605/// See [`AccessTokenScope::basins`].
1606pub enum BasinMatcher {
1607    /// Match no basins.
1608    None,
1609    /// Match exactly this basin.
1610    Exact(BasinName),
1611    /// Match all basins with this prefix.
1612    Prefix(BasinNamePrefix),
1613}
1614
1615#[derive(Debug, Clone)]
1616/// Pattern for matching streams.
1617///
1618/// See [`AccessTokenScope::streams`].
1619pub enum StreamMatcher {
1620    /// Match no streams.
1621    None,
1622    /// Match exactly this stream.
1623    Exact(StreamName),
1624    /// Match all streams with this prefix.
1625    Prefix(StreamNamePrefix),
1626}
1627
1628#[derive(Debug, Clone)]
1629/// Pattern for matching access tokens.
1630///
1631/// See [`AccessTokenScope::access_tokens`].
1632pub enum AccessTokenMatcher {
1633    /// Match no access tokens.
1634    None,
1635    /// Match exactly this access token.
1636    Exact(AccessTokenId),
1637    /// Match all access tokens with this prefix.
1638    Prefix(AccessTokenIdPrefix),
1639}
1640
1641#[derive(Debug, Clone, Default)]
1642#[non_exhaustive]
1643/// Permissions indicating allowed operations.
1644pub struct ReadWritePermissions {
1645    /// Read permission.
1646    ///
1647    /// Defaults to `false`.
1648    pub read: bool,
1649    /// Write permission.
1650    ///
1651    /// Defaults to `false`.
1652    pub write: bool,
1653}
1654
1655impl ReadWritePermissions {
1656    /// Create a new [`ReadWritePermissions`] with default values.
1657    pub fn new() -> Self {
1658        Self::default()
1659    }
1660
1661    /// Create read-only permissions.
1662    pub fn read_only() -> Self {
1663        Self {
1664            read: true,
1665            write: false,
1666        }
1667    }
1668
1669    /// Create write-only permissions.
1670    pub fn write_only() -> Self {
1671        Self {
1672            read: false,
1673            write: true,
1674        }
1675    }
1676
1677    /// Create read-write permissions.
1678    pub fn read_write() -> Self {
1679        Self {
1680            read: true,
1681            write: true,
1682        }
1683    }
1684}
1685
1686impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1687    fn from(value: ReadWritePermissions) -> Self {
1688        Self {
1689            read: Some(value.read),
1690            write: Some(value.write),
1691        }
1692    }
1693}
1694
1695impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1696    fn from(value: api::access::ReadWritePermissions) -> Self {
1697        Self {
1698            read: value.read.unwrap_or_default(),
1699            write: value.write.unwrap_or_default(),
1700        }
1701    }
1702}
1703
1704#[derive(Debug, Clone, Default)]
1705#[non_exhaustive]
1706/// Permissions at the operation group level.
1707///
1708/// See [`AccessTokenScope::op_group_perms`].
1709pub struct OperationGroupPermissions {
1710    /// Account-level access permissions.
1711    ///
1712    /// Defaults to `None`.
1713    pub account: Option<ReadWritePermissions>,
1714    /// Basin-level access permissions.
1715    ///
1716    /// Defaults to `None`.
1717    pub basin: Option<ReadWritePermissions>,
1718    /// Stream-level access permissions.
1719    ///
1720    /// Defaults to `None`.
1721    pub stream: Option<ReadWritePermissions>,
1722}
1723
1724impl OperationGroupPermissions {
1725    /// Create a new [`OperationGroupPermissions`] with default values.
1726    pub fn new() -> Self {
1727        Self::default()
1728    }
1729
1730    /// Create read-only permissions for all groups.
1731    pub fn read_only_all() -> Self {
1732        Self {
1733            account: Some(ReadWritePermissions::read_only()),
1734            basin: Some(ReadWritePermissions::read_only()),
1735            stream: Some(ReadWritePermissions::read_only()),
1736        }
1737    }
1738
1739    /// Create write-only permissions for all groups.
1740    pub fn write_only_all() -> Self {
1741        Self {
1742            account: Some(ReadWritePermissions::write_only()),
1743            basin: Some(ReadWritePermissions::write_only()),
1744            stream: Some(ReadWritePermissions::write_only()),
1745        }
1746    }
1747
1748    /// Create read-write permissions for all groups.
1749    pub fn read_write_all() -> Self {
1750        Self {
1751            account: Some(ReadWritePermissions::read_write()),
1752            basin: Some(ReadWritePermissions::read_write()),
1753            stream: Some(ReadWritePermissions::read_write()),
1754        }
1755    }
1756
1757    /// Set account-level access permissions.
1758    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1759        Self {
1760            account: Some(account),
1761            ..self
1762        }
1763    }
1764
1765    /// Set basin-level access permissions.
1766    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1767        Self {
1768            basin: Some(basin),
1769            ..self
1770        }
1771    }
1772
1773    /// Set stream-level access permissions.
1774    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1775        Self {
1776            stream: Some(stream),
1777            ..self
1778        }
1779    }
1780}
1781
1782impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1783    fn from(value: OperationGroupPermissions) -> Self {
1784        Self {
1785            account: value.account.map(Into::into),
1786            basin: value.basin.map(Into::into),
1787            stream: value.stream.map(Into::into),
1788        }
1789    }
1790}
1791
1792impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1793    fn from(value: api::access::PermittedOperationGroups) -> Self {
1794        Self {
1795            account: value.account.map(Into::into),
1796            basin: value.basin.map(Into::into),
1797            stream: value.stream.map(Into::into),
1798        }
1799    }
1800}
1801
1802#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1803/// Individual operation that can be permitted.
1804///
1805/// See [`AccessTokenScope::ops`].
1806pub enum Operation {
1807    /// List basins.
1808    ListBasins,
1809    /// Create a basin.
1810    CreateBasin,
1811    /// Get basin configuration.
1812    GetBasinConfig,
1813    /// Delete a basin.
1814    DeleteBasin,
1815    /// Reconfigure a basin.
1816    ReconfigureBasin,
1817    /// List access tokens.
1818    ListAccessTokens,
1819    /// Issue an access token.
1820    IssueAccessToken,
1821    /// Revoke an access token.
1822    RevokeAccessToken,
1823    /// Get account metrics.
1824    GetAccountMetrics,
1825    /// Get basin metrics.
1826    GetBasinMetrics,
1827    /// Get stream metrics.
1828    GetStreamMetrics,
1829    /// List streams.
1830    ListStreams,
1831    /// Create a stream.
1832    CreateStream,
1833    /// Get stream configuration.
1834    GetStreamConfig,
1835    /// Delete a stream.
1836    DeleteStream,
1837    /// Reconfigure a stream.
1838    ReconfigureStream,
1839    /// Check the tail of a stream.
1840    CheckTail,
1841    /// Append records to a stream.
1842    Append,
1843    /// Read records from a stream.
1844    Read,
1845    /// Trim records on a stream.
1846    Trim,
1847    /// Set the fencing token on a stream.
1848    Fence,
1849    /// List locations.
1850    ListLocations,
1851    /// Get the default location.
1852    GetDefaultLocation,
1853    /// Set the default location.
1854    SetDefaultLocation,
1855}
1856
1857impl From<Operation> for api::access::Operation {
1858    fn from(value: Operation) -> Self {
1859        match value {
1860            Operation::ListBasins => api::access::Operation::ListBasins,
1861            Operation::CreateBasin => api::access::Operation::CreateBasin,
1862            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1863            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1864            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1865            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1866            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1867            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1868            Operation::ListStreams => api::access::Operation::ListStreams,
1869            Operation::CreateStream => api::access::Operation::CreateStream,
1870            Operation::DeleteStream => api::access::Operation::DeleteStream,
1871            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1872            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1873            Operation::CheckTail => api::access::Operation::CheckTail,
1874            Operation::Append => api::access::Operation::Append,
1875            Operation::Read => api::access::Operation::Read,
1876            Operation::Trim => api::access::Operation::Trim,
1877            Operation::Fence => api::access::Operation::Fence,
1878            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1879            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1880            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1881            Operation::ListLocations => api::access::Operation::ListLocations,
1882            Operation::GetDefaultLocation => api::access::Operation::GetDefaultLocation,
1883            Operation::SetDefaultLocation => api::access::Operation::SetDefaultLocation,
1884        }
1885    }
1886}
1887
1888impl From<api::access::Operation> for Operation {
1889    fn from(value: api::access::Operation) -> Self {
1890        match value {
1891            api::access::Operation::ListBasins => Operation::ListBasins,
1892            api::access::Operation::CreateBasin => Operation::CreateBasin,
1893            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1894            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1895            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1896            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1897            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1898            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1899            api::access::Operation::ListStreams => Operation::ListStreams,
1900            api::access::Operation::CreateStream => Operation::CreateStream,
1901            api::access::Operation::DeleteStream => Operation::DeleteStream,
1902            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1903            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1904            api::access::Operation::CheckTail => Operation::CheckTail,
1905            api::access::Operation::Append => Operation::Append,
1906            api::access::Operation::Read => Operation::Read,
1907            api::access::Operation::Trim => Operation::Trim,
1908            api::access::Operation::Fence => Operation::Fence,
1909            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1910            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1911            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1912            api::access::Operation::ListLocations => Operation::ListLocations,
1913            api::access::Operation::GetDefaultLocation => Operation::GetDefaultLocation,
1914            api::access::Operation::SetDefaultLocation => Operation::SetDefaultLocation,
1915        }
1916    }
1917}
1918
1919#[derive(Debug, Clone)]
1920#[non_exhaustive]
1921/// Scope of an access token.
1922///
1923/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1924/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1925/// final set must not be empty.
1926///
1927/// See [`IssueAccessTokenInput::scope`].
1928pub struct AccessTokenScopeInput {
1929    basins: Option<BasinMatcher>,
1930    streams: Option<StreamMatcher>,
1931    access_tokens: Option<AccessTokenMatcher>,
1932    op_group_perms: Option<OperationGroupPermissions>,
1933    ops: HashSet<Operation>,
1934}
1935
1936impl AccessTokenScopeInput {
1937    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1938    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1939        Self {
1940            basins: None,
1941            streams: None,
1942            access_tokens: None,
1943            op_group_perms: None,
1944            ops: ops.into_iter().collect(),
1945        }
1946    }
1947
1948    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1949    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1950        Self {
1951            basins: None,
1952            streams: None,
1953            access_tokens: None,
1954            op_group_perms: Some(op_group_perms),
1955            ops: HashSet::default(),
1956        }
1957    }
1958
1959    /// Set the permitted operations.
1960    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1961        Self {
1962            ops: ops.into_iter().collect(),
1963            ..self
1964        }
1965    }
1966
1967    /// Set the access permissions at the operation group level.
1968    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1969        Self {
1970            op_group_perms: Some(op_group_perms),
1971            ..self
1972        }
1973    }
1974
1975    /// Set the permitted basins.
1976    ///
1977    /// Defaults to no basins.
1978    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1979        Self {
1980            basins: Some(basins),
1981            ..self
1982        }
1983    }
1984
1985    /// Set the permitted streams.
1986    ///
1987    /// Defaults to no streams.
1988    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1989        Self {
1990            streams: Some(streams),
1991            ..self
1992        }
1993    }
1994
1995    /// Set the permitted access tokens.
1996    ///
1997    /// Defaults to no access tokens.
1998    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1999        Self {
2000            access_tokens: Some(access_tokens),
2001            ..self
2002        }
2003    }
2004}
2005
2006#[derive(Debug, Clone)]
2007#[non_exhaustive]
2008/// Scope of an access token.
2009pub struct AccessTokenScope {
2010    /// Permitted basins.
2011    pub basins: Option<BasinMatcher>,
2012    /// Permitted streams.
2013    pub streams: Option<StreamMatcher>,
2014    /// Permitted access tokens.
2015    pub access_tokens: Option<AccessTokenMatcher>,
2016    /// Permissions at the operation group level.
2017    pub op_group_perms: Option<OperationGroupPermissions>,
2018    /// Permitted operations.
2019    pub ops: HashSet<Operation>,
2020}
2021
2022impl From<api::access::AccessTokenScope> for AccessTokenScope {
2023    fn from(value: api::access::AccessTokenScope) -> Self {
2024        Self {
2025            basins: value.basins.map(|rs| match rs {
2026                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2027                    BasinMatcher::Exact(e)
2028                }
2029                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2030                    BasinMatcher::None
2031                }
2032                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
2033            }),
2034            streams: value.streams.map(|rs| match rs {
2035                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2036                    StreamMatcher::Exact(e)
2037                }
2038                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2039                    StreamMatcher::None
2040                }
2041                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
2042            }),
2043            access_tokens: value.access_tokens.map(|rs| match rs {
2044                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2045                    AccessTokenMatcher::Exact(e)
2046                }
2047                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2048                    AccessTokenMatcher::None
2049                }
2050                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
2051            }),
2052            op_group_perms: value.op_groups.map(Into::into),
2053            ops: value
2054                .ops
2055                .map(|ops| ops.into_iter().map(Into::into).collect())
2056                .unwrap_or_default(),
2057        }
2058    }
2059}
2060
2061impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2062    fn from(value: AccessTokenScopeInput) -> Self {
2063        Self {
2064            basins: value.basins.map(|rs| match rs {
2065                BasinMatcher::None => {
2066                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2067                }
2068                BasinMatcher::Exact(e) => {
2069                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2070                }
2071                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2072            }),
2073            streams: value.streams.map(|rs| match rs {
2074                StreamMatcher::None => {
2075                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2076                }
2077                StreamMatcher::Exact(e) => {
2078                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2079                }
2080                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2081            }),
2082            access_tokens: value.access_tokens.map(|rs| match rs {
2083                AccessTokenMatcher::None => {
2084                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2085                }
2086                AccessTokenMatcher::Exact(e) => {
2087                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2088                }
2089                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2090            }),
2091            op_groups: value.op_group_perms.map(Into::into),
2092            ops: if value.ops.is_empty() {
2093                None
2094            } else {
2095                Some(value.ops.into_iter().map(Into::into).collect())
2096            },
2097        }
2098    }
2099}
2100
2101#[derive(Debug, Clone)]
2102#[non_exhaustive]
2103/// Input for [`issue_access_token`](crate::S2::issue_access_token).
2104pub struct IssueAccessTokenInput {
2105    /// Access token ID.
2106    pub id: AccessTokenId,
2107    /// Expiration time.
2108    ///
2109    /// Defaults to the expiration time of requestor's access token passed via
2110    /// [`S2Config`](S2Config::new).
2111    pub expires_at: Option<S2DateTime>,
2112    /// Whether to automatically prefix stream names during creation and strip the prefix during
2113    /// listing.
2114    ///
2115    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
2116    /// prefix.
2117    ///
2118    /// Defaults to `false`.
2119    pub auto_prefix_streams: bool,
2120    /// Scope of the token.
2121    pub scope: AccessTokenScopeInput,
2122}
2123
2124impl IssueAccessTokenInput {
2125    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
2126    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2127        Self {
2128            id,
2129            expires_at: None,
2130            auto_prefix_streams: false,
2131            scope,
2132        }
2133    }
2134
2135    /// Set the expiration time.
2136    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2137        Self {
2138            expires_at: Some(expires_at),
2139            ..self
2140        }
2141    }
2142
2143    /// Set whether to automatically prefix stream names during creation and strip the prefix during
2144    /// listing.
2145    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2146        Self {
2147            auto_prefix_streams,
2148            ..self
2149        }
2150    }
2151}
2152
2153impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2154    fn from(value: IssueAccessTokenInput) -> Self {
2155        Self {
2156            id: value.id,
2157            expires_at: value.expires_at.map(Into::into),
2158            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2159            scope: value.scope.into(),
2160        }
2161    }
2162}
2163
2164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2165/// Interval to accumulate over for timeseries metric sets.
2166pub enum TimeseriesInterval {
2167    /// Minute.
2168    Minute,
2169    /// Hour.
2170    Hour,
2171    /// Day.
2172    Day,
2173}
2174
2175impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2176    fn from(value: TimeseriesInterval) -> Self {
2177        match value {
2178            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2179            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2180            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2181        }
2182    }
2183}
2184
2185impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2186    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2187        match value {
2188            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2189            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2190            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2191        }
2192    }
2193}
2194
2195#[derive(Debug, Clone, Copy)]
2196#[non_exhaustive]
2197/// Time range as Unix epoch seconds.
2198pub struct TimeRange {
2199    /// Start timestamp (inclusive).
2200    pub start: u32,
2201    /// End timestamp (exclusive).
2202    pub end: u32,
2203}
2204
2205impl TimeRange {
2206    /// Create a new [`TimeRange`] with the given start and end timestamps.
2207    pub fn new(start: u32, end: u32) -> Self {
2208        Self { start, end }
2209    }
2210}
2211
2212#[derive(Debug, Clone, Copy)]
2213#[non_exhaustive]
2214/// Time range as Unix epoch seconds and accumulation interval.
2215pub struct TimeRangeAndInterval {
2216    /// Start timestamp (inclusive).
2217    pub start: u32,
2218    /// End timestamp (exclusive).
2219    pub end: u32,
2220    /// Interval to accumulate over for timeseries metric sets.
2221    ///
2222    /// Default is dependent on the requested metric set.
2223    pub interval: Option<TimeseriesInterval>,
2224}
2225
2226impl TimeRangeAndInterval {
2227    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2228    pub fn new(start: u32, end: u32) -> Self {
2229        Self {
2230            start,
2231            end,
2232            interval: None,
2233        }
2234    }
2235
2236    /// Set the interval to accumulate over for timeseries metric sets.
2237    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2238        Self {
2239            interval: Some(interval),
2240            ..self
2241        }
2242    }
2243}
2244
2245#[derive(Debug, Clone, Copy)]
2246/// Account metric set to return.
2247pub enum AccountMetricSet {
2248    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2249    /// specified time range.
2250    ActiveBasins(TimeRange),
2251    /// Returns [`AccumulationMetric`]s, one per account operation type.
2252    ///
2253    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2254    /// per interval over the requested time range.
2255    ///
2256    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2257    AccountOps(TimeRangeAndInterval),
2258}
2259
2260#[derive(Debug, Clone)]
2261#[non_exhaustive]
2262/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2263pub struct GetAccountMetricsInput {
2264    /// Metric set to return.
2265    pub set: AccountMetricSet,
2266}
2267
2268impl GetAccountMetricsInput {
2269    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2270    pub fn new(set: AccountMetricSet) -> Self {
2271        Self { set }
2272    }
2273}
2274
2275impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2276    fn from(value: GetAccountMetricsInput) -> Self {
2277        let (set, start, end, interval) = match value.set {
2278            AccountMetricSet::ActiveBasins(args) => (
2279                api::metrics::AccountMetricSet::ActiveBasins,
2280                args.start,
2281                args.end,
2282                None,
2283            ),
2284            AccountMetricSet::AccountOps(args) => (
2285                api::metrics::AccountMetricSet::AccountOps,
2286                args.start,
2287                args.end,
2288                args.interval,
2289            ),
2290        };
2291        Self {
2292            set,
2293            start: Some(start),
2294            end: Some(end),
2295            interval: interval.map(Into::into),
2296        }
2297    }
2298}
2299
2300#[derive(Debug, Clone, Copy)]
2301/// Basin metric set to return.
2302pub enum BasinMetricSet {
2303    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2304    /// in the basin, with one observed value for each hour over the requested time range.
2305    Storage(TimeRange),
2306    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2307    ///
2308    /// Each metric represents a timeseries of the number of append operations across all streams
2309    /// in the basin, with one accumulated value per interval over the requested time range.
2310    ///
2311    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2312    /// [`minute`](TimeseriesInterval::Minute).
2313    AppendOps(TimeRangeAndInterval),
2314    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2315    ///
2316    /// Each metric represents a timeseries of the number of read operations across all streams
2317    /// in the basin, with one accumulated value per interval over the requested time range.
2318    ///
2319    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2320    /// [`minute`](TimeseriesInterval::Minute).
2321    ReadOps(TimeRangeAndInterval),
2322    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2323    /// across all streams in the basin, with one accumulated value per interval
2324    /// over the requested time range.
2325    ///
2326    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2327    /// [`minute`](TimeseriesInterval::Minute).
2328    ReadThroughput(TimeRangeAndInterval),
2329    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2330    /// across all streams in the basin, with one accumulated value per interval
2331    /// over the requested time range.
2332    ///
2333    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2334    /// [`minute`](TimeseriesInterval::Minute).
2335    AppendThroughput(TimeRangeAndInterval),
2336    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2337    ///
2338    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2339    /// per interval over the requested time range.
2340    ///
2341    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2342    BasinOps(TimeRangeAndInterval),
2343}
2344
2345#[derive(Debug, Clone)]
2346#[non_exhaustive]
2347/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2348pub struct GetBasinMetricsInput {
2349    /// Basin name.
2350    pub name: BasinName,
2351    /// Metric set to return.
2352    pub set: BasinMetricSet,
2353}
2354
2355impl GetBasinMetricsInput {
2356    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2357    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2358        Self { name, set }
2359    }
2360}
2361
2362impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2363    fn from(value: GetBasinMetricsInput) -> Self {
2364        let (set, start, end, interval) = match value.set {
2365            BasinMetricSet::Storage(args) => (
2366                api::metrics::BasinMetricSet::Storage,
2367                args.start,
2368                args.end,
2369                None,
2370            ),
2371            BasinMetricSet::AppendOps(args) => (
2372                api::metrics::BasinMetricSet::AppendOps,
2373                args.start,
2374                args.end,
2375                args.interval,
2376            ),
2377            BasinMetricSet::ReadOps(args) => (
2378                api::metrics::BasinMetricSet::ReadOps,
2379                args.start,
2380                args.end,
2381                args.interval,
2382            ),
2383            BasinMetricSet::ReadThroughput(args) => (
2384                api::metrics::BasinMetricSet::ReadThroughput,
2385                args.start,
2386                args.end,
2387                args.interval,
2388            ),
2389            BasinMetricSet::AppendThroughput(args) => (
2390                api::metrics::BasinMetricSet::AppendThroughput,
2391                args.start,
2392                args.end,
2393                args.interval,
2394            ),
2395            BasinMetricSet::BasinOps(args) => (
2396                api::metrics::BasinMetricSet::BasinOps,
2397                args.start,
2398                args.end,
2399                args.interval,
2400            ),
2401        };
2402        (
2403            value.name,
2404            api::metrics::BasinMetricSetRequest {
2405                set,
2406                start: Some(start),
2407                end: Some(end),
2408                interval: interval.map(Into::into),
2409            },
2410        )
2411    }
2412}
2413
2414#[derive(Debug, Clone, Copy)]
2415/// Stream metric set to return.
2416pub enum StreamMetricSet {
2417    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2418    /// with one observed value for each minute over the requested time range.
2419    Storage(TimeRange),
2420}
2421
2422#[derive(Debug, Clone)]
2423#[non_exhaustive]
2424/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2425pub struct GetStreamMetricsInput {
2426    /// Basin name.
2427    pub basin_name: BasinName,
2428    /// Stream name.
2429    pub stream_name: StreamName,
2430    /// Metric set to return.
2431    pub set: StreamMetricSet,
2432}
2433
2434impl GetStreamMetricsInput {
2435    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2436    /// set.
2437    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2438        Self {
2439            basin_name,
2440            stream_name,
2441            set,
2442        }
2443    }
2444}
2445
2446impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2447    fn from(value: GetStreamMetricsInput) -> Self {
2448        let (set, start, end, interval) = match value.set {
2449            StreamMetricSet::Storage(args) => (
2450                api::metrics::StreamMetricSet::Storage,
2451                args.start,
2452                args.end,
2453                None,
2454            ),
2455        };
2456        (
2457            value.basin_name,
2458            value.stream_name,
2459            api::metrics::StreamMetricSetRequest {
2460                set,
2461                start: Some(start),
2462                end: Some(end),
2463                interval,
2464            },
2465        )
2466    }
2467}
2468
2469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2470/// Unit in which metric values are measured.
2471pub enum MetricUnit {
2472    /// Size in bytes.
2473    Bytes,
2474    /// Number of operations.
2475    Operations,
2476}
2477
2478impl From<api::metrics::MetricUnit> for MetricUnit {
2479    fn from(value: api::metrics::MetricUnit) -> Self {
2480        match value {
2481            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2482            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2483        }
2484    }
2485}
2486
2487#[derive(Debug, Clone)]
2488#[non_exhaustive]
2489/// Single named value.
2490pub struct ScalarMetric {
2491    /// Metric name.
2492    pub name: String,
2493    /// Unit for the metric value.
2494    pub unit: MetricUnit,
2495    /// Metric value.
2496    pub value: f64,
2497}
2498
2499#[derive(Debug, Clone)]
2500#[non_exhaustive]
2501/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2502/// specified interval.
2503pub struct AccumulationMetric {
2504    /// Timeseries name.
2505    pub name: String,
2506    /// Unit for the accumulated values.
2507    pub unit: MetricUnit,
2508    /// The interval at which datapoints are accumulated.
2509    pub interval: TimeseriesInterval,
2510    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2511    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2512    /// one `interval`.
2513    pub values: Vec<(u32, f64)>,
2514}
2515
2516#[derive(Debug, Clone)]
2517#[non_exhaustive]
2518/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2519pub struct GaugeMetric {
2520    /// Timeseries name.
2521    pub name: String,
2522    /// Unit for the instantaneous values.
2523    pub unit: MetricUnit,
2524    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2525    /// instant of the `timestamp` (in Unix epoch seconds).
2526    pub values: Vec<(u32, f64)>,
2527}
2528
2529#[derive(Debug, Clone)]
2530#[non_exhaustive]
2531/// Set of string labels.
2532pub struct LabelMetric {
2533    /// Label name.
2534    pub name: String,
2535    /// Label values.
2536    pub values: Vec<String>,
2537}
2538
2539#[derive(Debug, Clone)]
2540/// Individual metric in a returned metric set.
2541pub enum Metric {
2542    /// Single named value.
2543    Scalar(ScalarMetric),
2544    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2545    /// specified interval.
2546    Accumulation(AccumulationMetric),
2547    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2548    Gauge(GaugeMetric),
2549    /// Set of string labels.
2550    Label(LabelMetric),
2551}
2552
2553impl From<api::metrics::Metric> for Metric {
2554    fn from(value: api::metrics::Metric) -> Self {
2555        match value {
2556            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2557                name: sm.name.into(),
2558                unit: sm.unit.into(),
2559                value: sm.value,
2560            }),
2561            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2562                name: am.name.into(),
2563                unit: am.unit.into(),
2564                interval: am.interval.into(),
2565                values: am.values,
2566            }),
2567            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2568                name: gm.name.into(),
2569                unit: gm.unit.into(),
2570                values: gm.values,
2571            }),
2572            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2573                name: lm.name.into(),
2574                values: lm.values,
2575            }),
2576        }
2577    }
2578}
2579
2580#[derive(Debug, Clone, Default)]
2581#[non_exhaustive]
2582/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2583pub struct ListStreamsInput {
2584    /// Filter streams whose names begin with this value.
2585    ///
2586    /// Defaults to `""`.
2587    pub prefix: StreamNamePrefix,
2588    /// Filter streams whose names are lexicographically greater than this value.
2589    ///
2590    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2591    ///
2592    /// Defaults to `""`.
2593    pub start_after: StreamNameStartAfter,
2594    /// Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2595    ///
2596    /// Defaults to `1000`.
2597    pub limit: Option<usize>,
2598}
2599
2600impl ListStreamsInput {
2601    /// Create a new [`ListStreamsInput`] with default values.
2602    pub fn new() -> Self {
2603        Self::default()
2604    }
2605
2606    /// Set the prefix used to filter streams whose names begin with this value.
2607    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2608        Self { prefix, ..self }
2609    }
2610
2611    /// Set the value used to filter streams whose names are lexicographically greater than this
2612    /// value.
2613    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2614        Self {
2615            start_after,
2616            ..self
2617        }
2618    }
2619
2620    /// Set the limit on number of streams to return in a page.
2621    pub fn with_limit(self, limit: usize) -> Self {
2622        Self {
2623            limit: Some(limit),
2624            ..self
2625        }
2626    }
2627}
2628
2629impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2630    fn from(value: ListStreamsInput) -> Self {
2631        Self {
2632            prefix: Some(value.prefix),
2633            start_after: Some(value.start_after),
2634            limit: value.limit,
2635        }
2636    }
2637}
2638
2639#[derive(Debug, Clone, Default)]
2640/// Input for [`list_all_streams`](crate::S2Basin::list_all_streams) operation.
2641pub struct ListAllStreamsInput {
2642    /// Filter streams whose names begin with this value.
2643    ///
2644    /// Defaults to `""`.
2645    pub prefix: StreamNamePrefix,
2646    /// Filter streams whose names are lexicographically greater than this value.
2647    ///
2648    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2649    ///
2650    /// Defaults to `""`.
2651    pub start_after: StreamNameStartAfter,
2652    /// Whether to include streams that are being deleted.
2653    ///
2654    /// Defaults to `false`.
2655    pub include_deleted: bool,
2656}
2657
2658impl ListAllStreamsInput {
2659    /// Create a new [`ListAllStreamsInput`] with default values.
2660    pub fn new() -> Self {
2661        Self::default()
2662    }
2663
2664    /// Set the prefix used to filter streams whose names begin with this value.
2665    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2666        Self { prefix, ..self }
2667    }
2668
2669    /// Set the value used to filter streams whose names are lexicographically greater than this
2670    /// value.
2671    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2672        Self {
2673            start_after,
2674            ..self
2675        }
2676    }
2677
2678    /// Set whether to include streams that are being deleted.
2679    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2680        Self {
2681            include_deleted,
2682            ..self
2683        }
2684    }
2685}
2686
2687#[derive(Debug, Clone, PartialEq, Eq)]
2688#[non_exhaustive]
2689/// Stream information.
2690pub struct StreamInfo {
2691    /// Stream name.
2692    pub name: StreamName,
2693    /// Creation time.
2694    pub created_at: S2DateTime,
2695    /// Deletion time if the stream is being deleted.
2696    pub deleted_at: Option<S2DateTime>,
2697    /// Encryption algorithm for this stream, if encryption is enabled.
2698    pub cipher: Option<EncryptionAlgorithm>,
2699}
2700
2701impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2702    type Error = ValidationError;
2703
2704    fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2705        Ok(Self {
2706            name: value.name,
2707            created_at: value.created_at.try_into()?,
2708            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2709            cipher: value.cipher.map(Into::into),
2710        })
2711    }
2712}
2713
2714#[derive(Debug, Clone)]
2715#[non_exhaustive]
2716/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2717pub struct CreateStreamInput {
2718    /// Stream name.
2719    pub name: StreamName,
2720    /// Configuration for the stream.
2721    ///
2722    /// See [`StreamConfig`] for defaults.
2723    pub config: Option<StreamConfig>,
2724    idempotency_token: String,
2725}
2726
2727impl CreateStreamInput {
2728    /// Create a new [`CreateStreamInput`] with the given stream name.
2729    pub fn new(name: StreamName) -> Self {
2730        Self {
2731            name,
2732            config: None,
2733            idempotency_token: idempotency_token(),
2734        }
2735    }
2736
2737    /// Set the configuration for the stream.
2738    pub fn with_config(self, config: StreamConfig) -> Self {
2739        Self {
2740            config: Some(config),
2741            ..self
2742        }
2743    }
2744}
2745
2746impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2747    fn from(value: CreateStreamInput) -> Self {
2748        (
2749            api::stream::CreateStreamRequest {
2750                stream: value.name,
2751                config: value.config.map(Into::into),
2752            },
2753            value.idempotency_token,
2754        )
2755    }
2756}
2757
2758#[derive(Debug, Clone)]
2759#[non_exhaustive]
2760/// Input for [`ensure_stream`](crate::S2Basin::ensure_stream)
2761/// operation.
2762pub struct EnsureStreamInput {
2763    /// Stream name.
2764    pub name: StreamName,
2765    /// Configuration for the stream.
2766    ///
2767    /// See [`StreamConfig`] for defaults.
2768    pub config: Option<StreamConfig>,
2769}
2770
2771impl EnsureStreamInput {
2772    /// Create a new [`EnsureStreamInput`] with the given stream name.
2773    pub fn new(name: StreamName) -> Self {
2774        Self { name, config: None }
2775    }
2776
2777    /// Set the configuration for the stream.
2778    pub fn with_config(self, config: StreamConfig) -> Self {
2779        Self {
2780            config: Some(config),
2781            ..self
2782        }
2783    }
2784}
2785
2786impl From<EnsureStreamInput> for (StreamName, Option<api::config::StreamConfig>) {
2787    fn from(value: EnsureStreamInput) -> Self {
2788        (value.name, value.config.map(Into::into))
2789    }
2790}
2791
2792#[derive(Debug, Clone)]
2793#[non_exhaustive]
2794/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2795pub struct DeleteStreamInput {
2796    /// Stream name.
2797    pub name: StreamName,
2798    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2799    pub ignore_not_found: bool,
2800}
2801
2802impl DeleteStreamInput {
2803    /// Create a new [`DeleteStreamInput`] with the given stream name.
2804    pub fn new(name: StreamName) -> Self {
2805        Self {
2806            name,
2807            ignore_not_found: false,
2808        }
2809    }
2810
2811    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2812    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2813        Self {
2814            ignore_not_found,
2815            ..self
2816        }
2817    }
2818}
2819
2820#[derive(Debug, Clone)]
2821#[non_exhaustive]
2822/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2823pub struct ReconfigureStreamInput {
2824    /// Stream name.
2825    pub name: StreamName,
2826    /// Reconfiguration for [`StreamConfig`].
2827    pub config: StreamReconfiguration,
2828}
2829
2830impl ReconfigureStreamInput {
2831    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2832    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2833        Self { name, config }
2834    }
2835}
2836
2837#[derive(Debug, Clone, PartialEq, Eq)]
2838/// Token for fencing appends to a stream.
2839///
2840/// **Note:** It must not exceed 36 bytes in length.
2841///
2842/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2843pub struct FencingToken(String);
2844
2845impl FencingToken {
2846    /// Generate a random alphanumeric fencing token of `n` bytes.
2847    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2848        rand::rng()
2849            .sample_iter(&rand::distr::Alphanumeric)
2850            .take(n)
2851            .map(char::from)
2852            .collect::<String>()
2853            .parse()
2854    }
2855}
2856
2857impl FromStr for FencingToken {
2858    type Err = ValidationError;
2859
2860    fn from_str(s: &str) -> Result<Self, Self::Err> {
2861        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2862            return Err(ValidationError(format!(
2863                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2864            )));
2865        }
2866        Ok(FencingToken(s.to_string()))
2867    }
2868}
2869
2870impl std::fmt::Display for FencingToken {
2871    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2872        write!(f, "{}", self.0)
2873    }
2874}
2875
2876impl Deref for FencingToken {
2877    type Target = str;
2878
2879    fn deref(&self) -> &Self::Target {
2880        &self.0
2881    }
2882}
2883
2884#[derive(Debug, Clone, Copy, PartialEq)]
2885#[non_exhaustive]
2886/// A position in a stream.
2887pub struct StreamPosition {
2888    /// Sequence number assigned by the service.
2889    pub seq_num: u64,
2890    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2891    /// User-specified timestamps are passed through as-is.
2892    pub timestamp: u64,
2893}
2894
2895impl std::fmt::Display for StreamPosition {
2896    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2897        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2898    }
2899}
2900
2901impl From<api::stream::proto::StreamPosition> for StreamPosition {
2902    fn from(value: api::stream::proto::StreamPosition) -> Self {
2903        Self {
2904            seq_num: value.seq_num,
2905            timestamp: value.timestamp,
2906        }
2907    }
2908}
2909
2910impl From<api::stream::StreamPosition> for StreamPosition {
2911    fn from(value: api::stream::StreamPosition) -> Self {
2912        Self {
2913            seq_num: value.seq_num,
2914            timestamp: value.timestamp,
2915        }
2916    }
2917}
2918
2919#[derive(Debug, Clone, PartialEq)]
2920#[non_exhaustive]
2921/// A name-value pair.
2922pub struct Header {
2923    /// Name.
2924    pub name: Bytes,
2925    /// Value.
2926    pub value: Bytes,
2927}
2928
2929impl Header {
2930    /// Create a new [`Header`] with the given name and value.
2931    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2932        Self {
2933            name: name.into(),
2934            value: value.into(),
2935        }
2936    }
2937}
2938
2939impl From<Header> for api::stream::proto::Header {
2940    fn from(value: Header) -> Self {
2941        Self {
2942            name: value.name,
2943            value: value.value,
2944        }
2945    }
2946}
2947
2948impl From<api::stream::proto::Header> for Header {
2949    fn from(value: api::stream::proto::Header) -> Self {
2950        Self {
2951            name: value.name,
2952            value: value.value,
2953        }
2954    }
2955}
2956
2957#[derive(Debug, Clone, PartialEq)]
2958/// A record to append.
2959pub struct AppendRecord {
2960    body: Bytes,
2961    headers: Vec<Header>,
2962    timestamp: Option<u64>,
2963}
2964
2965impl AppendRecord {
2966    fn validate(self) -> Result<Self, ValidationError> {
2967        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2968            Err(ValidationError(format!(
2969                "metered_bytes: {} exceeds {}",
2970                self.metered_bytes(),
2971                RECORD_BATCH_MAX.bytes
2972            )))
2973        } else {
2974            Ok(self)
2975        }
2976    }
2977
2978    /// Create a new [`AppendRecord`] with the given record body.
2979    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2980        let record = Self {
2981            body: body.into(),
2982            headers: Vec::default(),
2983            timestamp: None,
2984        };
2985        record.validate()
2986    }
2987
2988    /// Set the headers for this record.
2989    pub fn with_headers(
2990        self,
2991        headers: impl IntoIterator<Item = Header>,
2992    ) -> Result<Self, ValidationError> {
2993        let record = Self {
2994            headers: headers.into_iter().collect(),
2995            ..self
2996        };
2997        record.validate()
2998    }
2999
3000    /// Set the timestamp for this record.
3001    ///
3002    /// Precise semantics depend on [`StreamConfig::timestamping`].
3003    pub fn with_timestamp(self, timestamp: u64) -> Self {
3004        Self {
3005            timestamp: Some(timestamp),
3006            ..self
3007        }
3008    }
3009
3010    /// Get the body of this record.
3011    pub fn body(&self) -> &[u8] {
3012        &self.body
3013    }
3014
3015    /// Get the headers of this record.
3016    pub fn headers(&self) -> &[Header] {
3017        &self.headers
3018    }
3019
3020    /// Get the timestamp of this record.
3021    pub fn timestamp(&self) -> Option<u64> {
3022        self.timestamp
3023    }
3024}
3025
3026impl From<AppendRecord> for api::stream::proto::AppendRecord {
3027    fn from(value: AppendRecord) -> Self {
3028        Self {
3029            timestamp: value.timestamp,
3030            headers: value.headers.into_iter().map(Into::into).collect(),
3031            body: value.body,
3032        }
3033    }
3034}
3035
3036/// Metered byte size calculation.
3037///
3038/// Formula for a record:
3039/// ```text
3040/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
3041/// ```
3042pub trait MeteredBytes {
3043    /// Returns the metered byte size.
3044    fn metered_bytes(&self) -> usize;
3045}
3046
3047macro_rules! metered_bytes_impl {
3048    ($ty:ty) => {
3049        impl MeteredBytes for $ty {
3050            fn metered_bytes(&self) -> usize {
3051                8 + (2 * self.headers.len())
3052                    + self
3053                        .headers
3054                        .iter()
3055                        .map(|h| h.name.len() + h.value.len())
3056                        .sum::<usize>()
3057                    + self.body.len()
3058            }
3059        }
3060    };
3061}
3062
3063metered_bytes_impl!(AppendRecord);
3064
3065#[derive(Debug, Clone)]
3066/// A batch of records to append atomically.
3067///
3068/// **Note:** It must contain at least `1` record and no more than `1000`.
3069/// The total size of the batch must not exceed `1MiB` in metered bytes.
3070///
3071/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
3072/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
3073/// that takes care of the abovementioned constraints.
3074pub struct AppendRecordBatch {
3075    records: Vec<AppendRecord>,
3076    metered_bytes: usize,
3077}
3078
3079impl AppendRecordBatch {
3080    pub(crate) fn with_capacity(capacity: usize) -> Self {
3081        Self {
3082            records: Vec::with_capacity(capacity),
3083            metered_bytes: 0,
3084        }
3085    }
3086
3087    pub(crate) fn push(&mut self, record: AppendRecord) {
3088        self.metered_bytes += record.metered_bytes();
3089        self.records.push(record);
3090    }
3091
3092    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
3093    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3094    where
3095        I: IntoIterator<Item = AppendRecord>,
3096    {
3097        let mut records = Vec::new();
3098        let mut metered_bytes = 0;
3099
3100        for record in iter {
3101            metered_bytes += record.metered_bytes();
3102            records.push(record);
3103
3104            if metered_bytes > RECORD_BATCH_MAX.bytes {
3105                return Err(ValidationError(format!(
3106                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
3107                    RECORD_BATCH_MAX.bytes
3108                )));
3109            }
3110
3111            if records.len() > RECORD_BATCH_MAX.count {
3112                return Err(ValidationError(format!(
3113                    "number of records in the batch exceeds {}",
3114                    RECORD_BATCH_MAX.count
3115                )));
3116            }
3117        }
3118
3119        if records.is_empty() {
3120            return Err(ValidationError("batch is empty".into()));
3121        }
3122
3123        Ok(Self {
3124            records,
3125            metered_bytes,
3126        })
3127    }
3128}
3129
3130impl Deref for AppendRecordBatch {
3131    type Target = [AppendRecord];
3132
3133    fn deref(&self) -> &Self::Target {
3134        &self.records
3135    }
3136}
3137
3138impl MeteredBytes for AppendRecordBatch {
3139    fn metered_bytes(&self) -> usize {
3140        self.metered_bytes
3141    }
3142}
3143
3144#[derive(Debug, Clone)]
3145/// Command to signal an operation.
3146pub enum Command {
3147    /// Fence operation.
3148    Fence {
3149        /// Fencing token.
3150        fencing_token: FencingToken,
3151    },
3152    /// Trim operation.
3153    Trim {
3154        /// Trim point.
3155        trim_point: u64,
3156    },
3157}
3158
3159#[derive(Debug, Clone)]
3160#[non_exhaustive]
3161/// Command record for signaling operations to the service.
3162///
3163/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
3164pub struct CommandRecord {
3165    /// Command to signal an operation.
3166    pub command: Command,
3167    /// Timestamp for this record.
3168    pub timestamp: Option<u64>,
3169}
3170
3171impl CommandRecord {
3172    const FENCE: &[u8] = b"fence";
3173    const TRIM: &[u8] = b"trim";
3174
3175    /// Create a fence command record with the given fencing token.
3176    ///
3177    /// Fencing is strongly consistent, and subsequent appends that specify a
3178    /// fencing token will fail if it does not match.
3179    pub fn fence(fencing_token: FencingToken) -> Self {
3180        Self {
3181            command: Command::Fence { fencing_token },
3182            timestamp: None,
3183        }
3184    }
3185
3186    /// Create a trim command record with the given trim point.
3187    ///
3188    /// Trim point is the desired earliest sequence number for the stream.
3189    ///
3190    /// Trimming is eventually consistent, and trimmed records may be visible
3191    /// for a brief period.
3192    pub fn trim(trim_point: u64) -> Self {
3193        Self {
3194            command: Command::Trim { trim_point },
3195            timestamp: None,
3196        }
3197    }
3198
3199    /// Set the timestamp for this record.
3200    pub fn with_timestamp(self, timestamp: u64) -> Self {
3201        Self {
3202            timestamp: Some(timestamp),
3203            ..self
3204        }
3205    }
3206}
3207
3208impl From<CommandRecord> for AppendRecord {
3209    fn from(value: CommandRecord) -> Self {
3210        let (header_value, body) = match value.command {
3211            Command::Fence { fencing_token } => (
3212                CommandRecord::FENCE,
3213                Bytes::copy_from_slice(fencing_token.as_bytes()),
3214            ),
3215            Command::Trim { trim_point } => (
3216                CommandRecord::TRIM,
3217                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3218            ),
3219        };
3220        Self {
3221            body,
3222            headers: vec![Header::new("", header_value)],
3223            timestamp: value.timestamp,
3224        }
3225    }
3226}
3227
3228#[derive(Debug, Clone)]
3229#[non_exhaustive]
3230/// Input for [`append`](crate::S2Stream::append) operation and
3231/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3232pub struct AppendInput {
3233    /// Batch of records to append atomically.
3234    pub records: AppendRecordBatch,
3235    /// Expected sequence number for the first record in the batch.
3236    ///
3237    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3238    pub match_seq_num: Option<u64>,
3239    /// Fencing token to match against the stream's current fencing token.
3240    ///
3241    /// If unspecified, no matching is performed. If specified and mismatched,
3242    /// the append fails. A stream defaults to `""` as its fencing token.
3243    pub fencing_token: Option<FencingToken>,
3244}
3245
3246impl AppendInput {
3247    /// Create a new [`AppendInput`] with the given batch of records.
3248    pub fn new(records: AppendRecordBatch) -> Self {
3249        Self {
3250            records,
3251            match_seq_num: None,
3252            fencing_token: None,
3253        }
3254    }
3255
3256    /// Set the expected sequence number for the first record in the batch.
3257    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3258        Self {
3259            match_seq_num: Some(match_seq_num),
3260            ..self
3261        }
3262    }
3263
3264    /// Set the fencing token to match against the stream's current fencing token.
3265    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3266        Self {
3267            fencing_token: Some(fencing_token),
3268            ..self
3269        }
3270    }
3271}
3272
3273impl From<AppendInput> for api::stream::proto::AppendInput {
3274    fn from(value: AppendInput) -> Self {
3275        Self {
3276            records: value.records.iter().cloned().map(Into::into).collect(),
3277            match_seq_num: value.match_seq_num,
3278            fencing_token: value.fencing_token.map(|t| t.to_string()),
3279        }
3280    }
3281}
3282
3283#[derive(Debug, Clone, PartialEq)]
3284#[non_exhaustive]
3285/// Acknowledgement for an [`AppendInput`].
3286pub struct AppendAck {
3287    /// Sequence number and timestamp of the first record that was appended.
3288    pub start: StreamPosition,
3289    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3290    /// that was appended.
3291    ///
3292    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3293    /// appended.
3294    pub end: StreamPosition,
3295    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3296    /// the last record on the stream.
3297    ///
3298    /// This can be greater than the `end` position in case of concurrent appends.
3299    pub tail: StreamPosition,
3300}
3301
3302impl From<api::stream::proto::AppendAck> for AppendAck {
3303    fn from(value: api::stream::proto::AppendAck) -> Self {
3304        Self {
3305            start: value.start.unwrap_or_default().into(),
3306            end: value.end.unwrap_or_default().into(),
3307            tail: value.tail.unwrap_or_default().into(),
3308        }
3309    }
3310}
3311
3312#[derive(Debug, Clone, Copy)]
3313/// Starting position for reading from a stream.
3314pub enum ReadFrom {
3315    /// Read from this sequence number.
3316    SeqNum(u64),
3317    /// Read from this timestamp.
3318    Timestamp(u64),
3319    /// Read from N records before the tail.
3320    TailOffset(u64),
3321}
3322
3323impl Default for ReadFrom {
3324    fn default() -> Self {
3325        Self::SeqNum(0)
3326    }
3327}
3328
3329#[derive(Debug, Default, Clone)]
3330#[non_exhaustive]
3331/// Where to start reading.
3332pub struct ReadStart {
3333    /// Starting position.
3334    ///
3335    /// Defaults to reading from sequence number `0`.
3336    pub from: ReadFrom,
3337    /// Whether to start from tail if the requested starting position is beyond it.
3338    ///
3339    /// Defaults to `false` (errors if position is beyond tail).
3340    pub clamp_to_tail: bool,
3341}
3342
3343impl ReadStart {
3344    /// Create a new [`ReadStart`] with default values.
3345    pub fn new() -> Self {
3346        Self::default()
3347    }
3348
3349    /// Set the starting position.
3350    pub fn with_from(self, from: ReadFrom) -> Self {
3351        Self { from, ..self }
3352    }
3353
3354    /// Set whether to start from tail if the requested starting position is beyond it.
3355    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3356        Self {
3357            clamp_to_tail,
3358            ..self
3359        }
3360    }
3361}
3362
3363impl From<ReadStart> for api::stream::ReadStart {
3364    fn from(value: ReadStart) -> Self {
3365        let (seq_num, timestamp, tail_offset) = match value.from {
3366            ReadFrom::SeqNum(n) => (Some(n), None, None),
3367            ReadFrom::Timestamp(t) => (None, Some(t), None),
3368            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3369        };
3370        Self {
3371            seq_num,
3372            timestamp,
3373            tail_offset,
3374            clamp: if value.clamp_to_tail {
3375                Some(true)
3376            } else {
3377                None
3378            },
3379        }
3380    }
3381}
3382
3383#[derive(Debug, Clone, Default)]
3384#[non_exhaustive]
3385/// Limits on how much to read.
3386pub struct ReadLimits {
3387    /// Limit on number of records.
3388    ///
3389    /// Defaults to `1000` for non-streaming read.
3390    pub count: Option<usize>,
3391    /// Limit on total metered bytes of records.
3392    ///
3393    /// Defaults to `1MiB` for non-streaming read.
3394    pub bytes: Option<usize>,
3395}
3396
3397impl ReadLimits {
3398    /// Create a new [`ReadLimits`] with default values.
3399    pub fn new() -> Self {
3400        Self::default()
3401    }
3402
3403    /// Set the limit on number of records.
3404    pub fn with_count(self, count: usize) -> Self {
3405        Self {
3406            count: Some(count),
3407            ..self
3408        }
3409    }
3410
3411    /// Set the limit on total metered bytes of records.
3412    pub fn with_bytes(self, bytes: usize) -> Self {
3413        Self {
3414            bytes: Some(bytes),
3415            ..self
3416        }
3417    }
3418}
3419
3420#[derive(Debug, Clone, Default)]
3421#[non_exhaustive]
3422/// When to stop reading.
3423pub struct ReadStop {
3424    /// Limits on how much to read.
3425    ///
3426    /// See [`ReadLimits`] for defaults.
3427    pub limits: ReadLimits,
3428    /// Timestamp at which to stop (exclusive).
3429    ///
3430    /// Defaults to `None`.
3431    pub until: Option<RangeTo<u64>>,
3432    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3433    /// seconds for [`read`](crate::S2Stream::read).
3434    ///
3435    /// Defaults to:
3436    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3437    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3438    ///   is specified.
3439    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3440    ///   `until` is specified.
3441    pub wait: Option<u32>,
3442}
3443
3444impl ReadStop {
3445    /// Create a new [`ReadStop`] with default values.
3446    pub fn new() -> Self {
3447        Self::default()
3448    }
3449
3450    /// Set the limits on how much to read.
3451    pub fn with_limits(self, limits: ReadLimits) -> Self {
3452        Self { limits, ..self }
3453    }
3454
3455    /// Set the timestamp at which to stop (exclusive).
3456    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3457        Self {
3458            until: Some(until),
3459            ..self
3460        }
3461    }
3462
3463    /// Set the duration in seconds to wait for new records before stopping.
3464    pub fn with_wait(self, wait: u32) -> Self {
3465        Self {
3466            wait: Some(wait),
3467            ..self
3468        }
3469    }
3470}
3471
3472impl From<ReadStop> for api::stream::ReadEnd {
3473    fn from(value: ReadStop) -> Self {
3474        Self {
3475            count: value.limits.count,
3476            bytes: value.limits.bytes,
3477            until: value.until.map(|r| r.end),
3478            wait: value.wait,
3479        }
3480    }
3481}
3482
3483#[derive(Debug, Clone, Default)]
3484#[non_exhaustive]
3485/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3486/// operations.
3487pub struct ReadInput {
3488    /// Where to start reading.
3489    ///
3490    /// See [`ReadStart`] for defaults.
3491    pub start: ReadStart,
3492    /// When to stop reading.
3493    ///
3494    /// See [`ReadStop`] for defaults.
3495    pub stop: ReadStop,
3496    /// Whether to filter out command records from the stream when reading.
3497    ///
3498    /// Defaults to `false`.
3499    pub ignore_command_records: bool,
3500}
3501
3502impl ReadInput {
3503    /// Create a new [`ReadInput`] with default values.
3504    pub fn new() -> Self {
3505        Self::default()
3506    }
3507
3508    /// Set where to start reading.
3509    pub fn with_start(self, start: ReadStart) -> Self {
3510        Self { start, ..self }
3511    }
3512
3513    /// Set when to stop reading.
3514    pub fn with_stop(self, stop: ReadStop) -> Self {
3515        Self { stop, ..self }
3516    }
3517
3518    /// Set whether to filter out command records from the stream when reading.
3519    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3520        Self {
3521            ignore_command_records,
3522            ..self
3523        }
3524    }
3525}
3526
3527#[derive(Debug, Clone)]
3528#[non_exhaustive]
3529/// Record that is durably sequenced on a stream.
3530pub struct SequencedRecord {
3531    /// Sequence number assigned to this record.
3532    pub seq_num: u64,
3533    /// Body of this record.
3534    pub body: Bytes,
3535    /// Headers for this record.
3536    pub headers: Vec<Header>,
3537    /// Timestamp for this record.
3538    pub timestamp: u64,
3539}
3540
3541impl SequencedRecord {
3542    /// Whether this is a command record.
3543    pub fn is_command_record(&self) -> bool {
3544        self.headers.len() == 1 && *self.headers[0].name == *b""
3545    }
3546}
3547
3548impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3549    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3550        Self {
3551            seq_num: value.seq_num,
3552            body: value.body,
3553            headers: value.headers.into_iter().map(Into::into).collect(),
3554            timestamp: value.timestamp,
3555        }
3556    }
3557}
3558
3559metered_bytes_impl!(SequencedRecord);
3560
3561#[derive(Debug, Clone)]
3562#[non_exhaustive]
3563/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3564/// [`read_session`](crate::S2Stream::read_session).
3565pub struct ReadBatch {
3566    /// Records that are durably sequenced on the stream.
3567    ///
3568    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3569    /// - the [`stop condition`](ReadInput::stop) was already met, or
3570    /// - all records in the batch were command records and
3571    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3572    pub records: Vec<SequencedRecord>,
3573    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3574    /// the last record.
3575    ///
3576    /// It will only be present when reading recent records.
3577    pub tail: Option<StreamPosition>,
3578}
3579
3580impl ReadBatch {
3581    pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3582        Self {
3583            records: batch.records.into_iter().map(Into::into).collect(),
3584            tail: batch.tail.map(Into::into),
3585        }
3586    }
3587}
3588
3589/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3590pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3591
3592#[derive(Debug, Clone, thiserror::Error)]
3593/// Why an append condition check failed.
3594pub enum AppendConditionFailed {
3595    #[error("fencing token mismatch, expected: {0}")]
3596    /// Fencing token did not match. Contains the expected fencing token.
3597    FencingTokenMismatch(FencingToken),
3598    #[error("sequence number mismatch, expected: {0}")]
3599    /// Sequence number did not match. Contains the expected sequence number.
3600    SeqNumMismatch(u64),
3601}
3602
3603impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3604    fn from(value: api::stream::AppendConditionFailed) -> Self {
3605        match value {
3606            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3607                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3608            }
3609            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3610                AppendConditionFailed::SeqNumMismatch(seq)
3611            }
3612        }
3613    }
3614}
3615
3616#[derive(Debug, Clone, thiserror::Error)]
3617/// Errors from S2 operations.
3618pub enum S2Error {
3619    #[error("{0}")]
3620    /// Client-side error.
3621    Client(String),
3622    #[error(transparent)]
3623    /// Validation error.
3624    Validation(#[from] ValidationError),
3625    #[error("{0}")]
3626    /// Append condition check failed. Contains the failure reason.
3627    AppendConditionFailed(AppendConditionFailed),
3628    #[error("read from an unwritten position. current tail: {0}")]
3629    /// Read from an unwritten position. Contains the current tail.
3630    ReadUnwritten(StreamPosition),
3631    #[error("{0}")]
3632    /// Other server-side error.
3633    Server(ErrorResponse),
3634}
3635
3636impl From<ApiError> for S2Error {
3637    fn from(err: ApiError) -> Self {
3638        match err {
3639            ApiError::ReadUnwritten(tail_response) => {
3640                Self::ReadUnwritten(tail_response.tail.into())
3641            }
3642            ApiError::AppendConditionFailed(condition_failed) => {
3643                Self::AppendConditionFailed(condition_failed.into())
3644            }
3645            ApiError::Server(_, response) => Self::Server(response.into()),
3646            other => Self::Client(other.to_string()),
3647        }
3648    }
3649}
3650
3651#[derive(Debug, Clone, thiserror::Error)]
3652#[error("{code}: {message}")]
3653#[non_exhaustive]
3654/// Error response from S2 server.
3655pub struct ErrorResponse {
3656    /// Error code.
3657    pub code: String,
3658    /// Error message.
3659    pub message: String,
3660}
3661
3662impl From<ApiErrorResponse> for ErrorResponse {
3663    fn from(response: ApiErrorResponse) -> Self {
3664        Self {
3665            code: response.code,
3666            message: response.message,
3667        }
3668    }
3669}
3670
3671fn idempotency_token() -> String {
3672    uuid::Uuid::new_v4().simple().to_string()
3673}