Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Encryption algorithm.
23pub use s2_common::encryption::EncryptionAlgorithm;
24/// Encryption mode, including plaintext.
25pub use s2_common::encryption::EncryptionMode;
26/// Encryption spec for stream operations.
27pub use s2_common::encryption::EncryptionSpec;
28/// Validation error.
29pub use s2_common::types::ValidationError;
30/// Access token ID.
31///
32/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
33pub use s2_common::types::access::AccessTokenId;
34/// See [`ListAccessTokensInput::prefix`].
35pub use s2_common::types::access::AccessTokenIdPrefix;
36/// See [`ListAccessTokensInput::start_after`].
37pub use s2_common::types::access::AccessTokenIdStartAfter;
38/// Basin name.
39///
40/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
41/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
42pub use s2_common::types::basin::BasinName;
43/// See [`ListBasinsInput::prefix`].
44pub use s2_common::types::basin::BasinNamePrefix;
45/// See [`ListBasinsInput::start_after`].
46pub use s2_common::types::basin::BasinNameStartAfter;
47/// Stream name.
48///
49/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
50pub use s2_common::types::stream::StreamName;
51/// See [`ListStreamsInput::prefix`].
52pub use s2_common::types::stream::StreamNamePrefix;
53/// See [`ListStreamsInput::start_after`].
54pub use s2_common::types::stream::StreamNameStartAfter;
55
56pub(crate) const ONE_MIB: u32 = 1024 * 1024;
57
58use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
59use secrecy::SecretString;
60
61use crate::api::{ApiError, ApiErrorResponse};
62
63/// An RFC 3339 datetime.
64///
65/// It can be created in either of the following ways:
66/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
67/// - Convert from [`time::OffsetDateTime`] using [`TryFrom`]/[`TryInto`].
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub struct S2DateTime(time::OffsetDateTime);
70
71impl TryFrom<time::OffsetDateTime> for S2DateTime {
72    type Error = ValidationError;
73
74    fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
75        dt.format(&time::format_description::well_known::Rfc3339)
76            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
77        Ok(Self(dt))
78    }
79}
80
81impl From<S2DateTime> for time::OffsetDateTime {
82    fn from(dt: S2DateTime) -> Self {
83        dt.0
84    }
85}
86
87impl FromStr for S2DateTime {
88    type Err = ValidationError;
89
90    fn from_str(s: &str) -> Result<Self, Self::Err> {
91        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
92            .map(Self)
93            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
94    }
95}
96
97impl fmt::Display for S2DateTime {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        write!(
100            f,
101            "{}",
102            self.0
103                .format(&time::format_description::well_known::Rfc3339)
104                .expect("RFC3339 formatting should not fail for S2DateTime")
105        )
106    }
107}
108
109/// Authority for connecting to an S2 basin.
110#[derive(Debug, Clone, PartialEq)]
111pub(crate) enum BasinAuthority {
112    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
113    ParentZone(Authority),
114    /// Direct cell authority. Basin is expected to be hosted by this cell.
115    Direct(Authority),
116}
117
118/// Account endpoint.
119#[derive(Debug, Clone)]
120pub struct AccountEndpoint {
121    scheme: Scheme,
122    authority: Authority,
123}
124
125impl AccountEndpoint {
126    /// Create a new [`AccountEndpoint`] with the given endpoint.
127    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
128        endpoint.parse()
129    }
130}
131
132impl FromStr for AccountEndpoint {
133    type Err = ValidationError;
134
135    fn from_str(s: &str) -> Result<Self, Self::Err> {
136        let (scheme, authority) = match s.find("://") {
137            Some(idx) => {
138                let scheme: Scheme = s[..idx]
139                    .parse()
140                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
141                (scheme, &s[idx + 3..])
142            }
143            None => (Scheme::HTTPS, s),
144        };
145        Ok(Self {
146            scheme,
147            authority: authority
148                .parse()
149                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
150        })
151    }
152}
153
154/// Basin endpoint.
155#[derive(Debug, Clone)]
156pub struct BasinEndpoint {
157    scheme: Scheme,
158    authority: BasinAuthority,
159}
160
161impl BasinEndpoint {
162    /// Create a new [`BasinEndpoint`] with the given endpoint.
163    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
164        endpoint.parse()
165    }
166}
167
168impl FromStr for BasinEndpoint {
169    type Err = ValidationError;
170
171    fn from_str(s: &str) -> Result<Self, Self::Err> {
172        let (scheme, authority) = match s.find("://") {
173            Some(idx) => {
174                let scheme: Scheme = s[..idx]
175                    .parse()
176                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
177                (scheme, &s[idx + 3..])
178            }
179            None => (Scheme::HTTPS, s),
180        };
181        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
182            BasinAuthority::ParentZone(
183                authority
184                    .parse()
185                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
186            )
187        } else {
188            BasinAuthority::Direct(
189                authority
190                    .parse()
191                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
192            )
193        };
194        Ok(Self { scheme, authority })
195    }
196}
197
198#[derive(Debug, Clone)]
199#[non_exhaustive]
200/// Endpoints for the S2 environment.
201pub struct S2Endpoints {
202    pub(crate) scheme: Scheme,
203    pub(crate) account_authority: Authority,
204    pub(crate) basin_authority: BasinAuthority,
205}
206
207impl S2Endpoints {
208    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
209    pub fn new(
210        account_endpoint: AccountEndpoint,
211        basin_endpoint: BasinEndpoint,
212    ) -> Result<Self, ValidationError> {
213        if account_endpoint.scheme != basin_endpoint.scheme {
214            return Err("account and basin endpoints must have the same scheme".into());
215        }
216        Ok(Self {
217            scheme: account_endpoint.scheme,
218            account_authority: account_endpoint.authority,
219            basin_authority: basin_endpoint.authority,
220        })
221    }
222
223    /// Create a new [`S2Endpoints`] from environment variables.
224    ///
225    /// The following environment variables are expected to be set:
226    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
227    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
228    pub fn from_env() -> Result<Self, ValidationError> {
229        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
230            Ok(endpoint) => endpoint.parse()?,
231            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
232            Err(VarError::NotUnicode(_)) => {
233                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
234            }
235        };
236
237        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
238            Ok(endpoint) => endpoint.parse()?,
239            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
240            Err(VarError::NotUnicode(_)) => {
241                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
242            }
243        };
244
245        if account_endpoint.scheme != basin_endpoint.scheme {
246            return Err(
247                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
248            );
249        }
250
251        Ok(Self {
252            scheme: account_endpoint.scheme,
253            account_authority: account_endpoint.authority,
254            basin_authority: basin_endpoint.authority,
255        })
256    }
257
258    pub(crate) fn for_aws() -> Self {
259        Self {
260            scheme: Scheme::HTTPS,
261            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
262            basin_authority: BasinAuthority::ParentZone(
263                "b.s2.dev".try_into().expect("valid authority"),
264            ),
265        }
266    }
267}
268
269#[derive(Debug, Clone, Copy)]
270/// Compression algorithm for request and response bodies.
271pub enum Compression {
272    /// No compression.
273    None,
274    /// Gzip compression.
275    Gzip,
276    /// Zstd compression.
277    Zstd,
278}
279
280impl From<Compression> for CompressionAlgorithm {
281    fn from(value: Compression) -> Self {
282        match value {
283            Compression::None => CompressionAlgorithm::None,
284            Compression::Gzip => CompressionAlgorithm::Gzip,
285            Compression::Zstd => CompressionAlgorithm::Zstd,
286        }
287    }
288}
289
290#[derive(Debug, Clone, Copy, PartialEq)]
291#[non_exhaustive]
292/// Retry policy for [`append`](crate::S2Stream::append) and
293/// [`append_session`](crate::S2Stream::append_session) operations.
294pub enum AppendRetryPolicy {
295    /// Retry all appends. Use when duplicate records on the stream are acceptable.
296    All,
297    /// Retry when it can be determined that the request had no side effects.
298    ///
299    /// Uses a frame-level signal to detect whether any body frames were consumed
300    /// by the HTTP transport. If no frames were sent, the server never saw the
301    /// request, so retry is safe and will not cause duplicate records.
302    ///
303    /// Certain server errors (`rate_limited`, `hot_server`) are also safe to
304    /// retry regardless of frame signal state, since they guarantee no mutation
305    /// occurred.
306    NoSideEffects,
307}
308
309#[derive(Debug, Clone)]
310#[non_exhaustive]
311/// Configuration for retrying requests in case of transient failures.
312///
313/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
314/// ```text
315/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
316///     jitter = rand[0, base_delay]
317///     delay  = base_delay + jitter
318/// ````
319pub struct RetryConfig {
320    /// Total number of attempts including the initial try. A value of `1` means no retries.
321    ///
322    /// Defaults to `3`.
323    pub max_attempts: NonZeroU32,
324    /// Minimum base delay for retries.
325    ///
326    /// Defaults to `100ms`.
327    pub min_base_delay: Duration,
328    /// Maximum base delay for retries.
329    ///
330    /// Defaults to `1s`.
331    pub max_base_delay: Duration,
332    /// Retry policy for [`append`](crate::S2Stream::append) and
333    /// [`append_session`](crate::S2Stream::append_session) operations.
334    ///
335    /// Defaults to `All`.
336    pub append_retry_policy: AppendRetryPolicy,
337}
338
339impl Default for RetryConfig {
340    fn default() -> Self {
341        Self {
342            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
343            min_base_delay: Duration::from_millis(100),
344            max_base_delay: Duration::from_secs(1),
345            append_retry_policy: AppendRetryPolicy::All,
346        }
347    }
348}
349
350impl RetryConfig {
351    /// Create a new [`RetryConfig`] with default settings.
352    pub fn new() -> Self {
353        Self::default()
354    }
355
356    pub(crate) fn max_retries(&self) -> u32 {
357        self.max_attempts.get() - 1
358    }
359
360    /// Set the total number of attempts including the initial try.
361    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
362        Self {
363            max_attempts,
364            ..self
365        }
366    }
367
368    /// Set the minimum base delay for retries.
369    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
370        Self {
371            min_base_delay,
372            ..self
373        }
374    }
375
376    /// Set the maximum base delay for retries.
377    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
378        Self {
379            max_base_delay,
380            ..self
381        }
382    }
383
384    /// Set the retry policy for [`append`](crate::S2Stream::append) and
385    /// [`append_session`](crate::S2Stream::append_session) operations.
386    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
387        Self {
388            append_retry_policy,
389            ..self
390        }
391    }
392}
393
394#[derive(Debug, Clone)]
395#[non_exhaustive]
396/// Configuration for [`S2`](crate::S2).
397pub struct S2Config {
398    pub(crate) access_token: SecretString,
399    pub(crate) endpoints: S2Endpoints,
400    pub(crate) connection_timeout: Duration,
401    pub(crate) request_timeout: Duration,
402    pub(crate) retry: RetryConfig,
403    pub(crate) compression: Compression,
404    pub(crate) user_agent: HeaderValue,
405    pub(crate) insecure_skip_cert_verification: bool,
406}
407
408impl S2Config {
409    /// Create a new [`S2Config`] with the given access token and default settings.
410    pub fn new(access_token: impl Into<String>) -> Self {
411        Self {
412            access_token: access_token.into().into(),
413            endpoints: S2Endpoints::for_aws(),
414            connection_timeout: Duration::from_secs(3),
415            request_timeout: Duration::from_secs(5),
416            retry: RetryConfig::new(),
417            compression: Compression::None,
418            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
419                .parse()
420                .expect("valid user agent"),
421            insecure_skip_cert_verification: false,
422        }
423    }
424
425    /// Set the S2 endpoints to connect to.
426    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
427        Self { endpoints, ..self }
428    }
429
430    /// Set the timeout for establishing a connection to the server.
431    ///
432    /// Defaults to `3s`.
433    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
434        Self {
435            connection_timeout,
436            ..self
437        }
438    }
439
440    /// Set the timeout for requests.
441    ///
442    /// Defaults to `5s`.
443    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
444        Self {
445            request_timeout,
446            ..self
447        }
448    }
449
450    /// Set the retry configuration for requests.
451    ///
452    /// See [`RetryConfig`] for defaults.
453    pub fn with_retry(self, retry: RetryConfig) -> Self {
454        Self { retry, ..self }
455    }
456
457    /// Set the compression algorithm for requests and responses.
458    ///
459    /// Defaults to no compression.
460    pub fn with_compression(self, compression: Compression) -> Self {
461        Self {
462            compression,
463            ..self
464        }
465    }
466
467    /// Skip TLS certificate verification (insecure).
468    ///
469    /// This is useful for connecting to endpoints with self-signed certificates
470    /// or certificates that don't match the hostname (similar to `curl -k`).
471    ///
472    /// # Warning
473    ///
474    /// This disables certificate verification and should only be used for
475    /// testing or development purposes. **Never use this in production.**
476    ///
477    /// Defaults to `false`.
478    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
479        Self {
480            insecure_skip_cert_verification: skip,
481            ..self
482        }
483    }
484
485    #[doc(hidden)]
486    #[cfg(feature = "_hidden")]
487    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
488        let user_agent = user_agent
489            .into()
490            .parse()
491            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
492        Ok(Self { user_agent, ..self })
493    }
494}
495
496#[derive(Debug, Default, Clone, PartialEq, Eq)]
497#[non_exhaustive]
498/// A page of values.
499pub struct Page<T> {
500    /// Values in this page.
501    pub values: Vec<T>,
502    /// Whether there are more pages.
503    pub has_more: bool,
504}
505
506impl<T> Page<T> {
507    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
508        Self {
509            values: values.into(),
510            has_more,
511        }
512    }
513}
514
515#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516/// Storage class for recent appends.
517pub enum StorageClass {
518    /// Standard storage class that offers append latencies under `500ms`.
519    Standard,
520    /// Express storage class that offers append latencies under `50ms`.
521    Express,
522}
523
524impl From<api::config::StorageClass> for StorageClass {
525    fn from(value: api::config::StorageClass) -> Self {
526        match value {
527            api::config::StorageClass::Standard => StorageClass::Standard,
528            api::config::StorageClass::Express => StorageClass::Express,
529        }
530    }
531}
532
533impl From<StorageClass> for api::config::StorageClass {
534    fn from(value: StorageClass) -> Self {
535        match value {
536            StorageClass::Standard => api::config::StorageClass::Standard,
537            StorageClass::Express => api::config::StorageClass::Express,
538        }
539    }
540}
541
542#[derive(Debug, Clone, Copy, PartialEq, Eq)]
543/// Retention policy for records in a stream.
544pub enum RetentionPolicy {
545    /// Age in seconds. Records older than this age are automatically trimmed.
546    Age(u64),
547    /// Records are retained indefinitely unless explicitly trimmed.
548    Infinite,
549}
550
551impl From<api::config::RetentionPolicy> for RetentionPolicy {
552    fn from(value: api::config::RetentionPolicy) -> Self {
553        match value {
554            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
555            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
556        }
557    }
558}
559
560impl From<RetentionPolicy> for api::config::RetentionPolicy {
561    fn from(value: RetentionPolicy) -> Self {
562        match value {
563            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
564            RetentionPolicy::Infinite => {
565                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
566            }
567        }
568    }
569}
570
571#[derive(Debug, Clone, Copy, PartialEq, Eq)]
572/// Timestamping mode for appends that influences how timestamps are handled.
573pub enum TimestampingMode {
574    /// Prefer client-specified timestamp if present otherwise use arrival time.
575    ClientPrefer,
576    /// Require a client-specified timestamp and reject the append if it is missing.
577    ClientRequire,
578    /// Use the arrival time and ignore any client-specified timestamp.
579    Arrival,
580}
581
582impl From<api::config::TimestampingMode> for TimestampingMode {
583    fn from(value: api::config::TimestampingMode) -> Self {
584        match value {
585            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
586            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
587            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
588        }
589    }
590}
591
592impl From<TimestampingMode> for api::config::TimestampingMode {
593    fn from(value: TimestampingMode) -> Self {
594        match value {
595            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
596            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
597            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
598        }
599    }
600}
601
602#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
603#[non_exhaustive]
604/// Configuration for timestamping behavior.
605pub struct TimestampingConfig {
606    /// Timestamping mode for appends that influences how timestamps are handled.
607    ///
608    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
609    pub mode: Option<TimestampingMode>,
610    /// Whether client-specified timestamps are allowed to exceed the arrival time.
611    ///
612    /// Defaults to `false` (client timestamps are capped at the arrival time).
613    pub uncapped: bool,
614}
615
616impl TimestampingConfig {
617    /// Create a new [`TimestampingConfig`] with default settings.
618    pub fn new() -> Self {
619        Self::default()
620    }
621
622    /// Set the timestamping mode for appends that influences how timestamps are handled.
623    pub fn with_mode(self, mode: TimestampingMode) -> Self {
624        Self {
625            mode: Some(mode),
626            ..self
627        }
628    }
629
630    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
631    pub fn with_uncapped(self, uncapped: bool) -> Self {
632        Self { uncapped, ..self }
633    }
634}
635
636impl From<api::config::TimestampingConfig> for TimestampingConfig {
637    fn from(value: api::config::TimestampingConfig) -> Self {
638        Self {
639            mode: value.mode.map(Into::into),
640            uncapped: value.uncapped.unwrap_or_default(),
641        }
642    }
643}
644
645impl From<TimestampingConfig> for api::config::TimestampingConfig {
646    fn from(value: TimestampingConfig) -> Self {
647        Self {
648            mode: value.mode.map(Into::into),
649            uncapped: Some(value.uncapped),
650        }
651    }
652}
653
654#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
655#[non_exhaustive]
656/// Configuration for automatically deleting a stream when it becomes empty.
657pub struct DeleteOnEmptyConfig {
658    /// Minimum age in seconds before an empty stream can be deleted.
659    ///
660    /// Defaults to `0` (disables automatic deletion).
661    pub min_age_secs: u64,
662}
663
664impl DeleteOnEmptyConfig {
665    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
666    pub fn new() -> Self {
667        Self::default()
668    }
669
670    /// Set the minimum age in seconds before an empty stream can be deleted.
671    pub fn with_min_age(self, min_age: Duration) -> Self {
672        Self {
673            min_age_secs: min_age.as_secs(),
674        }
675    }
676}
677
678impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
679    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
680        Self {
681            min_age_secs: value.min_age_secs,
682        }
683    }
684}
685
686impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
687    fn from(value: DeleteOnEmptyConfig) -> Self {
688        Self {
689            min_age_secs: value.min_age_secs,
690        }
691    }
692}
693
694/// Encryption configuration for a stream.
695#[derive(Debug, Clone, Default, PartialEq, Eq)]
696#[non_exhaustive]
697pub struct EncryptionConfig {
698    /// Allowed encryption modes for the stream.
699    ///
700    /// If empty, use defaults. If no default is configured, only plaintext is allowed.
701    pub allowed_modes: Vec<EncryptionMode>,
702}
703
704impl EncryptionConfig {
705    /// Create a new [`EncryptionConfig`] with default settings.
706    pub fn new() -> Self {
707        Self::default()
708    }
709
710    /// Set the allowed encryption modes.
711    pub fn with_allowed_modes(self, allowed_modes: Vec<EncryptionMode>) -> Self {
712        Self { allowed_modes }
713    }
714}
715
716impl From<api::config::EncryptionConfig> for EncryptionConfig {
717    fn from(value: api::config::EncryptionConfig) -> Self {
718        Self {
719            allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
720        }
721    }
722}
723
724impl From<EncryptionConfig> for api::config::EncryptionConfig {
725    fn from(value: EncryptionConfig) -> Self {
726        Self {
727            allowed_modes: value.allowed_modes.into_iter().map(Into::into).collect(),
728        }
729    }
730}
731
732#[derive(Debug, Clone, Default, PartialEq, Eq)]
733#[non_exhaustive]
734/// Configuration for a stream.
735pub struct StreamConfig {
736    /// Storage class for the stream.
737    ///
738    /// Defaults to [`Express`](StorageClass::Express).
739    pub storage_class: Option<StorageClass>,
740    /// Retention policy for records in the stream.
741    ///
742    /// Defaults to `7 days` of retention.
743    pub retention_policy: Option<RetentionPolicy>,
744    /// Configuration for timestamping behavior.
745    ///
746    /// See [`TimestampingConfig`] for defaults.
747    pub timestamping: Option<TimestampingConfig>,
748    /// Configuration for automatically deleting the stream when it becomes empty.
749    ///
750    /// See [`DeleteOnEmptyConfig`] for defaults.
751    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
752    /// Encryption configuration.
753    ///
754    /// See [`EncryptionConfig`] for defaults.
755    pub encryption: Option<EncryptionConfig>,
756}
757
758impl StreamConfig {
759    /// Create a new [`StreamConfig`] with default settings.
760    pub fn new() -> Self {
761        Self::default()
762    }
763
764    /// Set the storage class for the stream.
765    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
766        Self {
767            storage_class: Some(storage_class),
768            ..self
769        }
770    }
771
772    /// Set the retention policy for records in the stream.
773    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
774        Self {
775            retention_policy: Some(retention_policy),
776            ..self
777        }
778    }
779
780    /// Set the configuration for timestamping behavior.
781    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
782        Self {
783            timestamping: Some(timestamping),
784            ..self
785        }
786    }
787
788    /// Set the configuration for automatically deleting the stream when it becomes empty.
789    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
790        Self {
791            delete_on_empty: Some(delete_on_empty),
792            ..self
793        }
794    }
795
796    /// Set the encryption configuration.
797    pub fn with_encryption(self, encryption: EncryptionConfig) -> Self {
798        Self {
799            encryption: Some(encryption),
800            ..self
801        }
802    }
803}
804
805impl From<api::config::StreamConfig> for StreamConfig {
806    fn from(value: api::config::StreamConfig) -> Self {
807        Self {
808            storage_class: value.storage_class.map(Into::into),
809            retention_policy: value.retention_policy.map(Into::into),
810            timestamping: value.timestamping.map(Into::into),
811            delete_on_empty: value.delete_on_empty.map(Into::into),
812            encryption: value.encryption.map(Into::into),
813        }
814    }
815}
816
817impl From<StreamConfig> for api::config::StreamConfig {
818    fn from(value: StreamConfig) -> Self {
819        Self {
820            storage_class: value.storage_class.map(Into::into),
821            retention_policy: value.retention_policy.map(Into::into),
822            timestamping: value.timestamping.map(Into::into),
823            delete_on_empty: value.delete_on_empty.map(Into::into),
824            encryption: value.encryption.map(Into::into),
825        }
826    }
827}
828
829#[derive(Debug, Clone, Default)]
830#[non_exhaustive]
831/// Configuration for a basin.
832pub struct BasinConfig {
833    /// Default configuration for all streams in the basin.
834    ///
835    /// See [`StreamConfig`] for defaults.
836    pub default_stream_config: Option<StreamConfig>,
837    /// Whether to create stream on append if it doesn't exist using default stream configuration.
838    ///
839    /// Defaults to `false`.
840    pub create_stream_on_append: bool,
841    /// Whether to create stream on read if it doesn't exist using default stream configuration.
842    ///
843    /// Defaults to `false`.
844    pub create_stream_on_read: bool,
845}
846
847impl BasinConfig {
848    /// Create a new [`BasinConfig`] with default settings.
849    pub fn new() -> Self {
850        Self::default()
851    }
852
853    /// Set the default configuration for all streams in the basin.
854    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
855        Self {
856            default_stream_config: Some(config),
857            ..self
858        }
859    }
860
861    /// Set whether to create stream on append if it doesn't exist using default stream
862    /// configuration.
863    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
864        Self {
865            create_stream_on_append,
866            ..self
867        }
868    }
869
870    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
871    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
872        Self {
873            create_stream_on_read,
874            ..self
875        }
876    }
877}
878
879impl From<api::config::BasinConfig> for BasinConfig {
880    fn from(value: api::config::BasinConfig) -> Self {
881        Self {
882            default_stream_config: value.default_stream_config.map(Into::into),
883            create_stream_on_append: value.create_stream_on_append,
884            create_stream_on_read: value.create_stream_on_read,
885        }
886    }
887}
888
889impl From<BasinConfig> for api::config::BasinConfig {
890    fn from(value: BasinConfig) -> Self {
891        Self {
892            default_stream_config: value.default_stream_config.map(Into::into),
893            create_stream_on_append: value.create_stream_on_append,
894            create_stream_on_read: value.create_stream_on_read,
895        }
896    }
897}
898
899#[derive(Debug, Clone, PartialEq, Eq)]
900/// Scope of a basin.
901pub enum BasinScope {
902    /// AWS `us-east-1` region.
903    AwsUsEast1,
904}
905
906impl From<api::basin::BasinScope> for BasinScope {
907    fn from(value: api::basin::BasinScope) -> Self {
908        match value {
909            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
910        }
911    }
912}
913
914impl From<BasinScope> for api::basin::BasinScope {
915    fn from(value: BasinScope) -> Self {
916        match value {
917            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
918        }
919    }
920}
921
922/// Result of a create-or-reconfigure operation.
923///
924/// Indicates whether the resource was newly created or already existed and was
925/// reconfigured. Both variants hold the resource's current state.
926#[doc(hidden)]
927#[cfg(feature = "_hidden")]
928#[derive(Debug, Clone, PartialEq, Eq)]
929pub enum CreateOrReconfigured<T> {
930    /// Resource was newly created.
931    Created(T),
932    /// Resource already existed and was reconfigured to match the spec.
933    Reconfigured(T),
934}
935
936#[cfg(feature = "_hidden")]
937impl<T> CreateOrReconfigured<T> {
938    /// Returns `true` if the resource was newly created.
939    pub fn is_created(&self) -> bool {
940        matches!(self, Self::Created(_))
941    }
942
943    /// Unwrap the inner value regardless of variant.
944    pub fn into_inner(self) -> T {
945        match self {
946            Self::Created(t) | Self::Reconfigured(t) => t,
947        }
948    }
949}
950
951#[derive(Debug, Clone)]
952#[non_exhaustive]
953/// Input for [`create_basin`](crate::S2::create_basin) operation.
954pub struct CreateBasinInput {
955    /// Basin name.
956    pub name: BasinName,
957    /// Configuration for the basin.
958    ///
959    /// See [`BasinConfig`] for defaults.
960    pub config: Option<BasinConfig>,
961    /// Scope of the basin.
962    ///
963    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
964    pub scope: Option<BasinScope>,
965    idempotency_token: String,
966}
967
968impl CreateBasinInput {
969    /// Create a new [`CreateBasinInput`] with the given basin name.
970    pub fn new(name: BasinName) -> Self {
971        Self {
972            name,
973            config: None,
974            scope: None,
975            idempotency_token: idempotency_token(),
976        }
977    }
978
979    /// Set the configuration for the basin.
980    pub fn with_config(self, config: BasinConfig) -> Self {
981        Self {
982            config: Some(config),
983            ..self
984        }
985    }
986
987    /// Set the scope of the basin.
988    pub fn with_scope(self, scope: BasinScope) -> Self {
989        Self {
990            scope: Some(scope),
991            ..self
992        }
993    }
994}
995
996impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
997    fn from(value: CreateBasinInput) -> Self {
998        (
999            api::basin::CreateBasinRequest {
1000                basin: value.name,
1001                config: value.config.map(Into::into),
1002                scope: value.scope.map(Into::into),
1003            },
1004            value.idempotency_token,
1005        )
1006    }
1007}
1008
1009#[derive(Debug, Clone)]
1010#[non_exhaustive]
1011/// Input for [`create_or_reconfigure_basin`](crate::S2::create_or_reconfigure_basin) operation.
1012#[doc(hidden)]
1013#[cfg(feature = "_hidden")]
1014pub struct CreateOrReconfigureBasinInput {
1015    /// Basin name.
1016    pub name: BasinName,
1017    /// Reconfiguration for the basin.
1018    ///
1019    /// If `None`, the basin is created with default configuration or left unchanged if it exists.
1020    pub config: Option<BasinReconfiguration>,
1021    /// Scope of the basin.
1022    ///
1023    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once set.
1024    pub scope: Option<BasinScope>,
1025}
1026
1027#[cfg(feature = "_hidden")]
1028impl CreateOrReconfigureBasinInput {
1029    /// Create a new [`CreateOrReconfigureBasinInput`] with the given basin name.
1030    pub fn new(name: BasinName) -> Self {
1031        Self {
1032            name,
1033            config: None,
1034            scope: None,
1035        }
1036    }
1037
1038    /// Set the reconfiguration for the basin.
1039    pub fn with_config(self, config: BasinReconfiguration) -> Self {
1040        Self {
1041            config: Some(config),
1042            ..self
1043        }
1044    }
1045
1046    /// Set the scope of the basin.
1047    pub fn with_scope(self, scope: BasinScope) -> Self {
1048        Self {
1049            scope: Some(scope),
1050            ..self
1051        }
1052    }
1053}
1054
1055#[cfg(feature = "_hidden")]
1056impl From<CreateOrReconfigureBasinInput>
1057    for (
1058        BasinName,
1059        Option<api::basin::CreateOrReconfigureBasinRequest>,
1060    )
1061{
1062    fn from(value: CreateOrReconfigureBasinInput) -> Self {
1063        let request = if value.config.is_some() || value.scope.is_some() {
1064            Some(api::basin::CreateOrReconfigureBasinRequest {
1065                config: value.config.map(Into::into),
1066                scope: value.scope.map(Into::into),
1067            })
1068        } else {
1069            None
1070        };
1071        (value.name, request)
1072    }
1073}
1074
1075#[derive(Debug, Clone, Default)]
1076#[non_exhaustive]
1077/// Input for [`list_basins`](crate::S2::list_basins) operation.
1078pub struct ListBasinsInput {
1079    /// Filter basins whose names begin with this value.
1080    ///
1081    /// Defaults to `""`.
1082    pub prefix: BasinNamePrefix,
1083    /// Filter basins whose names are lexicographically greater than this value.
1084    ///
1085    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
1086    ///
1087    /// Defaults to `""`.
1088    pub start_after: BasinNameStartAfter,
1089    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
1090    ///
1091    /// Defaults to `1000`.
1092    pub limit: Option<usize>,
1093}
1094
1095impl ListBasinsInput {
1096    /// Create a new [`ListBasinsInput`] with default values.
1097    pub fn new() -> Self {
1098        Self::default()
1099    }
1100
1101    /// Set the prefix used to filter basins whose names begin with this value.
1102    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1103        Self { prefix, ..self }
1104    }
1105
1106    /// Set the value used to filter basins whose names are lexicographically greater than this
1107    /// value.
1108    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1109        Self {
1110            start_after,
1111            ..self
1112        }
1113    }
1114
1115    /// Set the limit on number of basins to return in a page.
1116    pub fn with_limit(self, limit: usize) -> Self {
1117        Self {
1118            limit: Some(limit),
1119            ..self
1120        }
1121    }
1122}
1123
1124impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1125    fn from(value: ListBasinsInput) -> Self {
1126        Self {
1127            prefix: Some(value.prefix),
1128            start_after: Some(value.start_after),
1129            limit: value.limit,
1130        }
1131    }
1132}
1133
1134#[derive(Debug, Clone, Default)]
1135/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
1136pub struct ListAllBasinsInput {
1137    /// Filter basins whose names begin with this value.
1138    ///
1139    /// Defaults to `""`.
1140    pub prefix: BasinNamePrefix,
1141    /// Filter basins whose names are lexicographically greater than this value.
1142    ///
1143    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
1144    ///
1145    /// Defaults to `""`.
1146    pub start_after: BasinNameStartAfter,
1147    /// Whether to include basins that are being deleted.
1148    ///
1149    /// Defaults to `false`.
1150    pub include_deleted: bool,
1151}
1152
1153impl ListAllBasinsInput {
1154    /// Create a new [`ListAllBasinsInput`] with default values.
1155    pub fn new() -> Self {
1156        Self::default()
1157    }
1158
1159    /// Set the prefix used to filter basins whose names begin with this value.
1160    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1161        Self { prefix, ..self }
1162    }
1163
1164    /// Set the value used to filter basins whose names are lexicographically greater than this
1165    /// value.
1166    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1167        Self {
1168            start_after,
1169            ..self
1170        }
1171    }
1172
1173    /// Set whether to include basins that are being deleted.
1174    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1175        Self {
1176            include_deleted,
1177            ..self
1178        }
1179    }
1180}
1181
1182#[derive(Debug, Clone, PartialEq, Eq)]
1183#[non_exhaustive]
1184/// Basin information.
1185pub struct BasinInfo {
1186    /// Basin name.
1187    pub name: BasinName,
1188    /// Scope of the basin.
1189    pub scope: Option<BasinScope>,
1190    /// Creation time.
1191    pub created_at: S2DateTime,
1192    /// Deletion time if the basin is being deleted.
1193    pub deleted_at: Option<S2DateTime>,
1194}
1195
1196impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1197    type Error = ValidationError;
1198
1199    fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1200        Ok(Self {
1201            name: value.name,
1202            scope: value.scope.map(Into::into),
1203            created_at: value.created_at.try_into()?,
1204            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1205        })
1206    }
1207}
1208
1209#[derive(Debug, Clone)]
1210#[non_exhaustive]
1211/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1212pub struct DeleteBasinInput {
1213    /// Basin name.
1214    pub name: BasinName,
1215    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1216    pub ignore_not_found: bool,
1217}
1218
1219impl DeleteBasinInput {
1220    /// Create a new [`DeleteBasinInput`] with the given basin name.
1221    pub fn new(name: BasinName) -> Self {
1222        Self {
1223            name,
1224            ignore_not_found: false,
1225        }
1226    }
1227
1228    /// Set whether to ignore `Not Found` error if the basin is not existing.
1229    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1230        Self {
1231            ignore_not_found,
1232            ..self
1233        }
1234    }
1235}
1236
1237#[derive(Debug, Clone, Default)]
1238#[non_exhaustive]
1239/// Reconfiguration for [`TimestampingConfig`].
1240pub struct TimestampingReconfiguration {
1241    /// Override for the existing [`mode`](TimestampingConfig::mode).
1242    pub mode: Maybe<Option<TimestampingMode>>,
1243    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1244    pub uncapped: Maybe<Option<bool>>,
1245}
1246
1247impl TimestampingReconfiguration {
1248    /// Create a new [`TimestampingReconfiguration`].
1249    pub fn new() -> Self {
1250        Self::default()
1251    }
1252
1253    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1254    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1255        Self {
1256            mode: Maybe::Specified(Some(mode)),
1257            ..self
1258        }
1259    }
1260
1261    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1262    pub fn with_uncapped(self, uncapped: bool) -> Self {
1263        Self {
1264            uncapped: Maybe::Specified(Some(uncapped)),
1265            ..self
1266        }
1267    }
1268}
1269
1270impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1271    fn from(value: TimestampingReconfiguration) -> Self {
1272        Self {
1273            mode: value.mode.map(|m| m.map(Into::into)),
1274            uncapped: value.uncapped,
1275        }
1276    }
1277}
1278
1279#[derive(Debug, Clone, Default)]
1280#[non_exhaustive]
1281/// Reconfiguration for [`DeleteOnEmptyConfig`].
1282pub struct DeleteOnEmptyReconfiguration {
1283    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1284    pub min_age_secs: Maybe<Option<u64>>,
1285}
1286
1287impl DeleteOnEmptyReconfiguration {
1288    /// Create a new [`DeleteOnEmptyReconfiguration`].
1289    pub fn new() -> Self {
1290        Self::default()
1291    }
1292
1293    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1294    pub fn with_min_age(self, min_age: Duration) -> Self {
1295        Self {
1296            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1297        }
1298    }
1299}
1300
1301impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1302    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1303        Self {
1304            min_age_secs: value.min_age_secs,
1305        }
1306    }
1307}
1308
1309/// Encryption reconfiguration for a stream.
1310#[derive(Debug, Clone, Default)]
1311#[non_exhaustive]
1312pub struct EncryptionReconfiguration {
1313    /// Override for the existing [`allowed_modes`](EncryptionConfig::allowed_modes).
1314    ///
1315    /// Specify an empty list to reset to defaults.
1316    pub allowed_modes: Maybe<Vec<EncryptionMode>>,
1317}
1318
1319impl EncryptionReconfiguration {
1320    /// Create a new [`EncryptionReconfiguration`] with default settings.
1321    pub fn new() -> Self {
1322        Self::default()
1323    }
1324
1325    /// Set the allowed encryption modes.
1326    pub fn with_allowed_modes(self, allowed_modes: Vec<EncryptionMode>) -> Self {
1327        Self {
1328            allowed_modes: Maybe::Specified(allowed_modes),
1329        }
1330    }
1331}
1332
1333impl From<EncryptionReconfiguration> for api::config::EncryptionReconfiguration {
1334    fn from(value: EncryptionReconfiguration) -> Self {
1335        Self {
1336            allowed_modes: value
1337                .allowed_modes
1338                .map(|modes| modes.into_iter().map(Into::into).collect()),
1339        }
1340    }
1341}
1342
1343#[derive(Debug, Clone, Default)]
1344#[non_exhaustive]
1345/// Reconfiguration for [`StreamConfig`].
1346pub struct StreamReconfiguration {
1347    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1348    pub storage_class: Maybe<Option<StorageClass>>,
1349    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1350    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1351    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1352    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1353    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1354    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1355    /// Override for the existing [`encryption`](StreamConfig::encryption).
1356    pub encryption: Maybe<Option<EncryptionReconfiguration>>,
1357}
1358
1359impl StreamReconfiguration {
1360    /// Create a new [`StreamReconfiguration`].
1361    pub fn new() -> Self {
1362        Self::default()
1363    }
1364
1365    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1366    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1367        Self {
1368            storage_class: Maybe::Specified(Some(storage_class)),
1369            ..self
1370        }
1371    }
1372
1373    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1374    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1375        Self {
1376            retention_policy: Maybe::Specified(Some(retention_policy)),
1377            ..self
1378        }
1379    }
1380
1381    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1382    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1383        Self {
1384            timestamping: Maybe::Specified(Some(timestamping)),
1385            ..self
1386        }
1387    }
1388
1389    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1390    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1391        Self {
1392            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1393            ..self
1394        }
1395    }
1396
1397    /// Set the encryption reconfiguration.
1398    pub fn with_encryption(self, encryption: EncryptionReconfiguration) -> Self {
1399        Self {
1400            encryption: Maybe::Specified(Some(encryption)),
1401            ..self
1402        }
1403    }
1404}
1405
1406impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1407    fn from(value: StreamReconfiguration) -> Self {
1408        Self {
1409            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1410            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1411            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1412            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1413            encryption: value.encryption.map(|m| m.map(Into::into)),
1414        }
1415    }
1416}
1417
1418#[derive(Debug, Clone, Default)]
1419#[non_exhaustive]
1420/// Reconfiguration for [`BasinConfig`].
1421pub struct BasinReconfiguration {
1422    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1423    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1424    /// Override for the existing
1425    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1426    pub create_stream_on_append: Maybe<bool>,
1427    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1428    pub create_stream_on_read: Maybe<bool>,
1429}
1430
1431impl BasinReconfiguration {
1432    /// Create a new [`BasinReconfiguration`].
1433    pub fn new() -> Self {
1434        Self::default()
1435    }
1436
1437    /// Set the override for the existing
1438    /// [`default_stream_config`](BasinConfig::default_stream_config).
1439    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1440        Self {
1441            default_stream_config: Maybe::Specified(Some(config)),
1442            ..self
1443        }
1444    }
1445
1446    /// Set the override for the existing
1447    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1448    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1449        Self {
1450            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1451            ..self
1452        }
1453    }
1454
1455    /// Set the override for the existing
1456    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1457    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1458        Self {
1459            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1460            ..self
1461        }
1462    }
1463}
1464
1465impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1466    fn from(value: BasinReconfiguration) -> Self {
1467        Self {
1468            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1469            create_stream_on_append: value.create_stream_on_append,
1470            create_stream_on_read: value.create_stream_on_read,
1471        }
1472    }
1473}
1474
1475#[derive(Debug, Clone)]
1476#[non_exhaustive]
1477/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1478pub struct ReconfigureBasinInput {
1479    /// Basin name.
1480    pub name: BasinName,
1481    /// Reconfiguration for [`BasinConfig`].
1482    pub config: BasinReconfiguration,
1483}
1484
1485impl ReconfigureBasinInput {
1486    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1487    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1488        Self { name, config }
1489    }
1490}
1491
1492#[derive(Debug, Clone, Default)]
1493#[non_exhaustive]
1494/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1495pub struct ListAccessTokensInput {
1496    /// Filter access tokens whose IDs begin with this value.
1497    ///
1498    /// Defaults to `""`.
1499    pub prefix: AccessTokenIdPrefix,
1500    /// Filter access tokens whose IDs are lexicographically greater than this value.
1501    ///
1502    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1503    ///
1504    /// Defaults to `""`.
1505    pub start_after: AccessTokenIdStartAfter,
1506    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1507    ///
1508    /// Defaults to `1000`.
1509    pub limit: Option<usize>,
1510}
1511
1512impl ListAccessTokensInput {
1513    /// Create a new [`ListAccessTokensInput`] with default values.
1514    pub fn new() -> Self {
1515        Self::default()
1516    }
1517
1518    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1519    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1520        Self { prefix, ..self }
1521    }
1522
1523    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1524    /// value.
1525    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1526        Self {
1527            start_after,
1528            ..self
1529        }
1530    }
1531
1532    /// Set the limit on number of access tokens to return in a page.
1533    pub fn with_limit(self, limit: usize) -> Self {
1534        Self {
1535            limit: Some(limit),
1536            ..self
1537        }
1538    }
1539}
1540
1541impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1542    fn from(value: ListAccessTokensInput) -> Self {
1543        Self {
1544            prefix: Some(value.prefix),
1545            start_after: Some(value.start_after),
1546            limit: value.limit,
1547        }
1548    }
1549}
1550
1551#[derive(Debug, Clone, Default)]
1552/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1553pub struct ListAllAccessTokensInput {
1554    /// Filter access tokens whose IDs begin with this value.
1555    ///
1556    /// Defaults to `""`.
1557    pub prefix: AccessTokenIdPrefix,
1558    /// Filter access tokens whose IDs are lexicographically greater than this value.
1559    ///
1560    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1561    ///
1562    /// Defaults to `""`.
1563    pub start_after: AccessTokenIdStartAfter,
1564}
1565
1566impl ListAllAccessTokensInput {
1567    /// Create a new [`ListAllAccessTokensInput`] with default values.
1568    pub fn new() -> Self {
1569        Self::default()
1570    }
1571
1572    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1573    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1574        Self { prefix, ..self }
1575    }
1576
1577    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1578    /// this value.
1579    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1580        Self {
1581            start_after,
1582            ..self
1583        }
1584    }
1585}
1586
1587#[derive(Debug, Clone)]
1588#[non_exhaustive]
1589/// Access token information.
1590pub struct AccessTokenInfo {
1591    /// Access token ID.
1592    pub id: AccessTokenId,
1593    /// Expiration time.
1594    pub expires_at: S2DateTime,
1595    /// Whether to automatically prefix stream names during creation and strip the prefix during
1596    /// listing.
1597    pub auto_prefix_streams: bool,
1598    /// Scope of the access token.
1599    pub scope: AccessTokenScope,
1600}
1601
1602impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1603    type Error = ValidationError;
1604
1605    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1606        let expires_at = value
1607            .expires_at
1608            .map(S2DateTime::try_from)
1609            .transpose()?
1610            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1611        Ok(Self {
1612            id: value.id,
1613            expires_at,
1614            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1615            scope: value.scope.into(),
1616        })
1617    }
1618}
1619
1620#[derive(Debug, Clone)]
1621/// Pattern for matching basins.
1622///
1623/// See [`AccessTokenScope::basins`].
1624pub enum BasinMatcher {
1625    /// Match no basins.
1626    None,
1627    /// Match exactly this basin.
1628    Exact(BasinName),
1629    /// Match all basins with this prefix.
1630    Prefix(BasinNamePrefix),
1631}
1632
1633#[derive(Debug, Clone)]
1634/// Pattern for matching streams.
1635///
1636/// See [`AccessTokenScope::streams`].
1637pub enum StreamMatcher {
1638    /// Match no streams.
1639    None,
1640    /// Match exactly this stream.
1641    Exact(StreamName),
1642    /// Match all streams with this prefix.
1643    Prefix(StreamNamePrefix),
1644}
1645
1646#[derive(Debug, Clone)]
1647/// Pattern for matching access tokens.
1648///
1649/// See [`AccessTokenScope::access_tokens`].
1650pub enum AccessTokenMatcher {
1651    /// Match no access tokens.
1652    None,
1653    /// Match exactly this access token.
1654    Exact(AccessTokenId),
1655    /// Match all access tokens with this prefix.
1656    Prefix(AccessTokenIdPrefix),
1657}
1658
1659#[derive(Debug, Clone, Default)]
1660#[non_exhaustive]
1661/// Permissions indicating allowed operations.
1662pub struct ReadWritePermissions {
1663    /// Read permission.
1664    ///
1665    /// Defaults to `false`.
1666    pub read: bool,
1667    /// Write permission.
1668    ///
1669    /// Defaults to `false`.
1670    pub write: bool,
1671}
1672
1673impl ReadWritePermissions {
1674    /// Create a new [`ReadWritePermissions`] with default values.
1675    pub fn new() -> Self {
1676        Self::default()
1677    }
1678
1679    /// Create read-only permissions.
1680    pub fn read_only() -> Self {
1681        Self {
1682            read: true,
1683            write: false,
1684        }
1685    }
1686
1687    /// Create write-only permissions.
1688    pub fn write_only() -> Self {
1689        Self {
1690            read: false,
1691            write: true,
1692        }
1693    }
1694
1695    /// Create read-write permissions.
1696    pub fn read_write() -> Self {
1697        Self {
1698            read: true,
1699            write: true,
1700        }
1701    }
1702}
1703
1704impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1705    fn from(value: ReadWritePermissions) -> Self {
1706        Self {
1707            read: Some(value.read),
1708            write: Some(value.write),
1709        }
1710    }
1711}
1712
1713impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1714    fn from(value: api::access::ReadWritePermissions) -> Self {
1715        Self {
1716            read: value.read.unwrap_or_default(),
1717            write: value.write.unwrap_or_default(),
1718        }
1719    }
1720}
1721
1722#[derive(Debug, Clone, Default)]
1723#[non_exhaustive]
1724/// Permissions at the operation group level.
1725///
1726/// See [`AccessTokenScope::op_group_perms`].
1727pub struct OperationGroupPermissions {
1728    /// Account-level access permissions.
1729    ///
1730    /// Defaults to `None`.
1731    pub account: Option<ReadWritePermissions>,
1732    /// Basin-level access permissions.
1733    ///
1734    /// Defaults to `None`.
1735    pub basin: Option<ReadWritePermissions>,
1736    /// Stream-level access permissions.
1737    ///
1738    /// Defaults to `None`.
1739    pub stream: Option<ReadWritePermissions>,
1740}
1741
1742impl OperationGroupPermissions {
1743    /// Create a new [`OperationGroupPermissions`] with default values.
1744    pub fn new() -> Self {
1745        Self::default()
1746    }
1747
1748    /// Create read-only permissions for all groups.
1749    pub fn read_only_all() -> Self {
1750        Self {
1751            account: Some(ReadWritePermissions::read_only()),
1752            basin: Some(ReadWritePermissions::read_only()),
1753            stream: Some(ReadWritePermissions::read_only()),
1754        }
1755    }
1756
1757    /// Create write-only permissions for all groups.
1758    pub fn write_only_all() -> Self {
1759        Self {
1760            account: Some(ReadWritePermissions::write_only()),
1761            basin: Some(ReadWritePermissions::write_only()),
1762            stream: Some(ReadWritePermissions::write_only()),
1763        }
1764    }
1765
1766    /// Create read-write permissions for all groups.
1767    pub fn read_write_all() -> Self {
1768        Self {
1769            account: Some(ReadWritePermissions::read_write()),
1770            basin: Some(ReadWritePermissions::read_write()),
1771            stream: Some(ReadWritePermissions::read_write()),
1772        }
1773    }
1774
1775    /// Set account-level access permissions.
1776    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1777        Self {
1778            account: Some(account),
1779            ..self
1780        }
1781    }
1782
1783    /// Set basin-level access permissions.
1784    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1785        Self {
1786            basin: Some(basin),
1787            ..self
1788        }
1789    }
1790
1791    /// Set stream-level access permissions.
1792    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1793        Self {
1794            stream: Some(stream),
1795            ..self
1796        }
1797    }
1798}
1799
1800impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1801    fn from(value: OperationGroupPermissions) -> Self {
1802        Self {
1803            account: value.account.map(Into::into),
1804            basin: value.basin.map(Into::into),
1805            stream: value.stream.map(Into::into),
1806        }
1807    }
1808}
1809
1810impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1811    fn from(value: api::access::PermittedOperationGroups) -> Self {
1812        Self {
1813            account: value.account.map(Into::into),
1814            basin: value.basin.map(Into::into),
1815            stream: value.stream.map(Into::into),
1816        }
1817    }
1818}
1819
1820#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1821/// Individual operation that can be permitted.
1822///
1823/// See [`AccessTokenScope::ops`].
1824pub enum Operation {
1825    /// List basins.
1826    ListBasins,
1827    /// Create a basin.
1828    CreateBasin,
1829    /// Get basin configuration.
1830    GetBasinConfig,
1831    /// Delete a basin.
1832    DeleteBasin,
1833    /// Reconfigure a basin.
1834    ReconfigureBasin,
1835    /// List access tokens.
1836    ListAccessTokens,
1837    /// Issue an access token.
1838    IssueAccessToken,
1839    /// Revoke an access token.
1840    RevokeAccessToken,
1841    /// Get account metrics.
1842    GetAccountMetrics,
1843    /// Get basin metrics.
1844    GetBasinMetrics,
1845    /// Get stream metrics.
1846    GetStreamMetrics,
1847    /// List streams.
1848    ListStreams,
1849    /// Create a stream.
1850    CreateStream,
1851    /// Get stream configuration.
1852    GetStreamConfig,
1853    /// Delete a stream.
1854    DeleteStream,
1855    /// Reconfigure a stream.
1856    ReconfigureStream,
1857    /// Check the tail of a stream.
1858    CheckTail,
1859    /// Append records to a stream.
1860    Append,
1861    /// Read records from a stream.
1862    Read,
1863    /// Trim records on a stream.
1864    Trim,
1865    /// Set the fencing token on a stream.
1866    Fence,
1867}
1868
1869impl From<Operation> for api::access::Operation {
1870    fn from(value: Operation) -> Self {
1871        match value {
1872            Operation::ListBasins => api::access::Operation::ListBasins,
1873            Operation::CreateBasin => api::access::Operation::CreateBasin,
1874            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1875            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1876            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1877            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1878            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1879            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1880            Operation::ListStreams => api::access::Operation::ListStreams,
1881            Operation::CreateStream => api::access::Operation::CreateStream,
1882            Operation::DeleteStream => api::access::Operation::DeleteStream,
1883            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1884            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1885            Operation::CheckTail => api::access::Operation::CheckTail,
1886            Operation::Append => api::access::Operation::Append,
1887            Operation::Read => api::access::Operation::Read,
1888            Operation::Trim => api::access::Operation::Trim,
1889            Operation::Fence => api::access::Operation::Fence,
1890            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1891            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1892            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1893        }
1894    }
1895}
1896
1897impl From<api::access::Operation> for Operation {
1898    fn from(value: api::access::Operation) -> Self {
1899        match value {
1900            api::access::Operation::ListBasins => Operation::ListBasins,
1901            api::access::Operation::CreateBasin => Operation::CreateBasin,
1902            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1903            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1904            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1905            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1906            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1907            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1908            api::access::Operation::ListStreams => Operation::ListStreams,
1909            api::access::Operation::CreateStream => Operation::CreateStream,
1910            api::access::Operation::DeleteStream => Operation::DeleteStream,
1911            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1912            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1913            api::access::Operation::CheckTail => Operation::CheckTail,
1914            api::access::Operation::Append => Operation::Append,
1915            api::access::Operation::Read => Operation::Read,
1916            api::access::Operation::Trim => Operation::Trim,
1917            api::access::Operation::Fence => Operation::Fence,
1918            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1919            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1920            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1921        }
1922    }
1923}
1924
1925#[derive(Debug, Clone)]
1926#[non_exhaustive]
1927/// Scope of an access token.
1928///
1929/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1930/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1931/// final set must not be empty.
1932///
1933/// See [`IssueAccessTokenInput::scope`].
1934pub struct AccessTokenScopeInput {
1935    basins: Option<BasinMatcher>,
1936    streams: Option<StreamMatcher>,
1937    access_tokens: Option<AccessTokenMatcher>,
1938    op_group_perms: Option<OperationGroupPermissions>,
1939    ops: HashSet<Operation>,
1940}
1941
1942impl AccessTokenScopeInput {
1943    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1944    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1945        Self {
1946            basins: None,
1947            streams: None,
1948            access_tokens: None,
1949            op_group_perms: None,
1950            ops: ops.into_iter().collect(),
1951        }
1952    }
1953
1954    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1955    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1956        Self {
1957            basins: None,
1958            streams: None,
1959            access_tokens: None,
1960            op_group_perms: Some(op_group_perms),
1961            ops: HashSet::default(),
1962        }
1963    }
1964
1965    /// Set the permitted operations.
1966    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1967        Self {
1968            ops: ops.into_iter().collect(),
1969            ..self
1970        }
1971    }
1972
1973    /// Set the access permissions at the operation group level.
1974    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1975        Self {
1976            op_group_perms: Some(op_group_perms),
1977            ..self
1978        }
1979    }
1980
1981    /// Set the permitted basins.
1982    ///
1983    /// Defaults to no basins.
1984    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1985        Self {
1986            basins: Some(basins),
1987            ..self
1988        }
1989    }
1990
1991    /// Set the permitted streams.
1992    ///
1993    /// Defaults to no streams.
1994    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1995        Self {
1996            streams: Some(streams),
1997            ..self
1998        }
1999    }
2000
2001    /// Set the permitted access tokens.
2002    ///
2003    /// Defaults to no access tokens.
2004    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
2005        Self {
2006            access_tokens: Some(access_tokens),
2007            ..self
2008        }
2009    }
2010}
2011
2012#[derive(Debug, Clone)]
2013#[non_exhaustive]
2014/// Scope of an access token.
2015pub struct AccessTokenScope {
2016    /// Permitted basins.
2017    pub basins: Option<BasinMatcher>,
2018    /// Permitted streams.
2019    pub streams: Option<StreamMatcher>,
2020    /// Permitted access tokens.
2021    pub access_tokens: Option<AccessTokenMatcher>,
2022    /// Permissions at the operation group level.
2023    pub op_group_perms: Option<OperationGroupPermissions>,
2024    /// Permitted operations.
2025    pub ops: HashSet<Operation>,
2026}
2027
2028impl From<api::access::AccessTokenScope> for AccessTokenScope {
2029    fn from(value: api::access::AccessTokenScope) -> Self {
2030        Self {
2031            basins: value.basins.map(|rs| match rs {
2032                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2033                    BasinMatcher::Exact(e)
2034                }
2035                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2036                    BasinMatcher::None
2037                }
2038                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
2039            }),
2040            streams: value.streams.map(|rs| match rs {
2041                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2042                    StreamMatcher::Exact(e)
2043                }
2044                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2045                    StreamMatcher::None
2046                }
2047                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
2048            }),
2049            access_tokens: value.access_tokens.map(|rs| match rs {
2050                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
2051                    AccessTokenMatcher::Exact(e)
2052                }
2053                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
2054                    AccessTokenMatcher::None
2055                }
2056                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
2057            }),
2058            op_group_perms: value.op_groups.map(Into::into),
2059            ops: value
2060                .ops
2061                .map(|ops| ops.into_iter().map(Into::into).collect())
2062                .unwrap_or_default(),
2063        }
2064    }
2065}
2066
2067impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
2068    fn from(value: AccessTokenScopeInput) -> Self {
2069        Self {
2070            basins: value.basins.map(|rs| match rs {
2071                BasinMatcher::None => {
2072                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2073                }
2074                BasinMatcher::Exact(e) => {
2075                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2076                }
2077                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2078            }),
2079            streams: value.streams.map(|rs| match rs {
2080                StreamMatcher::None => {
2081                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2082                }
2083                StreamMatcher::Exact(e) => {
2084                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2085                }
2086                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2087            }),
2088            access_tokens: value.access_tokens.map(|rs| match rs {
2089                AccessTokenMatcher::None => {
2090                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2091                }
2092                AccessTokenMatcher::Exact(e) => {
2093                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2094                }
2095                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2096            }),
2097            op_groups: value.op_group_perms.map(Into::into),
2098            ops: if value.ops.is_empty() {
2099                None
2100            } else {
2101                Some(value.ops.into_iter().map(Into::into).collect())
2102            },
2103        }
2104    }
2105}
2106
2107#[derive(Debug, Clone)]
2108#[non_exhaustive]
2109/// Input for [`issue_access_token`](crate::S2::issue_access_token).
2110pub struct IssueAccessTokenInput {
2111    /// Access token ID.
2112    pub id: AccessTokenId,
2113    /// Expiration time.
2114    ///
2115    /// Defaults to the expiration time of requestor's access token passed via
2116    /// [`S2Config`](S2Config::new).
2117    pub expires_at: Option<S2DateTime>,
2118    /// Whether to automatically prefix stream names during creation and strip the prefix during
2119    /// listing.
2120    ///
2121    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
2122    /// prefix.
2123    ///
2124    /// Defaults to `false`.
2125    pub auto_prefix_streams: bool,
2126    /// Scope of the token.
2127    pub scope: AccessTokenScopeInput,
2128}
2129
2130impl IssueAccessTokenInput {
2131    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
2132    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2133        Self {
2134            id,
2135            expires_at: None,
2136            auto_prefix_streams: false,
2137            scope,
2138        }
2139    }
2140
2141    /// Set the expiration time.
2142    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2143        Self {
2144            expires_at: Some(expires_at),
2145            ..self
2146        }
2147    }
2148
2149    /// Set whether to automatically prefix stream names during creation and strip the prefix during
2150    /// listing.
2151    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2152        Self {
2153            auto_prefix_streams,
2154            ..self
2155        }
2156    }
2157}
2158
2159impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2160    fn from(value: IssueAccessTokenInput) -> Self {
2161        Self {
2162            id: value.id,
2163            expires_at: value.expires_at.map(Into::into),
2164            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2165            scope: value.scope.into(),
2166        }
2167    }
2168}
2169
2170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2171/// Interval to accumulate over for timeseries metric sets.
2172pub enum TimeseriesInterval {
2173    /// Minute.
2174    Minute,
2175    /// Hour.
2176    Hour,
2177    /// Day.
2178    Day,
2179}
2180
2181impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2182    fn from(value: TimeseriesInterval) -> Self {
2183        match value {
2184            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2185            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2186            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2187        }
2188    }
2189}
2190
2191impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2192    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2193        match value {
2194            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2195            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2196            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2197        }
2198    }
2199}
2200
2201#[derive(Debug, Clone, Copy)]
2202#[non_exhaustive]
2203/// Time range as Unix epoch seconds.
2204pub struct TimeRange {
2205    /// Start timestamp (inclusive).
2206    pub start: u32,
2207    /// End timestamp (exclusive).
2208    pub end: u32,
2209}
2210
2211impl TimeRange {
2212    /// Create a new [`TimeRange`] with the given start and end timestamps.
2213    pub fn new(start: u32, end: u32) -> Self {
2214        Self { start, end }
2215    }
2216}
2217
2218#[derive(Debug, Clone, Copy)]
2219#[non_exhaustive]
2220/// Time range as Unix epoch seconds and accumulation interval.
2221pub struct TimeRangeAndInterval {
2222    /// Start timestamp (inclusive).
2223    pub start: u32,
2224    /// End timestamp (exclusive).
2225    pub end: u32,
2226    /// Interval to accumulate over for timeseries metric sets.
2227    ///
2228    /// Default is dependent on the requested metric set.
2229    pub interval: Option<TimeseriesInterval>,
2230}
2231
2232impl TimeRangeAndInterval {
2233    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2234    pub fn new(start: u32, end: u32) -> Self {
2235        Self {
2236            start,
2237            end,
2238            interval: None,
2239        }
2240    }
2241
2242    /// Set the interval to accumulate over for timeseries metric sets.
2243    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2244        Self {
2245            interval: Some(interval),
2246            ..self
2247        }
2248    }
2249}
2250
2251#[derive(Debug, Clone, Copy)]
2252/// Account metric set to return.
2253pub enum AccountMetricSet {
2254    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2255    /// specified time range.
2256    ActiveBasins(TimeRange),
2257    /// Returns [`AccumulationMetric`]s, one per account operation type.
2258    ///
2259    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2260    /// per interval over the requested time range.
2261    ///
2262    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2263    AccountOps(TimeRangeAndInterval),
2264}
2265
2266#[derive(Debug, Clone)]
2267#[non_exhaustive]
2268/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2269pub struct GetAccountMetricsInput {
2270    /// Metric set to return.
2271    pub set: AccountMetricSet,
2272}
2273
2274impl GetAccountMetricsInput {
2275    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2276    pub fn new(set: AccountMetricSet) -> Self {
2277        Self { set }
2278    }
2279}
2280
2281impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2282    fn from(value: GetAccountMetricsInput) -> Self {
2283        let (set, start, end, interval) = match value.set {
2284            AccountMetricSet::ActiveBasins(args) => (
2285                api::metrics::AccountMetricSet::ActiveBasins,
2286                args.start,
2287                args.end,
2288                None,
2289            ),
2290            AccountMetricSet::AccountOps(args) => (
2291                api::metrics::AccountMetricSet::AccountOps,
2292                args.start,
2293                args.end,
2294                args.interval,
2295            ),
2296        };
2297        Self {
2298            set,
2299            start: Some(start),
2300            end: Some(end),
2301            interval: interval.map(Into::into),
2302        }
2303    }
2304}
2305
2306#[derive(Debug, Clone, Copy)]
2307/// Basin metric set to return.
2308pub enum BasinMetricSet {
2309    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2310    /// in the basin, with one observed value for each hour over the requested time range.
2311    Storage(TimeRange),
2312    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2313    ///
2314    /// Each metric represents a timeseries of the number of append operations across all streams
2315    /// in the basin, with one accumulated value per interval over the requested time range.
2316    ///
2317    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2318    /// [`minute`](TimeseriesInterval::Minute).
2319    AppendOps(TimeRangeAndInterval),
2320    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2321    ///
2322    /// Each metric represents a timeseries of the number of read operations across all streams
2323    /// in the basin, with one accumulated value per interval over the requested time range.
2324    ///
2325    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2326    /// [`minute`](TimeseriesInterval::Minute).
2327    ReadOps(TimeRangeAndInterval),
2328    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2329    /// across all streams in the basin, with one accumulated value per interval
2330    /// over the requested time range.
2331    ///
2332    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2333    /// [`minute`](TimeseriesInterval::Minute).
2334    ReadThroughput(TimeRangeAndInterval),
2335    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2336    /// across all streams in the basin, with one accumulated value per interval
2337    /// over the requested time range.
2338    ///
2339    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2340    /// [`minute`](TimeseriesInterval::Minute).
2341    AppendThroughput(TimeRangeAndInterval),
2342    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2343    ///
2344    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2345    /// per interval over the requested time range.
2346    ///
2347    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2348    BasinOps(TimeRangeAndInterval),
2349}
2350
2351#[derive(Debug, Clone)]
2352#[non_exhaustive]
2353/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2354pub struct GetBasinMetricsInput {
2355    /// Basin name.
2356    pub name: BasinName,
2357    /// Metric set to return.
2358    pub set: BasinMetricSet,
2359}
2360
2361impl GetBasinMetricsInput {
2362    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2363    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2364        Self { name, set }
2365    }
2366}
2367
2368impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2369    fn from(value: GetBasinMetricsInput) -> Self {
2370        let (set, start, end, interval) = match value.set {
2371            BasinMetricSet::Storage(args) => (
2372                api::metrics::BasinMetricSet::Storage,
2373                args.start,
2374                args.end,
2375                None,
2376            ),
2377            BasinMetricSet::AppendOps(args) => (
2378                api::metrics::BasinMetricSet::AppendOps,
2379                args.start,
2380                args.end,
2381                args.interval,
2382            ),
2383            BasinMetricSet::ReadOps(args) => (
2384                api::metrics::BasinMetricSet::ReadOps,
2385                args.start,
2386                args.end,
2387                args.interval,
2388            ),
2389            BasinMetricSet::ReadThroughput(args) => (
2390                api::metrics::BasinMetricSet::ReadThroughput,
2391                args.start,
2392                args.end,
2393                args.interval,
2394            ),
2395            BasinMetricSet::AppendThroughput(args) => (
2396                api::metrics::BasinMetricSet::AppendThroughput,
2397                args.start,
2398                args.end,
2399                args.interval,
2400            ),
2401            BasinMetricSet::BasinOps(args) => (
2402                api::metrics::BasinMetricSet::BasinOps,
2403                args.start,
2404                args.end,
2405                args.interval,
2406            ),
2407        };
2408        (
2409            value.name,
2410            api::metrics::BasinMetricSetRequest {
2411                set,
2412                start: Some(start),
2413                end: Some(end),
2414                interval: interval.map(Into::into),
2415            },
2416        )
2417    }
2418}
2419
2420#[derive(Debug, Clone, Copy)]
2421/// Stream metric set to return.
2422pub enum StreamMetricSet {
2423    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2424    /// with one observed value for each minute over the requested time range.
2425    Storage(TimeRange),
2426}
2427
2428#[derive(Debug, Clone)]
2429#[non_exhaustive]
2430/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2431pub struct GetStreamMetricsInput {
2432    /// Basin name.
2433    pub basin_name: BasinName,
2434    /// Stream name.
2435    pub stream_name: StreamName,
2436    /// Metric set to return.
2437    pub set: StreamMetricSet,
2438}
2439
2440impl GetStreamMetricsInput {
2441    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2442    /// set.
2443    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2444        Self {
2445            basin_name,
2446            stream_name,
2447            set,
2448        }
2449    }
2450}
2451
2452impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2453    fn from(value: GetStreamMetricsInput) -> Self {
2454        let (set, start, end, interval) = match value.set {
2455            StreamMetricSet::Storage(args) => (
2456                api::metrics::StreamMetricSet::Storage,
2457                args.start,
2458                args.end,
2459                None,
2460            ),
2461        };
2462        (
2463            value.basin_name,
2464            value.stream_name,
2465            api::metrics::StreamMetricSetRequest {
2466                set,
2467                start: Some(start),
2468                end: Some(end),
2469                interval,
2470            },
2471        )
2472    }
2473}
2474
2475#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2476/// Unit in which metric values are measured.
2477pub enum MetricUnit {
2478    /// Size in bytes.
2479    Bytes,
2480    /// Number of operations.
2481    Operations,
2482}
2483
2484impl From<api::metrics::MetricUnit> for MetricUnit {
2485    fn from(value: api::metrics::MetricUnit) -> Self {
2486        match value {
2487            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2488            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2489        }
2490    }
2491}
2492
2493#[derive(Debug, Clone)]
2494#[non_exhaustive]
2495/// Single named value.
2496pub struct ScalarMetric {
2497    /// Metric name.
2498    pub name: String,
2499    /// Unit for the metric value.
2500    pub unit: MetricUnit,
2501    /// Metric value.
2502    pub value: f64,
2503}
2504
2505#[derive(Debug, Clone)]
2506#[non_exhaustive]
2507/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2508/// specified interval.
2509pub struct AccumulationMetric {
2510    /// Timeseries name.
2511    pub name: String,
2512    /// Unit for the accumulated values.
2513    pub unit: MetricUnit,
2514    /// The interval at which datapoints are accumulated.
2515    pub interval: TimeseriesInterval,
2516    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2517    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2518    /// one `interval`.
2519    pub values: Vec<(u32, f64)>,
2520}
2521
2522#[derive(Debug, Clone)]
2523#[non_exhaustive]
2524/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2525pub struct GaugeMetric {
2526    /// Timeseries name.
2527    pub name: String,
2528    /// Unit for the instantaneous values.
2529    pub unit: MetricUnit,
2530    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2531    /// instant of the `timestamp` (in Unix epoch seconds).
2532    pub values: Vec<(u32, f64)>,
2533}
2534
2535#[derive(Debug, Clone)]
2536#[non_exhaustive]
2537/// Set of string labels.
2538pub struct LabelMetric {
2539    /// Label name.
2540    pub name: String,
2541    /// Label values.
2542    pub values: Vec<String>,
2543}
2544
2545#[derive(Debug, Clone)]
2546/// Individual metric in a returned metric set.
2547pub enum Metric {
2548    /// Single named value.
2549    Scalar(ScalarMetric),
2550    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2551    /// specified interval.
2552    Accumulation(AccumulationMetric),
2553    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2554    Gauge(GaugeMetric),
2555    /// Set of string labels.
2556    Label(LabelMetric),
2557}
2558
2559impl From<api::metrics::Metric> for Metric {
2560    fn from(value: api::metrics::Metric) -> Self {
2561        match value {
2562            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2563                name: sm.name.into(),
2564                unit: sm.unit.into(),
2565                value: sm.value,
2566            }),
2567            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2568                name: am.name.into(),
2569                unit: am.unit.into(),
2570                interval: am.interval.into(),
2571                values: am.values,
2572            }),
2573            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2574                name: gm.name.into(),
2575                unit: gm.unit.into(),
2576                values: gm.values,
2577            }),
2578            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2579                name: lm.name.into(),
2580                values: lm.values,
2581            }),
2582        }
2583    }
2584}
2585
2586#[derive(Debug, Clone, Default)]
2587#[non_exhaustive]
2588/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2589pub struct ListStreamsInput {
2590    /// Filter streams whose names begin with this value.
2591    ///
2592    /// Defaults to `""`.
2593    pub prefix: StreamNamePrefix,
2594    /// Filter streams whose names are lexicographically greater than this value.
2595    ///
2596    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2597    ///
2598    /// Defaults to `""`.
2599    pub start_after: StreamNameStartAfter,
2600    /// Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2601    ///
2602    /// Defaults to `1000`.
2603    pub limit: Option<usize>,
2604}
2605
2606impl ListStreamsInput {
2607    /// Create a new [`ListStreamsInput`] with default values.
2608    pub fn new() -> Self {
2609        Self::default()
2610    }
2611
2612    /// Set the prefix used to filter streams whose names begin with this value.
2613    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2614        Self { prefix, ..self }
2615    }
2616
2617    /// Set the value used to filter streams whose names are lexicographically greater than this
2618    /// value.
2619    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2620        Self {
2621            start_after,
2622            ..self
2623        }
2624    }
2625
2626    /// Set the limit on number of streams to return in a page.
2627    pub fn with_limit(self, limit: usize) -> Self {
2628        Self {
2629            limit: Some(limit),
2630            ..self
2631        }
2632    }
2633}
2634
2635impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2636    fn from(value: ListStreamsInput) -> Self {
2637        Self {
2638            prefix: Some(value.prefix),
2639            start_after: Some(value.start_after),
2640            limit: value.limit,
2641        }
2642    }
2643}
2644
2645#[derive(Debug, Clone, Default)]
2646/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2647pub struct ListAllStreamsInput {
2648    /// Filter streams whose names begin with this value.
2649    ///
2650    /// Defaults to `""`.
2651    pub prefix: StreamNamePrefix,
2652    /// Filter streams whose names are lexicographically greater than this value.
2653    ///
2654    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2655    ///
2656    /// Defaults to `""`.
2657    pub start_after: StreamNameStartAfter,
2658    /// Whether to include streams that are being deleted.
2659    ///
2660    /// Defaults to `false`.
2661    pub include_deleted: bool,
2662}
2663
2664impl ListAllStreamsInput {
2665    /// Create a new [`ListAllStreamsInput`] with default values.
2666    pub fn new() -> Self {
2667        Self::default()
2668    }
2669
2670    /// Set the prefix used to filter streams whose names begin with this value.
2671    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2672        Self { prefix, ..self }
2673    }
2674
2675    /// Set the value used to filter streams whose names are lexicographically greater than this
2676    /// value.
2677    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2678        Self {
2679            start_after,
2680            ..self
2681        }
2682    }
2683
2684    /// Set whether to include streams that are being deleted.
2685    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2686        Self {
2687            include_deleted,
2688            ..self
2689        }
2690    }
2691}
2692
2693#[derive(Debug, Clone, PartialEq, Eq)]
2694#[non_exhaustive]
2695/// Stream information.
2696pub struct StreamInfo {
2697    /// Stream name.
2698    pub name: StreamName,
2699    /// Creation time.
2700    pub created_at: S2DateTime,
2701    /// Deletion time if the stream is being deleted.
2702    pub deleted_at: Option<S2DateTime>,
2703}
2704
2705impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2706    type Error = ValidationError;
2707
2708    fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2709        Ok(Self {
2710            name: value.name,
2711            created_at: value.created_at.try_into()?,
2712            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2713        })
2714    }
2715}
2716
2717#[derive(Debug, Clone)]
2718#[non_exhaustive]
2719/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2720pub struct CreateStreamInput {
2721    /// Stream name.
2722    pub name: StreamName,
2723    /// Configuration for the stream.
2724    ///
2725    /// See [`StreamConfig`] for defaults.
2726    pub config: Option<StreamConfig>,
2727    idempotency_token: String,
2728}
2729
2730impl CreateStreamInput {
2731    /// Create a new [`CreateStreamInput`] with the given stream name.
2732    pub fn new(name: StreamName) -> Self {
2733        Self {
2734            name,
2735            config: None,
2736            idempotency_token: idempotency_token(),
2737        }
2738    }
2739
2740    /// Set the configuration for the stream.
2741    pub fn with_config(self, config: StreamConfig) -> Self {
2742        Self {
2743            config: Some(config),
2744            ..self
2745        }
2746    }
2747}
2748
2749impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2750    fn from(value: CreateStreamInput) -> Self {
2751        (
2752            api::stream::CreateStreamRequest {
2753                stream: value.name,
2754                config: value.config.map(Into::into),
2755            },
2756            value.idempotency_token,
2757        )
2758    }
2759}
2760
2761#[derive(Debug, Clone)]
2762#[non_exhaustive]
2763/// Input for [`create_or_reconfigure_stream`](crate::S2Basin::create_or_reconfigure_stream)
2764/// operation.
2765#[doc(hidden)]
2766#[cfg(feature = "_hidden")]
2767pub struct CreateOrReconfigureStreamInput {
2768    /// Stream name.
2769    pub name: StreamName,
2770    /// Reconfiguration for the stream.
2771    ///
2772    /// If `None`, the stream is created with default configuration or left unchanged if it exists.
2773    pub config: Option<StreamReconfiguration>,
2774}
2775
2776#[cfg(feature = "_hidden")]
2777impl CreateOrReconfigureStreamInput {
2778    /// Create a new [`CreateOrReconfigureStreamInput`] with the given stream name.
2779    pub fn new(name: StreamName) -> Self {
2780        Self { name, config: None }
2781    }
2782
2783    /// Set the reconfiguration for the stream.
2784    pub fn with_config(self, config: StreamReconfiguration) -> Self {
2785        Self {
2786            config: Some(config),
2787            ..self
2788        }
2789    }
2790}
2791
2792#[cfg(feature = "_hidden")]
2793impl From<CreateOrReconfigureStreamInput>
2794    for (StreamName, Option<api::config::StreamReconfiguration>)
2795{
2796    fn from(value: CreateOrReconfigureStreamInput) -> Self {
2797        (value.name, value.config.map(Into::into))
2798    }
2799}
2800
2801#[derive(Debug, Clone)]
2802#[non_exhaustive]
2803/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2804pub struct DeleteStreamInput {
2805    /// Stream name.
2806    pub name: StreamName,
2807    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2808    pub ignore_not_found: bool,
2809}
2810
2811impl DeleteStreamInput {
2812    /// Create a new [`DeleteStreamInput`] with the given stream name.
2813    pub fn new(name: StreamName) -> Self {
2814        Self {
2815            name,
2816            ignore_not_found: false,
2817        }
2818    }
2819
2820    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2821    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2822        Self {
2823            ignore_not_found,
2824            ..self
2825        }
2826    }
2827}
2828
2829#[derive(Debug, Clone)]
2830#[non_exhaustive]
2831/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2832pub struct ReconfigureStreamInput {
2833    /// Stream name.
2834    pub name: StreamName,
2835    /// Reconfiguration for [`StreamConfig`].
2836    pub config: StreamReconfiguration,
2837}
2838
2839impl ReconfigureStreamInput {
2840    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2841    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2842        Self { name, config }
2843    }
2844}
2845
2846#[derive(Debug, Clone, PartialEq, Eq)]
2847/// Token for fencing appends to a stream.
2848///
2849/// **Note:** It must not exceed 36 bytes in length.
2850///
2851/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2852pub struct FencingToken(String);
2853
2854impl FencingToken {
2855    /// Generate a random alphanumeric fencing token of `n` bytes.
2856    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2857        rand::rng()
2858            .sample_iter(&rand::distr::Alphanumeric)
2859            .take(n)
2860            .map(char::from)
2861            .collect::<String>()
2862            .parse()
2863    }
2864}
2865
2866impl FromStr for FencingToken {
2867    type Err = ValidationError;
2868
2869    fn from_str(s: &str) -> Result<Self, Self::Err> {
2870        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2871            return Err(ValidationError(format!(
2872                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2873            )));
2874        }
2875        Ok(FencingToken(s.to_string()))
2876    }
2877}
2878
2879impl std::fmt::Display for FencingToken {
2880    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2881        write!(f, "{}", self.0)
2882    }
2883}
2884
2885impl Deref for FencingToken {
2886    type Target = str;
2887
2888    fn deref(&self) -> &Self::Target {
2889        &self.0
2890    }
2891}
2892
2893#[derive(Debug, Clone, Copy, PartialEq)]
2894#[non_exhaustive]
2895/// A position in a stream.
2896pub struct StreamPosition {
2897    /// Sequence number assigned by the service.
2898    pub seq_num: u64,
2899    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2900    /// User-specified timestamps are passed through as-is.
2901    pub timestamp: u64,
2902}
2903
2904impl std::fmt::Display for StreamPosition {
2905    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2906        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2907    }
2908}
2909
2910impl From<api::stream::proto::StreamPosition> for StreamPosition {
2911    fn from(value: api::stream::proto::StreamPosition) -> Self {
2912        Self {
2913            seq_num: value.seq_num,
2914            timestamp: value.timestamp,
2915        }
2916    }
2917}
2918
2919impl From<api::stream::StreamPosition> for StreamPosition {
2920    fn from(value: api::stream::StreamPosition) -> Self {
2921        Self {
2922            seq_num: value.seq_num,
2923            timestamp: value.timestamp,
2924        }
2925    }
2926}
2927
2928#[derive(Debug, Clone, PartialEq)]
2929#[non_exhaustive]
2930/// A name-value pair.
2931pub struct Header {
2932    /// Name.
2933    pub name: Bytes,
2934    /// Value.
2935    pub value: Bytes,
2936}
2937
2938impl Header {
2939    /// Create a new [`Header`] with the given name and value.
2940    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2941        Self {
2942            name: name.into(),
2943            value: value.into(),
2944        }
2945    }
2946}
2947
2948impl From<Header> for api::stream::proto::Header {
2949    fn from(value: Header) -> Self {
2950        Self {
2951            name: value.name,
2952            value: value.value,
2953        }
2954    }
2955}
2956
2957impl From<api::stream::proto::Header> for Header {
2958    fn from(value: api::stream::proto::Header) -> Self {
2959        Self {
2960            name: value.name,
2961            value: value.value,
2962        }
2963    }
2964}
2965
2966#[derive(Debug, Clone, PartialEq)]
2967/// A record to append.
2968pub struct AppendRecord {
2969    body: Bytes,
2970    headers: Vec<Header>,
2971    timestamp: Option<u64>,
2972}
2973
2974impl AppendRecord {
2975    fn validate(self) -> Result<Self, ValidationError> {
2976        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2977            Err(ValidationError(format!(
2978                "metered_bytes: {} exceeds {}",
2979                self.metered_bytes(),
2980                RECORD_BATCH_MAX.bytes
2981            )))
2982        } else {
2983            Ok(self)
2984        }
2985    }
2986
2987    /// Create a new [`AppendRecord`] with the given record body.
2988    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2989        let record = Self {
2990            body: body.into(),
2991            headers: Vec::default(),
2992            timestamp: None,
2993        };
2994        record.validate()
2995    }
2996
2997    /// Set the headers for this record.
2998    pub fn with_headers(
2999        self,
3000        headers: impl IntoIterator<Item = Header>,
3001    ) -> Result<Self, ValidationError> {
3002        let record = Self {
3003            headers: headers.into_iter().collect(),
3004            ..self
3005        };
3006        record.validate()
3007    }
3008
3009    /// Set the timestamp for this record.
3010    ///
3011    /// Precise semantics depend on [`StreamConfig::timestamping`].
3012    pub fn with_timestamp(self, timestamp: u64) -> Self {
3013        Self {
3014            timestamp: Some(timestamp),
3015            ..self
3016        }
3017    }
3018
3019    /// Get the body of this record.
3020    pub fn body(&self) -> &[u8] {
3021        &self.body
3022    }
3023
3024    /// Get the headers of this record.
3025    pub fn headers(&self) -> &[Header] {
3026        &self.headers
3027    }
3028
3029    /// Get the timestamp of this record.
3030    pub fn timestamp(&self) -> Option<u64> {
3031        self.timestamp
3032    }
3033}
3034
3035impl From<AppendRecord> for api::stream::proto::AppendRecord {
3036    fn from(value: AppendRecord) -> Self {
3037        Self {
3038            timestamp: value.timestamp,
3039            headers: value.headers.into_iter().map(Into::into).collect(),
3040            body: value.body,
3041        }
3042    }
3043}
3044
3045/// Metered byte size calculation.
3046///
3047/// Formula for a record:
3048/// ```text
3049/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
3050/// ```
3051pub trait MeteredBytes {
3052    /// Returns the metered byte size.
3053    fn metered_bytes(&self) -> usize;
3054}
3055
3056macro_rules! metered_bytes_impl {
3057    ($ty:ty) => {
3058        impl MeteredBytes for $ty {
3059            fn metered_bytes(&self) -> usize {
3060                8 + (2 * self.headers.len())
3061                    + self
3062                        .headers
3063                        .iter()
3064                        .map(|h| h.name.len() + h.value.len())
3065                        .sum::<usize>()
3066                    + self.body.len()
3067            }
3068        }
3069    };
3070}
3071
3072metered_bytes_impl!(AppendRecord);
3073
3074#[derive(Debug, Clone)]
3075/// A batch of records to append atomically.
3076///
3077/// **Note:** It must contain at least `1` record and no more than `1000`.
3078/// The total size of the batch must not exceed `1MiB` in metered bytes.
3079///
3080/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
3081/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
3082/// that takes care of the abovementioned constraints.
3083pub struct AppendRecordBatch {
3084    records: Vec<AppendRecord>,
3085    metered_bytes: usize,
3086}
3087
3088impl AppendRecordBatch {
3089    pub(crate) fn with_capacity(capacity: usize) -> Self {
3090        Self {
3091            records: Vec::with_capacity(capacity),
3092            metered_bytes: 0,
3093        }
3094    }
3095
3096    pub(crate) fn push(&mut self, record: AppendRecord) {
3097        self.metered_bytes += record.metered_bytes();
3098        self.records.push(record);
3099    }
3100
3101    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
3102    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3103    where
3104        I: IntoIterator<Item = AppendRecord>,
3105    {
3106        let mut records = Vec::new();
3107        let mut metered_bytes = 0;
3108
3109        for record in iter {
3110            metered_bytes += record.metered_bytes();
3111            records.push(record);
3112
3113            if metered_bytes > RECORD_BATCH_MAX.bytes {
3114                return Err(ValidationError(format!(
3115                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
3116                    RECORD_BATCH_MAX.bytes
3117                )));
3118            }
3119
3120            if records.len() > RECORD_BATCH_MAX.count {
3121                return Err(ValidationError(format!(
3122                    "number of records in the batch exceeds {}",
3123                    RECORD_BATCH_MAX.count
3124                )));
3125            }
3126        }
3127
3128        if records.is_empty() {
3129            return Err(ValidationError("batch is empty".into()));
3130        }
3131
3132        Ok(Self {
3133            records,
3134            metered_bytes,
3135        })
3136    }
3137}
3138
3139impl Deref for AppendRecordBatch {
3140    type Target = [AppendRecord];
3141
3142    fn deref(&self) -> &Self::Target {
3143        &self.records
3144    }
3145}
3146
3147impl MeteredBytes for AppendRecordBatch {
3148    fn metered_bytes(&self) -> usize {
3149        self.metered_bytes
3150    }
3151}
3152
3153#[derive(Debug, Clone)]
3154/// Command to signal an operation.
3155pub enum Command {
3156    /// Fence operation.
3157    Fence {
3158        /// Fencing token.
3159        fencing_token: FencingToken,
3160    },
3161    /// Trim operation.
3162    Trim {
3163        /// Trim point.
3164        trim_point: u64,
3165    },
3166}
3167
3168#[derive(Debug, Clone)]
3169#[non_exhaustive]
3170/// Command record for signaling operations to the service.
3171///
3172/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
3173pub struct CommandRecord {
3174    /// Command to signal an operation.
3175    pub command: Command,
3176    /// Timestamp for this record.
3177    pub timestamp: Option<u64>,
3178}
3179
3180impl CommandRecord {
3181    const FENCE: &[u8] = b"fence";
3182    const TRIM: &[u8] = b"trim";
3183
3184    /// Create a fence command record with the given fencing token.
3185    ///
3186    /// Fencing is strongly consistent, and subsequent appends that specify a
3187    /// fencing token will fail if it does not match.
3188    pub fn fence(fencing_token: FencingToken) -> Self {
3189        Self {
3190            command: Command::Fence { fencing_token },
3191            timestamp: None,
3192        }
3193    }
3194
3195    /// Create a trim command record with the given trim point.
3196    ///
3197    /// Trim point is the desired earliest sequence number for the stream.
3198    ///
3199    /// Trimming is eventually consistent, and trimmed records may be visible
3200    /// for a brief period.
3201    pub fn trim(trim_point: u64) -> Self {
3202        Self {
3203            command: Command::Trim { trim_point },
3204            timestamp: None,
3205        }
3206    }
3207
3208    /// Set the timestamp for this record.
3209    pub fn with_timestamp(self, timestamp: u64) -> Self {
3210        Self {
3211            timestamp: Some(timestamp),
3212            ..self
3213        }
3214    }
3215}
3216
3217impl From<CommandRecord> for AppendRecord {
3218    fn from(value: CommandRecord) -> Self {
3219        let (header_value, body) = match value.command {
3220            Command::Fence { fencing_token } => (
3221                CommandRecord::FENCE,
3222                Bytes::copy_from_slice(fencing_token.as_bytes()),
3223            ),
3224            Command::Trim { trim_point } => (
3225                CommandRecord::TRIM,
3226                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3227            ),
3228        };
3229        Self {
3230            body,
3231            headers: vec![Header::new("", header_value)],
3232            timestamp: value.timestamp,
3233        }
3234    }
3235}
3236
3237#[derive(Debug, Clone)]
3238#[non_exhaustive]
3239/// Input for [`append`](crate::S2Stream::append) operation and
3240/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3241pub struct AppendInput {
3242    /// Batch of records to append atomically.
3243    pub records: AppendRecordBatch,
3244    /// Expected sequence number for the first record in the batch.
3245    ///
3246    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3247    pub match_seq_num: Option<u64>,
3248    /// Fencing token to match against the stream's current fencing token.
3249    ///
3250    /// If unspecified, no matching is performed. If specified and mismatched,
3251    /// the append fails. A stream defaults to `""` as its fencing token.
3252    pub fencing_token: Option<FencingToken>,
3253}
3254
3255impl AppendInput {
3256    /// Create a new [`AppendInput`] with the given batch of records.
3257    pub fn new(records: AppendRecordBatch) -> Self {
3258        Self {
3259            records,
3260            match_seq_num: None,
3261            fencing_token: None,
3262        }
3263    }
3264
3265    /// Set the expected sequence number for the first record in the batch.
3266    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3267        Self {
3268            match_seq_num: Some(match_seq_num),
3269            ..self
3270        }
3271    }
3272
3273    /// Set the fencing token to match against the stream's current fencing token.
3274    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3275        Self {
3276            fencing_token: Some(fencing_token),
3277            ..self
3278        }
3279    }
3280}
3281
3282impl From<AppendInput> for api::stream::proto::AppendInput {
3283    fn from(value: AppendInput) -> Self {
3284        Self {
3285            records: value.records.iter().cloned().map(Into::into).collect(),
3286            match_seq_num: value.match_seq_num,
3287            fencing_token: value.fencing_token.map(|t| t.to_string()),
3288        }
3289    }
3290}
3291
3292#[derive(Debug, Clone, PartialEq)]
3293#[non_exhaustive]
3294/// Acknowledgement for an [`AppendInput`].
3295pub struct AppendAck {
3296    /// Sequence number and timestamp of the first record that was appended.
3297    pub start: StreamPosition,
3298    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3299    /// that was appended.
3300    ///
3301    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3302    /// appended.
3303    pub end: StreamPosition,
3304    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3305    /// the last record on the stream.
3306    ///
3307    /// This can be greater than the `end` position in case of concurrent appends.
3308    pub tail: StreamPosition,
3309}
3310
3311impl From<api::stream::proto::AppendAck> for AppendAck {
3312    fn from(value: api::stream::proto::AppendAck) -> Self {
3313        Self {
3314            start: value.start.unwrap_or_default().into(),
3315            end: value.end.unwrap_or_default().into(),
3316            tail: value.tail.unwrap_or_default().into(),
3317        }
3318    }
3319}
3320
3321#[derive(Debug, Clone, Copy)]
3322/// Starting position for reading from a stream.
3323pub enum ReadFrom {
3324    /// Read from this sequence number.
3325    SeqNum(u64),
3326    /// Read from this timestamp.
3327    Timestamp(u64),
3328    /// Read from N records before the tail.
3329    TailOffset(u64),
3330}
3331
3332impl Default for ReadFrom {
3333    fn default() -> Self {
3334        Self::SeqNum(0)
3335    }
3336}
3337
3338#[derive(Debug, Default, Clone)]
3339#[non_exhaustive]
3340/// Where to start reading.
3341pub struct ReadStart {
3342    /// Starting position.
3343    ///
3344    /// Defaults to reading from sequence number `0`.
3345    pub from: ReadFrom,
3346    /// Whether to start from tail if the requested starting position is beyond it.
3347    ///
3348    /// Defaults to `false` (errors if position is beyond tail).
3349    pub clamp_to_tail: bool,
3350}
3351
3352impl ReadStart {
3353    /// Create a new [`ReadStart`] with default values.
3354    pub fn new() -> Self {
3355        Self::default()
3356    }
3357
3358    /// Set the starting position.
3359    pub fn with_from(self, from: ReadFrom) -> Self {
3360        Self { from, ..self }
3361    }
3362
3363    /// Set whether to start from tail if the requested starting position is beyond it.
3364    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3365        Self {
3366            clamp_to_tail,
3367            ..self
3368        }
3369    }
3370}
3371
3372impl From<ReadStart> for api::stream::ReadStart {
3373    fn from(value: ReadStart) -> Self {
3374        let (seq_num, timestamp, tail_offset) = match value.from {
3375            ReadFrom::SeqNum(n) => (Some(n), None, None),
3376            ReadFrom::Timestamp(t) => (None, Some(t), None),
3377            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3378        };
3379        Self {
3380            seq_num,
3381            timestamp,
3382            tail_offset,
3383            clamp: if value.clamp_to_tail {
3384                Some(true)
3385            } else {
3386                None
3387            },
3388        }
3389    }
3390}
3391
3392#[derive(Debug, Clone, Default)]
3393#[non_exhaustive]
3394/// Limits on how much to read.
3395pub struct ReadLimits {
3396    /// Limit on number of records.
3397    ///
3398    /// Defaults to `1000` for non-streaming read.
3399    pub count: Option<usize>,
3400    /// Limit on total metered bytes of records.
3401    ///
3402    /// Defaults to `1MiB` for non-streaming read.
3403    pub bytes: Option<usize>,
3404}
3405
3406impl ReadLimits {
3407    /// Create a new [`ReadLimits`] with default values.
3408    pub fn new() -> Self {
3409        Self::default()
3410    }
3411
3412    /// Set the limit on number of records.
3413    pub fn with_count(self, count: usize) -> Self {
3414        Self {
3415            count: Some(count),
3416            ..self
3417        }
3418    }
3419
3420    /// Set the limit on total metered bytes of records.
3421    pub fn with_bytes(self, bytes: usize) -> Self {
3422        Self {
3423            bytes: Some(bytes),
3424            ..self
3425        }
3426    }
3427}
3428
3429#[derive(Debug, Clone, Default)]
3430#[non_exhaustive]
3431/// When to stop reading.
3432pub struct ReadStop {
3433    /// Limits on how much to read.
3434    ///
3435    /// See [`ReadLimits`] for defaults.
3436    pub limits: ReadLimits,
3437    /// Timestamp at which to stop (exclusive).
3438    ///
3439    /// Defaults to `None`.
3440    pub until: Option<RangeTo<u64>>,
3441    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3442    /// seconds for [`read`](crate::S2Stream::read).
3443    ///
3444    /// Defaults to:
3445    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3446    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3447    ///   is specified.
3448    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3449    ///   `until` is specified.
3450    pub wait: Option<u32>,
3451}
3452
3453impl ReadStop {
3454    /// Create a new [`ReadStop`] with default values.
3455    pub fn new() -> Self {
3456        Self::default()
3457    }
3458
3459    /// Set the limits on how much to read.
3460    pub fn with_limits(self, limits: ReadLimits) -> Self {
3461        Self { limits, ..self }
3462    }
3463
3464    /// Set the timestamp at which to stop (exclusive).
3465    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3466        Self {
3467            until: Some(until),
3468            ..self
3469        }
3470    }
3471
3472    /// Set the duration in seconds to wait for new records before stopping.
3473    pub fn with_wait(self, wait: u32) -> Self {
3474        Self {
3475            wait: Some(wait),
3476            ..self
3477        }
3478    }
3479}
3480
3481impl From<ReadStop> for api::stream::ReadEnd {
3482    fn from(value: ReadStop) -> Self {
3483        Self {
3484            count: value.limits.count,
3485            bytes: value.limits.bytes,
3486            until: value.until.map(|r| r.end),
3487            wait: value.wait,
3488        }
3489    }
3490}
3491
3492#[derive(Debug, Clone, Default)]
3493#[non_exhaustive]
3494/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3495/// operations.
3496pub struct ReadInput {
3497    /// Where to start reading.
3498    ///
3499    /// See [`ReadStart`] for defaults.
3500    pub start: ReadStart,
3501    /// When to stop reading.
3502    ///
3503    /// See [`ReadStop`] for defaults.
3504    pub stop: ReadStop,
3505    /// Whether to filter out command records from the stream when reading.
3506    ///
3507    /// Defaults to `false`.
3508    pub ignore_command_records: bool,
3509}
3510
3511impl ReadInput {
3512    /// Create a new [`ReadInput`] with default values.
3513    pub fn new() -> Self {
3514        Self::default()
3515    }
3516
3517    /// Set where to start reading.
3518    pub fn with_start(self, start: ReadStart) -> Self {
3519        Self { start, ..self }
3520    }
3521
3522    /// Set when to stop reading.
3523    pub fn with_stop(self, stop: ReadStop) -> Self {
3524        Self { stop, ..self }
3525    }
3526
3527    /// Set whether to filter out command records from the stream when reading.
3528    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3529        Self {
3530            ignore_command_records,
3531            ..self
3532        }
3533    }
3534}
3535
3536#[derive(Debug, Clone)]
3537#[non_exhaustive]
3538/// Record that is durably sequenced on a stream.
3539pub struct SequencedRecord {
3540    /// Sequence number assigned to this record.
3541    pub seq_num: u64,
3542    /// Body of this record.
3543    pub body: Bytes,
3544    /// Headers for this record.
3545    pub headers: Vec<Header>,
3546    /// Timestamp for this record.
3547    pub timestamp: u64,
3548}
3549
3550impl SequencedRecord {
3551    /// Whether this is a command record.
3552    pub fn is_command_record(&self) -> bool {
3553        self.headers.len() == 1 && *self.headers[0].name == *b""
3554    }
3555}
3556
3557impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3558    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3559        Self {
3560            seq_num: value.seq_num,
3561            body: value.body,
3562            headers: value.headers.into_iter().map(Into::into).collect(),
3563            timestamp: value.timestamp,
3564        }
3565    }
3566}
3567
3568metered_bytes_impl!(SequencedRecord);
3569
3570#[derive(Debug, Clone)]
3571#[non_exhaustive]
3572/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3573/// [`read_session`](crate::S2Stream::read_session).
3574pub struct ReadBatch {
3575    /// Records that are durably sequenced on the stream.
3576    ///
3577    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3578    /// - the [`stop condition`](ReadInput::stop) was already met, or
3579    /// - all records in the batch were command records and
3580    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3581    pub records: Vec<SequencedRecord>,
3582    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3583    /// the last record.
3584    ///
3585    /// It will only be present when reading recent records.
3586    pub tail: Option<StreamPosition>,
3587}
3588
3589impl ReadBatch {
3590    pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3591        Self {
3592            records: batch.records.into_iter().map(Into::into).collect(),
3593            tail: batch.tail.map(Into::into),
3594        }
3595    }
3596}
3597
3598/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3599pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3600
3601#[derive(Debug, Clone, thiserror::Error)]
3602/// Why an append condition check failed.
3603pub enum AppendConditionFailed {
3604    #[error("fencing token mismatch, expected: {0}")]
3605    /// Fencing token did not match. Contains the expected fencing token.
3606    FencingTokenMismatch(FencingToken),
3607    #[error("sequence number mismatch, expected: {0}")]
3608    /// Sequence number did not match. Contains the expected sequence number.
3609    SeqNumMismatch(u64),
3610}
3611
3612impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3613    fn from(value: api::stream::AppendConditionFailed) -> Self {
3614        match value {
3615            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3616                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3617            }
3618            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3619                AppendConditionFailed::SeqNumMismatch(seq)
3620            }
3621        }
3622    }
3623}
3624
3625#[derive(Debug, Clone, thiserror::Error)]
3626/// Errors from S2 operations.
3627pub enum S2Error {
3628    #[error("{0}")]
3629    /// Client-side error.
3630    Client(String),
3631    #[error(transparent)]
3632    /// Validation error.
3633    Validation(#[from] ValidationError),
3634    #[error("{0}")]
3635    /// Append condition check failed. Contains the failure reason.
3636    AppendConditionFailed(AppendConditionFailed),
3637    #[error("read from an unwritten position. current tail: {0}")]
3638    /// Read from an unwritten position. Contains the current tail.
3639    ReadUnwritten(StreamPosition),
3640    #[error("{0}")]
3641    /// Other server-side error.
3642    Server(ErrorResponse),
3643}
3644
3645impl From<ApiError> for S2Error {
3646    fn from(err: ApiError) -> Self {
3647        match err {
3648            ApiError::ReadUnwritten(tail_response) => {
3649                Self::ReadUnwritten(tail_response.tail.into())
3650            }
3651            ApiError::AppendConditionFailed(condition_failed) => {
3652                Self::AppendConditionFailed(condition_failed.into())
3653            }
3654            ApiError::Server(_, response) => Self::Server(response.into()),
3655            other => Self::Client(other.to_string()),
3656        }
3657    }
3658}
3659
3660#[derive(Debug, Clone, thiserror::Error)]
3661#[error("{code}: {message}")]
3662#[non_exhaustive]
3663/// Error response from S2 server.
3664pub struct ErrorResponse {
3665    /// Error code.
3666    pub code: String,
3667    /// Error message.
3668    pub message: String,
3669}
3670
3671impl From<ApiErrorResponse> for ErrorResponse {
3672    fn from(response: ApiErrorResponse) -> Self {
3673        Self {
3674            code: response.code,
3675            message: response.message,
3676        }
3677    }
3678}
3679
3680fn idempotency_token() -> String {
3681    uuid::Uuid::new_v4().simple().to_string()
3682}