Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    sync::Arc,
12    time::Duration,
13};
14
15use bytes::Bytes;
16use http::{
17    header::HeaderValue,
18    uri::{Authority, Scheme},
19};
20use rand::RngExt;
21use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Location name.
42///
43/// **Note:** It must be between 1 and 64 characters in length and can only comprise ASCII
44/// letters, numbers, colons, hyphens, and periods.
45pub use s2_common::types::location::LocationName;
46/// Stream name.
47///
48/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
49pub use s2_common::types::stream::StreamName;
50/// See [`ListStreamsInput::prefix`].
51pub use s2_common::types::stream::StreamNamePrefix;
52/// See [`ListStreamsInput::start_after`].
53pub use s2_common::types::stream::StreamNameStartAfter;
54pub use s2_common::{
55    caps::RECORD_BATCH_MAX,
56    encryption::{EncryptionAlgorithm, EncryptionKey},
57};
58
59pub(crate) const ONE_MIB: u32 = 1024 * 1024;
60
61use s2_common::{
62    maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH, types::resources::ProvisionResult,
63};
64use secrecy::SecretString;
65
66use crate::api::{ApiError, ApiErrorResponse};
67
68/// An RFC 3339 datetime.
69///
70/// It can be created in either of the following ways:
71/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
72/// - Convert from [`time::OffsetDateTime`] using [`TryFrom`]/[`TryInto`].
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub struct S2DateTime(time::OffsetDateTime);
75
76impl TryFrom<time::OffsetDateTime> for S2DateTime {
77    type Error = ValidationError;
78
79    fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
80        dt.format(&time::format_description::well_known::Rfc3339)
81            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
82        Ok(Self(dt))
83    }
84}
85
86impl From<S2DateTime> for time::OffsetDateTime {
87    fn from(dt: S2DateTime) -> Self {
88        dt.0
89    }
90}
91
92impl FromStr for S2DateTime {
93    type Err = ValidationError;
94
95    fn from_str(s: &str) -> Result<Self, Self::Err> {
96        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
97            .map(Self)
98            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
99    }
100}
101
102impl fmt::Display for S2DateTime {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        write!(
105            f,
106            "{}",
107            self.0
108                .format(&time::format_description::well_known::Rfc3339)
109                .expect("RFC3339 formatting should not fail for S2DateTime")
110        )
111    }
112}
113
114/// Authority for connecting to an S2 basin.
115#[derive(Debug, Clone, PartialEq)]
116pub(crate) enum BasinAuthority {
117    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
118    ParentZone(Authority),
119    /// Direct cell authority. Basin is expected to be hosted by this cell.
120    Direct(Authority),
121}
122
123/// Account endpoint.
124#[derive(Debug, Clone)]
125pub struct AccountEndpoint {
126    scheme: Scheme,
127    authority: Authority,
128}
129
130impl AccountEndpoint {
131    /// Create a new [`AccountEndpoint`] with the given endpoint.
132    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
133        endpoint.parse()
134    }
135}
136
137impl FromStr for AccountEndpoint {
138    type Err = ValidationError;
139
140    fn from_str(s: &str) -> Result<Self, Self::Err> {
141        let (scheme, authority) = match s.find("://") {
142            Some(idx) => {
143                let scheme: Scheme = s[..idx]
144                    .parse()
145                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
146                (scheme, &s[idx + 3..])
147            }
148            None => (Scheme::HTTPS, s),
149        };
150        Ok(Self {
151            scheme,
152            authority: authority
153                .parse()
154                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
155        })
156    }
157}
158
159/// Basin endpoint.
160#[derive(Debug, Clone)]
161pub struct BasinEndpoint {
162    scheme: Scheme,
163    authority: BasinAuthority,
164}
165
166impl BasinEndpoint {
167    /// Create a new [`BasinEndpoint`] with the given endpoint.
168    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
169        endpoint.parse()
170    }
171}
172
173impl FromStr for BasinEndpoint {
174    type Err = ValidationError;
175
176    fn from_str(s: &str) -> Result<Self, Self::Err> {
177        let (scheme, authority) = match s.find("://") {
178            Some(idx) => {
179                let scheme: Scheme = s[..idx]
180                    .parse()
181                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
182                (scheme, &s[idx + 3..])
183            }
184            None => (Scheme::HTTPS, s),
185        };
186        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
187            BasinAuthority::ParentZone(
188                authority
189                    .parse()
190                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
191            )
192        } else {
193            BasinAuthority::Direct(
194                authority
195                    .parse()
196                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
197            )
198        };
199        Ok(Self { scheme, authority })
200    }
201}
202
203#[derive(Debug, Clone)]
204#[non_exhaustive]
205/// Endpoints for the S2 environment.
206pub struct S2Endpoints {
207    pub(crate) scheme: Scheme,
208    pub(crate) account_authority: Authority,
209    pub(crate) basin_authority: BasinAuthority,
210}
211
212impl S2Endpoints {
213    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
214    pub fn new(
215        account_endpoint: AccountEndpoint,
216        basin_endpoint: BasinEndpoint,
217    ) -> Result<Self, ValidationError> {
218        if account_endpoint.scheme != basin_endpoint.scheme {
219            return Err("account and basin endpoints must have the same scheme".into());
220        }
221        Ok(Self {
222            scheme: account_endpoint.scheme,
223            account_authority: account_endpoint.authority,
224            basin_authority: basin_endpoint.authority,
225        })
226    }
227
228    /// Create a new [`S2Endpoints`] from environment variables.
229    ///
230    /// The following environment variables are expected to be set:
231    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
232    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
233    pub fn from_env() -> Result<Self, ValidationError> {
234        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
235            Ok(endpoint) => endpoint.parse()?,
236            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
237            Err(VarError::NotUnicode(_)) => {
238                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
239            }
240        };
241
242        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
243            Ok(endpoint) => endpoint.parse()?,
244            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
245            Err(VarError::NotUnicode(_)) => {
246                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
247            }
248        };
249
250        if account_endpoint.scheme != basin_endpoint.scheme {
251            return Err(
252                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
253            );
254        }
255
256        Ok(Self {
257            scheme: account_endpoint.scheme,
258            account_authority: account_endpoint.authority,
259            basin_authority: basin_endpoint.authority,
260        })
261    }
262
263    pub(crate) fn for_aws() -> Self {
264        Self {
265            scheme: Scheme::HTTPS,
266            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
267            basin_authority: BasinAuthority::ParentZone(
268                "b.s2.dev".try_into().expect("valid authority"),
269            ),
270        }
271    }
272}
273
274#[derive(Debug, Clone, Copy)]
275/// Compression algorithm for request and response bodies.
276pub enum Compression {
277    /// No compression.
278    None,
279    /// Gzip compression.
280    Gzip,
281    /// Zstd compression.
282    Zstd,
283}
284
285impl From<Compression> for CompressionAlgorithm {
286    fn from(value: Compression) -> Self {
287        match value {
288            Compression::None => CompressionAlgorithm::None,
289            Compression::Gzip => CompressionAlgorithm::Gzip,
290            Compression::Zstd => CompressionAlgorithm::Zstd,
291        }
292    }
293}
294
295#[derive(Debug, Clone, Copy, PartialEq)]
296#[non_exhaustive]
297/// Retry policy for [`append`](crate::S2Stream::append) and
298/// [`append_session`](crate::S2Stream::append_session) operations.
299pub enum AppendRetryPolicy {
300    /// Retry all appends. Use when duplicate records on the stream are acceptable.
301    All,
302    /// Retry when it can be determined that the request had no side effects.
303    ///
304    /// Uses a frame-level signal to detect whether any body frames were consumed
305    /// by the HTTP transport. If no frames were sent, the server never saw the
306    /// request, so retry is safe and will not cause duplicate records.
307    ///
308    /// Certain server errors (`rate_limited`, `hot_server`) are also safe to
309    /// retry regardless of frame signal state, since they guarantee no mutation
310    /// occurred.
311    NoSideEffects,
312}
313
314#[derive(Debug, Clone)]
315#[non_exhaustive]
316/// Configuration for retrying requests in case of transient failures.
317///
318/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
319/// ```text
320/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
321///     jitter = rand[0, base_delay]
322///     delay  = base_delay + jitter
323/// ````
324pub struct RetryConfig {
325    /// Total number of attempts including the initial try. A value of `1` means no retries.
326    ///
327    /// Defaults to `3`.
328    pub max_attempts: NonZeroU32,
329    /// Minimum base delay for retries.
330    ///
331    /// Defaults to `100ms`.
332    pub min_base_delay: Duration,
333    /// Maximum base delay for retries.
334    ///
335    /// Defaults to `1s`.
336    pub max_base_delay: Duration,
337    /// Retry policy for [`append`](crate::S2Stream::append) and
338    /// [`append_session`](crate::S2Stream::append_session) operations.
339    ///
340    /// Defaults to `All`.
341    pub append_retry_policy: AppendRetryPolicy,
342}
343
344impl Default for RetryConfig {
345    fn default() -> Self {
346        Self {
347            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
348            min_base_delay: Duration::from_millis(100),
349            max_base_delay: Duration::from_secs(1),
350            append_retry_policy: AppendRetryPolicy::All,
351        }
352    }
353}
354
355impl RetryConfig {
356    /// Create a new [`RetryConfig`] with default settings.
357    pub fn new() -> Self {
358        Self::default()
359    }
360
361    pub(crate) fn max_retries(&self) -> u32 {
362        self.max_attempts.get() - 1
363    }
364
365    /// Set the total number of attempts including the initial try.
366    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
367        Self {
368            max_attempts,
369            ..self
370        }
371    }
372
373    /// Set the minimum base delay for retries.
374    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
375        Self {
376            min_base_delay,
377            ..self
378        }
379    }
380
381    /// Set the maximum base delay for retries.
382    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
383        Self {
384            max_base_delay,
385            ..self
386        }
387    }
388
389    /// Set the retry policy for [`append`](crate::S2Stream::append) and
390    /// [`append_session`](crate::S2Stream::append_session) operations.
391    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
392        Self {
393            append_retry_policy,
394            ..self
395        }
396    }
397}
398
399#[derive(Debug, Clone)]
400#[non_exhaustive]
401/// Configuration for [`S2`](crate::S2).
402pub struct S2Config {
403    pub(crate) access_token: SecretString,
404    pub(crate) endpoints: S2Endpoints,
405    pub(crate) connection_timeout: Duration,
406    pub(crate) request_timeout: Duration,
407    pub(crate) retry: RetryConfig,
408    pub(crate) compression: Compression,
409    pub(crate) user_agent: HeaderValue,
410    pub(crate) insecure_skip_cert_verification: bool,
411    pub(crate) rustls_crypto_provider: Option<Arc<rustls::crypto::CryptoProvider>>,
412}
413
414impl S2Config {
415    /// Create a new [`S2Config`] with the given access token and default settings.
416    pub fn new(access_token: impl Into<String>) -> Self {
417        Self {
418            access_token: access_token.into().into(),
419            endpoints: S2Endpoints::for_aws(),
420            connection_timeout: Duration::from_secs(3),
421            request_timeout: Duration::from_secs(5),
422            retry: RetryConfig::new(),
423            compression: Compression::None,
424            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
425                .parse()
426                .expect("valid user agent"),
427            insecure_skip_cert_verification: false,
428            rustls_crypto_provider: default_rustls_crypto_provider(),
429        }
430    }
431
432    /// Set the S2 endpoints to connect to.
433    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
434        Self { endpoints, ..self }
435    }
436
437    /// Set the timeout for establishing a connection to the server.
438    ///
439    /// Defaults to `3s`.
440    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
441        Self {
442            connection_timeout,
443            ..self
444        }
445    }
446
447    /// Set the timeout for requests.
448    ///
449    /// Defaults to `5s`.
450    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
451        Self {
452            request_timeout,
453            ..self
454        }
455    }
456
457    /// Set the retry configuration for requests.
458    ///
459    /// See [`RetryConfig`] for defaults.
460    pub fn with_retry(self, retry: RetryConfig) -> Self {
461        Self { retry, ..self }
462    }
463
464    /// Set the compression algorithm for requests and responses.
465    ///
466    /// Defaults to no compression.
467    pub fn with_compression(self, compression: Compression) -> Self {
468        Self {
469            compression,
470            ..self
471        }
472    }
473
474    /// Skip TLS certificate verification (insecure).
475    ///
476    /// This is useful for connecting to endpoints with self-signed certificates
477    /// or certificates that don't match the hostname (similar to `curl -k`).
478    ///
479    /// # Warning
480    ///
481    /// This disables certificate verification and should only be used for
482    /// testing or development purposes. **Never use this in production.**
483    ///
484    /// Defaults to `false`.
485    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
486        Self {
487            insecure_skip_cert_verification: skip,
488            ..self
489        }
490    }
491
492    /// Use a specific rustls crypto provider for SDK TLS connections.
493    ///
494    /// With default features enabled, the SDK uses the `aws-lc-rs` provider.
495    /// With default features disabled, the SDK uses rustls's process-global
496    /// provider if one has been installed, or returns an error otherwise.
497    ///
498    /// Use this when your application needs a specific rustls provider, such as
499    /// `ring` or a custom [`rustls::crypto::CryptoProvider`]. The corresponding
500    /// rustls provider feature must be enabled in the dependency graph.
501    pub fn with_rustls_crypto_provider(
502        self,
503        provider: impl Into<Arc<rustls::crypto::CryptoProvider>>,
504    ) -> Self {
505        Self {
506            rustls_crypto_provider: Some(provider.into()),
507            ..self
508        }
509    }
510
511    /// Use rustls's `aws-lc-rs` crypto provider.
512    ///
513    /// Requires the `rustls-aws-lc-rs` crate feature.
514    #[cfg(feature = "rustls-aws-lc-rs")]
515    pub fn with_rustls_aws_lc_rs_crypto_provider(self) -> Self {
516        self.with_rustls_crypto_provider(rustls::crypto::aws_lc_rs::default_provider())
517    }
518
519    /// Use rustls's `ring` crypto provider.
520    ///
521    /// Requires the `rustls-ring` crate feature.
522    #[cfg(feature = "rustls-ring")]
523    pub fn with_rustls_ring_crypto_provider(self) -> Self {
524        self.with_rustls_crypto_provider(rustls::crypto::ring::default_provider())
525    }
526
527    #[doc(hidden)]
528    #[cfg(feature = "_hidden")]
529    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
530        let user_agent = user_agent
531            .into()
532            .parse()
533            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
534        Ok(Self { user_agent, ..self })
535    }
536}
537
538#[cfg(feature = "rustls-aws-lc-rs")]
539fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
540    Some(Arc::new(rustls::crypto::aws_lc_rs::default_provider()))
541}
542
543#[cfg(all(not(feature = "rustls-aws-lc-rs"), feature = "rustls-ring"))]
544fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
545    Some(Arc::new(rustls::crypto::ring::default_provider()))
546}
547
548#[cfg(all(not(feature = "rustls-aws-lc-rs"), not(feature = "rustls-ring")))]
549fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
550    None
551}
552
553#[derive(Debug, Default, Clone, PartialEq, Eq)]
554#[non_exhaustive]
555/// A page of values.
556pub struct Page<T> {
557    /// Values in this page.
558    pub values: Vec<T>,
559    /// Whether there are more pages.
560    pub has_more: bool,
561}
562
563impl<T> Page<T> {
564    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
565        Self {
566            values: values.into(),
567            has_more,
568        }
569    }
570}
571
572#[derive(Debug, Clone, Copy, PartialEq, Eq)]
573/// Storage class for recent appends.
574pub enum StorageClass {
575    /// Standard storage class that offers append latencies under `500ms`.
576    Standard,
577    /// Express storage class that offers append latencies under `50ms`.
578    Express,
579}
580
581impl From<api::config::StorageClass> for StorageClass {
582    fn from(value: api::config::StorageClass) -> Self {
583        match value {
584            api::config::StorageClass::Standard => StorageClass::Standard,
585            api::config::StorageClass::Express => StorageClass::Express,
586        }
587    }
588}
589
590impl From<StorageClass> for api::config::StorageClass {
591    fn from(value: StorageClass) -> Self {
592        match value {
593            StorageClass::Standard => api::config::StorageClass::Standard,
594            StorageClass::Express => api::config::StorageClass::Express,
595        }
596    }
597}
598
599#[derive(Debug, Clone, Copy, PartialEq, Eq)]
600/// Retention policy for records in a stream.
601pub enum RetentionPolicy {
602    /// Age in seconds. Records older than this age are automatically trimmed.
603    Age(u64),
604    /// Records are retained indefinitely unless explicitly trimmed.
605    Infinite,
606}
607
608impl From<api::config::RetentionPolicy> for RetentionPolicy {
609    fn from(value: api::config::RetentionPolicy) -> Self {
610        match value {
611            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
612            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
613        }
614    }
615}
616
617impl From<RetentionPolicy> for api::config::RetentionPolicy {
618    fn from(value: RetentionPolicy) -> Self {
619        match value {
620            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
621            RetentionPolicy::Infinite => {
622                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
623            }
624        }
625    }
626}
627
628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
629/// Timestamping mode for appends that influences how timestamps are handled.
630pub enum TimestampingMode {
631    /// Prefer client-specified timestamp if present otherwise use arrival time.
632    ClientPrefer,
633    /// Require a client-specified timestamp and reject the append if it is missing.
634    ClientRequire,
635    /// Use the arrival time and ignore any client-specified timestamp.
636    Arrival,
637}
638
639impl From<api::config::TimestampingMode> for TimestampingMode {
640    fn from(value: api::config::TimestampingMode) -> Self {
641        match value {
642            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
643            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
644            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
645        }
646    }
647}
648
649impl From<TimestampingMode> for api::config::TimestampingMode {
650    fn from(value: TimestampingMode) -> Self {
651        match value {
652            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
653            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
654            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
655        }
656    }
657}
658
659#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
660#[non_exhaustive]
661/// Configuration for timestamping behavior.
662pub struct TimestampingConfig {
663    /// Timestamping mode for appends that influences how timestamps are handled.
664    ///
665    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
666    pub mode: Option<TimestampingMode>,
667    /// Whether client-specified timestamps are allowed to exceed the arrival time.
668    ///
669    /// Defaults to `false` (client timestamps are capped at the arrival time).
670    pub uncapped: Option<bool>,
671}
672
673impl TimestampingConfig {
674    /// Create a new [`TimestampingConfig`] with default settings.
675    pub fn new() -> Self {
676        Self::default()
677    }
678
679    /// Set the timestamping mode for appends that influences how timestamps are handled.
680    pub fn with_mode(self, mode: TimestampingMode) -> Self {
681        Self {
682            mode: Some(mode),
683            ..self
684        }
685    }
686
687    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
688    pub fn with_uncapped(self, uncapped: bool) -> Self {
689        Self {
690            uncapped: Some(uncapped),
691            ..self
692        }
693    }
694}
695
696impl From<api::config::TimestampingConfig> for TimestampingConfig {
697    fn from(value: api::config::TimestampingConfig) -> Self {
698        Self {
699            mode: value.mode.map(Into::into),
700            uncapped: value.uncapped,
701        }
702    }
703}
704
705impl From<TimestampingConfig> for api::config::TimestampingConfig {
706    fn from(value: TimestampingConfig) -> Self {
707        Self {
708            mode: value.mode.map(Into::into),
709            uncapped: value.uncapped,
710        }
711    }
712}
713
714#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
715#[non_exhaustive]
716/// Configuration for automatically deleting a stream when it becomes empty.
717pub struct DeleteOnEmptyConfig {
718    /// Minimum age in seconds before an empty stream can be deleted.
719    ///
720    /// Defaults to `0` (disables automatic deletion).
721    pub min_age_secs: u64,
722}
723
724impl DeleteOnEmptyConfig {
725    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
726    pub fn new() -> Self {
727        Self::default()
728    }
729
730    /// Set the minimum age in seconds before an empty stream can be deleted.
731    pub fn with_min_age(self, min_age: Duration) -> Self {
732        Self {
733            min_age_secs: min_age.as_secs(),
734        }
735    }
736}
737
738impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
739    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
740        Self {
741            min_age_secs: value.min_age_secs,
742        }
743    }
744}
745
746impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
747    fn from(value: DeleteOnEmptyConfig) -> Self {
748        Self {
749            min_age_secs: value.min_age_secs,
750        }
751    }
752}
753
754#[derive(Debug, Clone, Default, PartialEq, Eq)]
755#[non_exhaustive]
756/// Configuration for a stream.
757pub struct StreamConfig {
758    /// Storage class for the stream.
759    ///
760    /// Defaults to [`Express`](StorageClass::Express).
761    pub storage_class: Option<StorageClass>,
762    /// Retention policy for records in the stream.
763    ///
764    /// Defaults to `7 days` of retention.
765    pub retention_policy: Option<RetentionPolicy>,
766    /// Configuration for timestamping behavior.
767    ///
768    /// See [`TimestampingConfig`] for defaults.
769    pub timestamping: Option<TimestampingConfig>,
770    /// Configuration for automatically deleting the stream when it becomes empty.
771    ///
772    /// See [`DeleteOnEmptyConfig`] for defaults.
773    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
774}
775
776impl StreamConfig {
777    /// Create a new [`StreamConfig`] with default settings.
778    pub fn new() -> Self {
779        Self::default()
780    }
781
782    /// Set the storage class for the stream.
783    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
784        Self {
785            storage_class: Some(storage_class),
786            ..self
787        }
788    }
789
790    /// Set the retention policy for records in the stream.
791    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
792        Self {
793            retention_policy: Some(retention_policy),
794            ..self
795        }
796    }
797
798    /// Set the configuration for timestamping behavior.
799    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
800        Self {
801            timestamping: Some(timestamping),
802            ..self
803        }
804    }
805
806    /// Set the configuration for automatically deleting the stream when it becomes empty.
807    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
808        Self {
809            delete_on_empty: Some(delete_on_empty),
810            ..self
811        }
812    }
813}
814
815impl From<api::config::StreamConfig> for StreamConfig {
816    fn from(value: api::config::StreamConfig) -> Self {
817        Self {
818            storage_class: value.storage_class.map(Into::into),
819            retention_policy: value.retention_policy.map(Into::into),
820            timestamping: value.timestamping.map(Into::into),
821            delete_on_empty: value.delete_on_empty.map(Into::into),
822        }
823    }
824}
825
826impl From<StreamConfig> for api::config::StreamConfig {
827    fn from(value: StreamConfig) -> Self {
828        Self {
829            storage_class: value.storage_class.map(Into::into),
830            retention_policy: value.retention_policy.map(Into::into),
831            timestamping: value.timestamping.map(Into::into),
832            delete_on_empty: value.delete_on_empty.map(Into::into),
833        }
834    }
835}
836
837#[derive(Debug, Clone, Default, PartialEq, Eq)]
838#[non_exhaustive]
839/// Configuration for a basin.
840pub struct BasinConfig {
841    /// Default configuration for all streams in the basin.
842    ///
843    /// See [`StreamConfig`] for defaults.
844    pub default_stream_config: Option<StreamConfig>,
845    /// Encryption algorithm to apply to newly created streams in the basin.
846    pub stream_cipher: Option<EncryptionAlgorithm>,
847    /// Whether to create stream on append if it doesn't exist using default stream configuration.
848    ///
849    /// Defaults to `false`.
850    pub create_stream_on_append: bool,
851    /// Whether to create stream on read if it doesn't exist using default stream configuration.
852    ///
853    /// Defaults to `false`.
854    pub create_stream_on_read: bool,
855}
856
857impl BasinConfig {
858    /// Create a new [`BasinConfig`] with default settings.
859    pub fn new() -> Self {
860        Self::default()
861    }
862
863    /// Set the default configuration for all streams in the basin.
864    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
865        Self {
866            default_stream_config: Some(config),
867            ..self
868        }
869    }
870
871    /// Set the encryption algorithm to apply to newly created streams in the basin.
872    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
873        Self {
874            stream_cipher: Some(stream_cipher),
875            ..self
876        }
877    }
878
879    /// Set whether to create stream on append if it doesn't exist using default stream
880    /// configuration.
881    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
882        Self {
883            create_stream_on_append,
884            ..self
885        }
886    }
887
888    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
889    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
890        Self {
891            create_stream_on_read,
892            ..self
893        }
894    }
895}
896
897impl From<api::config::BasinConfig> for BasinConfig {
898    fn from(value: api::config::BasinConfig) -> Self {
899        Self {
900            default_stream_config: value.default_stream_config.map(Into::into),
901            stream_cipher: value.stream_cipher.map(Into::into),
902            create_stream_on_append: value.create_stream_on_append,
903            create_stream_on_read: value.create_stream_on_read,
904        }
905    }
906}
907
908impl From<BasinConfig> for api::config::BasinConfig {
909    fn from(value: BasinConfig) -> Self {
910        Self {
911            default_stream_config: value.default_stream_config.map(Into::into),
912            stream_cipher: value.stream_cipher.map(Into::into),
913            create_stream_on_append: value.create_stream_on_append,
914            create_stream_on_read: value.create_stream_on_read,
915        }
916    }
917}
918
919#[derive(Debug, Clone)]
920#[non_exhaustive]
921/// Input for [`create_basin`](crate::S2::create_basin) operation.
922pub struct CreateBasinInput {
923    /// Basin name.
924    pub name: BasinName,
925    /// Configuration for the basin.
926    ///
927    /// See [`BasinConfig`] for defaults.
928    pub config: Option<BasinConfig>,
929    /// Location of the basin.
930    ///
931    /// If omitted when creating, uses the default location for the account.
932    pub location: Option<LocationName>,
933    idempotency_token: String,
934}
935
936impl CreateBasinInput {
937    /// Create a new [`CreateBasinInput`] with the given basin name.
938    pub fn new(name: BasinName) -> Self {
939        Self {
940            name,
941            config: None,
942            location: None,
943            idempotency_token: idempotency_token(),
944        }
945    }
946
947    /// Set the configuration for the basin.
948    pub fn with_config(self, config: BasinConfig) -> Self {
949        Self {
950            config: Some(config),
951            ..self
952        }
953    }
954
955    /// Set the location of the basin.
956    pub fn with_location<S>(self, location: S) -> Result<Self, ValidationError>
957    where
958        S: TryInto<LocationName>,
959        S::Error: fmt::Display,
960    {
961        let location = location
962            .try_into()
963            .map_err(|e| ValidationError(e.to_string()))?;
964        Ok(Self {
965            location: Some(location),
966            ..self
967        })
968    }
969}
970
971impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
972    fn from(value: CreateBasinInput) -> Self {
973        (
974            api::basin::CreateBasinRequest {
975                basin: value.name,
976                config: value.config.map(Into::into),
977                location: value.location,
978            },
979            value.idempotency_token,
980        )
981    }
982}
983
984#[derive(Debug, Clone)]
985#[non_exhaustive]
986/// Input for [`ensure_basin`](crate::S2::ensure_basin) operation.
987pub struct EnsureBasinInput {
988    /// Basin name.
989    pub name: BasinName,
990    /// Configuration for the basin.
991    ///
992    /// See [`BasinConfig`] for defaults.
993    pub config: Option<BasinConfig>,
994    /// Location of the basin.
995    ///
996    /// If omitted when creating, uses the default location for the account. Cannot be changed once
997    /// set.
998    pub location: Option<LocationName>,
999}
1000
1001impl EnsureBasinInput {
1002    /// Create a new [`EnsureBasinInput`] with the given basin name.
1003    pub fn new(name: BasinName) -> Self {
1004        Self {
1005            name,
1006            config: None,
1007            location: None,
1008        }
1009    }
1010
1011    /// Set the configuration for the basin.
1012    pub fn with_config(self, config: BasinConfig) -> Self {
1013        Self {
1014            config: Some(config),
1015            ..self
1016        }
1017    }
1018
1019    /// Set the location of the basin.
1020    pub fn with_location<S>(self, location: S) -> Result<Self, ValidationError>
1021    where
1022        S: TryInto<LocationName>,
1023        S::Error: fmt::Display,
1024    {
1025        let location = location
1026            .try_into()
1027            .map_err(|e| ValidationError(e.to_string()))?;
1028        Ok(Self {
1029            location: Some(location),
1030            ..self
1031        })
1032    }
1033}
1034
1035impl From<EnsureBasinInput> for (BasinName, Option<api::basin::EnsureBasinRequest>) {
1036    fn from(value: EnsureBasinInput) -> Self {
1037        let config = value.config;
1038        let request = if config.is_some() || value.location.is_some() {
1039            Some(api::basin::EnsureBasinRequest {
1040                config: config.map(Into::into),
1041                location: value.location,
1042            })
1043        } else {
1044            None
1045        };
1046        (value.name, request)
1047    }
1048}
1049
1050#[derive(Debug, Clone)]
1051/// Output for `ensure` operations ([`ensure_basin`](crate::S2::ensure_basin),
1052/// [`ensure_stream`](crate::S2Basin::ensure_stream)).
1053pub enum EnsureOutput<T> {
1054    /// Resource created.
1055    Created(T),
1056    /// Resource already existed, and its config was updated.
1057    ConfigUpdated(T),
1058    /// Resource already existed, and its config is unchanged.
1059    ConfigUnchanged(T),
1060}
1061
1062impl<T> From<ProvisionResult<T>> for EnsureOutput<T> {
1063    fn from(result: ProvisionResult<T>) -> Self {
1064        match result {
1065            ProvisionResult::Created(info) => EnsureOutput::Created(info),
1066            ProvisionResult::Updated(info) => EnsureOutput::ConfigUpdated(info),
1067            ProvisionResult::Noop(info) => EnsureOutput::ConfigUnchanged(info),
1068        }
1069    }
1070}
1071
1072#[derive(Debug, Clone, Default)]
1073#[non_exhaustive]
1074/// Input for [`list_basins`](crate::S2::list_basins) operation.
1075pub struct ListBasinsInput {
1076    /// Filter basins whose names begin with this value.
1077    ///
1078    /// Defaults to `""`.
1079    pub prefix: BasinNamePrefix,
1080    /// Filter basins whose names are lexicographically greater than this value.
1081    ///
1082    /// Defaults to `""`.
1083    pub start_after: BasinNameStartAfter,
1084    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
1085    ///
1086    /// Defaults to `1000`.
1087    pub limit: Option<usize>,
1088}
1089
1090impl ListBasinsInput {
1091    /// Create a new [`ListBasinsInput`] with default values.
1092    pub fn new() -> Self {
1093        Self::default()
1094    }
1095
1096    /// Set the prefix used to filter basins whose names begin with this value.
1097    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1098        Self { prefix, ..self }
1099    }
1100
1101    /// Set the value used to filter basins whose names are lexicographically greater than this
1102    /// value.
1103    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1104        Self {
1105            start_after,
1106            ..self
1107        }
1108    }
1109
1110    /// Set the limit on number of basins to return in a page.
1111    pub fn with_limit(self, limit: usize) -> Self {
1112        Self {
1113            limit: Some(limit),
1114            ..self
1115        }
1116    }
1117}
1118
1119impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1120    fn from(value: ListBasinsInput) -> Self {
1121        Self {
1122            prefix: Some(value.prefix),
1123            start_after: Some(value.start_after),
1124            limit: value.limit,
1125        }
1126    }
1127}
1128
1129#[derive(Debug, Clone, Default)]
1130/// Input for [`list_all_basins`](crate::S2::list_all_basins) operation.
1131pub struct ListAllBasinsInput {
1132    /// Filter basins whose names begin with this value.
1133    ///
1134    /// Defaults to `""`.
1135    pub prefix: BasinNamePrefix,
1136    /// Filter basins whose names are lexicographically greater than this value.
1137    ///
1138    /// Defaults to `""`.
1139    pub start_after: BasinNameStartAfter,
1140    /// Whether to include basins that are being deleted.
1141    ///
1142    /// Defaults to `false`.
1143    pub include_deleted: bool,
1144}
1145
1146impl ListAllBasinsInput {
1147    /// Create a new [`ListAllBasinsInput`] with default values.
1148    pub fn new() -> Self {
1149        Self::default()
1150    }
1151
1152    /// Set the prefix used to filter basins whose names begin with this value.
1153    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1154        Self { prefix, ..self }
1155    }
1156
1157    /// Set the value used to filter basins whose names are lexicographically greater than this
1158    /// value.
1159    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1160        Self {
1161            start_after,
1162            ..self
1163        }
1164    }
1165
1166    /// Set whether to include basins that are being deleted.
1167    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1168        Self {
1169            include_deleted,
1170            ..self
1171        }
1172    }
1173}
1174
1175#[derive(Debug, Clone, PartialEq, Eq)]
1176#[non_exhaustive]
1177/// Basin information.
1178pub struct BasinInfo {
1179    /// Basin name.
1180    pub name: BasinName,
1181    /// Location of the basin.
1182    pub location: Option<LocationName>,
1183    /// Creation time.
1184    pub created_at: S2DateTime,
1185    /// Deletion time if the basin is being deleted.
1186    pub deleted_at: Option<S2DateTime>,
1187}
1188
1189impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1190    type Error = ValidationError;
1191
1192    fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1193        Ok(Self {
1194            name: value.name,
1195            location: value.location,
1196            created_at: value.created_at.try_into()?,
1197            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1198        })
1199    }
1200}
1201
1202#[derive(Debug, Clone)]
1203#[non_exhaustive]
1204/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1205pub struct DeleteBasinInput {
1206    /// Basin name.
1207    pub name: BasinName,
1208    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1209    pub ignore_not_found: bool,
1210}
1211
1212impl DeleteBasinInput {
1213    /// Create a new [`DeleteBasinInput`] with the given basin name.
1214    pub fn new(name: BasinName) -> Self {
1215        Self {
1216            name,
1217            ignore_not_found: false,
1218        }
1219    }
1220
1221    /// Set whether to ignore `Not Found` error if the basin is not existing.
1222    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1223        Self {
1224            ignore_not_found,
1225            ..self
1226        }
1227    }
1228}
1229
1230#[derive(Debug, Clone, Default)]
1231#[non_exhaustive]
1232/// Reconfiguration for [`TimestampingConfig`].
1233pub struct TimestampingReconfiguration {
1234    /// Override for the existing [`mode`](TimestampingConfig::mode).
1235    pub mode: Maybe<Option<TimestampingMode>>,
1236    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1237    pub uncapped: Maybe<Option<bool>>,
1238}
1239
1240impl TimestampingReconfiguration {
1241    /// Create a new [`TimestampingReconfiguration`].
1242    pub fn new() -> Self {
1243        Self::default()
1244    }
1245
1246    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1247    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1248        Self {
1249            mode: Maybe::Specified(Some(mode)),
1250            ..self
1251        }
1252    }
1253
1254    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1255    pub fn with_uncapped(self, uncapped: bool) -> Self {
1256        Self {
1257            uncapped: Maybe::Specified(Some(uncapped)),
1258            ..self
1259        }
1260    }
1261}
1262
1263impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1264    fn from(value: TimestampingReconfiguration) -> Self {
1265        Self {
1266            mode: value.mode.map(|m| m.map(Into::into)),
1267            uncapped: value.uncapped,
1268        }
1269    }
1270}
1271
1272#[derive(Debug, Clone, Default)]
1273#[non_exhaustive]
1274/// Reconfiguration for [`DeleteOnEmptyConfig`].
1275pub struct DeleteOnEmptyReconfiguration {
1276    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1277    pub min_age_secs: Maybe<Option<u64>>,
1278}
1279
1280impl DeleteOnEmptyReconfiguration {
1281    /// Create a new [`DeleteOnEmptyReconfiguration`].
1282    pub fn new() -> Self {
1283        Self::default()
1284    }
1285
1286    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1287    pub fn with_min_age(self, min_age: Duration) -> Self {
1288        Self {
1289            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1290        }
1291    }
1292}
1293
1294impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1295    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1296        Self {
1297            min_age_secs: value.min_age_secs,
1298        }
1299    }
1300}
1301
1302#[derive(Debug, Clone, Default)]
1303#[non_exhaustive]
1304/// Reconfiguration for [`StreamConfig`].
1305pub struct StreamReconfiguration {
1306    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1307    pub storage_class: Maybe<Option<StorageClass>>,
1308    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1309    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1310    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1311    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1312    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1313    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1314}
1315
1316impl StreamReconfiguration {
1317    /// Create a new [`StreamReconfiguration`].
1318    pub fn new() -> Self {
1319        Self::default()
1320    }
1321
1322    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1323    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1324        Self {
1325            storage_class: Maybe::Specified(Some(storage_class)),
1326            ..self
1327        }
1328    }
1329
1330    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1331    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1332        Self {
1333            retention_policy: Maybe::Specified(Some(retention_policy)),
1334            ..self
1335        }
1336    }
1337
1338    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1339    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1340        Self {
1341            timestamping: Maybe::Specified(Some(timestamping)),
1342            ..self
1343        }
1344    }
1345
1346    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1347    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1348        Self {
1349            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1350            ..self
1351        }
1352    }
1353}
1354
1355impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1356    fn from(value: StreamReconfiguration) -> Self {
1357        Self {
1358            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1359            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1360            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1361            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1362        }
1363    }
1364}
1365
1366#[derive(Debug, Clone, Default)]
1367#[non_exhaustive]
1368/// Reconfiguration for [`BasinConfig`].
1369pub struct BasinReconfiguration {
1370    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1371    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1372    /// Override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1373    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1374    /// Override for the existing
1375    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1376    pub create_stream_on_append: Maybe<bool>,
1377    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1378    pub create_stream_on_read: Maybe<bool>,
1379}
1380
1381impl BasinReconfiguration {
1382    /// Create a new [`BasinReconfiguration`].
1383    pub fn new() -> Self {
1384        Self::default()
1385    }
1386
1387    /// Set the override for the existing
1388    /// [`default_stream_config`](BasinConfig::default_stream_config).
1389    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1390        Self {
1391            default_stream_config: Maybe::Specified(Some(config)),
1392            ..self
1393        }
1394    }
1395
1396    /// Set the override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1397    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1398        Self {
1399            stream_cipher: Maybe::Specified(Some(stream_cipher)),
1400            ..self
1401        }
1402    }
1403
1404    /// Set the override for the existing
1405    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1406    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1407        Self {
1408            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1409            ..self
1410        }
1411    }
1412
1413    /// Set the override for the existing
1414    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1415    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1416        Self {
1417            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1418            ..self
1419        }
1420    }
1421}
1422
1423impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1424    fn from(value: BasinReconfiguration) -> Self {
1425        Self {
1426            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1427            stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1428            create_stream_on_append: value.create_stream_on_append,
1429            create_stream_on_read: value.create_stream_on_read,
1430        }
1431    }
1432}
1433
1434#[derive(Debug, Clone)]
1435#[non_exhaustive]
1436/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1437pub struct ReconfigureBasinInput {
1438    /// Basin name.
1439    pub name: BasinName,
1440    /// Reconfiguration for [`BasinConfig`].
1441    pub config: BasinReconfiguration,
1442}
1443
1444impl ReconfigureBasinInput {
1445    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1446    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1447        Self { name, config }
1448    }
1449}
1450
1451#[derive(Debug, Clone, Default)]
1452#[non_exhaustive]
1453/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1454pub struct ListAccessTokensInput {
1455    /// Filter access tokens whose IDs begin with this value.
1456    ///
1457    /// Defaults to `""`.
1458    pub prefix: AccessTokenIdPrefix,
1459    /// Filter access tokens whose IDs are lexicographically greater than this value.
1460    ///
1461    /// Defaults to `""`.
1462    pub start_after: AccessTokenIdStartAfter,
1463    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1464    ///
1465    /// Defaults to `1000`.
1466    pub limit: Option<usize>,
1467}
1468
1469impl ListAccessTokensInput {
1470    /// Create a new [`ListAccessTokensInput`] with default values.
1471    pub fn new() -> Self {
1472        Self::default()
1473    }
1474
1475    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1476    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1477        Self { prefix, ..self }
1478    }
1479
1480    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1481    /// value.
1482    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1483        Self {
1484            start_after,
1485            ..self
1486        }
1487    }
1488
1489    /// Set the limit on number of access tokens to return in a page.
1490    pub fn with_limit(self, limit: usize) -> Self {
1491        Self {
1492            limit: Some(limit),
1493            ..self
1494        }
1495    }
1496}
1497
1498impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1499    fn from(value: ListAccessTokensInput) -> Self {
1500        Self {
1501            prefix: Some(value.prefix),
1502            start_after: Some(value.start_after),
1503            limit: value.limit,
1504        }
1505    }
1506}
1507
1508#[derive(Debug, Clone, Default)]
1509/// Input for [`list_all_access_tokens`](crate::S2::list_all_access_tokens) operation.
1510pub struct ListAllAccessTokensInput {
1511    /// Filter access tokens whose IDs begin with this value.
1512    ///
1513    /// Defaults to `""`.
1514    pub prefix: AccessTokenIdPrefix,
1515    /// Filter access tokens whose IDs are lexicographically greater than this value.
1516    ///
1517    /// Defaults to `""`.
1518    pub start_after: AccessTokenIdStartAfter,
1519}
1520
1521impl ListAllAccessTokensInput {
1522    /// Create a new [`ListAllAccessTokensInput`] with default values.
1523    pub fn new() -> Self {
1524        Self::default()
1525    }
1526
1527    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1528    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1529        Self { prefix, ..self }
1530    }
1531
1532    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1533    /// this value.
1534    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1535        Self {
1536            start_after,
1537            ..self
1538        }
1539    }
1540}
1541
1542#[derive(Debug, Clone, PartialEq, Eq)]
1543#[non_exhaustive]
1544/// Location information.
1545pub struct LocationInfo {
1546    /// Location name.
1547    pub name: LocationName,
1548    /// Location represents a private placement, limited by account.
1549    pub is_private: bool,
1550}
1551
1552impl From<api::location::LocationInfo> for LocationInfo {
1553    fn from(value: api::location::LocationInfo) -> Self {
1554        Self {
1555            name: value.name,
1556            is_private: value.is_private,
1557        }
1558    }
1559}
1560
1561#[derive(Debug, Clone)]
1562#[non_exhaustive]
1563/// Access token information.
1564pub struct AccessTokenInfo {
1565    /// Access token ID.
1566    pub id: AccessTokenId,
1567    /// Expiration time.
1568    pub expires_at: S2DateTime,
1569    /// Whether to automatically prefix stream names during creation and strip the prefix during
1570    /// listing.
1571    pub auto_prefix_streams: bool,
1572    /// Scope of the access token.
1573    pub scope: AccessTokenScope,
1574}
1575
1576impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1577    type Error = ValidationError;
1578
1579    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1580        let expires_at = value
1581            .expires_at
1582            .map(S2DateTime::try_from)
1583            .transpose()?
1584            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1585        Ok(Self {
1586            id: value.id,
1587            expires_at,
1588            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1589            scope: value.scope.into(),
1590        })
1591    }
1592}
1593
1594#[derive(Debug, Clone)]
1595/// Pattern for matching basins.
1596///
1597/// See [`AccessTokenScope::basins`].
1598pub enum BasinMatcher {
1599    /// Match no basins.
1600    None,
1601    /// Match exactly this basin.
1602    Exact(BasinName),
1603    /// Match all basins with this prefix.
1604    Prefix(BasinNamePrefix),
1605}
1606
1607#[derive(Debug, Clone)]
1608/// Pattern for matching streams.
1609///
1610/// See [`AccessTokenScope::streams`].
1611pub enum StreamMatcher {
1612    /// Match no streams.
1613    None,
1614    /// Match exactly this stream.
1615    Exact(StreamName),
1616    /// Match all streams with this prefix.
1617    Prefix(StreamNamePrefix),
1618}
1619
1620#[derive(Debug, Clone)]
1621/// Pattern for matching access tokens.
1622///
1623/// See [`AccessTokenScope::access_tokens`].
1624pub enum AccessTokenMatcher {
1625    /// Match no access tokens.
1626    None,
1627    /// Match exactly this access token.
1628    Exact(AccessTokenId),
1629    /// Match all access tokens with this prefix.
1630    Prefix(AccessTokenIdPrefix),
1631}
1632
1633#[derive(Debug, Clone, Default)]
1634#[non_exhaustive]
1635/// Permissions indicating allowed operations.
1636pub struct ReadWritePermissions {
1637    /// Read permission.
1638    ///
1639    /// Defaults to `false`.
1640    pub read: bool,
1641    /// Write permission.
1642    ///
1643    /// Defaults to `false`.
1644    pub write: bool,
1645}
1646
1647impl ReadWritePermissions {
1648    /// Create a new [`ReadWritePermissions`] with default values.
1649    pub fn new() -> Self {
1650        Self::default()
1651    }
1652
1653    /// Create read-only permissions.
1654    pub fn read_only() -> Self {
1655        Self {
1656            read: true,
1657            write: false,
1658        }
1659    }
1660
1661    /// Create write-only permissions.
1662    pub fn write_only() -> Self {
1663        Self {
1664            read: false,
1665            write: true,
1666        }
1667    }
1668
1669    /// Create read-write permissions.
1670    pub fn read_write() -> Self {
1671        Self {
1672            read: true,
1673            write: true,
1674        }
1675    }
1676}
1677
1678impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1679    fn from(value: ReadWritePermissions) -> Self {
1680        Self {
1681            read: Some(value.read),
1682            write: Some(value.write),
1683        }
1684    }
1685}
1686
1687impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1688    fn from(value: api::access::ReadWritePermissions) -> Self {
1689        Self {
1690            read: value.read.unwrap_or_default(),
1691            write: value.write.unwrap_or_default(),
1692        }
1693    }
1694}
1695
1696#[derive(Debug, Clone, Default)]
1697#[non_exhaustive]
1698/// Permissions at the operation group level.
1699///
1700/// See [`AccessTokenScope::op_group_perms`].
1701pub struct OperationGroupPermissions {
1702    /// Account-level access permissions.
1703    ///
1704    /// Defaults to `None`.
1705    pub account: Option<ReadWritePermissions>,
1706    /// Basin-level access permissions.
1707    ///
1708    /// Defaults to `None`.
1709    pub basin: Option<ReadWritePermissions>,
1710    /// Stream-level access permissions.
1711    ///
1712    /// Defaults to `None`.
1713    pub stream: Option<ReadWritePermissions>,
1714}
1715
1716impl OperationGroupPermissions {
1717    /// Create a new [`OperationGroupPermissions`] with default values.
1718    pub fn new() -> Self {
1719        Self::default()
1720    }
1721
1722    /// Create read-only permissions for all groups.
1723    pub fn read_only_all() -> Self {
1724        Self {
1725            account: Some(ReadWritePermissions::read_only()),
1726            basin: Some(ReadWritePermissions::read_only()),
1727            stream: Some(ReadWritePermissions::read_only()),
1728        }
1729    }
1730
1731    /// Create write-only permissions for all groups.
1732    pub fn write_only_all() -> Self {
1733        Self {
1734            account: Some(ReadWritePermissions::write_only()),
1735            basin: Some(ReadWritePermissions::write_only()),
1736            stream: Some(ReadWritePermissions::write_only()),
1737        }
1738    }
1739
1740    /// Create read-write permissions for all groups.
1741    pub fn read_write_all() -> Self {
1742        Self {
1743            account: Some(ReadWritePermissions::read_write()),
1744            basin: Some(ReadWritePermissions::read_write()),
1745            stream: Some(ReadWritePermissions::read_write()),
1746        }
1747    }
1748
1749    /// Set account-level access permissions.
1750    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1751        Self {
1752            account: Some(account),
1753            ..self
1754        }
1755    }
1756
1757    /// Set basin-level access permissions.
1758    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1759        Self {
1760            basin: Some(basin),
1761            ..self
1762        }
1763    }
1764
1765    /// Set stream-level access permissions.
1766    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1767        Self {
1768            stream: Some(stream),
1769            ..self
1770        }
1771    }
1772}
1773
1774impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1775    fn from(value: OperationGroupPermissions) -> Self {
1776        Self {
1777            account: value.account.map(Into::into),
1778            basin: value.basin.map(Into::into),
1779            stream: value.stream.map(Into::into),
1780        }
1781    }
1782}
1783
1784impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1785    fn from(value: api::access::PermittedOperationGroups) -> Self {
1786        Self {
1787            account: value.account.map(Into::into),
1788            basin: value.basin.map(Into::into),
1789            stream: value.stream.map(Into::into),
1790        }
1791    }
1792}
1793
1794#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1795/// Individual operation that can be permitted.
1796///
1797/// See [`AccessTokenScope::ops`].
1798pub enum Operation {
1799    /// List basins.
1800    ListBasins,
1801    /// Create a basin.
1802    CreateBasin,
1803    /// Get basin configuration.
1804    GetBasinConfig,
1805    /// Delete a basin.
1806    DeleteBasin,
1807    /// Reconfigure a basin.
1808    ReconfigureBasin,
1809    /// List access tokens.
1810    ListAccessTokens,
1811    /// Issue an access token.
1812    IssueAccessToken,
1813    /// Revoke an access token.
1814    RevokeAccessToken,
1815    /// Get account metrics.
1816    GetAccountMetrics,
1817    /// Get basin metrics.
1818    GetBasinMetrics,
1819    /// Get stream metrics.
1820    GetStreamMetrics,
1821    /// List streams.
1822    ListStreams,
1823    /// Create a stream.
1824    CreateStream,
1825    /// Get stream configuration.
1826    GetStreamConfig,
1827    /// Delete a stream.
1828    DeleteStream,
1829    /// Reconfigure a stream.
1830    ReconfigureStream,
1831    /// Check the tail of a stream.
1832    CheckTail,
1833    /// Append records to a stream.
1834    Append,
1835    /// Read records from a stream.
1836    Read,
1837    /// Trim records on a stream.
1838    Trim,
1839    /// Set the fencing token on a stream.
1840    Fence,
1841    /// List locations.
1842    ListLocations,
1843    /// Get the default location.
1844    GetDefaultLocation,
1845    /// Set the default location.
1846    SetDefaultLocation,
1847}
1848
1849impl From<Operation> for api::access::Operation {
1850    fn from(value: Operation) -> Self {
1851        match value {
1852            Operation::ListBasins => api::access::Operation::ListBasins,
1853            Operation::CreateBasin => api::access::Operation::CreateBasin,
1854            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1855            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1856            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1857            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1858            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1859            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1860            Operation::ListStreams => api::access::Operation::ListStreams,
1861            Operation::CreateStream => api::access::Operation::CreateStream,
1862            Operation::DeleteStream => api::access::Operation::DeleteStream,
1863            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1864            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1865            Operation::CheckTail => api::access::Operation::CheckTail,
1866            Operation::Append => api::access::Operation::Append,
1867            Operation::Read => api::access::Operation::Read,
1868            Operation::Trim => api::access::Operation::Trim,
1869            Operation::Fence => api::access::Operation::Fence,
1870            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1871            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1872            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1873            Operation::ListLocations => api::access::Operation::ListLocations,
1874            Operation::GetDefaultLocation => api::access::Operation::GetDefaultLocation,
1875            Operation::SetDefaultLocation => api::access::Operation::SetDefaultLocation,
1876        }
1877    }
1878}
1879
1880impl From<api::access::Operation> for Operation {
1881    fn from(value: api::access::Operation) -> Self {
1882        match value {
1883            api::access::Operation::ListBasins => Operation::ListBasins,
1884            api::access::Operation::CreateBasin => Operation::CreateBasin,
1885            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1886            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1887            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1888            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1889            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1890            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1891            api::access::Operation::ListStreams => Operation::ListStreams,
1892            api::access::Operation::CreateStream => Operation::CreateStream,
1893            api::access::Operation::DeleteStream => Operation::DeleteStream,
1894            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1895            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1896            api::access::Operation::CheckTail => Operation::CheckTail,
1897            api::access::Operation::Append => Operation::Append,
1898            api::access::Operation::Read => Operation::Read,
1899            api::access::Operation::Trim => Operation::Trim,
1900            api::access::Operation::Fence => Operation::Fence,
1901            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1902            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1903            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1904            api::access::Operation::ListLocations => Operation::ListLocations,
1905            api::access::Operation::GetDefaultLocation => Operation::GetDefaultLocation,
1906            api::access::Operation::SetDefaultLocation => Operation::SetDefaultLocation,
1907        }
1908    }
1909}
1910
1911#[derive(Debug, Clone)]
1912#[non_exhaustive]
1913/// Scope of an access token.
1914///
1915/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1916/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1917/// final set must not be empty.
1918///
1919/// See [`IssueAccessTokenInput::scope`].
1920pub struct AccessTokenScopeInput {
1921    basins: Option<BasinMatcher>,
1922    streams: Option<StreamMatcher>,
1923    access_tokens: Option<AccessTokenMatcher>,
1924    op_group_perms: Option<OperationGroupPermissions>,
1925    ops: HashSet<Operation>,
1926}
1927
1928impl AccessTokenScopeInput {
1929    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1930    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1931        Self {
1932            basins: None,
1933            streams: None,
1934            access_tokens: None,
1935            op_group_perms: None,
1936            ops: ops.into_iter().collect(),
1937        }
1938    }
1939
1940    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1941    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1942        Self {
1943            basins: None,
1944            streams: None,
1945            access_tokens: None,
1946            op_group_perms: Some(op_group_perms),
1947            ops: HashSet::default(),
1948        }
1949    }
1950
1951    /// Set the permitted operations.
1952    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1953        Self {
1954            ops: ops.into_iter().collect(),
1955            ..self
1956        }
1957    }
1958
1959    /// Set the access permissions at the operation group level.
1960    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1961        Self {
1962            op_group_perms: Some(op_group_perms),
1963            ..self
1964        }
1965    }
1966
1967    /// Set the permitted basins.
1968    ///
1969    /// Defaults to no basins.
1970    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1971        Self {
1972            basins: Some(basins),
1973            ..self
1974        }
1975    }
1976
1977    /// Set the permitted streams.
1978    ///
1979    /// Defaults to no streams.
1980    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1981        Self {
1982            streams: Some(streams),
1983            ..self
1984        }
1985    }
1986
1987    /// Set the permitted access tokens.
1988    ///
1989    /// Defaults to no access tokens.
1990    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1991        Self {
1992            access_tokens: Some(access_tokens),
1993            ..self
1994        }
1995    }
1996}
1997
1998#[derive(Debug, Clone)]
1999#[non_exhaustive]
2000/// Scope of an access token.
2001pub struct AccessTokenScope {
2002    /// Permitted basins.
2003    pub basins: Option<BasinMatcher>,
2004    /// Permitted streams.
2005    pub streams: Option<StreamMatcher>,
2006    /// Permitted access tokens.
2007    pub access_tokens: Option<AccessTokenMatcher>,
2008    /// Permissions at the operation group level.
2009    pub op_group_perms: Option<OperationGroupPermissions>,
2010    /// Permitted operations.
2011    pub ops: HashSet<Operation>,
2012}
2013
2014impl From<api::access::AccessTokenScope> for AccessTokenScope {
2015    fn from(value: api::access::AccessTokenScope) -> Self {
2016        Self {
2017            basins: value.basins.map(|rs| match rs {
2018                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2019                    BasinMatcher::Exact(e)
2020                }
2021                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2022                    BasinMatcher::None
2023                }
2024                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
2025            }),
2026            streams: value.streams.map(|rs| match rs {
2027                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2028                    StreamMatcher::Exact(e)
2029                }
2030                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2031                    StreamMatcher::None
2032                }
2033                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
2034            }),
2035            access_tokens: value.access_tokens.map(|rs| match rs {
2036                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2037                    AccessTokenMatcher::Exact(e)
2038                }
2039                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2040                    AccessTokenMatcher::None
2041                }
2042                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
2043            }),
2044            op_group_perms: value.op_groups.map(Into::into),
2045            ops: value
2046                .ops
2047                .map(|ops| ops.into_iter().map(Into::into).collect())
2048                .unwrap_or_default(),
2049        }
2050    }
2051}
2052
2053impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2054    fn from(value: AccessTokenScopeInput) -> Self {
2055        Self {
2056            basins: value.basins.map(|rs| match rs {
2057                BasinMatcher::None => {
2058                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2059                }
2060                BasinMatcher::Exact(e) => {
2061                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2062                }
2063                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2064            }),
2065            streams: value.streams.map(|rs| match rs {
2066                StreamMatcher::None => {
2067                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2068                }
2069                StreamMatcher::Exact(e) => {
2070                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2071                }
2072                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2073            }),
2074            access_tokens: value.access_tokens.map(|rs| match rs {
2075                AccessTokenMatcher::None => {
2076                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2077                }
2078                AccessTokenMatcher::Exact(e) => {
2079                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2080                }
2081                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2082            }),
2083            op_groups: value.op_group_perms.map(Into::into),
2084            ops: if value.ops.is_empty() {
2085                None
2086            } else {
2087                Some(value.ops.into_iter().map(Into::into).collect())
2088            },
2089        }
2090    }
2091}
2092
2093#[derive(Debug, Clone)]
2094#[non_exhaustive]
2095/// Input for [`issue_access_token`](crate::S2::issue_access_token).
2096pub struct IssueAccessTokenInput {
2097    /// Access token ID.
2098    pub id: AccessTokenId,
2099    /// Expiration time.
2100    ///
2101    /// Defaults to the expiration time of requestor's access token passed via
2102    /// [`S2Config`](S2Config::new).
2103    pub expires_at: Option<S2DateTime>,
2104    /// Whether to automatically prefix stream names during creation and strip the prefix during
2105    /// listing.
2106    ///
2107    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
2108    /// prefix.
2109    ///
2110    /// Defaults to `false`.
2111    pub auto_prefix_streams: bool,
2112    /// Scope of the token.
2113    pub scope: AccessTokenScopeInput,
2114}
2115
2116impl IssueAccessTokenInput {
2117    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
2118    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2119        Self {
2120            id,
2121            expires_at: None,
2122            auto_prefix_streams: false,
2123            scope,
2124        }
2125    }
2126
2127    /// Set the expiration time.
2128    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2129        Self {
2130            expires_at: Some(expires_at),
2131            ..self
2132        }
2133    }
2134
2135    /// Set whether to automatically prefix stream names during creation and strip the prefix during
2136    /// listing.
2137    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2138        Self {
2139            auto_prefix_streams,
2140            ..self
2141        }
2142    }
2143}
2144
2145impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2146    fn from(value: IssueAccessTokenInput) -> Self {
2147        Self {
2148            id: value.id,
2149            expires_at: value.expires_at.map(Into::into),
2150            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2151            scope: value.scope.into(),
2152        }
2153    }
2154}
2155
2156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2157/// Interval to accumulate over for timeseries metric sets.
2158pub enum TimeseriesInterval {
2159    /// Minute.
2160    Minute,
2161    /// Hour.
2162    Hour,
2163    /// Day.
2164    Day,
2165}
2166
2167impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2168    fn from(value: TimeseriesInterval) -> Self {
2169        match value {
2170            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2171            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2172            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2173        }
2174    }
2175}
2176
2177impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2178    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2179        match value {
2180            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2181            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2182            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2183        }
2184    }
2185}
2186
2187#[derive(Debug, Clone, Copy)]
2188#[non_exhaustive]
2189/// Time range as Unix epoch seconds.
2190pub struct TimeRange {
2191    /// Start timestamp (inclusive).
2192    pub start: u32,
2193    /// End timestamp (exclusive).
2194    pub end: u32,
2195}
2196
2197impl TimeRange {
2198    /// Create a new [`TimeRange`] with the given start and end timestamps.
2199    pub fn new(start: u32, end: u32) -> Self {
2200        Self { start, end }
2201    }
2202}
2203
2204#[derive(Debug, Clone, Copy)]
2205#[non_exhaustive]
2206/// Time range as Unix epoch seconds and accumulation interval.
2207pub struct TimeRangeAndInterval {
2208    /// Start timestamp (inclusive).
2209    pub start: u32,
2210    /// End timestamp (exclusive).
2211    pub end: u32,
2212    /// Interval to accumulate over for timeseries metric sets.
2213    ///
2214    /// Default is dependent on the requested metric set.
2215    pub interval: Option<TimeseriesInterval>,
2216}
2217
2218impl TimeRangeAndInterval {
2219    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2220    pub fn new(start: u32, end: u32) -> Self {
2221        Self {
2222            start,
2223            end,
2224            interval: None,
2225        }
2226    }
2227
2228    /// Set the interval to accumulate over for timeseries metric sets.
2229    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2230        Self {
2231            interval: Some(interval),
2232            ..self
2233        }
2234    }
2235}
2236
2237#[derive(Debug, Clone, Copy)]
2238/// Account metric set to return.
2239pub enum AccountMetricSet {
2240    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2241    /// specified time range.
2242    ActiveBasins(TimeRange),
2243    /// Returns [`AccumulationMetric`]s, one per account operation type.
2244    ///
2245    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2246    /// per interval over the requested time range.
2247    ///
2248    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2249    AccountOps(TimeRangeAndInterval),
2250}
2251
2252#[derive(Debug, Clone)]
2253#[non_exhaustive]
2254/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2255pub struct GetAccountMetricsInput {
2256    /// Metric set to return.
2257    pub set: AccountMetricSet,
2258}
2259
2260impl GetAccountMetricsInput {
2261    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2262    pub fn new(set: AccountMetricSet) -> Self {
2263        Self { set }
2264    }
2265}
2266
2267impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2268    fn from(value: GetAccountMetricsInput) -> Self {
2269        let (set, start, end, interval) = match value.set {
2270            AccountMetricSet::ActiveBasins(args) => (
2271                api::metrics::AccountMetricSet::ActiveBasins,
2272                args.start,
2273                args.end,
2274                None,
2275            ),
2276            AccountMetricSet::AccountOps(args) => (
2277                api::metrics::AccountMetricSet::AccountOps,
2278                args.start,
2279                args.end,
2280                args.interval,
2281            ),
2282        };
2283        Self {
2284            set,
2285            start: Some(start),
2286            end: Some(end),
2287            interval: interval.map(Into::into),
2288        }
2289    }
2290}
2291
2292#[derive(Debug, Clone, Copy)]
2293/// Basin metric set to return.
2294pub enum BasinMetricSet {
2295    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2296    /// in the basin, with one observed value for each hour over the requested time range.
2297    Storage(TimeRange),
2298    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2299    ///
2300    /// Each metric represents a timeseries of the number of append operations across all streams
2301    /// in the basin, with one accumulated value per interval over the requested time range.
2302    ///
2303    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2304    /// [`minute`](TimeseriesInterval::Minute).
2305    AppendOps(TimeRangeAndInterval),
2306    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2307    ///
2308    /// Each metric represents a timeseries of the number of read operations across all streams
2309    /// in the basin, with one accumulated value per interval over the requested time range.
2310    ///
2311    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2312    /// [`minute`](TimeseriesInterval::Minute).
2313    ReadOps(TimeRangeAndInterval),
2314    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2315    /// across all streams in the basin, with one accumulated value per interval
2316    /// over the requested time range.
2317    ///
2318    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2319    /// [`minute`](TimeseriesInterval::Minute).
2320    ReadThroughput(TimeRangeAndInterval),
2321    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2322    /// across all streams in the basin, with one accumulated value per interval
2323    /// over the requested time range.
2324    ///
2325    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2326    /// [`minute`](TimeseriesInterval::Minute).
2327    AppendThroughput(TimeRangeAndInterval),
2328    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2329    ///
2330    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2331    /// per interval over the requested time range.
2332    ///
2333    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2334    BasinOps(TimeRangeAndInterval),
2335}
2336
2337#[derive(Debug, Clone)]
2338#[non_exhaustive]
2339/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2340pub struct GetBasinMetricsInput {
2341    /// Basin name.
2342    pub name: BasinName,
2343    /// Metric set to return.
2344    pub set: BasinMetricSet,
2345}
2346
2347impl GetBasinMetricsInput {
2348    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2349    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2350        Self { name, set }
2351    }
2352}
2353
2354impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2355    fn from(value: GetBasinMetricsInput) -> Self {
2356        let (set, start, end, interval) = match value.set {
2357            BasinMetricSet::Storage(args) => (
2358                api::metrics::BasinMetricSet::Storage,
2359                args.start,
2360                args.end,
2361                None,
2362            ),
2363            BasinMetricSet::AppendOps(args) => (
2364                api::metrics::BasinMetricSet::AppendOps,
2365                args.start,
2366                args.end,
2367                args.interval,
2368            ),
2369            BasinMetricSet::ReadOps(args) => (
2370                api::metrics::BasinMetricSet::ReadOps,
2371                args.start,
2372                args.end,
2373                args.interval,
2374            ),
2375            BasinMetricSet::ReadThroughput(args) => (
2376                api::metrics::BasinMetricSet::ReadThroughput,
2377                args.start,
2378                args.end,
2379                args.interval,
2380            ),
2381            BasinMetricSet::AppendThroughput(args) => (
2382                api::metrics::BasinMetricSet::AppendThroughput,
2383                args.start,
2384                args.end,
2385                args.interval,
2386            ),
2387            BasinMetricSet::BasinOps(args) => (
2388                api::metrics::BasinMetricSet::BasinOps,
2389                args.start,
2390                args.end,
2391                args.interval,
2392            ),
2393        };
2394        (
2395            value.name,
2396            api::metrics::BasinMetricSetRequest {
2397                set,
2398                start: Some(start),
2399                end: Some(end),
2400                interval: interval.map(Into::into),
2401            },
2402        )
2403    }
2404}
2405
2406#[derive(Debug, Clone, Copy)]
2407/// Stream metric set to return.
2408pub enum StreamMetricSet {
2409    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2410    /// with one observed value for each minute over the requested time range.
2411    Storage(TimeRange),
2412}
2413
2414#[derive(Debug, Clone)]
2415#[non_exhaustive]
2416/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2417pub struct GetStreamMetricsInput {
2418    /// Basin name.
2419    pub basin_name: BasinName,
2420    /// Stream name.
2421    pub stream_name: StreamName,
2422    /// Metric set to return.
2423    pub set: StreamMetricSet,
2424}
2425
2426impl GetStreamMetricsInput {
2427    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2428    /// set.
2429    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2430        Self {
2431            basin_name,
2432            stream_name,
2433            set,
2434        }
2435    }
2436}
2437
2438impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2439    fn from(value: GetStreamMetricsInput) -> Self {
2440        let (set, start, end, interval) = match value.set {
2441            StreamMetricSet::Storage(args) => (
2442                api::metrics::StreamMetricSet::Storage,
2443                args.start,
2444                args.end,
2445                None,
2446            ),
2447        };
2448        (
2449            value.basin_name,
2450            value.stream_name,
2451            api::metrics::StreamMetricSetRequest {
2452                set,
2453                start: Some(start),
2454                end: Some(end),
2455                interval,
2456            },
2457        )
2458    }
2459}
2460
2461#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2462/// Unit in which metric values are measured.
2463pub enum MetricUnit {
2464    /// Size in bytes.
2465    Bytes,
2466    /// Number of operations.
2467    Operations,
2468}
2469
2470impl From<api::metrics::MetricUnit> for MetricUnit {
2471    fn from(value: api::metrics::MetricUnit) -> Self {
2472        match value {
2473            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2474            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2475        }
2476    }
2477}
2478
2479#[derive(Debug, Clone)]
2480#[non_exhaustive]
2481/// Single named value.
2482pub struct ScalarMetric {
2483    /// Metric name.
2484    pub name: String,
2485    /// Unit for the metric value.
2486    pub unit: MetricUnit,
2487    /// Metric value.
2488    pub value: f64,
2489}
2490
2491#[derive(Debug, Clone)]
2492#[non_exhaustive]
2493/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2494/// specified interval.
2495pub struct AccumulationMetric {
2496    /// Timeseries name.
2497    pub name: String,
2498    /// Unit for the accumulated values.
2499    pub unit: MetricUnit,
2500    /// The interval at which datapoints are accumulated.
2501    pub interval: TimeseriesInterval,
2502    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2503    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2504    /// one `interval`.
2505    pub values: Vec<(u32, f64)>,
2506}
2507
2508#[derive(Debug, Clone)]
2509#[non_exhaustive]
2510/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2511pub struct GaugeMetric {
2512    /// Timeseries name.
2513    pub name: String,
2514    /// Unit for the instantaneous values.
2515    pub unit: MetricUnit,
2516    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2517    /// instant of the `timestamp` (in Unix epoch seconds).
2518    pub values: Vec<(u32, f64)>,
2519}
2520
2521#[derive(Debug, Clone)]
2522#[non_exhaustive]
2523/// Set of string labels.
2524pub struct LabelMetric {
2525    /// Label name.
2526    pub name: String,
2527    /// Label values.
2528    pub values: Vec<String>,
2529}
2530
2531#[derive(Debug, Clone)]
2532/// Individual metric in a returned metric set.
2533pub enum Metric {
2534    /// Single named value.
2535    Scalar(ScalarMetric),
2536    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2537    /// specified interval.
2538    Accumulation(AccumulationMetric),
2539    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2540    Gauge(GaugeMetric),
2541    /// Set of string labels.
2542    Label(LabelMetric),
2543}
2544
2545impl From<api::metrics::Metric> for Metric {
2546    fn from(value: api::metrics::Metric) -> Self {
2547        match value {
2548            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2549                name: sm.name.into(),
2550                unit: sm.unit.into(),
2551                value: sm.value,
2552            }),
2553            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2554                name: am.name.into(),
2555                unit: am.unit.into(),
2556                interval: am.interval.into(),
2557                values: am.values,
2558            }),
2559            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2560                name: gm.name.into(),
2561                unit: gm.unit.into(),
2562                values: gm.values,
2563            }),
2564            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2565                name: lm.name.into(),
2566                values: lm.values,
2567            }),
2568        }
2569    }
2570}
2571
2572#[derive(Debug, Clone, Default)]
2573#[non_exhaustive]
2574/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2575pub struct ListStreamsInput {
2576    /// Filter streams whose names begin with this value.
2577    ///
2578    /// Defaults to `""`.
2579    pub prefix: StreamNamePrefix,
2580    /// Filter streams whose names are lexicographically greater than this value.
2581    ///
2582    /// Defaults to `""`.
2583    pub start_after: StreamNameStartAfter,
2584    /// Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2585    ///
2586    /// Defaults to `1000`.
2587    pub limit: Option<usize>,
2588}
2589
2590impl ListStreamsInput {
2591    /// Create a new [`ListStreamsInput`] with default values.
2592    pub fn new() -> Self {
2593        Self::default()
2594    }
2595
2596    /// Set the prefix used to filter streams whose names begin with this value.
2597    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2598        Self { prefix, ..self }
2599    }
2600
2601    /// Set the value used to filter streams whose names are lexicographically greater than this
2602    /// value.
2603    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2604        Self {
2605            start_after,
2606            ..self
2607        }
2608    }
2609
2610    /// Set the limit on number of streams to return in a page.
2611    pub fn with_limit(self, limit: usize) -> Self {
2612        Self {
2613            limit: Some(limit),
2614            ..self
2615        }
2616    }
2617}
2618
2619impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2620    fn from(value: ListStreamsInput) -> Self {
2621        Self {
2622            prefix: Some(value.prefix),
2623            start_after: Some(value.start_after),
2624            limit: value.limit,
2625        }
2626    }
2627}
2628
2629#[derive(Debug, Clone, Default)]
2630/// Input for [`list_all_streams`](crate::S2Basin::list_all_streams) operation.
2631pub struct ListAllStreamsInput {
2632    /// Filter streams whose names begin with this value.
2633    ///
2634    /// Defaults to `""`.
2635    pub prefix: StreamNamePrefix,
2636    /// Filter streams whose names are lexicographically greater than this value.
2637    ///
2638    /// Defaults to `""`.
2639    pub start_after: StreamNameStartAfter,
2640    /// Whether to include streams that are being deleted.
2641    ///
2642    /// Defaults to `false`.
2643    pub include_deleted: bool,
2644}
2645
2646impl ListAllStreamsInput {
2647    /// Create a new [`ListAllStreamsInput`] with default values.
2648    pub fn new() -> Self {
2649        Self::default()
2650    }
2651
2652    /// Set the prefix used to filter streams whose names begin with this value.
2653    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2654        Self { prefix, ..self }
2655    }
2656
2657    /// Set the value used to filter streams whose names are lexicographically greater than this
2658    /// value.
2659    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2660        Self {
2661            start_after,
2662            ..self
2663        }
2664    }
2665
2666    /// Set whether to include streams that are being deleted.
2667    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2668        Self {
2669            include_deleted,
2670            ..self
2671        }
2672    }
2673}
2674
2675#[derive(Debug, Clone, PartialEq, Eq)]
2676#[non_exhaustive]
2677/// Stream information.
2678pub struct StreamInfo {
2679    /// Stream name.
2680    pub name: StreamName,
2681    /// Creation time.
2682    pub created_at: S2DateTime,
2683    /// Deletion time if the stream is being deleted.
2684    pub deleted_at: Option<S2DateTime>,
2685    /// Encryption algorithm for this stream, if encryption is enabled.
2686    pub cipher: Option<EncryptionAlgorithm>,
2687}
2688
2689impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2690    type Error = ValidationError;
2691
2692    fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2693        Ok(Self {
2694            name: value.name,
2695            created_at: value.created_at.try_into()?,
2696            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2697            cipher: value.cipher.map(Into::into),
2698        })
2699    }
2700}
2701
2702#[derive(Debug, Clone)]
2703#[non_exhaustive]
2704/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2705pub struct CreateStreamInput {
2706    /// Stream name.
2707    pub name: StreamName,
2708    /// Configuration for the stream.
2709    ///
2710    /// See [`StreamConfig`] for defaults.
2711    pub config: Option<StreamConfig>,
2712    idempotency_token: String,
2713}
2714
2715impl CreateStreamInput {
2716    /// Create a new [`CreateStreamInput`] with the given stream name.
2717    pub fn new(name: StreamName) -> Self {
2718        Self {
2719            name,
2720            config: None,
2721            idempotency_token: idempotency_token(),
2722        }
2723    }
2724
2725    /// Set the configuration for the stream.
2726    pub fn with_config(self, config: StreamConfig) -> Self {
2727        Self {
2728            config: Some(config),
2729            ..self
2730        }
2731    }
2732}
2733
2734impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2735    fn from(value: CreateStreamInput) -> Self {
2736        (
2737            api::stream::CreateStreamRequest {
2738                stream: value.name,
2739                config: value.config.map(Into::into),
2740            },
2741            value.idempotency_token,
2742        )
2743    }
2744}
2745
2746#[derive(Debug, Clone)]
2747#[non_exhaustive]
2748/// Input for [`ensure_stream`](crate::S2Basin::ensure_stream)
2749/// operation.
2750pub struct EnsureStreamInput {
2751    /// Stream name.
2752    pub name: StreamName,
2753    /// Configuration for the stream.
2754    ///
2755    /// See [`StreamConfig`] for defaults.
2756    pub config: Option<StreamConfig>,
2757}
2758
2759impl EnsureStreamInput {
2760    /// Create a new [`EnsureStreamInput`] with the given stream name.
2761    pub fn new(name: StreamName) -> Self {
2762        Self { name, config: None }
2763    }
2764
2765    /// Set the configuration for the stream.
2766    pub fn with_config(self, config: StreamConfig) -> Self {
2767        Self {
2768            config: Some(config),
2769            ..self
2770        }
2771    }
2772}
2773
2774impl From<EnsureStreamInput> for (StreamName, Option<api::config::StreamConfig>) {
2775    fn from(value: EnsureStreamInput) -> Self {
2776        (value.name, value.config.map(Into::into))
2777    }
2778}
2779
2780#[derive(Debug, Clone)]
2781#[non_exhaustive]
2782/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2783pub struct DeleteStreamInput {
2784    /// Stream name.
2785    pub name: StreamName,
2786    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2787    pub ignore_not_found: bool,
2788}
2789
2790impl DeleteStreamInput {
2791    /// Create a new [`DeleteStreamInput`] with the given stream name.
2792    pub fn new(name: StreamName) -> Self {
2793        Self {
2794            name,
2795            ignore_not_found: false,
2796        }
2797    }
2798
2799    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2800    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2801        Self {
2802            ignore_not_found,
2803            ..self
2804        }
2805    }
2806}
2807
2808#[derive(Debug, Clone)]
2809#[non_exhaustive]
2810/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2811pub struct ReconfigureStreamInput {
2812    /// Stream name.
2813    pub name: StreamName,
2814    /// Reconfiguration for [`StreamConfig`].
2815    pub config: StreamReconfiguration,
2816}
2817
2818impl ReconfigureStreamInput {
2819    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2820    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2821        Self { name, config }
2822    }
2823}
2824
2825#[derive(Debug, Clone, PartialEq, Eq)]
2826/// Token for fencing appends to a stream.
2827///
2828/// **Note:** It must not exceed 36 bytes in length.
2829///
2830/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2831pub struct FencingToken(String);
2832
2833impl FencingToken {
2834    /// Generate a random alphanumeric fencing token of `n` bytes.
2835    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2836        rand::rng()
2837            .sample_iter(&rand::distr::Alphanumeric)
2838            .take(n)
2839            .map(char::from)
2840            .collect::<String>()
2841            .parse()
2842    }
2843}
2844
2845impl FromStr for FencingToken {
2846    type Err = ValidationError;
2847
2848    fn from_str(s: &str) -> Result<Self, Self::Err> {
2849        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2850            return Err(ValidationError(format!(
2851                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2852            )));
2853        }
2854        Ok(FencingToken(s.to_string()))
2855    }
2856}
2857
2858impl std::fmt::Display for FencingToken {
2859    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2860        write!(f, "{}", self.0)
2861    }
2862}
2863
2864impl Deref for FencingToken {
2865    type Target = str;
2866
2867    fn deref(&self) -> &Self::Target {
2868        &self.0
2869    }
2870}
2871
2872#[derive(Debug, Clone, Copy, PartialEq)]
2873#[non_exhaustive]
2874/// A position in a stream.
2875pub struct StreamPosition {
2876    /// Sequence number assigned by the service.
2877    pub seq_num: u64,
2878    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2879    /// User-specified timestamps are passed through as-is.
2880    pub timestamp: u64,
2881}
2882
2883impl std::fmt::Display for StreamPosition {
2884    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2885        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2886    }
2887}
2888
2889impl From<api::stream::proto::StreamPosition> for StreamPosition {
2890    fn from(value: api::stream::proto::StreamPosition) -> Self {
2891        Self {
2892            seq_num: value.seq_num,
2893            timestamp: value.timestamp,
2894        }
2895    }
2896}
2897
2898impl From<api::stream::StreamPosition> for StreamPosition {
2899    fn from(value: api::stream::StreamPosition) -> Self {
2900        Self {
2901            seq_num: value.seq_num,
2902            timestamp: value.timestamp,
2903        }
2904    }
2905}
2906
2907#[derive(Debug, Clone, PartialEq)]
2908#[non_exhaustive]
2909/// A name-value pair.
2910pub struct Header {
2911    /// Name.
2912    pub name: Bytes,
2913    /// Value.
2914    pub value: Bytes,
2915}
2916
2917impl Header {
2918    /// Create a new [`Header`] with the given name and value.
2919    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2920        Self {
2921            name: name.into(),
2922            value: value.into(),
2923        }
2924    }
2925}
2926
2927impl From<Header> for api::stream::proto::Header {
2928    fn from(value: Header) -> Self {
2929        Self {
2930            name: value.name,
2931            value: value.value,
2932        }
2933    }
2934}
2935
2936impl From<api::stream::proto::Header> for Header {
2937    fn from(value: api::stream::proto::Header) -> Self {
2938        Self {
2939            name: value.name,
2940            value: value.value,
2941        }
2942    }
2943}
2944
2945#[derive(Debug, Clone, PartialEq)]
2946/// A record to append.
2947pub struct AppendRecord {
2948    body: Bytes,
2949    headers: Vec<Header>,
2950    timestamp: Option<u64>,
2951}
2952
2953impl AppendRecord {
2954    fn validate(self) -> Result<Self, ValidationError> {
2955        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2956            Err(ValidationError(format!(
2957                "metered_bytes: {} exceeds {}",
2958                self.metered_bytes(),
2959                RECORD_BATCH_MAX.bytes
2960            )))
2961        } else {
2962            Ok(self)
2963        }
2964    }
2965
2966    /// Create a new [`AppendRecord`] with the given record body.
2967    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2968        let record = Self {
2969            body: body.into(),
2970            headers: Vec::default(),
2971            timestamp: None,
2972        };
2973        record.validate()
2974    }
2975
2976    /// Set the headers for this record.
2977    pub fn with_headers(
2978        self,
2979        headers: impl IntoIterator<Item = Header>,
2980    ) -> Result<Self, ValidationError> {
2981        let record = Self {
2982            headers: headers.into_iter().collect(),
2983            ..self
2984        };
2985        record.validate()
2986    }
2987
2988    /// Set the timestamp for this record.
2989    ///
2990    /// Precise semantics depend on [`StreamConfig::timestamping`].
2991    pub fn with_timestamp(self, timestamp: u64) -> Self {
2992        Self {
2993            timestamp: Some(timestamp),
2994            ..self
2995        }
2996    }
2997
2998    /// Get the body of this record.
2999    pub fn body(&self) -> &[u8] {
3000        &self.body
3001    }
3002
3003    /// Get the headers of this record.
3004    pub fn headers(&self) -> &[Header] {
3005        &self.headers
3006    }
3007
3008    /// Get the timestamp of this record.
3009    pub fn timestamp(&self) -> Option<u64> {
3010        self.timestamp
3011    }
3012}
3013
3014impl From<AppendRecord> for api::stream::proto::AppendRecord {
3015    fn from(value: AppendRecord) -> Self {
3016        Self {
3017            timestamp: value.timestamp,
3018            headers: value.headers.into_iter().map(Into::into).collect(),
3019            body: value.body,
3020        }
3021    }
3022}
3023
3024/// Metered byte size calculation.
3025///
3026/// Formula for a record:
3027/// ```text
3028/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
3029/// ```
3030pub trait MeteredBytes {
3031    /// Returns the metered byte size.
3032    fn metered_bytes(&self) -> usize;
3033}
3034
3035macro_rules! metered_bytes_impl {
3036    ($ty:ty) => {
3037        impl MeteredBytes for $ty {
3038            fn metered_bytes(&self) -> usize {
3039                8 + (2 * self.headers.len())
3040                    + self
3041                        .headers
3042                        .iter()
3043                        .map(|h| h.name.len() + h.value.len())
3044                        .sum::<usize>()
3045                    + self.body.len()
3046            }
3047        }
3048    };
3049}
3050
3051metered_bytes_impl!(AppendRecord);
3052
3053#[derive(Debug, Clone)]
3054/// A batch of records to append atomically.
3055///
3056/// **Note:** It must contain at least `1` record and no more than `1000`.
3057/// The total size of the batch must not exceed `1MiB` in metered bytes.
3058///
3059/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
3060/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
3061/// that takes care of the abovementioned constraints.
3062pub struct AppendRecordBatch {
3063    records: Vec<AppendRecord>,
3064    metered_bytes: usize,
3065}
3066
3067impl AppendRecordBatch {
3068    pub(crate) fn with_capacity(capacity: usize) -> Self {
3069        Self {
3070            records: Vec::with_capacity(capacity),
3071            metered_bytes: 0,
3072        }
3073    }
3074
3075    pub(crate) fn push(&mut self, record: AppendRecord) {
3076        self.metered_bytes += record.metered_bytes();
3077        self.records.push(record);
3078    }
3079
3080    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
3081    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3082    where
3083        I: IntoIterator<Item = AppendRecord>,
3084    {
3085        let mut records = Vec::new();
3086        let mut metered_bytes = 0;
3087
3088        for record in iter {
3089            metered_bytes += record.metered_bytes();
3090            records.push(record);
3091
3092            if metered_bytes > RECORD_BATCH_MAX.bytes {
3093                return Err(ValidationError(format!(
3094                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
3095                    RECORD_BATCH_MAX.bytes
3096                )));
3097            }
3098
3099            if records.len() > RECORD_BATCH_MAX.count {
3100                return Err(ValidationError(format!(
3101                    "number of records in the batch exceeds {}",
3102                    RECORD_BATCH_MAX.count
3103                )));
3104            }
3105        }
3106
3107        if records.is_empty() {
3108            return Err(ValidationError("batch is empty".into()));
3109        }
3110
3111        Ok(Self {
3112            records,
3113            metered_bytes,
3114        })
3115    }
3116}
3117
3118impl Deref for AppendRecordBatch {
3119    type Target = [AppendRecord];
3120
3121    fn deref(&self) -> &Self::Target {
3122        &self.records
3123    }
3124}
3125
3126impl MeteredBytes for AppendRecordBatch {
3127    fn metered_bytes(&self) -> usize {
3128        self.metered_bytes
3129    }
3130}
3131
3132#[derive(Debug, Clone)]
3133/// Command to signal an operation.
3134pub enum Command {
3135    /// Fence operation.
3136    Fence {
3137        /// Fencing token.
3138        fencing_token: FencingToken,
3139    },
3140    /// Trim operation.
3141    Trim {
3142        /// Trim point.
3143        trim_point: u64,
3144    },
3145}
3146
3147#[derive(Debug, Clone)]
3148#[non_exhaustive]
3149/// Command record for signaling operations to the service.
3150///
3151/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
3152pub struct CommandRecord {
3153    /// Command to signal an operation.
3154    pub command: Command,
3155    /// Timestamp for this record.
3156    pub timestamp: Option<u64>,
3157}
3158
3159impl CommandRecord {
3160    const FENCE: &[u8] = b"fence";
3161    const TRIM: &[u8] = b"trim";
3162
3163    /// Create a fence command record with the given fencing token.
3164    ///
3165    /// Fencing is strongly consistent, and subsequent appends that specify a
3166    /// fencing token will fail if it does not match.
3167    pub fn fence(fencing_token: FencingToken) -> Self {
3168        Self {
3169            command: Command::Fence { fencing_token },
3170            timestamp: None,
3171        }
3172    }
3173
3174    /// Create a trim command record with the given trim point.
3175    ///
3176    /// Trim point is the desired earliest sequence number for the stream.
3177    ///
3178    /// Trimming is eventually consistent, and trimmed records may be visible
3179    /// for a brief period.
3180    pub fn trim(trim_point: u64) -> Self {
3181        Self {
3182            command: Command::Trim { trim_point },
3183            timestamp: None,
3184        }
3185    }
3186
3187    /// Set the timestamp for this record.
3188    pub fn with_timestamp(self, timestamp: u64) -> Self {
3189        Self {
3190            timestamp: Some(timestamp),
3191            ..self
3192        }
3193    }
3194}
3195
3196impl From<CommandRecord> for AppendRecord {
3197    fn from(value: CommandRecord) -> Self {
3198        let (header_value, body) = match value.command {
3199            Command::Fence { fencing_token } => (
3200                CommandRecord::FENCE,
3201                Bytes::copy_from_slice(fencing_token.as_bytes()),
3202            ),
3203            Command::Trim { trim_point } => (
3204                CommandRecord::TRIM,
3205                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3206            ),
3207        };
3208        Self {
3209            body,
3210            headers: vec![Header::new("", header_value)],
3211            timestamp: value.timestamp,
3212        }
3213    }
3214}
3215
3216#[derive(Debug, Clone)]
3217#[non_exhaustive]
3218/// Input for [`append`](crate::S2Stream::append) operation and
3219/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3220pub struct AppendInput {
3221    /// Batch of records to append atomically.
3222    pub records: AppendRecordBatch,
3223    /// Expected sequence number for the first record in the batch.
3224    ///
3225    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3226    pub match_seq_num: Option<u64>,
3227    /// Fencing token to match against the stream's current fencing token.
3228    ///
3229    /// If unspecified, no matching is performed. If specified and mismatched,
3230    /// the append fails. A stream defaults to `""` as its fencing token.
3231    pub fencing_token: Option<FencingToken>,
3232}
3233
3234impl AppendInput {
3235    /// Create a new [`AppendInput`] with the given batch of records.
3236    pub fn new(records: AppendRecordBatch) -> Self {
3237        Self {
3238            records,
3239            match_seq_num: None,
3240            fencing_token: None,
3241        }
3242    }
3243
3244    /// Set the expected sequence number for the first record in the batch.
3245    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3246        Self {
3247            match_seq_num: Some(match_seq_num),
3248            ..self
3249        }
3250    }
3251
3252    /// Set the fencing token to match against the stream's current fencing token.
3253    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3254        Self {
3255            fencing_token: Some(fencing_token),
3256            ..self
3257        }
3258    }
3259}
3260
3261impl From<AppendInput> for api::stream::proto::AppendInput {
3262    fn from(value: AppendInput) -> Self {
3263        Self {
3264            records: value.records.iter().cloned().map(Into::into).collect(),
3265            match_seq_num: value.match_seq_num,
3266            fencing_token: value.fencing_token.map(|t| t.to_string()),
3267        }
3268    }
3269}
3270
3271#[derive(Debug, Clone, PartialEq)]
3272#[non_exhaustive]
3273/// Acknowledgement for an [`AppendInput`].
3274pub struct AppendAck {
3275    /// Sequence number and timestamp of the first record that was appended.
3276    pub start: StreamPosition,
3277    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3278    /// that was appended.
3279    ///
3280    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3281    /// appended.
3282    pub end: StreamPosition,
3283    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3284    /// the last record on the stream.
3285    ///
3286    /// This can be greater than the `end` position in case of concurrent appends.
3287    pub tail: StreamPosition,
3288}
3289
3290impl From<api::stream::proto::AppendAck> for AppendAck {
3291    fn from(value: api::stream::proto::AppendAck) -> Self {
3292        Self {
3293            start: value.start.unwrap_or_default().into(),
3294            end: value.end.unwrap_or_default().into(),
3295            tail: value.tail.unwrap_or_default().into(),
3296        }
3297    }
3298}
3299
3300#[derive(Debug, Clone, Copy)]
3301/// Starting position for reading from a stream.
3302pub enum ReadFrom {
3303    /// Read from this sequence number.
3304    SeqNum(u64),
3305    /// Read from this timestamp.
3306    Timestamp(u64),
3307    /// Read from N records before the tail.
3308    TailOffset(u64),
3309}
3310
3311impl Default for ReadFrom {
3312    fn default() -> Self {
3313        Self::SeqNum(0)
3314    }
3315}
3316
3317#[derive(Debug, Default, Clone)]
3318#[non_exhaustive]
3319/// Where to start reading.
3320pub struct ReadStart {
3321    /// Starting position.
3322    ///
3323    /// Defaults to reading from sequence number `0`.
3324    pub from: ReadFrom,
3325    /// Whether to start from tail if the requested starting position is beyond it.
3326    ///
3327    /// Defaults to `false` (errors if position is beyond tail).
3328    pub clamp_to_tail: bool,
3329}
3330
3331impl ReadStart {
3332    /// Create a new [`ReadStart`] with default values.
3333    pub fn new() -> Self {
3334        Self::default()
3335    }
3336
3337    /// Set the starting position.
3338    pub fn with_from(self, from: ReadFrom) -> Self {
3339        Self { from, ..self }
3340    }
3341
3342    /// Set whether to start from tail if the requested starting position is beyond it.
3343    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3344        Self {
3345            clamp_to_tail,
3346            ..self
3347        }
3348    }
3349}
3350
3351impl From<ReadStart> for api::stream::ReadStart {
3352    fn from(value: ReadStart) -> Self {
3353        let (seq_num, timestamp, tail_offset) = match value.from {
3354            ReadFrom::SeqNum(n) => (Some(n), None, None),
3355            ReadFrom::Timestamp(t) => (None, Some(t), None),
3356            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3357        };
3358        Self {
3359            seq_num,
3360            timestamp,
3361            tail_offset,
3362            clamp: if value.clamp_to_tail {
3363                Some(true)
3364            } else {
3365                None
3366            },
3367        }
3368    }
3369}
3370
3371#[derive(Debug, Clone, Default)]
3372#[non_exhaustive]
3373/// Limits on how much to read.
3374pub struct ReadLimits {
3375    /// Limit on number of records.
3376    ///
3377    /// Defaults to `1000` for non-streaming read.
3378    pub count: Option<usize>,
3379    /// Limit on total metered bytes of records.
3380    ///
3381    /// Defaults to `1MiB` for non-streaming read.
3382    pub bytes: Option<usize>,
3383}
3384
3385impl ReadLimits {
3386    /// Create a new [`ReadLimits`] with default values.
3387    pub fn new() -> Self {
3388        Self::default()
3389    }
3390
3391    /// Set the limit on number of records.
3392    pub fn with_count(self, count: usize) -> Self {
3393        Self {
3394            count: Some(count),
3395            ..self
3396        }
3397    }
3398
3399    /// Set the limit on total metered bytes of records.
3400    pub fn with_bytes(self, bytes: usize) -> Self {
3401        Self {
3402            bytes: Some(bytes),
3403            ..self
3404        }
3405    }
3406}
3407
3408#[derive(Debug, Clone, Default)]
3409#[non_exhaustive]
3410/// When to stop reading.
3411pub struct ReadStop {
3412    /// Limits on how much to read.
3413    ///
3414    /// See [`ReadLimits`] for defaults.
3415    pub limits: ReadLimits,
3416    /// Timestamp at which to stop (exclusive).
3417    ///
3418    /// Defaults to `None`.
3419    pub until: Option<RangeTo<u64>>,
3420    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3421    /// seconds for [`read`](crate::S2Stream::read).
3422    ///
3423    /// Defaults to:
3424    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3425    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3426    ///   is specified.
3427    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3428    ///   `until` is specified.
3429    pub wait: Option<u32>,
3430}
3431
3432impl ReadStop {
3433    /// Create a new [`ReadStop`] with default values.
3434    pub fn new() -> Self {
3435        Self::default()
3436    }
3437
3438    /// Set the limits on how much to read.
3439    pub fn with_limits(self, limits: ReadLimits) -> Self {
3440        Self { limits, ..self }
3441    }
3442
3443    /// Set the timestamp at which to stop (exclusive).
3444    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3445        Self {
3446            until: Some(until),
3447            ..self
3448        }
3449    }
3450
3451    /// Set the duration in seconds to wait for new records before stopping.
3452    pub fn with_wait(self, wait: u32) -> Self {
3453        Self {
3454            wait: Some(wait),
3455            ..self
3456        }
3457    }
3458}
3459
3460impl From<ReadStop> for api::stream::ReadEnd {
3461    fn from(value: ReadStop) -> Self {
3462        Self {
3463            count: value.limits.count,
3464            bytes: value.limits.bytes,
3465            until: value.until.map(|r| r.end),
3466            wait: value.wait,
3467        }
3468    }
3469}
3470
3471#[derive(Debug, Clone, Default)]
3472#[non_exhaustive]
3473/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3474/// operations.
3475pub struct ReadInput {
3476    /// Where to start reading.
3477    ///
3478    /// See [`ReadStart`] for defaults.
3479    pub start: ReadStart,
3480    /// When to stop reading.
3481    ///
3482    /// See [`ReadStop`] for defaults.
3483    pub stop: ReadStop,
3484    /// Whether to filter out command records from the stream when reading.
3485    ///
3486    /// Defaults to `false`.
3487    pub ignore_command_records: bool,
3488}
3489
3490impl ReadInput {
3491    /// Create a new [`ReadInput`] with default values.
3492    pub fn new() -> Self {
3493        Self::default()
3494    }
3495
3496    /// Set where to start reading.
3497    pub fn with_start(self, start: ReadStart) -> Self {
3498        Self { start, ..self }
3499    }
3500
3501    /// Set when to stop reading.
3502    pub fn with_stop(self, stop: ReadStop) -> Self {
3503        Self { stop, ..self }
3504    }
3505
3506    /// Set whether to filter out command records from the stream when reading.
3507    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3508        Self {
3509            ignore_command_records,
3510            ..self
3511        }
3512    }
3513}
3514
3515#[derive(Debug, Clone)]
3516#[non_exhaustive]
3517/// Record that is durably sequenced on a stream.
3518pub struct SequencedRecord {
3519    /// Sequence number assigned to this record.
3520    pub seq_num: u64,
3521    /// Body of this record.
3522    pub body: Bytes,
3523    /// Headers for this record.
3524    pub headers: Vec<Header>,
3525    /// Timestamp for this record.
3526    pub timestamp: u64,
3527}
3528
3529impl SequencedRecord {
3530    /// Whether this is a command record.
3531    pub fn is_command_record(&self) -> bool {
3532        self.headers.len() == 1 && *self.headers[0].name == *b""
3533    }
3534}
3535
3536impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3537    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3538        Self {
3539            seq_num: value.seq_num,
3540            body: value.body,
3541            headers: value.headers.into_iter().map(Into::into).collect(),
3542            timestamp: value.timestamp,
3543        }
3544    }
3545}
3546
3547metered_bytes_impl!(SequencedRecord);
3548
3549#[derive(Debug, Clone)]
3550#[non_exhaustive]
3551/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3552/// [`read_session`](crate::S2Stream::read_session).
3553pub struct ReadBatch {
3554    /// Records that are durably sequenced on the stream.
3555    ///
3556    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3557    /// - the [`stop condition`](ReadInput::stop) was already met, or
3558    /// - all records in the batch were command records and
3559    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3560    pub records: Vec<SequencedRecord>,
3561    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3562    /// the last record.
3563    ///
3564    /// It will only be present when reading recent records.
3565    pub tail: Option<StreamPosition>,
3566}
3567
3568impl ReadBatch {
3569    pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3570        Self {
3571            records: batch.records.into_iter().map(Into::into).collect(),
3572            tail: batch.tail.map(Into::into),
3573        }
3574    }
3575}
3576
3577/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3578pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3579
3580#[derive(Debug, Clone, thiserror::Error)]
3581/// Why an append condition check failed.
3582pub enum AppendConditionFailed {
3583    #[error("fencing token mismatch, expected: {0}")]
3584    /// Fencing token did not match. Contains the expected fencing token.
3585    FencingTokenMismatch(FencingToken),
3586    #[error("sequence number mismatch, expected: {0}")]
3587    /// Sequence number did not match. Contains the expected sequence number.
3588    SeqNumMismatch(u64),
3589}
3590
3591impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3592    fn from(value: api::stream::AppendConditionFailed) -> Self {
3593        match value {
3594            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3595                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3596            }
3597            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3598                AppendConditionFailed::SeqNumMismatch(seq)
3599            }
3600        }
3601    }
3602}
3603
3604#[derive(Debug, Clone, thiserror::Error)]
3605/// Errors from S2 operations.
3606pub enum S2Error {
3607    #[error("{0}")]
3608    /// Client-side error.
3609    Client(String),
3610    #[error(transparent)]
3611    /// Validation error.
3612    Validation(#[from] ValidationError),
3613    #[error("{0}")]
3614    /// Append condition check failed. Contains the failure reason.
3615    AppendConditionFailed(AppendConditionFailed),
3616    #[error("read from an unwritten position. current tail: {0}")]
3617    /// Read from an unwritten position. Contains the current tail.
3618    ReadUnwritten(StreamPosition),
3619    #[error("{0}")]
3620    /// Other server-side error.
3621    Server(ErrorResponse),
3622}
3623
3624impl From<ApiError> for S2Error {
3625    fn from(err: ApiError) -> Self {
3626        match err {
3627            ApiError::ReadUnwritten(tail_response) => {
3628                Self::ReadUnwritten(tail_response.tail.into())
3629            }
3630            ApiError::AppendConditionFailed(condition_failed) => {
3631                Self::AppendConditionFailed(condition_failed.into())
3632            }
3633            ApiError::Server(_, response) => Self::Server(response.into()),
3634            other => Self::Client(other.to_string()),
3635        }
3636    }
3637}
3638
3639#[derive(Debug, Clone, thiserror::Error)]
3640#[error("{code}: {message}")]
3641#[non_exhaustive]
3642/// Error response from S2 server.
3643pub struct ErrorResponse {
3644    /// Error code.
3645    pub code: String,
3646    /// Error message.
3647    pub message: String,
3648}
3649
3650impl From<ApiErrorResponse> for ErrorResponse {
3651    fn from(response: ApiErrorResponse) -> Self {
3652        Self {
3653            code: response.code,
3654            message: response.message,
3655        }
3656    }
3657}
3658
3659fn idempotency_token() -> String {
3660    uuid::Uuid::new_v4().simple().to_string()
3661}