Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    sync::Arc,
12    time::Duration,
13};
14
15use bytes::Bytes;
16use http::{
17    header::HeaderValue,
18    uri::{Authority, Scheme},
19};
20use rand::RngExt;
21use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
22pub use s2_common::caps::RECORD_BATCH_MAX;
23/// Encryption algorithm.
24pub use s2_common::encryption::EncryptionAlgorithm;
25/// Encryption key for stream operations.
26pub use s2_common::encryption::EncryptionKey;
27/// Validation error.
28pub use s2_common::types::ValidationError;
29/// Access token ID.
30///
31/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
32pub use s2_common::types::access::AccessTokenId;
33/// See [`ListAccessTokensInput::prefix`].
34pub use s2_common::types::access::AccessTokenIdPrefix;
35/// See [`ListAccessTokensInput::start_after`].
36pub use s2_common::types::access::AccessTokenIdStartAfter;
37/// Basin name.
38///
39/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
40/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
41pub use s2_common::types::basin::BasinName;
42/// See [`ListBasinsInput::prefix`].
43pub use s2_common::types::basin::BasinNamePrefix;
44/// See [`ListBasinsInput::start_after`].
45pub use s2_common::types::basin::BasinNameStartAfter;
46/// Stream name.
47///
48/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
49pub use s2_common::types::stream::StreamName;
50/// See [`ListStreamsInput::prefix`].
51pub use s2_common::types::stream::StreamNamePrefix;
52/// See [`ListStreamsInput::start_after`].
53pub use s2_common::types::stream::StreamNameStartAfter;
54
55pub(crate) const ONE_MIB: u32 = 1024 * 1024;
56
57use s2_common::{
58    maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH, types::resources::ProvisionResult,
59};
60use secrecy::SecretString;
61
62use crate::api::{ApiError, ApiErrorResponse};
63
64/// An RFC 3339 datetime.
65///
66/// It can be created in either of the following ways:
67/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
68/// - Convert from [`time::OffsetDateTime`] using [`TryFrom`]/[`TryInto`].
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub struct S2DateTime(time::OffsetDateTime);
71
72impl TryFrom<time::OffsetDateTime> for S2DateTime {
73    type Error = ValidationError;
74
75    fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
76        dt.format(&time::format_description::well_known::Rfc3339)
77            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
78        Ok(Self(dt))
79    }
80}
81
82impl From<S2DateTime> for time::OffsetDateTime {
83    fn from(dt: S2DateTime) -> Self {
84        dt.0
85    }
86}
87
88impl FromStr for S2DateTime {
89    type Err = ValidationError;
90
91    fn from_str(s: &str) -> Result<Self, Self::Err> {
92        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
93            .map(Self)
94            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
95    }
96}
97
98impl fmt::Display for S2DateTime {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        write!(
101            f,
102            "{}",
103            self.0
104                .format(&time::format_description::well_known::Rfc3339)
105                .expect("RFC3339 formatting should not fail for S2DateTime")
106        )
107    }
108}
109
110/// Authority for connecting to an S2 basin.
111#[derive(Debug, Clone, PartialEq)]
112pub(crate) enum BasinAuthority {
113    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
114    ParentZone(Authority),
115    /// Direct cell authority. Basin is expected to be hosted by this cell.
116    Direct(Authority),
117}
118
119/// Account endpoint.
120#[derive(Debug, Clone)]
121pub struct AccountEndpoint {
122    scheme: Scheme,
123    authority: Authority,
124}
125
126impl AccountEndpoint {
127    /// Create a new [`AccountEndpoint`] with the given endpoint.
128    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
129        endpoint.parse()
130    }
131}
132
133impl FromStr for AccountEndpoint {
134    type Err = ValidationError;
135
136    fn from_str(s: &str) -> Result<Self, Self::Err> {
137        let (scheme, authority) = match s.find("://") {
138            Some(idx) => {
139                let scheme: Scheme = s[..idx]
140                    .parse()
141                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
142                (scheme, &s[idx + 3..])
143            }
144            None => (Scheme::HTTPS, s),
145        };
146        Ok(Self {
147            scheme,
148            authority: authority
149                .parse()
150                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
151        })
152    }
153}
154
155/// Basin endpoint.
156#[derive(Debug, Clone)]
157pub struct BasinEndpoint {
158    scheme: Scheme,
159    authority: BasinAuthority,
160}
161
162impl BasinEndpoint {
163    /// Create a new [`BasinEndpoint`] with the given endpoint.
164    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
165        endpoint.parse()
166    }
167}
168
169impl FromStr for BasinEndpoint {
170    type Err = ValidationError;
171
172    fn from_str(s: &str) -> Result<Self, Self::Err> {
173        let (scheme, authority) = match s.find("://") {
174            Some(idx) => {
175                let scheme: Scheme = s[..idx]
176                    .parse()
177                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
178                (scheme, &s[idx + 3..])
179            }
180            None => (Scheme::HTTPS, s),
181        };
182        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
183            BasinAuthority::ParentZone(
184                authority
185                    .parse()
186                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
187            )
188        } else {
189            BasinAuthority::Direct(
190                authority
191                    .parse()
192                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
193            )
194        };
195        Ok(Self { scheme, authority })
196    }
197}
198
199#[derive(Debug, Clone)]
200#[non_exhaustive]
201/// Endpoints for the S2 environment.
202pub struct S2Endpoints {
203    pub(crate) scheme: Scheme,
204    pub(crate) account_authority: Authority,
205    pub(crate) basin_authority: BasinAuthority,
206}
207
208impl S2Endpoints {
209    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
210    pub fn new(
211        account_endpoint: AccountEndpoint,
212        basin_endpoint: BasinEndpoint,
213    ) -> Result<Self, ValidationError> {
214        if account_endpoint.scheme != basin_endpoint.scheme {
215            return Err("account and basin endpoints must have the same scheme".into());
216        }
217        Ok(Self {
218            scheme: account_endpoint.scheme,
219            account_authority: account_endpoint.authority,
220            basin_authority: basin_endpoint.authority,
221        })
222    }
223
224    /// Create a new [`S2Endpoints`] from environment variables.
225    ///
226    /// The following environment variables are expected to be set:
227    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
228    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
229    pub fn from_env() -> Result<Self, ValidationError> {
230        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
231            Ok(endpoint) => endpoint.parse()?,
232            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
233            Err(VarError::NotUnicode(_)) => {
234                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
235            }
236        };
237
238        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
239            Ok(endpoint) => endpoint.parse()?,
240            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
241            Err(VarError::NotUnicode(_)) => {
242                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
243            }
244        };
245
246        if account_endpoint.scheme != basin_endpoint.scheme {
247            return Err(
248                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
249            );
250        }
251
252        Ok(Self {
253            scheme: account_endpoint.scheme,
254            account_authority: account_endpoint.authority,
255            basin_authority: basin_endpoint.authority,
256        })
257    }
258
259    pub(crate) fn for_aws() -> Self {
260        Self {
261            scheme: Scheme::HTTPS,
262            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
263            basin_authority: BasinAuthority::ParentZone(
264                "b.s2.dev".try_into().expect("valid authority"),
265            ),
266        }
267    }
268}
269
270#[derive(Debug, Clone, Copy)]
271/// Compression algorithm for request and response bodies.
272pub enum Compression {
273    /// No compression.
274    None,
275    /// Gzip compression.
276    Gzip,
277    /// Zstd compression.
278    Zstd,
279}
280
281impl From<Compression> for CompressionAlgorithm {
282    fn from(value: Compression) -> Self {
283        match value {
284            Compression::None => CompressionAlgorithm::None,
285            Compression::Gzip => CompressionAlgorithm::Gzip,
286            Compression::Zstd => CompressionAlgorithm::Zstd,
287        }
288    }
289}
290
291#[derive(Debug, Clone, Copy, PartialEq)]
292#[non_exhaustive]
293/// Retry policy for [`append`](crate::S2Stream::append) and
294/// [`append_session`](crate::S2Stream::append_session) operations.
295pub enum AppendRetryPolicy {
296    /// Retry all appends. Use when duplicate records on the stream are acceptable.
297    All,
298    /// Retry when it can be determined that the request had no side effects.
299    ///
300    /// Uses a frame-level signal to detect whether any body frames were consumed
301    /// by the HTTP transport. If no frames were sent, the server never saw the
302    /// request, so retry is safe and will not cause duplicate records.
303    ///
304    /// Certain server errors (`rate_limited`, `hot_server`) are also safe to
305    /// retry regardless of frame signal state, since they guarantee no mutation
306    /// occurred.
307    NoSideEffects,
308}
309
310#[derive(Debug, Clone)]
311#[non_exhaustive]
312/// Configuration for retrying requests in case of transient failures.
313///
314/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
315/// ```text
316/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
317///     jitter = rand[0, base_delay]
318///     delay  = base_delay + jitter
319/// ````
320pub struct RetryConfig {
321    /// Total number of attempts including the initial try. A value of `1` means no retries.
322    ///
323    /// Defaults to `3`.
324    pub max_attempts: NonZeroU32,
325    /// Minimum base delay for retries.
326    ///
327    /// Defaults to `100ms`.
328    pub min_base_delay: Duration,
329    /// Maximum base delay for retries.
330    ///
331    /// Defaults to `1s`.
332    pub max_base_delay: Duration,
333    /// Retry policy for [`append`](crate::S2Stream::append) and
334    /// [`append_session`](crate::S2Stream::append_session) operations.
335    ///
336    /// Defaults to `All`.
337    pub append_retry_policy: AppendRetryPolicy,
338}
339
340impl Default for RetryConfig {
341    fn default() -> Self {
342        Self {
343            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
344            min_base_delay: Duration::from_millis(100),
345            max_base_delay: Duration::from_secs(1),
346            append_retry_policy: AppendRetryPolicy::All,
347        }
348    }
349}
350
351impl RetryConfig {
352    /// Create a new [`RetryConfig`] with default settings.
353    pub fn new() -> Self {
354        Self::default()
355    }
356
357    pub(crate) fn max_retries(&self) -> u32 {
358        self.max_attempts.get() - 1
359    }
360
361    /// Set the total number of attempts including the initial try.
362    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
363        Self {
364            max_attempts,
365            ..self
366        }
367    }
368
369    /// Set the minimum base delay for retries.
370    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
371        Self {
372            min_base_delay,
373            ..self
374        }
375    }
376
377    /// Set the maximum base delay for retries.
378    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
379        Self {
380            max_base_delay,
381            ..self
382        }
383    }
384
385    /// Set the retry policy for [`append`](crate::S2Stream::append) and
386    /// [`append_session`](crate::S2Stream::append_session) operations.
387    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
388        Self {
389            append_retry_policy,
390            ..self
391        }
392    }
393}
394
395#[derive(Debug, Clone)]
396#[non_exhaustive]
397/// Configuration for [`S2`](crate::S2).
398pub struct S2Config {
399    pub(crate) access_token: SecretString,
400    pub(crate) endpoints: S2Endpoints,
401    pub(crate) connection_timeout: Duration,
402    pub(crate) request_timeout: Duration,
403    pub(crate) retry: RetryConfig,
404    pub(crate) compression: Compression,
405    pub(crate) user_agent: HeaderValue,
406    pub(crate) insecure_skip_cert_verification: bool,
407    pub(crate) rustls_crypto_provider: Option<Arc<rustls::crypto::CryptoProvider>>,
408}
409
410impl S2Config {
411    /// Create a new [`S2Config`] with the given access token and default settings.
412    pub fn new(access_token: impl Into<String>) -> Self {
413        Self {
414            access_token: access_token.into().into(),
415            endpoints: S2Endpoints::for_aws(),
416            connection_timeout: Duration::from_secs(3),
417            request_timeout: Duration::from_secs(5),
418            retry: RetryConfig::new(),
419            compression: Compression::None,
420            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
421                .parse()
422                .expect("valid user agent"),
423            insecure_skip_cert_verification: false,
424            rustls_crypto_provider: default_rustls_crypto_provider(),
425        }
426    }
427
428    /// Set the S2 endpoints to connect to.
429    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
430        Self { endpoints, ..self }
431    }
432
433    /// Set the timeout for establishing a connection to the server.
434    ///
435    /// Defaults to `3s`.
436    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
437        Self {
438            connection_timeout,
439            ..self
440        }
441    }
442
443    /// Set the timeout for requests.
444    ///
445    /// Defaults to `5s`.
446    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
447        Self {
448            request_timeout,
449            ..self
450        }
451    }
452
453    /// Set the retry configuration for requests.
454    ///
455    /// See [`RetryConfig`] for defaults.
456    pub fn with_retry(self, retry: RetryConfig) -> Self {
457        Self { retry, ..self }
458    }
459
460    /// Set the compression algorithm for requests and responses.
461    ///
462    /// Defaults to no compression.
463    pub fn with_compression(self, compression: Compression) -> Self {
464        Self {
465            compression,
466            ..self
467        }
468    }
469
470    /// Skip TLS certificate verification (insecure).
471    ///
472    /// This is useful for connecting to endpoints with self-signed certificates
473    /// or certificates that don't match the hostname (similar to `curl -k`).
474    ///
475    /// # Warning
476    ///
477    /// This disables certificate verification and should only be used for
478    /// testing or development purposes. **Never use this in production.**
479    ///
480    /// Defaults to `false`.
481    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
482        Self {
483            insecure_skip_cert_verification: skip,
484            ..self
485        }
486    }
487
488    /// Use a specific rustls crypto provider for SDK TLS connections.
489    ///
490    /// With default features enabled, the SDK uses the `aws-lc-rs` provider.
491    /// With default features disabled, the SDK uses rustls's process-global
492    /// provider if one has been installed, or returns an error otherwise.
493    ///
494    /// Use this when your application needs a specific rustls provider, such as
495    /// `ring` or a custom [`rustls::crypto::CryptoProvider`]. The corresponding
496    /// rustls provider feature must be enabled in the dependency graph.
497    pub fn with_rustls_crypto_provider(
498        self,
499        provider: impl Into<Arc<rustls::crypto::CryptoProvider>>,
500    ) -> Self {
501        Self {
502            rustls_crypto_provider: Some(provider.into()),
503            ..self
504        }
505    }
506
507    /// Use rustls's `aws-lc-rs` crypto provider.
508    ///
509    /// Requires the `rustls-aws-lc-rs` crate feature.
510    #[cfg(feature = "rustls-aws-lc-rs")]
511    pub fn with_rustls_aws_lc_rs_crypto_provider(self) -> Self {
512        self.with_rustls_crypto_provider(rustls::crypto::aws_lc_rs::default_provider())
513    }
514
515    /// Use rustls's `ring` crypto provider.
516    ///
517    /// Requires the `rustls-ring` crate feature.
518    #[cfg(feature = "rustls-ring")]
519    pub fn with_rustls_ring_crypto_provider(self) -> Self {
520        self.with_rustls_crypto_provider(rustls::crypto::ring::default_provider())
521    }
522
523    #[doc(hidden)]
524    #[cfg(feature = "_hidden")]
525    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
526        let user_agent = user_agent
527            .into()
528            .parse()
529            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
530        Ok(Self { user_agent, ..self })
531    }
532}
533
534#[cfg(feature = "rustls-aws-lc-rs")]
535fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
536    Some(Arc::new(rustls::crypto::aws_lc_rs::default_provider()))
537}
538
539#[cfg(all(not(feature = "rustls-aws-lc-rs"), feature = "rustls-ring"))]
540fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
541    Some(Arc::new(rustls::crypto::ring::default_provider()))
542}
543
544#[cfg(all(not(feature = "rustls-aws-lc-rs"), not(feature = "rustls-ring")))]
545fn default_rustls_crypto_provider() -> Option<Arc<rustls::crypto::CryptoProvider>> {
546    None
547}
548
549#[derive(Debug, Default, Clone, PartialEq, Eq)]
550#[non_exhaustive]
551/// A page of values.
552pub struct Page<T> {
553    /// Values in this page.
554    pub values: Vec<T>,
555    /// Whether there are more pages.
556    pub has_more: bool,
557}
558
559impl<T> Page<T> {
560    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
561        Self {
562            values: values.into(),
563            has_more,
564        }
565    }
566}
567
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569/// Storage class for recent appends.
570pub enum StorageClass {
571    /// Standard storage class that offers append latencies under `500ms`.
572    Standard,
573    /// Express storage class that offers append latencies under `50ms`.
574    Express,
575}
576
577impl From<api::config::StorageClass> for StorageClass {
578    fn from(value: api::config::StorageClass) -> Self {
579        match value {
580            api::config::StorageClass::Standard => StorageClass::Standard,
581            api::config::StorageClass::Express => StorageClass::Express,
582        }
583    }
584}
585
586impl From<StorageClass> for api::config::StorageClass {
587    fn from(value: StorageClass) -> Self {
588        match value {
589            StorageClass::Standard => api::config::StorageClass::Standard,
590            StorageClass::Express => api::config::StorageClass::Express,
591        }
592    }
593}
594
595#[derive(Debug, Clone, Copy, PartialEq, Eq)]
596/// Retention policy for records in a stream.
597pub enum RetentionPolicy {
598    /// Age in seconds. Records older than this age are automatically trimmed.
599    Age(u64),
600    /// Records are retained indefinitely unless explicitly trimmed.
601    Infinite,
602}
603
604impl From<api::config::RetentionPolicy> for RetentionPolicy {
605    fn from(value: api::config::RetentionPolicy) -> Self {
606        match value {
607            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
608            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
609        }
610    }
611}
612
613impl From<RetentionPolicy> for api::config::RetentionPolicy {
614    fn from(value: RetentionPolicy) -> Self {
615        match value {
616            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
617            RetentionPolicy::Infinite => {
618                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
619            }
620        }
621    }
622}
623
624#[derive(Debug, Clone, Copy, PartialEq, Eq)]
625/// Timestamping mode for appends that influences how timestamps are handled.
626pub enum TimestampingMode {
627    /// Prefer client-specified timestamp if present otherwise use arrival time.
628    ClientPrefer,
629    /// Require a client-specified timestamp and reject the append if it is missing.
630    ClientRequire,
631    /// Use the arrival time and ignore any client-specified timestamp.
632    Arrival,
633}
634
635impl From<api::config::TimestampingMode> for TimestampingMode {
636    fn from(value: api::config::TimestampingMode) -> Self {
637        match value {
638            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
639            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
640            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
641        }
642    }
643}
644
645impl From<TimestampingMode> for api::config::TimestampingMode {
646    fn from(value: TimestampingMode) -> Self {
647        match value {
648            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
649            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
650            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
651        }
652    }
653}
654
655#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
656#[non_exhaustive]
657/// Configuration for timestamping behavior.
658pub struct TimestampingConfig {
659    /// Timestamping mode for appends that influences how timestamps are handled.
660    ///
661    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
662    pub mode: Option<TimestampingMode>,
663    /// Whether client-specified timestamps are allowed to exceed the arrival time.
664    ///
665    /// Defaults to `false` (client timestamps are capped at the arrival time).
666    pub uncapped: Option<bool>,
667}
668
669impl TimestampingConfig {
670    /// Create a new [`TimestampingConfig`] with default settings.
671    pub fn new() -> Self {
672        Self::default()
673    }
674
675    /// Set the timestamping mode for appends that influences how timestamps are handled.
676    pub fn with_mode(self, mode: TimestampingMode) -> Self {
677        Self {
678            mode: Some(mode),
679            ..self
680        }
681    }
682
683    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
684    pub fn with_uncapped(self, uncapped: bool) -> Self {
685        Self {
686            uncapped: Some(uncapped),
687            ..self
688        }
689    }
690}
691
692impl From<api::config::TimestampingConfig> for TimestampingConfig {
693    fn from(value: api::config::TimestampingConfig) -> Self {
694        Self {
695            mode: value.mode.map(Into::into),
696            uncapped: value.uncapped,
697        }
698    }
699}
700
701impl From<TimestampingConfig> for api::config::TimestampingConfig {
702    fn from(value: TimestampingConfig) -> Self {
703        Self {
704            mode: value.mode.map(Into::into),
705            uncapped: value.uncapped,
706        }
707    }
708}
709
710#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
711#[non_exhaustive]
712/// Configuration for automatically deleting a stream when it becomes empty.
713pub struct DeleteOnEmptyConfig {
714    /// Minimum age in seconds before an empty stream can be deleted.
715    ///
716    /// Defaults to `0` (disables automatic deletion).
717    pub min_age_secs: u64,
718}
719
720impl DeleteOnEmptyConfig {
721    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
722    pub fn new() -> Self {
723        Self::default()
724    }
725
726    /// Set the minimum age in seconds before an empty stream can be deleted.
727    pub fn with_min_age(self, min_age: Duration) -> Self {
728        Self {
729            min_age_secs: min_age.as_secs(),
730        }
731    }
732}
733
734impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
735    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
736        Self {
737            min_age_secs: value.min_age_secs,
738        }
739    }
740}
741
742impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
743    fn from(value: DeleteOnEmptyConfig) -> Self {
744        Self {
745            min_age_secs: value.min_age_secs,
746        }
747    }
748}
749
750#[derive(Debug, Clone, Default, PartialEq, Eq)]
751#[non_exhaustive]
752/// Configuration for a stream.
753pub struct StreamConfig {
754    /// Storage class for the stream.
755    ///
756    /// Defaults to [`Express`](StorageClass::Express).
757    pub storage_class: Option<StorageClass>,
758    /// Retention policy for records in the stream.
759    ///
760    /// Defaults to `7 days` of retention.
761    pub retention_policy: Option<RetentionPolicy>,
762    /// Configuration for timestamping behavior.
763    ///
764    /// See [`TimestampingConfig`] for defaults.
765    pub timestamping: Option<TimestampingConfig>,
766    /// Configuration for automatically deleting the stream when it becomes empty.
767    ///
768    /// See [`DeleteOnEmptyConfig`] for defaults.
769    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
770}
771
772impl StreamConfig {
773    /// Create a new [`StreamConfig`] with default settings.
774    pub fn new() -> Self {
775        Self::default()
776    }
777
778    /// Set the storage class for the stream.
779    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
780        Self {
781            storage_class: Some(storage_class),
782            ..self
783        }
784    }
785
786    /// Set the retention policy for records in the stream.
787    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
788        Self {
789            retention_policy: Some(retention_policy),
790            ..self
791        }
792    }
793
794    /// Set the configuration for timestamping behavior.
795    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
796        Self {
797            timestamping: Some(timestamping),
798            ..self
799        }
800    }
801
802    /// Set the configuration for automatically deleting the stream when it becomes empty.
803    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
804        Self {
805            delete_on_empty: Some(delete_on_empty),
806            ..self
807        }
808    }
809}
810
811impl From<api::config::StreamConfig> for StreamConfig {
812    fn from(value: api::config::StreamConfig) -> Self {
813        Self {
814            storage_class: value.storage_class.map(Into::into),
815            retention_policy: value.retention_policy.map(Into::into),
816            timestamping: value.timestamping.map(Into::into),
817            delete_on_empty: value.delete_on_empty.map(Into::into),
818        }
819    }
820}
821
822impl From<StreamConfig> for api::config::StreamConfig {
823    fn from(value: StreamConfig) -> Self {
824        Self {
825            storage_class: value.storage_class.map(Into::into),
826            retention_policy: value.retention_policy.map(Into::into),
827            timestamping: value.timestamping.map(Into::into),
828            delete_on_empty: value.delete_on_empty.map(Into::into),
829        }
830    }
831}
832
833#[derive(Debug, Clone, Default, PartialEq, Eq)]
834#[non_exhaustive]
835/// Configuration for a basin.
836pub struct BasinConfig {
837    /// Default configuration for all streams in the basin.
838    ///
839    /// See [`StreamConfig`] for defaults.
840    pub default_stream_config: Option<StreamConfig>,
841    /// Encryption algorithm to apply to newly created streams in the basin.
842    pub stream_cipher: Option<EncryptionAlgorithm>,
843    /// Whether to create stream on append if it doesn't exist using default stream configuration.
844    ///
845    /// Defaults to `false`.
846    pub create_stream_on_append: bool,
847    /// Whether to create stream on read if it doesn't exist using default stream configuration.
848    ///
849    /// Defaults to `false`.
850    pub create_stream_on_read: bool,
851}
852
853impl BasinConfig {
854    /// Create a new [`BasinConfig`] with default settings.
855    pub fn new() -> Self {
856        Self::default()
857    }
858
859    /// Set the default configuration for all streams in the basin.
860    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
861        Self {
862            default_stream_config: Some(config),
863            ..self
864        }
865    }
866
867    /// Set the encryption algorithm to apply to newly created streams in the basin.
868    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
869        Self {
870            stream_cipher: Some(stream_cipher),
871            ..self
872        }
873    }
874
875    /// Set whether to create stream on append if it doesn't exist using default stream
876    /// configuration.
877    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
878        Self {
879            create_stream_on_append,
880            ..self
881        }
882    }
883
884    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
885    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
886        Self {
887            create_stream_on_read,
888            ..self
889        }
890    }
891}
892
893impl From<api::config::BasinConfig> for BasinConfig {
894    fn from(value: api::config::BasinConfig) -> Self {
895        Self {
896            default_stream_config: value.default_stream_config.map(Into::into),
897            stream_cipher: value.stream_cipher.map(Into::into),
898            create_stream_on_append: value.create_stream_on_append,
899            create_stream_on_read: value.create_stream_on_read,
900        }
901    }
902}
903
904impl From<BasinConfig> for api::config::BasinConfig {
905    fn from(value: BasinConfig) -> Self {
906        Self {
907            default_stream_config: value.default_stream_config.map(Into::into),
908            stream_cipher: value.stream_cipher.map(Into::into),
909            create_stream_on_append: value.create_stream_on_append,
910            create_stream_on_read: value.create_stream_on_read,
911        }
912    }
913}
914
915#[derive(Debug, Clone, PartialEq, Eq)]
916/// Scope of a basin.
917#[non_exhaustive]
918pub enum BasinScope {
919    /// AWS `us-east-1` region.
920    AwsUsEast1,
921    /// AWS `us-west-2` region.
922    AwsUsWest2,
923    /// AWS `eu-north-1` region.
924    AwsEuNorth1,
925}
926
927impl From<api::basin::BasinScope> for BasinScope {
928    fn from(value: api::basin::BasinScope) -> Self {
929        match value {
930            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
931            api::basin::BasinScope::AwsUsWest2 => BasinScope::AwsUsWest2,
932            api::basin::BasinScope::AwsEuNorth1 => BasinScope::AwsEuNorth1,
933        }
934    }
935}
936
937impl From<BasinScope> for api::basin::BasinScope {
938    fn from(value: BasinScope) -> Self {
939        match value {
940            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
941            BasinScope::AwsUsWest2 => api::basin::BasinScope::AwsUsWest2,
942            BasinScope::AwsEuNorth1 => api::basin::BasinScope::AwsEuNorth1,
943        }
944    }
945}
946
947#[derive(Debug, Clone)]
948#[non_exhaustive]
949/// Input for [`create_basin`](crate::S2::create_basin) operation.
950pub struct CreateBasinInput {
951    /// Basin name.
952    pub name: BasinName,
953    /// Configuration for the basin.
954    ///
955    /// See [`BasinConfig`] for defaults.
956    pub config: Option<BasinConfig>,
957    /// Scope of the basin.
958    ///
959    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once created.
960    pub scope: Option<BasinScope>,
961    idempotency_token: String,
962}
963
964impl CreateBasinInput {
965    /// Create a new [`CreateBasinInput`] with the given basin name.
966    pub fn new(name: BasinName) -> Self {
967        Self {
968            name,
969            config: None,
970            scope: None,
971            idempotency_token: idempotency_token(),
972        }
973    }
974
975    /// Set the configuration for the basin.
976    pub fn with_config(self, config: BasinConfig) -> Self {
977        Self {
978            config: Some(config),
979            ..self
980        }
981    }
982
983    /// Set the scope of the basin.
984    pub fn with_scope(self, scope: BasinScope) -> Self {
985        Self {
986            scope: Some(scope),
987            ..self
988        }
989    }
990}
991
992impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
993    fn from(value: CreateBasinInput) -> Self {
994        (
995            api::basin::CreateBasinRequest {
996                basin: value.name,
997                config: value.config.map(Into::into),
998                scope: value.scope.map(Into::into),
999            },
1000            value.idempotency_token,
1001        )
1002    }
1003}
1004
1005#[derive(Debug, Clone)]
1006#[non_exhaustive]
1007/// Input for [`ensure_basin`](crate::S2::ensure_basin) operation.
1008pub struct EnsureBasinInput {
1009    /// Basin name.
1010    pub name: BasinName,
1011    /// Configuration for the basin.
1012    ///
1013    /// See [`BasinConfig`] for defaults.
1014    pub config: Option<BasinConfig>,
1015    /// Scope of the basin.
1016    ///
1017    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once created.
1018    pub scope: Option<BasinScope>,
1019}
1020
1021impl EnsureBasinInput {
1022    /// Create a new [`EnsureBasinInput`] with the given basin name.
1023    pub fn new(name: BasinName) -> Self {
1024        Self {
1025            name,
1026            config: None,
1027            scope: None,
1028        }
1029    }
1030
1031    /// Set the configuration for the basin.
1032    pub fn with_config(self, config: BasinConfig) -> Self {
1033        Self {
1034            config: Some(config),
1035            ..self
1036        }
1037    }
1038
1039    /// Set the scope of the basin.
1040    pub fn with_scope(self, scope: BasinScope) -> Self {
1041        Self {
1042            scope: Some(scope),
1043            ..self
1044        }
1045    }
1046}
1047
1048impl From<EnsureBasinInput> for (BasinName, Option<api::basin::EnsureBasinRequest>) {
1049    fn from(value: EnsureBasinInput) -> Self {
1050        let config = value.config;
1051        let request = if config.is_some() || value.scope.is_some() {
1052            Some(api::basin::EnsureBasinRequest {
1053                config: config.map(Into::into),
1054                scope: value.scope.map(Into::into),
1055            })
1056        } else {
1057            None
1058        };
1059        (value.name, request)
1060    }
1061}
1062
1063#[derive(Debug, Clone)]
1064/// Output for `ensure` operations ([`ensure_basin`](crate::S2::ensure_basin),
1065/// [`ensure_stream`](crate::S2Basin::ensure_stream)).
1066pub enum EnsureOutput<T> {
1067    /// Resource created.
1068    Created(T),
1069    /// Resource already existed, and its config was updated.
1070    ConfigUpdated(T),
1071    /// Resource already existed, and its config is unchanged.
1072    ConfigUnchanged(T),
1073}
1074
1075impl<T> From<ProvisionResult<T>> for EnsureOutput<T> {
1076    fn from(result: ProvisionResult<T>) -> Self {
1077        match result {
1078            ProvisionResult::Created(info) => EnsureOutput::Created(info),
1079            ProvisionResult::Updated(info) => EnsureOutput::ConfigUpdated(info),
1080            ProvisionResult::Noop(info) => EnsureOutput::ConfigUnchanged(info),
1081        }
1082    }
1083}
1084
1085#[derive(Debug, Clone, Default)]
1086#[non_exhaustive]
1087/// Input for [`list_basins`](crate::S2::list_basins) operation.
1088pub struct ListBasinsInput {
1089    /// Filter basins whose names begin with this value.
1090    ///
1091    /// Defaults to `""`.
1092    pub prefix: BasinNamePrefix,
1093    /// Filter basins whose names are lexicographically greater than this value.
1094    ///
1095    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
1096    ///
1097    /// Defaults to `""`.
1098    pub start_after: BasinNameStartAfter,
1099    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
1100    ///
1101    /// Defaults to `1000`.
1102    pub limit: Option<usize>,
1103}
1104
1105impl ListBasinsInput {
1106    /// Create a new [`ListBasinsInput`] with default values.
1107    pub fn new() -> Self {
1108        Self::default()
1109    }
1110
1111    /// Set the prefix used to filter basins whose names begin with this value.
1112    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1113        Self { prefix, ..self }
1114    }
1115
1116    /// Set the value used to filter basins whose names are lexicographically greater than this
1117    /// value.
1118    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1119        Self {
1120            start_after,
1121            ..self
1122        }
1123    }
1124
1125    /// Set the limit on number of basins to return in a page.
1126    pub fn with_limit(self, limit: usize) -> Self {
1127        Self {
1128            limit: Some(limit),
1129            ..self
1130        }
1131    }
1132}
1133
1134impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1135    fn from(value: ListBasinsInput) -> Self {
1136        Self {
1137            prefix: Some(value.prefix),
1138            start_after: Some(value.start_after),
1139            limit: value.limit,
1140        }
1141    }
1142}
1143
1144#[derive(Debug, Clone, Default)]
1145/// Input for [`list_all_basins`](crate::S2::list_all_basins) operation.
1146pub struct ListAllBasinsInput {
1147    /// Filter basins whose names begin with this value.
1148    ///
1149    /// Defaults to `""`.
1150    pub prefix: BasinNamePrefix,
1151    /// Filter basins whose names are lexicographically greater than this value.
1152    ///
1153    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
1154    ///
1155    /// Defaults to `""`.
1156    pub start_after: BasinNameStartAfter,
1157    /// Whether to include basins that are being deleted.
1158    ///
1159    /// Defaults to `false`.
1160    pub include_deleted: bool,
1161}
1162
1163impl ListAllBasinsInput {
1164    /// Create a new [`ListAllBasinsInput`] with default values.
1165    pub fn new() -> Self {
1166        Self::default()
1167    }
1168
1169    /// Set the prefix used to filter basins whose names begin with this value.
1170    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1171        Self { prefix, ..self }
1172    }
1173
1174    /// Set the value used to filter basins whose names are lexicographically greater than this
1175    /// value.
1176    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1177        Self {
1178            start_after,
1179            ..self
1180        }
1181    }
1182
1183    /// Set whether to include basins that are being deleted.
1184    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1185        Self {
1186            include_deleted,
1187            ..self
1188        }
1189    }
1190}
1191
1192#[derive(Debug, Clone, PartialEq, Eq)]
1193#[non_exhaustive]
1194/// Basin information.
1195pub struct BasinInfo {
1196    /// Basin name.
1197    pub name: BasinName,
1198    /// Scope of the basin.
1199    pub scope: Option<BasinScope>,
1200    /// Creation time.
1201    pub created_at: S2DateTime,
1202    /// Deletion time if the basin is being deleted.
1203    pub deleted_at: Option<S2DateTime>,
1204}
1205
1206impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1207    type Error = ValidationError;
1208
1209    fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1210        Ok(Self {
1211            name: value.name,
1212            scope: value.scope.map(Into::into),
1213            created_at: value.created_at.try_into()?,
1214            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1215        })
1216    }
1217}
1218
1219#[derive(Debug, Clone)]
1220#[non_exhaustive]
1221/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1222pub struct DeleteBasinInput {
1223    /// Basin name.
1224    pub name: BasinName,
1225    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1226    pub ignore_not_found: bool,
1227}
1228
1229impl DeleteBasinInput {
1230    /// Create a new [`DeleteBasinInput`] with the given basin name.
1231    pub fn new(name: BasinName) -> Self {
1232        Self {
1233            name,
1234            ignore_not_found: false,
1235        }
1236    }
1237
1238    /// Set whether to ignore `Not Found` error if the basin is not existing.
1239    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1240        Self {
1241            ignore_not_found,
1242            ..self
1243        }
1244    }
1245}
1246
1247#[derive(Debug, Clone, Default)]
1248#[non_exhaustive]
1249/// Reconfiguration for [`TimestampingConfig`].
1250pub struct TimestampingReconfiguration {
1251    /// Override for the existing [`mode`](TimestampingConfig::mode).
1252    pub mode: Maybe<Option<TimestampingMode>>,
1253    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1254    pub uncapped: Maybe<Option<bool>>,
1255}
1256
1257impl TimestampingReconfiguration {
1258    /// Create a new [`TimestampingReconfiguration`].
1259    pub fn new() -> Self {
1260        Self::default()
1261    }
1262
1263    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1264    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1265        Self {
1266            mode: Maybe::Specified(Some(mode)),
1267            ..self
1268        }
1269    }
1270
1271    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1272    pub fn with_uncapped(self, uncapped: bool) -> Self {
1273        Self {
1274            uncapped: Maybe::Specified(Some(uncapped)),
1275            ..self
1276        }
1277    }
1278}
1279
1280impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1281    fn from(value: TimestampingReconfiguration) -> Self {
1282        Self {
1283            mode: value.mode.map(|m| m.map(Into::into)),
1284            uncapped: value.uncapped,
1285        }
1286    }
1287}
1288
1289#[derive(Debug, Clone, Default)]
1290#[non_exhaustive]
1291/// Reconfiguration for [`DeleteOnEmptyConfig`].
1292pub struct DeleteOnEmptyReconfiguration {
1293    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1294    pub min_age_secs: Maybe<Option<u64>>,
1295}
1296
1297impl DeleteOnEmptyReconfiguration {
1298    /// Create a new [`DeleteOnEmptyReconfiguration`].
1299    pub fn new() -> Self {
1300        Self::default()
1301    }
1302
1303    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1304    pub fn with_min_age(self, min_age: Duration) -> Self {
1305        Self {
1306            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1307        }
1308    }
1309}
1310
1311impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1312    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1313        Self {
1314            min_age_secs: value.min_age_secs,
1315        }
1316    }
1317}
1318
1319#[derive(Debug, Clone, Default)]
1320#[non_exhaustive]
1321/// Reconfiguration for [`StreamConfig`].
1322pub struct StreamReconfiguration {
1323    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1324    pub storage_class: Maybe<Option<StorageClass>>,
1325    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1326    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1327    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1328    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1329    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1330    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1331}
1332
1333impl StreamReconfiguration {
1334    /// Create a new [`StreamReconfiguration`].
1335    pub fn new() -> Self {
1336        Self::default()
1337    }
1338
1339    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1340    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1341        Self {
1342            storage_class: Maybe::Specified(Some(storage_class)),
1343            ..self
1344        }
1345    }
1346
1347    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1348    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1349        Self {
1350            retention_policy: Maybe::Specified(Some(retention_policy)),
1351            ..self
1352        }
1353    }
1354
1355    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1356    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1357        Self {
1358            timestamping: Maybe::Specified(Some(timestamping)),
1359            ..self
1360        }
1361    }
1362
1363    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1364    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1365        Self {
1366            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1367            ..self
1368        }
1369    }
1370}
1371
1372impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1373    fn from(value: StreamReconfiguration) -> Self {
1374        Self {
1375            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1376            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1377            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1378            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1379        }
1380    }
1381}
1382
1383#[derive(Debug, Clone, Default)]
1384#[non_exhaustive]
1385/// Reconfiguration for [`BasinConfig`].
1386pub struct BasinReconfiguration {
1387    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1388    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1389    /// Override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1390    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1391    /// Override for the existing
1392    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1393    pub create_stream_on_append: Maybe<bool>,
1394    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1395    pub create_stream_on_read: Maybe<bool>,
1396}
1397
1398impl BasinReconfiguration {
1399    /// Create a new [`BasinReconfiguration`].
1400    pub fn new() -> Self {
1401        Self::default()
1402    }
1403
1404    /// Set the override for the existing
1405    /// [`default_stream_config`](BasinConfig::default_stream_config).
1406    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1407        Self {
1408            default_stream_config: Maybe::Specified(Some(config)),
1409            ..self
1410        }
1411    }
1412
1413    /// Set the override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1414    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1415        Self {
1416            stream_cipher: Maybe::Specified(Some(stream_cipher)),
1417            ..self
1418        }
1419    }
1420
1421    /// Set the override for the existing
1422    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1423    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1424        Self {
1425            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1426            ..self
1427        }
1428    }
1429
1430    /// Set the override for the existing
1431    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1432    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1433        Self {
1434            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1435            ..self
1436        }
1437    }
1438}
1439
1440impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1441    fn from(value: BasinReconfiguration) -> Self {
1442        Self {
1443            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1444            stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1445            create_stream_on_append: value.create_stream_on_append,
1446            create_stream_on_read: value.create_stream_on_read,
1447        }
1448    }
1449}
1450
1451#[derive(Debug, Clone)]
1452#[non_exhaustive]
1453/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1454pub struct ReconfigureBasinInput {
1455    /// Basin name.
1456    pub name: BasinName,
1457    /// Reconfiguration for [`BasinConfig`].
1458    pub config: BasinReconfiguration,
1459}
1460
1461impl ReconfigureBasinInput {
1462    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1463    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1464        Self { name, config }
1465    }
1466}
1467
1468#[derive(Debug, Clone, Default)]
1469#[non_exhaustive]
1470/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1471pub struct ListAccessTokensInput {
1472    /// Filter access tokens whose IDs begin with this value.
1473    ///
1474    /// Defaults to `""`.
1475    pub prefix: AccessTokenIdPrefix,
1476    /// Filter access tokens whose IDs are lexicographically greater than this value.
1477    ///
1478    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1479    ///
1480    /// Defaults to `""`.
1481    pub start_after: AccessTokenIdStartAfter,
1482    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1483    ///
1484    /// Defaults to `1000`.
1485    pub limit: Option<usize>,
1486}
1487
1488impl ListAccessTokensInput {
1489    /// Create a new [`ListAccessTokensInput`] with default values.
1490    pub fn new() -> Self {
1491        Self::default()
1492    }
1493
1494    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1495    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1496        Self { prefix, ..self }
1497    }
1498
1499    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1500    /// value.
1501    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1502        Self {
1503            start_after,
1504            ..self
1505        }
1506    }
1507
1508    /// Set the limit on number of access tokens to return in a page.
1509    pub fn with_limit(self, limit: usize) -> Self {
1510        Self {
1511            limit: Some(limit),
1512            ..self
1513        }
1514    }
1515}
1516
1517impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1518    fn from(value: ListAccessTokensInput) -> Self {
1519        Self {
1520            prefix: Some(value.prefix),
1521            start_after: Some(value.start_after),
1522            limit: value.limit,
1523        }
1524    }
1525}
1526
1527#[derive(Debug, Clone, Default)]
1528/// Input for [`list_all_access_tokens`](crate::S2::list_all_access_tokens) operation.
1529pub struct ListAllAccessTokensInput {
1530    /// Filter access tokens whose IDs begin with this value.
1531    ///
1532    /// Defaults to `""`.
1533    pub prefix: AccessTokenIdPrefix,
1534    /// Filter access tokens whose IDs are lexicographically greater than this value.
1535    ///
1536    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1537    ///
1538    /// Defaults to `""`.
1539    pub start_after: AccessTokenIdStartAfter,
1540}
1541
1542impl ListAllAccessTokensInput {
1543    /// Create a new [`ListAllAccessTokensInput`] with default values.
1544    pub fn new() -> Self {
1545        Self::default()
1546    }
1547
1548    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1549    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1550        Self { prefix, ..self }
1551    }
1552
1553    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1554    /// this value.
1555    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1556        Self {
1557            start_after,
1558            ..self
1559        }
1560    }
1561}
1562
1563#[derive(Debug, Clone)]
1564#[non_exhaustive]
1565/// Access token information.
1566pub struct AccessTokenInfo {
1567    /// Access token ID.
1568    pub id: AccessTokenId,
1569    /// Expiration time.
1570    pub expires_at: S2DateTime,
1571    /// Whether to automatically prefix stream names during creation and strip the prefix during
1572    /// listing.
1573    pub auto_prefix_streams: bool,
1574    /// Scope of the access token.
1575    pub scope: AccessTokenScope,
1576}
1577
1578impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1579    type Error = ValidationError;
1580
1581    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1582        let expires_at = value
1583            .expires_at
1584            .map(S2DateTime::try_from)
1585            .transpose()?
1586            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1587        Ok(Self {
1588            id: value.id,
1589            expires_at,
1590            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1591            scope: value.scope.into(),
1592        })
1593    }
1594}
1595
1596#[derive(Debug, Clone)]
1597/// Pattern for matching basins.
1598///
1599/// See [`AccessTokenScope::basins`].
1600pub enum BasinMatcher {
1601    /// Match no basins.
1602    None,
1603    /// Match exactly this basin.
1604    Exact(BasinName),
1605    /// Match all basins with this prefix.
1606    Prefix(BasinNamePrefix),
1607}
1608
1609#[derive(Debug, Clone)]
1610/// Pattern for matching streams.
1611///
1612/// See [`AccessTokenScope::streams`].
1613pub enum StreamMatcher {
1614    /// Match no streams.
1615    None,
1616    /// Match exactly this stream.
1617    Exact(StreamName),
1618    /// Match all streams with this prefix.
1619    Prefix(StreamNamePrefix),
1620}
1621
1622#[derive(Debug, Clone)]
1623/// Pattern for matching access tokens.
1624///
1625/// See [`AccessTokenScope::access_tokens`].
1626pub enum AccessTokenMatcher {
1627    /// Match no access tokens.
1628    None,
1629    /// Match exactly this access token.
1630    Exact(AccessTokenId),
1631    /// Match all access tokens with this prefix.
1632    Prefix(AccessTokenIdPrefix),
1633}
1634
1635#[derive(Debug, Clone, Default)]
1636#[non_exhaustive]
1637/// Permissions indicating allowed operations.
1638pub struct ReadWritePermissions {
1639    /// Read permission.
1640    ///
1641    /// Defaults to `false`.
1642    pub read: bool,
1643    /// Write permission.
1644    ///
1645    /// Defaults to `false`.
1646    pub write: bool,
1647}
1648
1649impl ReadWritePermissions {
1650    /// Create a new [`ReadWritePermissions`] with default values.
1651    pub fn new() -> Self {
1652        Self::default()
1653    }
1654
1655    /// Create read-only permissions.
1656    pub fn read_only() -> Self {
1657        Self {
1658            read: true,
1659            write: false,
1660        }
1661    }
1662
1663    /// Create write-only permissions.
1664    pub fn write_only() -> Self {
1665        Self {
1666            read: false,
1667            write: true,
1668        }
1669    }
1670
1671    /// Create read-write permissions.
1672    pub fn read_write() -> Self {
1673        Self {
1674            read: true,
1675            write: true,
1676        }
1677    }
1678}
1679
1680impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1681    fn from(value: ReadWritePermissions) -> Self {
1682        Self {
1683            read: Some(value.read),
1684            write: Some(value.write),
1685        }
1686    }
1687}
1688
1689impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1690    fn from(value: api::access::ReadWritePermissions) -> Self {
1691        Self {
1692            read: value.read.unwrap_or_default(),
1693            write: value.write.unwrap_or_default(),
1694        }
1695    }
1696}
1697
1698#[derive(Debug, Clone, Default)]
1699#[non_exhaustive]
1700/// Permissions at the operation group level.
1701///
1702/// See [`AccessTokenScope::op_group_perms`].
1703pub struct OperationGroupPermissions {
1704    /// Account-level access permissions.
1705    ///
1706    /// Defaults to `None`.
1707    pub account: Option<ReadWritePermissions>,
1708    /// Basin-level access permissions.
1709    ///
1710    /// Defaults to `None`.
1711    pub basin: Option<ReadWritePermissions>,
1712    /// Stream-level access permissions.
1713    ///
1714    /// Defaults to `None`.
1715    pub stream: Option<ReadWritePermissions>,
1716}
1717
1718impl OperationGroupPermissions {
1719    /// Create a new [`OperationGroupPermissions`] with default values.
1720    pub fn new() -> Self {
1721        Self::default()
1722    }
1723
1724    /// Create read-only permissions for all groups.
1725    pub fn read_only_all() -> Self {
1726        Self {
1727            account: Some(ReadWritePermissions::read_only()),
1728            basin: Some(ReadWritePermissions::read_only()),
1729            stream: Some(ReadWritePermissions::read_only()),
1730        }
1731    }
1732
1733    /// Create write-only permissions for all groups.
1734    pub fn write_only_all() -> Self {
1735        Self {
1736            account: Some(ReadWritePermissions::write_only()),
1737            basin: Some(ReadWritePermissions::write_only()),
1738            stream: Some(ReadWritePermissions::write_only()),
1739        }
1740    }
1741
1742    /// Create read-write permissions for all groups.
1743    pub fn read_write_all() -> Self {
1744        Self {
1745            account: Some(ReadWritePermissions::read_write()),
1746            basin: Some(ReadWritePermissions::read_write()),
1747            stream: Some(ReadWritePermissions::read_write()),
1748        }
1749    }
1750
1751    /// Set account-level access permissions.
1752    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1753        Self {
1754            account: Some(account),
1755            ..self
1756        }
1757    }
1758
1759    /// Set basin-level access permissions.
1760    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1761        Self {
1762            basin: Some(basin),
1763            ..self
1764        }
1765    }
1766
1767    /// Set stream-level access permissions.
1768    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1769        Self {
1770            stream: Some(stream),
1771            ..self
1772        }
1773    }
1774}
1775
1776impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1777    fn from(value: OperationGroupPermissions) -> Self {
1778        Self {
1779            account: value.account.map(Into::into),
1780            basin: value.basin.map(Into::into),
1781            stream: value.stream.map(Into::into),
1782        }
1783    }
1784}
1785
1786impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1787    fn from(value: api::access::PermittedOperationGroups) -> Self {
1788        Self {
1789            account: value.account.map(Into::into),
1790            basin: value.basin.map(Into::into),
1791            stream: value.stream.map(Into::into),
1792        }
1793    }
1794}
1795
1796#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1797/// Individual operation that can be permitted.
1798///
1799/// See [`AccessTokenScope::ops`].
1800pub enum Operation {
1801    /// List basins.
1802    ListBasins,
1803    /// Create a basin.
1804    CreateBasin,
1805    /// Get basin configuration.
1806    GetBasinConfig,
1807    /// Delete a basin.
1808    DeleteBasin,
1809    /// Reconfigure a basin.
1810    ReconfigureBasin,
1811    /// List access tokens.
1812    ListAccessTokens,
1813    /// Issue an access token.
1814    IssueAccessToken,
1815    /// Revoke an access token.
1816    RevokeAccessToken,
1817    /// Get account metrics.
1818    GetAccountMetrics,
1819    /// Get basin metrics.
1820    GetBasinMetrics,
1821    /// Get stream metrics.
1822    GetStreamMetrics,
1823    /// List streams.
1824    ListStreams,
1825    /// Create a stream.
1826    CreateStream,
1827    /// Get stream configuration.
1828    GetStreamConfig,
1829    /// Delete a stream.
1830    DeleteStream,
1831    /// Reconfigure a stream.
1832    ReconfigureStream,
1833    /// Check the tail of a stream.
1834    CheckTail,
1835    /// Append records to a stream.
1836    Append,
1837    /// Read records from a stream.
1838    Read,
1839    /// Trim records on a stream.
1840    Trim,
1841    /// Set the fencing token on a stream.
1842    Fence,
1843}
1844
1845impl From<Operation> for api::access::Operation {
1846    fn from(value: Operation) -> Self {
1847        match value {
1848            Operation::ListBasins => api::access::Operation::ListBasins,
1849            Operation::CreateBasin => api::access::Operation::CreateBasin,
1850            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1851            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1852            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1853            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1854            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1855            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1856            Operation::ListStreams => api::access::Operation::ListStreams,
1857            Operation::CreateStream => api::access::Operation::CreateStream,
1858            Operation::DeleteStream => api::access::Operation::DeleteStream,
1859            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1860            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1861            Operation::CheckTail => api::access::Operation::CheckTail,
1862            Operation::Append => api::access::Operation::Append,
1863            Operation::Read => api::access::Operation::Read,
1864            Operation::Trim => api::access::Operation::Trim,
1865            Operation::Fence => api::access::Operation::Fence,
1866            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1867            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1868            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1869        }
1870    }
1871}
1872
1873impl From<api::access::Operation> for Operation {
1874    fn from(value: api::access::Operation) -> Self {
1875        match value {
1876            api::access::Operation::ListBasins => Operation::ListBasins,
1877            api::access::Operation::CreateBasin => Operation::CreateBasin,
1878            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1879            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1880            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1881            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1882            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1883            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1884            api::access::Operation::ListStreams => Operation::ListStreams,
1885            api::access::Operation::CreateStream => Operation::CreateStream,
1886            api::access::Operation::DeleteStream => Operation::DeleteStream,
1887            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1888            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1889            api::access::Operation::CheckTail => Operation::CheckTail,
1890            api::access::Operation::Append => Operation::Append,
1891            api::access::Operation::Read => Operation::Read,
1892            api::access::Operation::Trim => Operation::Trim,
1893            api::access::Operation::Fence => Operation::Fence,
1894            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1895            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1896            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1897        }
1898    }
1899}
1900
1901#[derive(Debug, Clone)]
1902#[non_exhaustive]
1903/// Scope of an access token.
1904///
1905/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1906/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1907/// final set must not be empty.
1908///
1909/// See [`IssueAccessTokenInput::scope`].
1910pub struct AccessTokenScopeInput {
1911    basins: Option<BasinMatcher>,
1912    streams: Option<StreamMatcher>,
1913    access_tokens: Option<AccessTokenMatcher>,
1914    op_group_perms: Option<OperationGroupPermissions>,
1915    ops: HashSet<Operation>,
1916}
1917
1918impl AccessTokenScopeInput {
1919    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1920    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1921        Self {
1922            basins: None,
1923            streams: None,
1924            access_tokens: None,
1925            op_group_perms: None,
1926            ops: ops.into_iter().collect(),
1927        }
1928    }
1929
1930    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1931    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1932        Self {
1933            basins: None,
1934            streams: None,
1935            access_tokens: None,
1936            op_group_perms: Some(op_group_perms),
1937            ops: HashSet::default(),
1938        }
1939    }
1940
1941    /// Set the permitted operations.
1942    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1943        Self {
1944            ops: ops.into_iter().collect(),
1945            ..self
1946        }
1947    }
1948
1949    /// Set the access permissions at the operation group level.
1950    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1951        Self {
1952            op_group_perms: Some(op_group_perms),
1953            ..self
1954        }
1955    }
1956
1957    /// Set the permitted basins.
1958    ///
1959    /// Defaults to no basins.
1960    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1961        Self {
1962            basins: Some(basins),
1963            ..self
1964        }
1965    }
1966
1967    /// Set the permitted streams.
1968    ///
1969    /// Defaults to no streams.
1970    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1971        Self {
1972            streams: Some(streams),
1973            ..self
1974        }
1975    }
1976
1977    /// Set the permitted access tokens.
1978    ///
1979    /// Defaults to no access tokens.
1980    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1981        Self {
1982            access_tokens: Some(access_tokens),
1983            ..self
1984        }
1985    }
1986}
1987
1988#[derive(Debug, Clone)]
1989#[non_exhaustive]
1990/// Scope of an access token.
1991pub struct AccessTokenScope {
1992    /// Permitted basins.
1993    pub basins: Option<BasinMatcher>,
1994    /// Permitted streams.
1995    pub streams: Option<StreamMatcher>,
1996    /// Permitted access tokens.
1997    pub access_tokens: Option<AccessTokenMatcher>,
1998    /// Permissions at the operation group level.
1999    pub op_group_perms: Option<OperationGroupPermissions>,
2000    /// Permitted operations.
2001    pub ops: HashSet<Operation>,
2002}
2003
2004impl From<api::access::AccessTokenScope> for AccessTokenScope {
2005    fn from(value: api::access::AccessTokenScope) -> Self {
2006        Self {
2007            basins: value.basins.map(|rs| match rs {
2008                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2009                    BasinMatcher::Exact(e)
2010                }
2011                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2012                    BasinMatcher::None
2013                }
2014                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
2015            }),
2016            streams: value.streams.map(|rs| match rs {
2017                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2018                    StreamMatcher::Exact(e)
2019                }
2020                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2021                    StreamMatcher::None
2022                }
2023                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
2024            }),
2025            access_tokens: value.access_tokens.map(|rs| match rs {
2026                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2027                    AccessTokenMatcher::Exact(e)
2028                }
2029                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2030                    AccessTokenMatcher::None
2031                }
2032                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
2033            }),
2034            op_group_perms: value.op_groups.map(Into::into),
2035            ops: value
2036                .ops
2037                .map(|ops| ops.into_iter().map(Into::into).collect())
2038                .unwrap_or_default(),
2039        }
2040    }
2041}
2042
2043impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2044    fn from(value: AccessTokenScopeInput) -> Self {
2045        Self {
2046            basins: value.basins.map(|rs| match rs {
2047                BasinMatcher::None => {
2048                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2049                }
2050                BasinMatcher::Exact(e) => {
2051                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2052                }
2053                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2054            }),
2055            streams: value.streams.map(|rs| match rs {
2056                StreamMatcher::None => {
2057                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2058                }
2059                StreamMatcher::Exact(e) => {
2060                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2061                }
2062                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2063            }),
2064            access_tokens: value.access_tokens.map(|rs| match rs {
2065                AccessTokenMatcher::None => {
2066                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2067                }
2068                AccessTokenMatcher::Exact(e) => {
2069                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2070                }
2071                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2072            }),
2073            op_groups: value.op_group_perms.map(Into::into),
2074            ops: if value.ops.is_empty() {
2075                None
2076            } else {
2077                Some(value.ops.into_iter().map(Into::into).collect())
2078            },
2079        }
2080    }
2081}
2082
2083#[derive(Debug, Clone)]
2084#[non_exhaustive]
2085/// Input for [`issue_access_token`](crate::S2::issue_access_token).
2086pub struct IssueAccessTokenInput {
2087    /// Access token ID.
2088    pub id: AccessTokenId,
2089    /// Expiration time.
2090    ///
2091    /// Defaults to the expiration time of requestor's access token passed via
2092    /// [`S2Config`](S2Config::new).
2093    pub expires_at: Option<S2DateTime>,
2094    /// Whether to automatically prefix stream names during creation and strip the prefix during
2095    /// listing.
2096    ///
2097    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
2098    /// prefix.
2099    ///
2100    /// Defaults to `false`.
2101    pub auto_prefix_streams: bool,
2102    /// Scope of the token.
2103    pub scope: AccessTokenScopeInput,
2104}
2105
2106impl IssueAccessTokenInput {
2107    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
2108    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2109        Self {
2110            id,
2111            expires_at: None,
2112            auto_prefix_streams: false,
2113            scope,
2114        }
2115    }
2116
2117    /// Set the expiration time.
2118    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2119        Self {
2120            expires_at: Some(expires_at),
2121            ..self
2122        }
2123    }
2124
2125    /// Set whether to automatically prefix stream names during creation and strip the prefix during
2126    /// listing.
2127    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2128        Self {
2129            auto_prefix_streams,
2130            ..self
2131        }
2132    }
2133}
2134
2135impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2136    fn from(value: IssueAccessTokenInput) -> Self {
2137        Self {
2138            id: value.id,
2139            expires_at: value.expires_at.map(Into::into),
2140            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2141            scope: value.scope.into(),
2142        }
2143    }
2144}
2145
2146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2147/// Interval to accumulate over for timeseries metric sets.
2148pub enum TimeseriesInterval {
2149    /// Minute.
2150    Minute,
2151    /// Hour.
2152    Hour,
2153    /// Day.
2154    Day,
2155}
2156
2157impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2158    fn from(value: TimeseriesInterval) -> Self {
2159        match value {
2160            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2161            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2162            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2163        }
2164    }
2165}
2166
2167impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2168    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2169        match value {
2170            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2171            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2172            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2173        }
2174    }
2175}
2176
2177#[derive(Debug, Clone, Copy)]
2178#[non_exhaustive]
2179/// Time range as Unix epoch seconds.
2180pub struct TimeRange {
2181    /// Start timestamp (inclusive).
2182    pub start: u32,
2183    /// End timestamp (exclusive).
2184    pub end: u32,
2185}
2186
2187impl TimeRange {
2188    /// Create a new [`TimeRange`] with the given start and end timestamps.
2189    pub fn new(start: u32, end: u32) -> Self {
2190        Self { start, end }
2191    }
2192}
2193
2194#[derive(Debug, Clone, Copy)]
2195#[non_exhaustive]
2196/// Time range as Unix epoch seconds and accumulation interval.
2197pub struct TimeRangeAndInterval {
2198    /// Start timestamp (inclusive).
2199    pub start: u32,
2200    /// End timestamp (exclusive).
2201    pub end: u32,
2202    /// Interval to accumulate over for timeseries metric sets.
2203    ///
2204    /// Default is dependent on the requested metric set.
2205    pub interval: Option<TimeseriesInterval>,
2206}
2207
2208impl TimeRangeAndInterval {
2209    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2210    pub fn new(start: u32, end: u32) -> Self {
2211        Self {
2212            start,
2213            end,
2214            interval: None,
2215        }
2216    }
2217
2218    /// Set the interval to accumulate over for timeseries metric sets.
2219    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2220        Self {
2221            interval: Some(interval),
2222            ..self
2223        }
2224    }
2225}
2226
2227#[derive(Debug, Clone, Copy)]
2228/// Account metric set to return.
2229pub enum AccountMetricSet {
2230    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2231    /// specified time range.
2232    ActiveBasins(TimeRange),
2233    /// Returns [`AccumulationMetric`]s, one per account operation type.
2234    ///
2235    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2236    /// per interval over the requested time range.
2237    ///
2238    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2239    AccountOps(TimeRangeAndInterval),
2240}
2241
2242#[derive(Debug, Clone)]
2243#[non_exhaustive]
2244/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2245pub struct GetAccountMetricsInput {
2246    /// Metric set to return.
2247    pub set: AccountMetricSet,
2248}
2249
2250impl GetAccountMetricsInput {
2251    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2252    pub fn new(set: AccountMetricSet) -> Self {
2253        Self { set }
2254    }
2255}
2256
2257impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2258    fn from(value: GetAccountMetricsInput) -> Self {
2259        let (set, start, end, interval) = match value.set {
2260            AccountMetricSet::ActiveBasins(args) => (
2261                api::metrics::AccountMetricSet::ActiveBasins,
2262                args.start,
2263                args.end,
2264                None,
2265            ),
2266            AccountMetricSet::AccountOps(args) => (
2267                api::metrics::AccountMetricSet::AccountOps,
2268                args.start,
2269                args.end,
2270                args.interval,
2271            ),
2272        };
2273        Self {
2274            set,
2275            start: Some(start),
2276            end: Some(end),
2277            interval: interval.map(Into::into),
2278        }
2279    }
2280}
2281
2282#[derive(Debug, Clone, Copy)]
2283/// Basin metric set to return.
2284pub enum BasinMetricSet {
2285    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2286    /// in the basin, with one observed value for each hour over the requested time range.
2287    Storage(TimeRange),
2288    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2289    ///
2290    /// Each metric represents a timeseries of the number of append operations across all streams
2291    /// in the basin, with one accumulated value per interval over the requested time range.
2292    ///
2293    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2294    /// [`minute`](TimeseriesInterval::Minute).
2295    AppendOps(TimeRangeAndInterval),
2296    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2297    ///
2298    /// Each metric represents a timeseries of the number of read operations across all streams
2299    /// in the basin, with one accumulated value per interval over the requested time range.
2300    ///
2301    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2302    /// [`minute`](TimeseriesInterval::Minute).
2303    ReadOps(TimeRangeAndInterval),
2304    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2305    /// across all streams in the basin, with one accumulated value per interval
2306    /// over the requested time range.
2307    ///
2308    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2309    /// [`minute`](TimeseriesInterval::Minute).
2310    ReadThroughput(TimeRangeAndInterval),
2311    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2312    /// across all streams in the basin, with one accumulated value per interval
2313    /// over the requested time range.
2314    ///
2315    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2316    /// [`minute`](TimeseriesInterval::Minute).
2317    AppendThroughput(TimeRangeAndInterval),
2318    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2319    ///
2320    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2321    /// per interval over the requested time range.
2322    ///
2323    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2324    BasinOps(TimeRangeAndInterval),
2325}
2326
2327#[derive(Debug, Clone)]
2328#[non_exhaustive]
2329/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2330pub struct GetBasinMetricsInput {
2331    /// Basin name.
2332    pub name: BasinName,
2333    /// Metric set to return.
2334    pub set: BasinMetricSet,
2335}
2336
2337impl GetBasinMetricsInput {
2338    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2339    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2340        Self { name, set }
2341    }
2342}
2343
2344impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2345    fn from(value: GetBasinMetricsInput) -> Self {
2346        let (set, start, end, interval) = match value.set {
2347            BasinMetricSet::Storage(args) => (
2348                api::metrics::BasinMetricSet::Storage,
2349                args.start,
2350                args.end,
2351                None,
2352            ),
2353            BasinMetricSet::AppendOps(args) => (
2354                api::metrics::BasinMetricSet::AppendOps,
2355                args.start,
2356                args.end,
2357                args.interval,
2358            ),
2359            BasinMetricSet::ReadOps(args) => (
2360                api::metrics::BasinMetricSet::ReadOps,
2361                args.start,
2362                args.end,
2363                args.interval,
2364            ),
2365            BasinMetricSet::ReadThroughput(args) => (
2366                api::metrics::BasinMetricSet::ReadThroughput,
2367                args.start,
2368                args.end,
2369                args.interval,
2370            ),
2371            BasinMetricSet::AppendThroughput(args) => (
2372                api::metrics::BasinMetricSet::AppendThroughput,
2373                args.start,
2374                args.end,
2375                args.interval,
2376            ),
2377            BasinMetricSet::BasinOps(args) => (
2378                api::metrics::BasinMetricSet::BasinOps,
2379                args.start,
2380                args.end,
2381                args.interval,
2382            ),
2383        };
2384        (
2385            value.name,
2386            api::metrics::BasinMetricSetRequest {
2387                set,
2388                start: Some(start),
2389                end: Some(end),
2390                interval: interval.map(Into::into),
2391            },
2392        )
2393    }
2394}
2395
2396#[derive(Debug, Clone, Copy)]
2397/// Stream metric set to return.
2398pub enum StreamMetricSet {
2399    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2400    /// with one observed value for each minute over the requested time range.
2401    Storage(TimeRange),
2402}
2403
2404#[derive(Debug, Clone)]
2405#[non_exhaustive]
2406/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2407pub struct GetStreamMetricsInput {
2408    /// Basin name.
2409    pub basin_name: BasinName,
2410    /// Stream name.
2411    pub stream_name: StreamName,
2412    /// Metric set to return.
2413    pub set: StreamMetricSet,
2414}
2415
2416impl GetStreamMetricsInput {
2417    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2418    /// set.
2419    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2420        Self {
2421            basin_name,
2422            stream_name,
2423            set,
2424        }
2425    }
2426}
2427
2428impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2429    fn from(value: GetStreamMetricsInput) -> Self {
2430        let (set, start, end, interval) = match value.set {
2431            StreamMetricSet::Storage(args) => (
2432                api::metrics::StreamMetricSet::Storage,
2433                args.start,
2434                args.end,
2435                None,
2436            ),
2437        };
2438        (
2439            value.basin_name,
2440            value.stream_name,
2441            api::metrics::StreamMetricSetRequest {
2442                set,
2443                start: Some(start),
2444                end: Some(end),
2445                interval,
2446            },
2447        )
2448    }
2449}
2450
2451#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2452/// Unit in which metric values are measured.
2453pub enum MetricUnit {
2454    /// Size in bytes.
2455    Bytes,
2456    /// Number of operations.
2457    Operations,
2458}
2459
2460impl From<api::metrics::MetricUnit> for MetricUnit {
2461    fn from(value: api::metrics::MetricUnit) -> Self {
2462        match value {
2463            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2464            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2465        }
2466    }
2467}
2468
2469#[derive(Debug, Clone)]
2470#[non_exhaustive]
2471/// Single named value.
2472pub struct ScalarMetric {
2473    /// Metric name.
2474    pub name: String,
2475    /// Unit for the metric value.
2476    pub unit: MetricUnit,
2477    /// Metric value.
2478    pub value: f64,
2479}
2480
2481#[derive(Debug, Clone)]
2482#[non_exhaustive]
2483/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2484/// specified interval.
2485pub struct AccumulationMetric {
2486    /// Timeseries name.
2487    pub name: String,
2488    /// Unit for the accumulated values.
2489    pub unit: MetricUnit,
2490    /// The interval at which datapoints are accumulated.
2491    pub interval: TimeseriesInterval,
2492    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2493    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2494    /// one `interval`.
2495    pub values: Vec<(u32, f64)>,
2496}
2497
2498#[derive(Debug, Clone)]
2499#[non_exhaustive]
2500/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2501pub struct GaugeMetric {
2502    /// Timeseries name.
2503    pub name: String,
2504    /// Unit for the instantaneous values.
2505    pub unit: MetricUnit,
2506    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2507    /// instant of the `timestamp` (in Unix epoch seconds).
2508    pub values: Vec<(u32, f64)>,
2509}
2510
2511#[derive(Debug, Clone)]
2512#[non_exhaustive]
2513/// Set of string labels.
2514pub struct LabelMetric {
2515    /// Label name.
2516    pub name: String,
2517    /// Label values.
2518    pub values: Vec<String>,
2519}
2520
2521#[derive(Debug, Clone)]
2522/// Individual metric in a returned metric set.
2523pub enum Metric {
2524    /// Single named value.
2525    Scalar(ScalarMetric),
2526    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2527    /// specified interval.
2528    Accumulation(AccumulationMetric),
2529    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2530    Gauge(GaugeMetric),
2531    /// Set of string labels.
2532    Label(LabelMetric),
2533}
2534
2535impl From<api::metrics::Metric> for Metric {
2536    fn from(value: api::metrics::Metric) -> Self {
2537        match value {
2538            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2539                name: sm.name.into(),
2540                unit: sm.unit.into(),
2541                value: sm.value,
2542            }),
2543            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2544                name: am.name.into(),
2545                unit: am.unit.into(),
2546                interval: am.interval.into(),
2547                values: am.values,
2548            }),
2549            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2550                name: gm.name.into(),
2551                unit: gm.unit.into(),
2552                values: gm.values,
2553            }),
2554            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2555                name: lm.name.into(),
2556                values: lm.values,
2557            }),
2558        }
2559    }
2560}
2561
2562#[derive(Debug, Clone, Default)]
2563#[non_exhaustive]
2564/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2565pub struct ListStreamsInput {
2566    /// Filter streams whose names begin with this value.
2567    ///
2568    /// Defaults to `""`.
2569    pub prefix: StreamNamePrefix,
2570    /// Filter streams whose names are lexicographically greater than this value.
2571    ///
2572    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2573    ///
2574    /// Defaults to `""`.
2575    pub start_after: StreamNameStartAfter,
2576    /// Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2577    ///
2578    /// Defaults to `1000`.
2579    pub limit: Option<usize>,
2580}
2581
2582impl ListStreamsInput {
2583    /// Create a new [`ListStreamsInput`] with default values.
2584    pub fn new() -> Self {
2585        Self::default()
2586    }
2587
2588    /// Set the prefix used to filter streams whose names begin with this value.
2589    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2590        Self { prefix, ..self }
2591    }
2592
2593    /// Set the value used to filter streams whose names are lexicographically greater than this
2594    /// value.
2595    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2596        Self {
2597            start_after,
2598            ..self
2599        }
2600    }
2601
2602    /// Set the limit on number of streams to return in a page.
2603    pub fn with_limit(self, limit: usize) -> Self {
2604        Self {
2605            limit: Some(limit),
2606            ..self
2607        }
2608    }
2609}
2610
2611impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2612    fn from(value: ListStreamsInput) -> Self {
2613        Self {
2614            prefix: Some(value.prefix),
2615            start_after: Some(value.start_after),
2616            limit: value.limit,
2617        }
2618    }
2619}
2620
2621#[derive(Debug, Clone, Default)]
2622/// Input for [`list_all_streams`](crate::S2Basin::list_all_streams) operation.
2623pub struct ListAllStreamsInput {
2624    /// Filter streams whose names begin with this value.
2625    ///
2626    /// Defaults to `""`.
2627    pub prefix: StreamNamePrefix,
2628    /// Filter streams whose names are lexicographically greater than this value.
2629    ///
2630    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2631    ///
2632    /// Defaults to `""`.
2633    pub start_after: StreamNameStartAfter,
2634    /// Whether to include streams that are being deleted.
2635    ///
2636    /// Defaults to `false`.
2637    pub include_deleted: bool,
2638}
2639
2640impl ListAllStreamsInput {
2641    /// Create a new [`ListAllStreamsInput`] with default values.
2642    pub fn new() -> Self {
2643        Self::default()
2644    }
2645
2646    /// Set the prefix used to filter streams whose names begin with this value.
2647    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2648        Self { prefix, ..self }
2649    }
2650
2651    /// Set the value used to filter streams whose names are lexicographically greater than this
2652    /// value.
2653    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2654        Self {
2655            start_after,
2656            ..self
2657        }
2658    }
2659
2660    /// Set whether to include streams that are being deleted.
2661    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2662        Self {
2663            include_deleted,
2664            ..self
2665        }
2666    }
2667}
2668
2669#[derive(Debug, Clone, PartialEq, Eq)]
2670#[non_exhaustive]
2671/// Stream information.
2672pub struct StreamInfo {
2673    /// Stream name.
2674    pub name: StreamName,
2675    /// Creation time.
2676    pub created_at: S2DateTime,
2677    /// Deletion time if the stream is being deleted.
2678    pub deleted_at: Option<S2DateTime>,
2679    /// Encryption algorithm for this stream, if encryption is enabled.
2680    pub cipher: Option<EncryptionAlgorithm>,
2681}
2682
2683impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2684    type Error = ValidationError;
2685
2686    fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2687        Ok(Self {
2688            name: value.name,
2689            created_at: value.created_at.try_into()?,
2690            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2691            cipher: value.cipher.map(Into::into),
2692        })
2693    }
2694}
2695
2696#[derive(Debug, Clone)]
2697#[non_exhaustive]
2698/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2699pub struct CreateStreamInput {
2700    /// Stream name.
2701    pub name: StreamName,
2702    /// Configuration for the stream.
2703    ///
2704    /// See [`StreamConfig`] for defaults.
2705    pub config: Option<StreamConfig>,
2706    idempotency_token: String,
2707}
2708
2709impl CreateStreamInput {
2710    /// Create a new [`CreateStreamInput`] with the given stream name.
2711    pub fn new(name: StreamName) -> Self {
2712        Self {
2713            name,
2714            config: None,
2715            idempotency_token: idempotency_token(),
2716        }
2717    }
2718
2719    /// Set the configuration for the stream.
2720    pub fn with_config(self, config: StreamConfig) -> Self {
2721        Self {
2722            config: Some(config),
2723            ..self
2724        }
2725    }
2726}
2727
2728impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2729    fn from(value: CreateStreamInput) -> Self {
2730        (
2731            api::stream::CreateStreamRequest {
2732                stream: value.name,
2733                config: value.config.map(Into::into),
2734            },
2735            value.idempotency_token,
2736        )
2737    }
2738}
2739
2740#[derive(Debug, Clone)]
2741#[non_exhaustive]
2742/// Input for [`ensure_stream`](crate::S2Basin::ensure_stream)
2743/// operation.
2744pub struct EnsureStreamInput {
2745    /// Stream name.
2746    pub name: StreamName,
2747    /// Configuration for the stream.
2748    ///
2749    /// See [`StreamConfig`] for defaults.
2750    pub config: Option<StreamConfig>,
2751}
2752
2753impl EnsureStreamInput {
2754    /// Create a new [`EnsureStreamInput`] with the given stream name.
2755    pub fn new(name: StreamName) -> Self {
2756        Self { name, config: None }
2757    }
2758
2759    /// Set the configuration for the stream.
2760    pub fn with_config(self, config: StreamConfig) -> Self {
2761        Self {
2762            config: Some(config),
2763            ..self
2764        }
2765    }
2766}
2767
2768impl From<EnsureStreamInput> for (StreamName, Option<api::config::StreamConfig>) {
2769    fn from(value: EnsureStreamInput) -> Self {
2770        (value.name, value.config.map(Into::into))
2771    }
2772}
2773
2774#[derive(Debug, Clone)]
2775#[non_exhaustive]
2776/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2777pub struct DeleteStreamInput {
2778    /// Stream name.
2779    pub name: StreamName,
2780    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2781    pub ignore_not_found: bool,
2782}
2783
2784impl DeleteStreamInput {
2785    /// Create a new [`DeleteStreamInput`] with the given stream name.
2786    pub fn new(name: StreamName) -> Self {
2787        Self {
2788            name,
2789            ignore_not_found: false,
2790        }
2791    }
2792
2793    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2794    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2795        Self {
2796            ignore_not_found,
2797            ..self
2798        }
2799    }
2800}
2801
2802#[derive(Debug, Clone)]
2803#[non_exhaustive]
2804/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2805pub struct ReconfigureStreamInput {
2806    /// Stream name.
2807    pub name: StreamName,
2808    /// Reconfiguration for [`StreamConfig`].
2809    pub config: StreamReconfiguration,
2810}
2811
2812impl ReconfigureStreamInput {
2813    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2814    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2815        Self { name, config }
2816    }
2817}
2818
2819#[derive(Debug, Clone, PartialEq, Eq)]
2820/// Token for fencing appends to a stream.
2821///
2822/// **Note:** It must not exceed 36 bytes in length.
2823///
2824/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2825pub struct FencingToken(String);
2826
2827impl FencingToken {
2828    /// Generate a random alphanumeric fencing token of `n` bytes.
2829    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2830        rand::rng()
2831            .sample_iter(&rand::distr::Alphanumeric)
2832            .take(n)
2833            .map(char::from)
2834            .collect::<String>()
2835            .parse()
2836    }
2837}
2838
2839impl FromStr for FencingToken {
2840    type Err = ValidationError;
2841
2842    fn from_str(s: &str) -> Result<Self, Self::Err> {
2843        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2844            return Err(ValidationError(format!(
2845                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2846            )));
2847        }
2848        Ok(FencingToken(s.to_string()))
2849    }
2850}
2851
2852impl std::fmt::Display for FencingToken {
2853    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2854        write!(f, "{}", self.0)
2855    }
2856}
2857
2858impl Deref for FencingToken {
2859    type Target = str;
2860
2861    fn deref(&self) -> &Self::Target {
2862        &self.0
2863    }
2864}
2865
2866#[derive(Debug, Clone, Copy, PartialEq)]
2867#[non_exhaustive]
2868/// A position in a stream.
2869pub struct StreamPosition {
2870    /// Sequence number assigned by the service.
2871    pub seq_num: u64,
2872    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2873    /// User-specified timestamps are passed through as-is.
2874    pub timestamp: u64,
2875}
2876
2877impl std::fmt::Display for StreamPosition {
2878    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2879        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2880    }
2881}
2882
2883impl From<api::stream::proto::StreamPosition> for StreamPosition {
2884    fn from(value: api::stream::proto::StreamPosition) -> Self {
2885        Self {
2886            seq_num: value.seq_num,
2887            timestamp: value.timestamp,
2888        }
2889    }
2890}
2891
2892impl From<api::stream::StreamPosition> for StreamPosition {
2893    fn from(value: api::stream::StreamPosition) -> Self {
2894        Self {
2895            seq_num: value.seq_num,
2896            timestamp: value.timestamp,
2897        }
2898    }
2899}
2900
2901#[derive(Debug, Clone, PartialEq)]
2902#[non_exhaustive]
2903/// A name-value pair.
2904pub struct Header {
2905    /// Name.
2906    pub name: Bytes,
2907    /// Value.
2908    pub value: Bytes,
2909}
2910
2911impl Header {
2912    /// Create a new [`Header`] with the given name and value.
2913    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2914        Self {
2915            name: name.into(),
2916            value: value.into(),
2917        }
2918    }
2919}
2920
2921impl From<Header> for api::stream::proto::Header {
2922    fn from(value: Header) -> Self {
2923        Self {
2924            name: value.name,
2925            value: value.value,
2926        }
2927    }
2928}
2929
2930impl From<api::stream::proto::Header> for Header {
2931    fn from(value: api::stream::proto::Header) -> Self {
2932        Self {
2933            name: value.name,
2934            value: value.value,
2935        }
2936    }
2937}
2938
2939#[derive(Debug, Clone, PartialEq)]
2940/// A record to append.
2941pub struct AppendRecord {
2942    body: Bytes,
2943    headers: Vec<Header>,
2944    timestamp: Option<u64>,
2945}
2946
2947impl AppendRecord {
2948    fn validate(self) -> Result<Self, ValidationError> {
2949        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2950            Err(ValidationError(format!(
2951                "metered_bytes: {} exceeds {}",
2952                self.metered_bytes(),
2953                RECORD_BATCH_MAX.bytes
2954            )))
2955        } else {
2956            Ok(self)
2957        }
2958    }
2959
2960    /// Create a new [`AppendRecord`] with the given record body.
2961    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2962        let record = Self {
2963            body: body.into(),
2964            headers: Vec::default(),
2965            timestamp: None,
2966        };
2967        record.validate()
2968    }
2969
2970    /// Set the headers for this record.
2971    pub fn with_headers(
2972        self,
2973        headers: impl IntoIterator<Item = Header>,
2974    ) -> Result<Self, ValidationError> {
2975        let record = Self {
2976            headers: headers.into_iter().collect(),
2977            ..self
2978        };
2979        record.validate()
2980    }
2981
2982    /// Set the timestamp for this record.
2983    ///
2984    /// Precise semantics depend on [`StreamConfig::timestamping`].
2985    pub fn with_timestamp(self, timestamp: u64) -> Self {
2986        Self {
2987            timestamp: Some(timestamp),
2988            ..self
2989        }
2990    }
2991
2992    /// Get the body of this record.
2993    pub fn body(&self) -> &[u8] {
2994        &self.body
2995    }
2996
2997    /// Get the headers of this record.
2998    pub fn headers(&self) -> &[Header] {
2999        &self.headers
3000    }
3001
3002    /// Get the timestamp of this record.
3003    pub fn timestamp(&self) -> Option<u64> {
3004        self.timestamp
3005    }
3006}
3007
3008impl From<AppendRecord> for api::stream::proto::AppendRecord {
3009    fn from(value: AppendRecord) -> Self {
3010        Self {
3011            timestamp: value.timestamp,
3012            headers: value.headers.into_iter().map(Into::into).collect(),
3013            body: value.body,
3014        }
3015    }
3016}
3017
3018/// Metered byte size calculation.
3019///
3020/// Formula for a record:
3021/// ```text
3022/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
3023/// ```
3024pub trait MeteredBytes {
3025    /// Returns the metered byte size.
3026    fn metered_bytes(&self) -> usize;
3027}
3028
3029macro_rules! metered_bytes_impl {
3030    ($ty:ty) => {
3031        impl MeteredBytes for $ty {
3032            fn metered_bytes(&self) -> usize {
3033                8 + (2 * self.headers.len())
3034                    + self
3035                        .headers
3036                        .iter()
3037                        .map(|h| h.name.len() + h.value.len())
3038                        .sum::<usize>()
3039                    + self.body.len()
3040            }
3041        }
3042    };
3043}
3044
3045metered_bytes_impl!(AppendRecord);
3046
3047#[derive(Debug, Clone)]
3048/// A batch of records to append atomically.
3049///
3050/// **Note:** It must contain at least `1` record and no more than `1000`.
3051/// The total size of the batch must not exceed `1MiB` in metered bytes.
3052///
3053/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
3054/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
3055/// that takes care of the abovementioned constraints.
3056pub struct AppendRecordBatch {
3057    records: Vec<AppendRecord>,
3058    metered_bytes: usize,
3059}
3060
3061impl AppendRecordBatch {
3062    pub(crate) fn with_capacity(capacity: usize) -> Self {
3063        Self {
3064            records: Vec::with_capacity(capacity),
3065            metered_bytes: 0,
3066        }
3067    }
3068
3069    pub(crate) fn push(&mut self, record: AppendRecord) {
3070        self.metered_bytes += record.metered_bytes();
3071        self.records.push(record);
3072    }
3073
3074    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
3075    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3076    where
3077        I: IntoIterator<Item = AppendRecord>,
3078    {
3079        let mut records = Vec::new();
3080        let mut metered_bytes = 0;
3081
3082        for record in iter {
3083            metered_bytes += record.metered_bytes();
3084            records.push(record);
3085
3086            if metered_bytes > RECORD_BATCH_MAX.bytes {
3087                return Err(ValidationError(format!(
3088                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
3089                    RECORD_BATCH_MAX.bytes
3090                )));
3091            }
3092
3093            if records.len() > RECORD_BATCH_MAX.count {
3094                return Err(ValidationError(format!(
3095                    "number of records in the batch exceeds {}",
3096                    RECORD_BATCH_MAX.count
3097                )));
3098            }
3099        }
3100
3101        if records.is_empty() {
3102            return Err(ValidationError("batch is empty".into()));
3103        }
3104
3105        Ok(Self {
3106            records,
3107            metered_bytes,
3108        })
3109    }
3110}
3111
3112impl Deref for AppendRecordBatch {
3113    type Target = [AppendRecord];
3114
3115    fn deref(&self) -> &Self::Target {
3116        &self.records
3117    }
3118}
3119
3120impl MeteredBytes for AppendRecordBatch {
3121    fn metered_bytes(&self) -> usize {
3122        self.metered_bytes
3123    }
3124}
3125
3126#[derive(Debug, Clone)]
3127/// Command to signal an operation.
3128pub enum Command {
3129    /// Fence operation.
3130    Fence {
3131        /// Fencing token.
3132        fencing_token: FencingToken,
3133    },
3134    /// Trim operation.
3135    Trim {
3136        /// Trim point.
3137        trim_point: u64,
3138    },
3139}
3140
3141#[derive(Debug, Clone)]
3142#[non_exhaustive]
3143/// Command record for signaling operations to the service.
3144///
3145/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
3146pub struct CommandRecord {
3147    /// Command to signal an operation.
3148    pub command: Command,
3149    /// Timestamp for this record.
3150    pub timestamp: Option<u64>,
3151}
3152
3153impl CommandRecord {
3154    const FENCE: &[u8] = b"fence";
3155    const TRIM: &[u8] = b"trim";
3156
3157    /// Create a fence command record with the given fencing token.
3158    ///
3159    /// Fencing is strongly consistent, and subsequent appends that specify a
3160    /// fencing token will fail if it does not match.
3161    pub fn fence(fencing_token: FencingToken) -> Self {
3162        Self {
3163            command: Command::Fence { fencing_token },
3164            timestamp: None,
3165        }
3166    }
3167
3168    /// Create a trim command record with the given trim point.
3169    ///
3170    /// Trim point is the desired earliest sequence number for the stream.
3171    ///
3172    /// Trimming is eventually consistent, and trimmed records may be visible
3173    /// for a brief period.
3174    pub fn trim(trim_point: u64) -> Self {
3175        Self {
3176            command: Command::Trim { trim_point },
3177            timestamp: None,
3178        }
3179    }
3180
3181    /// Set the timestamp for this record.
3182    pub fn with_timestamp(self, timestamp: u64) -> Self {
3183        Self {
3184            timestamp: Some(timestamp),
3185            ..self
3186        }
3187    }
3188}
3189
3190impl From<CommandRecord> for AppendRecord {
3191    fn from(value: CommandRecord) -> Self {
3192        let (header_value, body) = match value.command {
3193            Command::Fence { fencing_token } => (
3194                CommandRecord::FENCE,
3195                Bytes::copy_from_slice(fencing_token.as_bytes()),
3196            ),
3197            Command::Trim { trim_point } => (
3198                CommandRecord::TRIM,
3199                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3200            ),
3201        };
3202        Self {
3203            body,
3204            headers: vec![Header::new("", header_value)],
3205            timestamp: value.timestamp,
3206        }
3207    }
3208}
3209
3210#[derive(Debug, Clone)]
3211#[non_exhaustive]
3212/// Input for [`append`](crate::S2Stream::append) operation and
3213/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3214pub struct AppendInput {
3215    /// Batch of records to append atomically.
3216    pub records: AppendRecordBatch,
3217    /// Expected sequence number for the first record in the batch.
3218    ///
3219    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3220    pub match_seq_num: Option<u64>,
3221    /// Fencing token to match against the stream's current fencing token.
3222    ///
3223    /// If unspecified, no matching is performed. If specified and mismatched,
3224    /// the append fails. A stream defaults to `""` as its fencing token.
3225    pub fencing_token: Option<FencingToken>,
3226}
3227
3228impl AppendInput {
3229    /// Create a new [`AppendInput`] with the given batch of records.
3230    pub fn new(records: AppendRecordBatch) -> Self {
3231        Self {
3232            records,
3233            match_seq_num: None,
3234            fencing_token: None,
3235        }
3236    }
3237
3238    /// Set the expected sequence number for the first record in the batch.
3239    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3240        Self {
3241            match_seq_num: Some(match_seq_num),
3242            ..self
3243        }
3244    }
3245
3246    /// Set the fencing token to match against the stream's current fencing token.
3247    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3248        Self {
3249            fencing_token: Some(fencing_token),
3250            ..self
3251        }
3252    }
3253}
3254
3255impl From<AppendInput> for api::stream::proto::AppendInput {
3256    fn from(value: AppendInput) -> Self {
3257        Self {
3258            records: value.records.iter().cloned().map(Into::into).collect(),
3259            match_seq_num: value.match_seq_num,
3260            fencing_token: value.fencing_token.map(|t| t.to_string()),
3261        }
3262    }
3263}
3264
3265#[derive(Debug, Clone, PartialEq)]
3266#[non_exhaustive]
3267/// Acknowledgement for an [`AppendInput`].
3268pub struct AppendAck {
3269    /// Sequence number and timestamp of the first record that was appended.
3270    pub start: StreamPosition,
3271    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3272    /// that was appended.
3273    ///
3274    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3275    /// appended.
3276    pub end: StreamPosition,
3277    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3278    /// the last record on the stream.
3279    ///
3280    /// This can be greater than the `end` position in case of concurrent appends.
3281    pub tail: StreamPosition,
3282}
3283
3284impl From<api::stream::proto::AppendAck> for AppendAck {
3285    fn from(value: api::stream::proto::AppendAck) -> Self {
3286        Self {
3287            start: value.start.unwrap_or_default().into(),
3288            end: value.end.unwrap_or_default().into(),
3289            tail: value.tail.unwrap_or_default().into(),
3290        }
3291    }
3292}
3293
3294#[derive(Debug, Clone, Copy)]
3295/// Starting position for reading from a stream.
3296pub enum ReadFrom {
3297    /// Read from this sequence number.
3298    SeqNum(u64),
3299    /// Read from this timestamp.
3300    Timestamp(u64),
3301    /// Read from N records before the tail.
3302    TailOffset(u64),
3303}
3304
3305impl Default for ReadFrom {
3306    fn default() -> Self {
3307        Self::SeqNum(0)
3308    }
3309}
3310
3311#[derive(Debug, Default, Clone)]
3312#[non_exhaustive]
3313/// Where to start reading.
3314pub struct ReadStart {
3315    /// Starting position.
3316    ///
3317    /// Defaults to reading from sequence number `0`.
3318    pub from: ReadFrom,
3319    /// Whether to start from tail if the requested starting position is beyond it.
3320    ///
3321    /// Defaults to `false` (errors if position is beyond tail).
3322    pub clamp_to_tail: bool,
3323}
3324
3325impl ReadStart {
3326    /// Create a new [`ReadStart`] with default values.
3327    pub fn new() -> Self {
3328        Self::default()
3329    }
3330
3331    /// Set the starting position.
3332    pub fn with_from(self, from: ReadFrom) -> Self {
3333        Self { from, ..self }
3334    }
3335
3336    /// Set whether to start from tail if the requested starting position is beyond it.
3337    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3338        Self {
3339            clamp_to_tail,
3340            ..self
3341        }
3342    }
3343}
3344
3345impl From<ReadStart> for api::stream::ReadStart {
3346    fn from(value: ReadStart) -> Self {
3347        let (seq_num, timestamp, tail_offset) = match value.from {
3348            ReadFrom::SeqNum(n) => (Some(n), None, None),
3349            ReadFrom::Timestamp(t) => (None, Some(t), None),
3350            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3351        };
3352        Self {
3353            seq_num,
3354            timestamp,
3355            tail_offset,
3356            clamp: if value.clamp_to_tail {
3357                Some(true)
3358            } else {
3359                None
3360            },
3361        }
3362    }
3363}
3364
3365#[derive(Debug, Clone, Default)]
3366#[non_exhaustive]
3367/// Limits on how much to read.
3368pub struct ReadLimits {
3369    /// Limit on number of records.
3370    ///
3371    /// Defaults to `1000` for non-streaming read.
3372    pub count: Option<usize>,
3373    /// Limit on total metered bytes of records.
3374    ///
3375    /// Defaults to `1MiB` for non-streaming read.
3376    pub bytes: Option<usize>,
3377}
3378
3379impl ReadLimits {
3380    /// Create a new [`ReadLimits`] with default values.
3381    pub fn new() -> Self {
3382        Self::default()
3383    }
3384
3385    /// Set the limit on number of records.
3386    pub fn with_count(self, count: usize) -> Self {
3387        Self {
3388            count: Some(count),
3389            ..self
3390        }
3391    }
3392
3393    /// Set the limit on total metered bytes of records.
3394    pub fn with_bytes(self, bytes: usize) -> Self {
3395        Self {
3396            bytes: Some(bytes),
3397            ..self
3398        }
3399    }
3400}
3401
3402#[derive(Debug, Clone, Default)]
3403#[non_exhaustive]
3404/// When to stop reading.
3405pub struct ReadStop {
3406    /// Limits on how much to read.
3407    ///
3408    /// See [`ReadLimits`] for defaults.
3409    pub limits: ReadLimits,
3410    /// Timestamp at which to stop (exclusive).
3411    ///
3412    /// Defaults to `None`.
3413    pub until: Option<RangeTo<u64>>,
3414    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3415    /// seconds for [`read`](crate::S2Stream::read).
3416    ///
3417    /// Defaults to:
3418    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3419    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3420    ///   is specified.
3421    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3422    ///   `until` is specified.
3423    pub wait: Option<u32>,
3424}
3425
3426impl ReadStop {
3427    /// Create a new [`ReadStop`] with default values.
3428    pub fn new() -> Self {
3429        Self::default()
3430    }
3431
3432    /// Set the limits on how much to read.
3433    pub fn with_limits(self, limits: ReadLimits) -> Self {
3434        Self { limits, ..self }
3435    }
3436
3437    /// Set the timestamp at which to stop (exclusive).
3438    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3439        Self {
3440            until: Some(until),
3441            ..self
3442        }
3443    }
3444
3445    /// Set the duration in seconds to wait for new records before stopping.
3446    pub fn with_wait(self, wait: u32) -> Self {
3447        Self {
3448            wait: Some(wait),
3449            ..self
3450        }
3451    }
3452}
3453
3454impl From<ReadStop> for api::stream::ReadEnd {
3455    fn from(value: ReadStop) -> Self {
3456        Self {
3457            count: value.limits.count,
3458            bytes: value.limits.bytes,
3459            until: value.until.map(|r| r.end),
3460            wait: value.wait,
3461        }
3462    }
3463}
3464
3465#[derive(Debug, Clone, Default)]
3466#[non_exhaustive]
3467/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3468/// operations.
3469pub struct ReadInput {
3470    /// Where to start reading.
3471    ///
3472    /// See [`ReadStart`] for defaults.
3473    pub start: ReadStart,
3474    /// When to stop reading.
3475    ///
3476    /// See [`ReadStop`] for defaults.
3477    pub stop: ReadStop,
3478    /// Whether to filter out command records from the stream when reading.
3479    ///
3480    /// Defaults to `false`.
3481    pub ignore_command_records: bool,
3482}
3483
3484impl ReadInput {
3485    /// Create a new [`ReadInput`] with default values.
3486    pub fn new() -> Self {
3487        Self::default()
3488    }
3489
3490    /// Set where to start reading.
3491    pub fn with_start(self, start: ReadStart) -> Self {
3492        Self { start, ..self }
3493    }
3494
3495    /// Set when to stop reading.
3496    pub fn with_stop(self, stop: ReadStop) -> Self {
3497        Self { stop, ..self }
3498    }
3499
3500    /// Set whether to filter out command records from the stream when reading.
3501    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3502        Self {
3503            ignore_command_records,
3504            ..self
3505        }
3506    }
3507}
3508
3509#[derive(Debug, Clone)]
3510#[non_exhaustive]
3511/// Record that is durably sequenced on a stream.
3512pub struct SequencedRecord {
3513    /// Sequence number assigned to this record.
3514    pub seq_num: u64,
3515    /// Body of this record.
3516    pub body: Bytes,
3517    /// Headers for this record.
3518    pub headers: Vec<Header>,
3519    /// Timestamp for this record.
3520    pub timestamp: u64,
3521}
3522
3523impl SequencedRecord {
3524    /// Whether this is a command record.
3525    pub fn is_command_record(&self) -> bool {
3526        self.headers.len() == 1 && *self.headers[0].name == *b""
3527    }
3528}
3529
3530impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3531    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3532        Self {
3533            seq_num: value.seq_num,
3534            body: value.body,
3535            headers: value.headers.into_iter().map(Into::into).collect(),
3536            timestamp: value.timestamp,
3537        }
3538    }
3539}
3540
3541metered_bytes_impl!(SequencedRecord);
3542
3543#[derive(Debug, Clone)]
3544#[non_exhaustive]
3545/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3546/// [`read_session`](crate::S2Stream::read_session).
3547pub struct ReadBatch {
3548    /// Records that are durably sequenced on the stream.
3549    ///
3550    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3551    /// - the [`stop condition`](ReadInput::stop) was already met, or
3552    /// - all records in the batch were command records and
3553    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3554    pub records: Vec<SequencedRecord>,
3555    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3556    /// the last record.
3557    ///
3558    /// It will only be present when reading recent records.
3559    pub tail: Option<StreamPosition>,
3560}
3561
3562impl ReadBatch {
3563    pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3564        Self {
3565            records: batch.records.into_iter().map(Into::into).collect(),
3566            tail: batch.tail.map(Into::into),
3567        }
3568    }
3569}
3570
3571/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3572pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3573
3574#[derive(Debug, Clone, thiserror::Error)]
3575/// Why an append condition check failed.
3576pub enum AppendConditionFailed {
3577    #[error("fencing token mismatch, expected: {0}")]
3578    /// Fencing token did not match. Contains the expected fencing token.
3579    FencingTokenMismatch(FencingToken),
3580    #[error("sequence number mismatch, expected: {0}")]
3581    /// Sequence number did not match. Contains the expected sequence number.
3582    SeqNumMismatch(u64),
3583}
3584
3585impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3586    fn from(value: api::stream::AppendConditionFailed) -> Self {
3587        match value {
3588            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3589                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3590            }
3591            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3592                AppendConditionFailed::SeqNumMismatch(seq)
3593            }
3594        }
3595    }
3596}
3597
3598#[derive(Debug, Clone, thiserror::Error)]
3599/// Errors from S2 operations.
3600pub enum S2Error {
3601    #[error("{0}")]
3602    /// Client-side error.
3603    Client(String),
3604    #[error(transparent)]
3605    /// Validation error.
3606    Validation(#[from] ValidationError),
3607    #[error("{0}")]
3608    /// Append condition check failed. Contains the failure reason.
3609    AppendConditionFailed(AppendConditionFailed),
3610    #[error("read from an unwritten position. current tail: {0}")]
3611    /// Read from an unwritten position. Contains the current tail.
3612    ReadUnwritten(StreamPosition),
3613    #[error("{0}")]
3614    /// Other server-side error.
3615    Server(ErrorResponse),
3616}
3617
3618impl From<ApiError> for S2Error {
3619    fn from(err: ApiError) -> Self {
3620        match err {
3621            ApiError::ReadUnwritten(tail_response) => {
3622                Self::ReadUnwritten(tail_response.tail.into())
3623            }
3624            ApiError::AppendConditionFailed(condition_failed) => {
3625                Self::AppendConditionFailed(condition_failed.into())
3626            }
3627            ApiError::Server(_, response) => Self::Server(response.into()),
3628            other => Self::Client(other.to_string()),
3629        }
3630    }
3631}
3632
3633#[derive(Debug, Clone, thiserror::Error)]
3634#[error("{code}: {message}")]
3635#[non_exhaustive]
3636/// Error response from S2 server.
3637pub struct ErrorResponse {
3638    /// Error code.
3639    pub code: String,
3640    /// Error message.
3641    pub message: String,
3642}
3643
3644impl From<ApiErrorResponse> for ErrorResponse {
3645    fn from(response: ApiErrorResponse) -> Self {
3646        Self {
3647            code: response.code,
3648            message: response.message,
3649        }
3650    }
3651}
3652
3653fn idempotency_token() -> String {
3654    uuid::Uuid::new_v4().simple().to_string()
3655}