s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Stream name.
42///
43/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
44pub use s2_common::types::stream::StreamName;
45/// See [`ListStreamsInput::prefix`].
46pub use s2_common::types::stream::StreamNamePrefix;
47/// See [`ListStreamsInput::start_after`].
48pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57/// An RFC 3339 datetime.
58///
59/// It can be created in either of the following ways:
60/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
61/// - Convert from [`time::OffsetDateTime`] using [`From`]/[`Into`].
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66    fn from(dt: time::OffsetDateTime) -> Self {
67        Self(dt)
68    }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72    fn from(dt: S2DateTime) -> Self {
73        dt.0
74    }
75}
76
77impl FromStr for S2DateTime {
78    type Err = ValidationError;
79
80    fn from_str(s: &str) -> Result<Self, Self::Err> {
81        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82            .map(Self)
83            .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84    }
85}
86
87impl fmt::Display for S2DateTime {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        write!(
90            f,
91            "{}",
92            self.0
93                .format(&time::format_description::well_known::Rfc3339)
94                .expect("RFC3339 formatting should not fail for S2DateTime")
95        )
96    }
97}
98
99/// Authority for connecting to an S2 basin.
100#[derive(Debug, Clone, PartialEq)]
101pub enum BasinAuthority {
102    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
103    ParentZone(Authority),
104    /// Direct cell authority. Basin is expected to be hosted by this cell.
105    Direct(Authority),
106}
107
108#[derive(Debug, Clone)]
109#[non_exhaustive]
110/// Endpoints for the S2 environment.
111pub struct S2Endpoints {
112    /// URI scheme.
113    ///
114    /// Defaults to `https`.
115    pub scheme: Scheme,
116    /// Account authority.
117    pub account_authority: Authority,
118    /// Basin authority.
119    pub basin_authority: BasinAuthority,
120}
121
122impl S2Endpoints {
123    /// Create a new [`S2Endpoints`] with the given account and basin authorities.
124    pub fn new(account_authority: Authority, basin_authority: BasinAuthority) -> Self {
125        Self {
126            scheme: Scheme::HTTPS,
127            account_authority,
128            basin_authority,
129        }
130    }
131
132    /// Create a new [`S2Endpoints`] from environment variables.
133    ///
134    /// The following environment variables are expected to be set:
135    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
136    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
137    pub fn from_env() -> Result<Self, ValidationError> {
138        let account_endpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
139            Ok(s) => s,
140            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
141            Err(VarError::NotUnicode(_)) => {
142                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
143            }
144        };
145
146        let basin_endpoint = match std::env::var("S2_BASIN_ENDPOINT") {
147            Ok(s) => s,
148            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
149            Err(VarError::NotUnicode(_)) => {
150                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
151            }
152        };
153
154        let (account_scheme, account_authority) = parse_account_endpoint(&account_endpoint)?;
155
156        let (basin_scheme, basin_authority) = parse_basin_endpoint(&basin_endpoint)?;
157
158        if account_scheme != basin_scheme {
159            return Err(
160                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
161            );
162        }
163
164        Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
165    }
166
167    #[doc(hidden)]
168    #[cfg(feature = "_hidden")]
169    pub fn parse_from(
170        account_endpoint: &str,
171        basin_endpoint: &str,
172    ) -> Result<Self, ValidationError> {
173        let (account_scheme, account_authority) = parse_account_endpoint(account_endpoint)?;
174        let (basin_scheme, basin_authority) = parse_basin_endpoint(basin_endpoint)?;
175
176        if account_scheme != basin_scheme {
177            return Err("account and basin endpoints must have the same scheme".into());
178        }
179        Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
180    }
181
182    /// Set the URI scheme.
183    pub fn with_scheme(self, scheme: Scheme) -> Self {
184        Self { scheme, ..self }
185    }
186
187    pub(crate) fn for_aws() -> Self {
188        Self {
189            scheme: Scheme::HTTPS,
190            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
191            basin_authority: BasinAuthority::ParentZone(
192                "b.aws.s2.dev".try_into().expect("valid authority"),
193            ),
194        }
195    }
196}
197
198fn parse_account_endpoint(s: &str) -> Result<(Scheme, Authority), ValidationError> {
199    let (scheme, authority) = match s.find("://") {
200        Some(idx) => {
201            let scheme: Scheme = s[..idx]
202                .parse()
203                .map_err(|_| "invalid account endpoint scheme".to_string())?;
204            (scheme, &s[idx + 3..])
205        }
206        None => (Scheme::HTTPS, s),
207    };
208    Ok((
209        scheme,
210        authority
211            .parse()
212            .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
213    ))
214}
215
216fn parse_basin_endpoint(s: &str) -> Result<(Scheme, BasinAuthority), ValidationError> {
217    let (scheme, authority) = match s.find("://") {
218        Some(idx) => {
219            let scheme: Scheme = s[..idx]
220                .parse()
221                .map_err(|_| "invalid basin endpoint scheme".to_string())?;
222            (scheme, &s[idx + 3..])
223        }
224        None => (Scheme::HTTPS, s),
225    };
226    let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
227        BasinAuthority::ParentZone(
228            authority
229                .parse()
230                .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
231        )
232    } else {
233        BasinAuthority::Direct(
234            authority
235                .parse()
236                .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
237        )
238    };
239    Ok((scheme, authority))
240}
241
242#[derive(Debug, Clone, Copy)]
243/// Compression algorithm for request and response bodies.
244pub enum Compression {
245    /// No compression.
246    None,
247    /// Gzip compression.
248    Gzip,
249    /// Zstd compression.
250    Zstd,
251}
252
253impl From<Compression> for CompressionAlgorithm {
254    fn from(value: Compression) -> Self {
255        match value {
256            Compression::None => CompressionAlgorithm::None,
257            Compression::Gzip => CompressionAlgorithm::Gzip,
258            Compression::Zstd => CompressionAlgorithm::Zstd,
259        }
260    }
261}
262
263#[derive(Debug, Clone, Copy, PartialEq)]
264#[non_exhaustive]
265/// Retry policy for [`append`](crate::S2Stream::append) and
266/// [`append_session`](crate::S2Stream::append_session) operations.
267pub enum AppendRetryPolicy {
268    /// Retry all appends. Use when duplicate records on the stream are acceptable.
269    All,
270    /// Only retry appends that include [`match_seq_num`](AppendInput::match_seq_num).
271    NoSideEffects,
272}
273
274impl AppendRetryPolicy {
275    pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
276        match self {
277            Self::All => true,
278            Self::NoSideEffects => input.match_seq_num.is_some(),
279        }
280    }
281}
282
283#[derive(Debug, Clone)]
284#[non_exhaustive]
285/// Configuration for retrying requests in case of transient failures.
286///
287/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
288/// ```text
289/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
290///     jitter = rand[0, base_delay]
291///     delay  = base_delay + jitter
292/// ````
293pub struct RetryConfig {
294    /// Total number of attempts including the initial try. A value of `1` means no retries.
295    ///
296    /// Defaults to `3`.
297    pub max_attempts: NonZeroU32,
298    /// Minimum base delay for retries.
299    ///
300    /// Defaults to `100ms`.
301    pub min_base_delay: Duration,
302    /// Maximum base delay for retries.
303    ///
304    /// Defaults to `1s`.
305    pub max_base_delay: Duration,
306    /// Retry policy for [`append`](crate::S2Stream::append) and
307    /// [`append_session`](crate::S2Stream::append_session) operations.
308    ///
309    /// Defaults to `All`.
310    pub append_retry_policy: AppendRetryPolicy,
311}
312
313impl Default for RetryConfig {
314    fn default() -> Self {
315        Self {
316            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
317            min_base_delay: Duration::from_millis(100),
318            max_base_delay: Duration::from_secs(1),
319            append_retry_policy: AppendRetryPolicy::All,
320        }
321    }
322}
323
324impl RetryConfig {
325    /// Create a new [`RetryConfig`] with default settings.
326    pub fn new() -> Self {
327        Self::default()
328    }
329
330    pub(crate) fn max_retries(&self) -> u32 {
331        self.max_attempts.get() - 1
332    }
333
334    /// Set the total number of attempts including the initial try.
335    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
336        Self {
337            max_attempts,
338            ..self
339        }
340    }
341
342    /// Set the minimum base delay for retries.
343    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
344        Self {
345            min_base_delay,
346            ..self
347        }
348    }
349
350    /// Set the maximum base delay for retries.
351    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
352        Self {
353            max_base_delay,
354            ..self
355        }
356    }
357
358    /// Set the retry policy for [`append`](crate::S2Stream::append) and
359    /// [`append_session`](crate::S2Stream::append_session) operations.
360    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
361        Self {
362            append_retry_policy,
363            ..self
364        }
365    }
366}
367
368#[derive(Debug, Clone)]
369#[non_exhaustive]
370/// Configuration for [`S2`](crate::S2).
371pub struct S2Config {
372    pub(crate) access_token: SecretString,
373    pub(crate) endpoints: S2Endpoints,
374    pub(crate) connection_timeout: Duration,
375    pub(crate) request_timeout: Duration,
376    pub(crate) retry: RetryConfig,
377    pub(crate) compression: Compression,
378    pub(crate) user_agent: HeaderValue,
379}
380
381impl S2Config {
382    /// Create a new [`S2Config`] with the given access token and default settings.
383    pub fn new(access_token: impl Into<String>) -> Self {
384        Self {
385            access_token: access_token.into().into(),
386            endpoints: S2Endpoints::for_aws(),
387            connection_timeout: Duration::from_secs(3),
388            request_timeout: Duration::from_secs(5),
389            retry: RetryConfig::new(),
390            compression: Compression::None,
391            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
392                .parse()
393                .expect("valid user agent"),
394        }
395    }
396
397    /// Set the S2 endpoints to connect to.
398    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
399        Self { endpoints, ..self }
400    }
401
402    /// Set the timeout for establishing a connection to the server.
403    ///
404    /// Defaults to `3s`.
405    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
406        Self {
407            connection_timeout,
408            ..self
409        }
410    }
411
412    /// Set the timeout for requests.
413    ///
414    /// Defaults to `5s`.
415    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
416        Self {
417            request_timeout,
418            ..self
419        }
420    }
421
422    /// Set the retry configuration for requests.
423    ///
424    /// See [`RetryConfig`] for defaults.
425    pub fn with_retry(self, retry: RetryConfig) -> Self {
426        Self { retry, ..self }
427    }
428
429    /// Set the compression algorithm for requests and responses.
430    ///
431    /// Defaults to no compression.
432    pub fn with_compression(self, compression: Compression) -> Self {
433        Self {
434            compression,
435            ..self
436        }
437    }
438
439    #[doc(hidden)]
440    #[cfg(feature = "_hidden")]
441    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
442        let user_agent = user_agent
443            .into()
444            .parse()
445            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
446        Ok(Self { user_agent, ..self })
447    }
448}
449
450#[derive(Debug, Default, Clone, PartialEq, Eq)]
451#[non_exhaustive]
452/// A page of values.
453pub struct Page<T> {
454    /// Values in this page.
455    pub values: Vec<T>,
456    /// Whether there are more pages.
457    pub has_more: bool,
458}
459
460impl<T> Page<T> {
461    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
462        Self {
463            values: values.into(),
464            has_more,
465        }
466    }
467}
468
469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
470/// Storage class for recent appends.
471pub enum StorageClass {
472    /// Standard storage class that offers append latencies under `500ms`.
473    Standard,
474    /// Express storage class that offers append latencies under `50ms`.
475    Express,
476}
477
478impl From<api::config::StorageClass> for StorageClass {
479    fn from(value: api::config::StorageClass) -> Self {
480        match value {
481            api::config::StorageClass::Standard => StorageClass::Standard,
482            api::config::StorageClass::Express => StorageClass::Express,
483        }
484    }
485}
486
487impl From<StorageClass> for api::config::StorageClass {
488    fn from(value: StorageClass) -> Self {
489        match value {
490            StorageClass::Standard => api::config::StorageClass::Standard,
491            StorageClass::Express => api::config::StorageClass::Express,
492        }
493    }
494}
495
496#[derive(Debug, Clone, Copy, PartialEq, Eq)]
497/// Retention policy for records in a stream.
498pub enum RetentionPolicy {
499    /// Age in seconds. Records older than this age are automatically trimmed.
500    Age(u64),
501    /// Records are retained indefinitely unless explicitly trimmed.
502    Infinite,
503}
504
505impl From<api::config::RetentionPolicy> for RetentionPolicy {
506    fn from(value: api::config::RetentionPolicy) -> Self {
507        match value {
508            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
509            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
510        }
511    }
512}
513
514impl From<RetentionPolicy> for api::config::RetentionPolicy {
515    fn from(value: RetentionPolicy) -> Self {
516        match value {
517            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
518            RetentionPolicy::Infinite => {
519                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
520            }
521        }
522    }
523}
524
525#[derive(Debug, Clone, Copy, PartialEq, Eq)]
526/// Timestamping mode for appends that influences how timestamps are handled.
527pub enum TimestampingMode {
528    /// Prefer client-specified timestamp if present otherwise use arrival time.
529    ClientPrefer,
530    /// Require a client-specified timestamp and reject the append if it is missing.
531    ClientRequire,
532    /// Use the arrival time and ignore any client-specified timestamp.
533    Arrival,
534}
535
536impl From<api::config::TimestampingMode> for TimestampingMode {
537    fn from(value: api::config::TimestampingMode) -> Self {
538        match value {
539            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
540            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
541            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
542        }
543    }
544}
545
546impl From<TimestampingMode> for api::config::TimestampingMode {
547    fn from(value: TimestampingMode) -> Self {
548        match value {
549            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
550            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
551            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
552        }
553    }
554}
555
556#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
557#[non_exhaustive]
558/// Configuration for timestamping behavior.
559pub struct TimestampingConfig {
560    /// Timestamping mode for appends that influences how timestamps are handled.
561    ///
562    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
563    pub mode: Option<TimestampingMode>,
564    /// Whether client-specified timestamps are allowed to exceed the arrival time.
565    ///
566    /// Defaults to `false` (client timestamps are capped at the arrival time).
567    pub uncapped: bool,
568}
569
570impl TimestampingConfig {
571    /// Create a new [`TimestampingConfig`] with default settings.
572    pub fn new() -> Self {
573        Self::default()
574    }
575
576    /// Set the timestamping mode for appends that influences how timestamps are handled.
577    pub fn with_mode(self, mode: TimestampingMode) -> Self {
578        Self {
579            mode: Some(mode),
580            ..self
581        }
582    }
583
584    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
585    pub fn with_uncapped(self, uncapped: bool) -> Self {
586        Self { uncapped, ..self }
587    }
588}
589
590impl From<api::config::TimestampingConfig> for TimestampingConfig {
591    fn from(value: api::config::TimestampingConfig) -> Self {
592        Self {
593            mode: value.mode.map(Into::into),
594            uncapped: value.uncapped.unwrap_or_default(),
595        }
596    }
597}
598
599impl From<TimestampingConfig> for api::config::TimestampingConfig {
600    fn from(value: TimestampingConfig) -> Self {
601        Self {
602            mode: value.mode.map(Into::into),
603            uncapped: Some(value.uncapped),
604        }
605    }
606}
607
608#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
609#[non_exhaustive]
610/// Configuration for automatically deleting a stream when it becomes empty.
611pub struct DeleteOnEmptyConfig {
612    /// Minimum age in seconds before an empty stream can be deleted.
613    ///
614    /// Defaults to `0` (disables automatic deletion).
615    pub min_age_secs: u64,
616}
617
618impl DeleteOnEmptyConfig {
619    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
620    pub fn new() -> Self {
621        Self::default()
622    }
623
624    /// Set the minimum age in seconds before an empty stream can be deleted.
625    pub fn with_min_age(self, min_age: Duration) -> Self {
626        Self {
627            min_age_secs: min_age.as_secs(),
628        }
629    }
630}
631
632impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
633    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
634        Self {
635            min_age_secs: value.min_age_secs,
636        }
637    }
638}
639
640impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
641    fn from(value: DeleteOnEmptyConfig) -> Self {
642        Self {
643            min_age_secs: value.min_age_secs,
644        }
645    }
646}
647
648#[derive(Debug, Clone, Default, PartialEq, Eq)]
649#[non_exhaustive]
650/// Configuration for a stream.
651pub struct StreamConfig {
652    /// Storage class for the stream.
653    ///
654    /// Defaults to [`Express`](StorageClass::Express).
655    pub storage_class: Option<StorageClass>,
656    /// Retention policy for records in the stream.
657    ///
658    /// Defaults to `7 days` of retention.
659    pub retention_policy: Option<RetentionPolicy>,
660    /// Configuration for timestamping behavior.
661    ///
662    /// See [`TimestampingConfig`] for defaults.
663    pub timestamping: Option<TimestampingConfig>,
664    /// Configuration for automatically deleting the stream when it becomes empty.
665    ///
666    /// See [`DeleteOnEmptyConfig`] for defaults.
667    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
668}
669
670impl StreamConfig {
671    /// Create a new [`StreamConfig`] with default settings.
672    pub fn new() -> Self {
673        Self::default()
674    }
675
676    /// Set the storage class for the stream.
677    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
678        Self {
679            storage_class: Some(storage_class),
680            ..self
681        }
682    }
683
684    /// Set the retention policy for records in the stream.
685    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
686        Self {
687            retention_policy: Some(retention_policy),
688            ..self
689        }
690    }
691
692    /// Set the configuration for timestamping behavior.
693    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
694        Self {
695            timestamping: Some(timestamping),
696            ..self
697        }
698    }
699
700    /// Set the configuration for automatically deleting the stream when it becomes empty.
701    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
702        Self {
703            delete_on_empty: Some(delete_on_empty),
704            ..self
705        }
706    }
707}
708
709impl From<api::config::StreamConfig> for StreamConfig {
710    fn from(value: api::config::StreamConfig) -> Self {
711        Self {
712            storage_class: value.storage_class.map(Into::into),
713            retention_policy: value.retention_policy.map(Into::into),
714            timestamping: value.timestamping.map(Into::into),
715            delete_on_empty: value.delete_on_empty.map(Into::into),
716        }
717    }
718}
719
720impl From<StreamConfig> for api::config::StreamConfig {
721    fn from(value: StreamConfig) -> Self {
722        Self {
723            storage_class: value.storage_class.map(Into::into),
724            retention_policy: value.retention_policy.map(Into::into),
725            timestamping: value.timestamping.map(Into::into),
726            delete_on_empty: value.delete_on_empty.map(Into::into),
727        }
728    }
729}
730
731#[derive(Debug, Clone, Default)]
732#[non_exhaustive]
733/// Configuration for a basin.
734pub struct BasinConfig {
735    /// Default configuration for all streams in the basin.
736    ///
737    /// See [`StreamConfig`] for defaults.
738    pub default_stream_config: Option<StreamConfig>,
739    /// Whether to create stream on append if it doesn't exist using default stream configuration.
740    ///
741    /// Defaults to `false`.
742    pub create_stream_on_append: bool,
743    /// Whether to create stream on read if it doesn't exist using default stream configuration.
744    ///
745    /// Defaults to `false`.
746    pub create_stream_on_read: bool,
747}
748
749impl BasinConfig {
750    /// Create a new [`BasinConfig`] with default settings.
751    pub fn new() -> Self {
752        Self::default()
753    }
754
755    /// Set the default configuration for all streams in the basin.
756    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
757        Self {
758            default_stream_config: Some(config),
759            ..self
760        }
761    }
762
763    /// Set whether to create stream on append if it doesn't exist using default stream
764    /// configuration.
765    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
766        Self {
767            create_stream_on_append,
768            ..self
769        }
770    }
771
772    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
773    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
774        Self {
775            create_stream_on_read,
776            ..self
777        }
778    }
779}
780
781impl From<api::config::BasinConfig> for BasinConfig {
782    fn from(value: api::config::BasinConfig) -> Self {
783        Self {
784            default_stream_config: value.default_stream_config.map(Into::into),
785            create_stream_on_append: value.create_stream_on_append,
786            create_stream_on_read: value.create_stream_on_read,
787        }
788    }
789}
790
791impl From<BasinConfig> for api::config::BasinConfig {
792    fn from(value: BasinConfig) -> Self {
793        Self {
794            default_stream_config: value.default_stream_config.map(Into::into),
795            create_stream_on_append: value.create_stream_on_append,
796            create_stream_on_read: value.create_stream_on_read,
797        }
798    }
799}
800
801#[derive(Debug, Clone, PartialEq, Eq)]
802/// Scope of a basin.
803pub enum BasinScope {
804    /// AWS `us-east-1` region.
805    AwsUsEast1,
806}
807
808impl From<api::basin::BasinScope> for BasinScope {
809    fn from(value: api::basin::BasinScope) -> Self {
810        match value {
811            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
812        }
813    }
814}
815
816impl From<BasinScope> for api::basin::BasinScope {
817    fn from(value: BasinScope) -> Self {
818        match value {
819            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
820        }
821    }
822}
823
824#[derive(Debug, Clone)]
825#[non_exhaustive]
826/// Input for [`create_basin`](crate::S2::create_basin) operation.
827pub struct CreateBasinInput {
828    /// Basin name.
829    pub name: BasinName,
830    /// Configuration for the basin.
831    ///
832    /// See [`BasinConfig`] for defaults.
833    pub config: Option<BasinConfig>,
834    /// Scope of the basin.
835    ///
836    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
837    pub scope: Option<BasinScope>,
838    /// Idempotency token for the operation.
839    ///
840    /// When a request is retried, the same token is reused,
841    /// causing the server to return the original response instead of
842    /// performing the operation again.
843    ///
844    /// Defaults to a random UUID.
845    pub idempotency_token: String,
846}
847
848impl CreateBasinInput {
849    /// Create a new [`CreateBasinInput`] with the given basin name.
850    pub fn new(name: BasinName) -> Self {
851        Self {
852            name,
853            config: None,
854            scope: None,
855            idempotency_token: idempotency_token(),
856        }
857    }
858
859    /// Set the configuration for the basin.
860    pub fn with_config(self, config: BasinConfig) -> Self {
861        Self {
862            config: Some(config),
863            ..self
864        }
865    }
866
867    /// Set the scope of the basin.
868    pub fn with_scope(self, scope: BasinScope) -> Self {
869        Self {
870            scope: Some(scope),
871            ..self
872        }
873    }
874
875    /// Set the idempotency token for the operation.
876    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
877        Self {
878            idempotency_token: idempotency_token.into(),
879            ..self
880        }
881    }
882}
883
884impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
885    fn from(value: CreateBasinInput) -> Self {
886        (
887            api::basin::CreateBasinRequest {
888                basin: value.name,
889                config: value.config.map(Into::into),
890                scope: value.scope.map(Into::into),
891            },
892            value.idempotency_token,
893        )
894    }
895}
896
897#[derive(Debug, Clone, Default)]
898#[non_exhaustive]
899/// Input for [`list_basins`](crate::S2::list_basins) operation.
900pub struct ListBasinsInput {
901    /// Filter basins whose names begin with this value.
902    ///
903    /// Defaults to `""`.
904    pub prefix: BasinNamePrefix,
905    /// Filter basins whose names are lexicographically greater than this value.
906    ///
907    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
908    ///
909    /// Defaults to `""`.
910    pub start_after: BasinNameStartAfter,
911    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
912    ///
913    /// Defaults to `1000`.
914    pub limit: Option<usize>,
915}
916
917impl ListBasinsInput {
918    /// Create a new [`ListBasinsInput`] with default values.
919    pub fn new() -> Self {
920        Self::default()
921    }
922
923    /// Set the prefix used to filter basins whose names begin with this value.
924    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
925        Self { prefix, ..self }
926    }
927
928    /// Set the value used to filter basins whose names are lexicographically greater than this
929    /// value.
930    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
931        Self {
932            start_after,
933            ..self
934        }
935    }
936
937    /// Set the limit on number of basins to return in a page.
938    pub fn with_limit(self, limit: usize) -> Self {
939        Self {
940            limit: Some(limit),
941            ..self
942        }
943    }
944}
945
946impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
947    fn from(value: ListBasinsInput) -> Self {
948        Self {
949            prefix: Some(value.prefix),
950            start_after: Some(value.start_after),
951            limit: value.limit,
952        }
953    }
954}
955
956#[derive(Debug, Clone)]
957/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
958pub struct ListAllBasinsInput {
959    /// Filter basins whose names begin with this value.
960    ///
961    /// Defaults to `""`.
962    pub prefix: BasinNamePrefix,
963    /// Filter basins whose names are lexicographically greater than this value.
964    ///
965    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
966    ///
967    /// Defaults to `""`.
968    pub start_after: BasinNameStartAfter,
969    /// Whether to ignore basins that have been requested for deletion but not yet deleted.
970    ///
971    /// Defaults to `true`.
972    pub ignore_pending_deletions: bool,
973}
974
975impl Default for ListAllBasinsInput {
976    fn default() -> Self {
977        Self {
978            prefix: BasinNamePrefix::default(),
979            start_after: BasinNameStartAfter::default(),
980            ignore_pending_deletions: true,
981        }
982    }
983}
984
985impl ListAllBasinsInput {
986    /// Create a new [`ListAllBasinsInput`] with default values.
987    pub fn new() -> Self {
988        Self::default()
989    }
990
991    /// Set the prefix used to filter basins whose names begin with this value.
992    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
993        Self { prefix, ..self }
994    }
995
996    /// Set the value used to filter basins whose names are lexicographically greater than this
997    /// value.
998    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
999        Self {
1000            start_after,
1001            ..self
1002        }
1003    }
1004
1005    /// Set whether to ignore basins that have been requested for deletion but not yet deleted.
1006    pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
1007        Self {
1008            ignore_pending_deletions,
1009            ..self
1010        }
1011    }
1012}
1013
1014#[derive(Debug, Clone, PartialEq, Eq)]
1015/// Current state of a basin.
1016pub enum BasinState {
1017    /// Active
1018    Active,
1019    /// Creating
1020    Creating,
1021    /// Deleting
1022    Deleting,
1023}
1024
1025impl From<api::basin::BasinState> for BasinState {
1026    fn from(value: api::basin::BasinState) -> Self {
1027        match value {
1028            api::basin::BasinState::Active => BasinState::Active,
1029            api::basin::BasinState::Creating => BasinState::Creating,
1030            api::basin::BasinState::Deleting => BasinState::Deleting,
1031        }
1032    }
1033}
1034
1035#[derive(Debug, Clone, PartialEq, Eq)]
1036#[non_exhaustive]
1037/// Basin information.
1038pub struct BasinInfo {
1039    /// Basin name.
1040    pub name: BasinName,
1041    /// Scope of the basin.
1042    pub scope: Option<BasinScope>,
1043    /// Current state of the basin.
1044    pub state: BasinState,
1045}
1046
1047impl From<api::basin::BasinInfo> for BasinInfo {
1048    fn from(value: api::basin::BasinInfo) -> Self {
1049        Self {
1050            name: value.name,
1051            scope: value.scope.map(Into::into),
1052            state: value.state.into(),
1053        }
1054    }
1055}
1056
1057#[derive(Debug, Clone)]
1058#[non_exhaustive]
1059/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1060pub struct DeleteBasinInput {
1061    /// Basin name.
1062    pub name: BasinName,
1063    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1064    pub ignore_not_found: bool,
1065}
1066
1067impl DeleteBasinInput {
1068    /// Create a new [`DeleteBasinInput`] with the given basin name.
1069    pub fn new(name: BasinName) -> Self {
1070        Self {
1071            name,
1072            ignore_not_found: false,
1073        }
1074    }
1075
1076    /// Set whether to ignore `Not Found` error if the basin is not existing.
1077    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1078        Self {
1079            ignore_not_found,
1080            ..self
1081        }
1082    }
1083}
1084
1085#[derive(Debug, Clone, Default)]
1086#[non_exhaustive]
1087/// Reconfiguration for [`TimestampingConfig`].
1088pub struct TimestampingReconfiguration {
1089    /// Override for the existing [`mode`](TimestampingConfig::mode).
1090    pub mode: Maybe<Option<TimestampingMode>>,
1091    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1092    pub uncapped: Maybe<Option<bool>>,
1093}
1094
1095impl TimestampingReconfiguration {
1096    /// Create a new [`TimestampingReconfiguration`].
1097    pub fn new() -> Self {
1098        Self::default()
1099    }
1100
1101    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1102    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1103        Self {
1104            mode: Maybe::Specified(Some(mode)),
1105            ..self
1106        }
1107    }
1108
1109    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1110    pub fn with_uncapped(self, uncapped: bool) -> Self {
1111        Self {
1112            uncapped: Maybe::Specified(Some(uncapped)),
1113            ..self
1114        }
1115    }
1116}
1117
1118impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1119    fn from(value: TimestampingReconfiguration) -> Self {
1120        Self {
1121            mode: value.mode.map(|m| m.map(Into::into)),
1122            uncapped: value.uncapped,
1123        }
1124    }
1125}
1126
1127#[derive(Debug, Clone, Default)]
1128#[non_exhaustive]
1129/// Reconfiguration for [`DeleteOnEmptyConfig`].
1130pub struct DeleteOnEmptyReconfiguration {
1131    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1132    pub min_age_secs: Maybe<Option<u64>>,
1133}
1134
1135impl DeleteOnEmptyReconfiguration {
1136    /// Create a new [`DeleteOnEmptyReconfiguration`].
1137    pub fn new() -> Self {
1138        Self::default()
1139    }
1140
1141    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1142    pub fn with_min_age(self, min_age: Duration) -> Self {
1143        Self {
1144            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1145        }
1146    }
1147}
1148
1149impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1150    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1151        Self {
1152            min_age_secs: value.min_age_secs,
1153        }
1154    }
1155}
1156
1157#[derive(Debug, Clone, Default)]
1158#[non_exhaustive]
1159/// Reconfiguration for [`StreamConfig`].
1160pub struct StreamReconfiguration {
1161    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1162    pub storage_class: Maybe<Option<StorageClass>>,
1163    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1164    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1165    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1166    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1167    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1168    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1169}
1170
1171impl StreamReconfiguration {
1172    /// Create a new [`StreamReconfiguration`].
1173    pub fn new() -> Self {
1174        Self::default()
1175    }
1176
1177    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1178    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1179        Self {
1180            storage_class: Maybe::Specified(Some(storage_class)),
1181            ..self
1182        }
1183    }
1184
1185    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1186    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1187        Self {
1188            retention_policy: Maybe::Specified(Some(retention_policy)),
1189            ..self
1190        }
1191    }
1192
1193    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1194    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1195        Self {
1196            timestamping: Maybe::Specified(Some(timestamping)),
1197            ..self
1198        }
1199    }
1200
1201    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1202    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1203        Self {
1204            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1205            ..self
1206        }
1207    }
1208}
1209
1210impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1211    fn from(value: StreamReconfiguration) -> Self {
1212        Self {
1213            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1214            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1215            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1216            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1217        }
1218    }
1219}
1220
1221#[derive(Debug, Clone, Default)]
1222#[non_exhaustive]
1223/// Reconfiguration for [`BasinConfig`].
1224pub struct BasinReconfiguration {
1225    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1226    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1227    /// Override for the existing
1228    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1229    pub create_stream_on_append: Maybe<bool>,
1230    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1231    pub create_stream_on_read: Maybe<bool>,
1232}
1233
1234impl BasinReconfiguration {
1235    /// Create a new [`BasinReconfiguration`].
1236    pub fn new() -> Self {
1237        Self::default()
1238    }
1239
1240    /// Set the override for the existing
1241    /// [`default_stream_config`](BasinConfig::default_stream_config).
1242    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1243        Self {
1244            default_stream_config: Maybe::Specified(Some(config)),
1245            ..self
1246        }
1247    }
1248
1249    /// Set the override for the existing
1250    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1251    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1252        Self {
1253            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1254            ..self
1255        }
1256    }
1257
1258    /// Set the override for the existing
1259    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1260    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1261        Self {
1262            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1263            ..self
1264        }
1265    }
1266}
1267
1268impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1269    fn from(value: BasinReconfiguration) -> Self {
1270        Self {
1271            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1272            create_stream_on_append: value.create_stream_on_append,
1273            create_stream_on_read: value.create_stream_on_read,
1274        }
1275    }
1276}
1277
1278#[derive(Debug, Clone)]
1279#[non_exhaustive]
1280/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1281pub struct ReconfigureBasinInput {
1282    /// Basin name.
1283    pub name: BasinName,
1284    /// Reconfiguration for [`BasinConfig`].
1285    pub config: BasinReconfiguration,
1286}
1287
1288impl ReconfigureBasinInput {
1289    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1290    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1291        Self { name, config }
1292    }
1293}
1294
1295#[derive(Debug, Clone, Default)]
1296#[non_exhaustive]
1297/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1298pub struct ListAccessTokensInput {
1299    /// Filter access tokens whose IDs begin with this value.
1300    ///
1301    /// Defaults to `""`.
1302    pub prefix: AccessTokenIdPrefix,
1303    /// Filter access tokens whose IDs are lexicographically greater than this value.
1304    ///
1305    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1306    ///
1307    /// Defaults to `""`.
1308    pub start_after: AccessTokenIdStartAfter,
1309    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1310    ///
1311    /// Defaults to `1000`.
1312    pub limit: Option<usize>,
1313}
1314
1315impl ListAccessTokensInput {
1316    /// Create a new [`ListAccessTokensInput`] with default values.
1317    pub fn new() -> Self {
1318        Self::default()
1319    }
1320
1321    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1322    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1323        Self { prefix, ..self }
1324    }
1325
1326    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1327    /// value.
1328    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1329        Self {
1330            start_after,
1331            ..self
1332        }
1333    }
1334
1335    /// Set the limit on number of access tokens to return in a page.
1336    pub fn with_limit(self, limit: usize) -> Self {
1337        Self {
1338            limit: Some(limit),
1339            ..self
1340        }
1341    }
1342}
1343
1344impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1345    fn from(value: ListAccessTokensInput) -> Self {
1346        Self {
1347            prefix: Some(value.prefix),
1348            start_after: Some(value.start_after),
1349            limit: value.limit,
1350        }
1351    }
1352}
1353
1354#[derive(Debug, Clone, Default)]
1355/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1356pub struct ListAllAccessTokensInput {
1357    /// Filter access tokens whose IDs begin with this value.
1358    ///
1359    /// Defaults to `""`.
1360    pub prefix: AccessTokenIdPrefix,
1361    /// Filter access tokens whose IDs are lexicographically greater than this value.
1362    ///
1363    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1364    ///
1365    /// Defaults to `""`.
1366    pub start_after: AccessTokenIdStartAfter,
1367}
1368
1369impl ListAllAccessTokensInput {
1370    /// Create a new [`ListAllAccessTokensInput`] with default values.
1371    pub fn new() -> Self {
1372        Self::default()
1373    }
1374
1375    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1376    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1377        Self { prefix, ..self }
1378    }
1379
1380    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1381    /// this value.
1382    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1383        Self {
1384            start_after,
1385            ..self
1386        }
1387    }
1388}
1389
1390#[derive(Debug, Clone)]
1391#[non_exhaustive]
1392/// Access token information.
1393pub struct AccessTokenInfo {
1394    /// Access token ID.
1395    pub id: AccessTokenId,
1396    /// Expiration time.
1397    pub expires_at: S2DateTime,
1398    /// Whether to automatically prefix stream names during creation and strip the prefix during
1399    /// listing.
1400    pub auto_prefix_streams: bool,
1401    /// Scope of the access token.
1402    pub scope: AccessTokenScope,
1403}
1404
1405impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1406    type Error = ValidationError;
1407
1408    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1409        let expires_at = value
1410            .expires_at
1411            .map(Into::into)
1412            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1413        Ok(Self {
1414            id: value.id,
1415            expires_at,
1416            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1417            scope: value.scope.into(),
1418        })
1419    }
1420}
1421
1422#[derive(Debug, Clone)]
1423/// Pattern for matching basins.
1424///
1425/// See [`AccessTokenScope::basins`].
1426pub enum BasinMatcher {
1427    /// Match no basins.
1428    None,
1429    /// Match exactly this basin.
1430    Exact(BasinName),
1431    /// Match all basins with this prefix.
1432    Prefix(BasinNamePrefix),
1433}
1434
1435#[derive(Debug, Clone)]
1436/// Pattern for matching streams.
1437///
1438/// See [`AccessTokenScope::streams`].
1439pub enum StreamMatcher {
1440    /// Match no streams.
1441    None,
1442    /// Match exactly this stream.
1443    Exact(StreamName),
1444    /// Match all streams with this prefix.
1445    Prefix(StreamNamePrefix),
1446}
1447
1448#[derive(Debug, Clone)]
1449/// Pattern for matching access tokens.
1450///
1451/// See [`AccessTokenScope::access_tokens`].
1452pub enum AccessTokenMatcher {
1453    /// Match no access tokens.
1454    None,
1455    /// Match exactly this access token.
1456    Exact(AccessTokenId),
1457    /// Match all access tokens with this prefix.
1458    Prefix(AccessTokenIdPrefix),
1459}
1460
1461#[derive(Debug, Clone, Default)]
1462#[non_exhaustive]
1463/// Permissions indicating allowed operations.
1464pub struct ReadWritePermissions {
1465    /// Read permission.
1466    ///
1467    /// Defaults to `false`.
1468    pub read: bool,
1469    /// Write permission.
1470    ///
1471    /// Defaults to `false`.
1472    pub write: bool,
1473}
1474
1475impl ReadWritePermissions {
1476    /// Create a new [`ReadWritePermissions`] with default values.
1477    pub fn new() -> Self {
1478        Self::default()
1479    }
1480
1481    /// Create read-only permissions.
1482    pub fn read_only() -> Self {
1483        Self {
1484            read: true,
1485            write: false,
1486        }
1487    }
1488
1489    /// Create write-only permissions.
1490    pub fn write_only() -> Self {
1491        Self {
1492            read: false,
1493            write: true,
1494        }
1495    }
1496
1497    /// Create read-write permissions.
1498    pub fn read_write() -> Self {
1499        Self {
1500            read: true,
1501            write: true,
1502        }
1503    }
1504}
1505
1506impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1507    fn from(value: ReadWritePermissions) -> Self {
1508        Self {
1509            read: Some(value.read),
1510            write: Some(value.write),
1511        }
1512    }
1513}
1514
1515impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1516    fn from(value: api::access::ReadWritePermissions) -> Self {
1517        Self {
1518            read: value.read.unwrap_or_default(),
1519            write: value.write.unwrap_or_default(),
1520        }
1521    }
1522}
1523
1524#[derive(Debug, Clone, Default)]
1525#[non_exhaustive]
1526/// Permissions at the operation group level.
1527///
1528/// See [`AccessTokenScope::op_group_perms`].
1529pub struct OperationGroupPermissions {
1530    /// Account-level access permissions.
1531    ///
1532    /// Defaults to `None`.
1533    pub account: Option<ReadWritePermissions>,
1534    /// Basin-level access permissions.
1535    ///
1536    /// Defaults to `None`.
1537    pub basin: Option<ReadWritePermissions>,
1538    /// Stream-level access permissions.
1539    ///
1540    /// Defaults to `None`.
1541    pub stream: Option<ReadWritePermissions>,
1542}
1543
1544impl OperationGroupPermissions {
1545    /// Create a new [`OperationGroupPermissions`] with default values.
1546    pub fn new() -> Self {
1547        Self::default()
1548    }
1549
1550    /// Create read-only permissions for all groups.
1551    pub fn read_only_all() -> Self {
1552        Self {
1553            account: Some(ReadWritePermissions::read_only()),
1554            basin: Some(ReadWritePermissions::read_only()),
1555            stream: Some(ReadWritePermissions::read_only()),
1556        }
1557    }
1558
1559    /// Create write-only permissions for all groups.
1560    pub fn write_only_all() -> Self {
1561        Self {
1562            account: Some(ReadWritePermissions::write_only()),
1563            basin: Some(ReadWritePermissions::write_only()),
1564            stream: Some(ReadWritePermissions::write_only()),
1565        }
1566    }
1567
1568    /// Create read-write permissions for all groups.
1569    pub fn read_write_all() -> Self {
1570        Self {
1571            account: Some(ReadWritePermissions::read_write()),
1572            basin: Some(ReadWritePermissions::read_write()),
1573            stream: Some(ReadWritePermissions::read_write()),
1574        }
1575    }
1576
1577    /// Set account-level access permissions.
1578    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1579        Self {
1580            account: Some(account),
1581            ..self
1582        }
1583    }
1584
1585    /// Set basin-level access permissions.
1586    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1587        Self {
1588            basin: Some(basin),
1589            ..self
1590        }
1591    }
1592
1593    /// Set stream-level access permissions.
1594    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1595        Self {
1596            stream: Some(stream),
1597            ..self
1598        }
1599    }
1600}
1601
1602impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1603    fn from(value: OperationGroupPermissions) -> Self {
1604        Self {
1605            account: value.account.map(Into::into),
1606            basin: value.basin.map(Into::into),
1607            stream: value.stream.map(Into::into),
1608        }
1609    }
1610}
1611
1612impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1613    fn from(value: api::access::PermittedOperationGroups) -> Self {
1614        Self {
1615            account: value.account.map(Into::into),
1616            basin: value.basin.map(Into::into),
1617            stream: value.stream.map(Into::into),
1618        }
1619    }
1620}
1621
1622#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1623/// Individual operation that can be permitted.
1624///
1625/// See [`AccessTokenScope::ops`].
1626pub enum Operation {
1627    /// List basins.
1628    ListBasins,
1629    /// Create a basin.
1630    CreateBasin,
1631    /// Get basin configuration.
1632    GetBasinConfig,
1633    /// Delete a basin.
1634    DeleteBasin,
1635    /// Reconfigure a basin.
1636    ReconfigureBasin,
1637    /// List access tokens.
1638    ListAccessTokens,
1639    /// Issue an access token.
1640    IssueAccessToken,
1641    /// Revoke an access token.
1642    RevokeAccessToken,
1643    /// Get account metrics.
1644    GetAccountMetrics,
1645    /// Get basin metrics.
1646    GetBasinMetrics,
1647    /// Get stream metrics.
1648    GetStreamMetrics,
1649    /// List streams.
1650    ListStreams,
1651    /// Create a stream.
1652    CreateStream,
1653    /// Get stream configuration.
1654    GetStreamConfig,
1655    /// Delete a stream.
1656    DeleteStream,
1657    /// Reconfigure a stream.
1658    ReconfigureStream,
1659    /// Check the tail of a stream.
1660    CheckTail,
1661    /// Append records to a stream.
1662    Append,
1663    /// Read records from a stream.
1664    Read,
1665    /// Trim records on a stream.
1666    Trim,
1667    /// Set the fencing token on a stream.
1668    Fence,
1669}
1670
1671impl From<Operation> for api::access::Operation {
1672    fn from(value: Operation) -> Self {
1673        match value {
1674            Operation::ListBasins => api::access::Operation::ListBasins,
1675            Operation::CreateBasin => api::access::Operation::CreateBasin,
1676            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1677            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1678            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1679            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1680            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1681            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1682            Operation::ListStreams => api::access::Operation::ListStreams,
1683            Operation::CreateStream => api::access::Operation::CreateStream,
1684            Operation::DeleteStream => api::access::Operation::DeleteStream,
1685            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1686            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1687            Operation::CheckTail => api::access::Operation::CheckTail,
1688            Operation::Append => api::access::Operation::Append,
1689            Operation::Read => api::access::Operation::Read,
1690            Operation::Trim => api::access::Operation::Trim,
1691            Operation::Fence => api::access::Operation::Fence,
1692            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1693            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1694            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1695        }
1696    }
1697}
1698
1699impl From<api::access::Operation> for Operation {
1700    fn from(value: api::access::Operation) -> Self {
1701        match value {
1702            api::access::Operation::ListBasins => Operation::ListBasins,
1703            api::access::Operation::CreateBasin => Operation::CreateBasin,
1704            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1705            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1706            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1707            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1708            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1709            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1710            api::access::Operation::ListStreams => Operation::ListStreams,
1711            api::access::Operation::CreateStream => Operation::CreateStream,
1712            api::access::Operation::DeleteStream => Operation::DeleteStream,
1713            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1714            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1715            api::access::Operation::CheckTail => Operation::CheckTail,
1716            api::access::Operation::Append => Operation::Append,
1717            api::access::Operation::Read => Operation::Read,
1718            api::access::Operation::Trim => Operation::Trim,
1719            api::access::Operation::Fence => Operation::Fence,
1720            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1721            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1722            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1723        }
1724    }
1725}
1726
1727#[derive(Debug, Clone)]
1728#[non_exhaustive]
1729/// Scope of an access token.
1730///
1731/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1732/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1733/// final set must not be empty.
1734///
1735/// See [`IssueAccessTokenInput::scope`].
1736pub struct AccessTokenScopeInput {
1737    basins: Option<BasinMatcher>,
1738    streams: Option<StreamMatcher>,
1739    access_tokens: Option<AccessTokenMatcher>,
1740    op_group_perms: Option<OperationGroupPermissions>,
1741    ops: HashSet<Operation>,
1742}
1743
1744impl AccessTokenScopeInput {
1745    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1746    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1747        Self {
1748            basins: None,
1749            streams: None,
1750            access_tokens: None,
1751            op_group_perms: None,
1752            ops: ops.into_iter().collect(),
1753        }
1754    }
1755
1756    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1757    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1758        Self {
1759            basins: None,
1760            streams: None,
1761            access_tokens: None,
1762            op_group_perms: Some(op_group_perms),
1763            ops: HashSet::default(),
1764        }
1765    }
1766
1767    /// Set the permitted operations.
1768    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1769        Self {
1770            ops: ops.into_iter().collect(),
1771            ..self
1772        }
1773    }
1774
1775    /// Set the access permissions at the operation group level.
1776    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1777        Self {
1778            op_group_perms: Some(op_group_perms),
1779            ..self
1780        }
1781    }
1782
1783    /// Set the permitted basins.
1784    ///
1785    /// Defaults to no basins.
1786    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1787        Self {
1788            basins: Some(basins),
1789            ..self
1790        }
1791    }
1792
1793    /// Set the permitted streams.
1794    ///
1795    /// Defaults to no streams.
1796    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1797        Self {
1798            streams: Some(streams),
1799            ..self
1800        }
1801    }
1802
1803    /// Set the permitted access tokens.
1804    ///
1805    /// Defaults to no access tokens.
1806    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1807        Self {
1808            access_tokens: Some(access_tokens),
1809            ..self
1810        }
1811    }
1812}
1813
1814#[derive(Debug, Clone)]
1815#[non_exhaustive]
1816/// Scope of an access token.
1817pub struct AccessTokenScope {
1818    /// Permitted basins.
1819    pub basins: Option<BasinMatcher>,
1820    /// Permitted streams.
1821    pub streams: Option<StreamMatcher>,
1822    /// Permitted access tokens.
1823    pub access_tokens: Option<AccessTokenMatcher>,
1824    /// Permissions at the operation group level.
1825    pub op_group_perms: Option<OperationGroupPermissions>,
1826    /// Permitted operations.
1827    pub ops: HashSet<Operation>,
1828}
1829
1830impl From<api::access::AccessTokenScope> for AccessTokenScope {
1831    fn from(value: api::access::AccessTokenScope) -> Self {
1832        Self {
1833            basins: value.basins.map(|rs| match rs {
1834                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1835                    BasinMatcher::Exact(e)
1836                }
1837                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1838                    BasinMatcher::None
1839                }
1840                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1841            }),
1842            streams: value.streams.map(|rs| match rs {
1843                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1844                    StreamMatcher::Exact(e)
1845                }
1846                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1847                    StreamMatcher::None
1848                }
1849                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1850            }),
1851            access_tokens: value.access_tokens.map(|rs| match rs {
1852                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1853                    AccessTokenMatcher::Exact(e)
1854                }
1855                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1856                    AccessTokenMatcher::None
1857                }
1858                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1859            }),
1860            op_group_perms: value.op_groups.map(Into::into),
1861            ops: value
1862                .ops
1863                .map(|ops| ops.into_iter().map(Into::into).collect())
1864                .unwrap_or_default(),
1865        }
1866    }
1867}
1868
1869impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1870    fn from(value: AccessTokenScopeInput) -> Self {
1871        Self {
1872            basins: value.basins.map(|rs| match rs {
1873                BasinMatcher::None => {
1874                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1875                }
1876                BasinMatcher::Exact(e) => {
1877                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1878                }
1879                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1880            }),
1881            streams: value.streams.map(|rs| match rs {
1882                StreamMatcher::None => {
1883                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1884                }
1885                StreamMatcher::Exact(e) => {
1886                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1887                }
1888                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1889            }),
1890            access_tokens: value.access_tokens.map(|rs| match rs {
1891                AccessTokenMatcher::None => {
1892                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1893                }
1894                AccessTokenMatcher::Exact(e) => {
1895                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1896                }
1897                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1898            }),
1899            op_groups: value.op_group_perms.map(Into::into),
1900            ops: if value.ops.is_empty() {
1901                None
1902            } else {
1903                Some(value.ops.into_iter().map(Into::into).collect())
1904            },
1905        }
1906    }
1907}
1908
1909#[derive(Debug, Clone)]
1910#[non_exhaustive]
1911/// Input for [`issue_access_token`](crate::S2::issue_access_token).
1912pub struct IssueAccessTokenInput {
1913    /// Access token ID.
1914    pub id: AccessTokenId,
1915    /// Expiration time.
1916    ///
1917    /// Defaults to the expiration time of requestor's access token passed via
1918    /// [`S2Config`](S2Config::new).
1919    pub expires_at: Option<S2DateTime>,
1920    /// Whether to automatically prefix stream names during creation and strip the prefix during
1921    /// listing.
1922    ///
1923    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
1924    /// prefix.
1925    ///
1926    /// Defaults to `false`.
1927    pub auto_prefix_streams: bool,
1928    /// Scope of the token.
1929    pub scope: AccessTokenScopeInput,
1930}
1931
1932impl IssueAccessTokenInput {
1933    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
1934    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1935        Self {
1936            id,
1937            expires_at: None,
1938            auto_prefix_streams: false,
1939            scope,
1940        }
1941    }
1942
1943    /// Set the expiration time.
1944    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1945        Self {
1946            expires_at: Some(expires_at),
1947            ..self
1948        }
1949    }
1950
1951    /// Set whether to automatically prefix stream names during creation and strip the prefix during
1952    /// listing.
1953    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1954        Self {
1955            auto_prefix_streams,
1956            ..self
1957        }
1958    }
1959}
1960
1961impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1962    fn from(value: IssueAccessTokenInput) -> Self {
1963        Self {
1964            id: value.id,
1965            expires_at: value.expires_at.map(Into::into),
1966            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1967            scope: value.scope.into(),
1968        }
1969    }
1970}
1971
1972#[derive(Debug, Clone, Copy)]
1973/// Interval to accumulate over for timeseries metric sets.
1974pub enum TimeseriesInterval {
1975    /// Minute.
1976    Minute,
1977    /// Hour.
1978    Hour,
1979    /// Day.
1980    Day,
1981}
1982
1983impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
1984    fn from(value: TimeseriesInterval) -> Self {
1985        match value {
1986            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
1987            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
1988            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
1989        }
1990    }
1991}
1992
1993impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
1994    fn from(value: api::metrics::TimeseriesInterval) -> Self {
1995        match value {
1996            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
1997            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
1998            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
1999        }
2000    }
2001}
2002
2003#[derive(Debug, Clone, Copy)]
2004#[non_exhaustive]
2005/// Time range as Unix epoch seconds.
2006pub struct TimeRange {
2007    /// Start timestamp (inclusive).
2008    pub start: u32,
2009    /// End timestamp (exclusive).
2010    pub end: u32,
2011}
2012
2013impl TimeRange {
2014    /// Create a new [`TimeRange`] with the given start and end timestamps.
2015    pub fn new(start: u32, end: u32) -> Self {
2016        Self { start, end }
2017    }
2018}
2019
2020#[derive(Debug, Clone, Copy)]
2021#[non_exhaustive]
2022/// Time range as Unix epoch seconds and accumulation interval.
2023pub struct TimeRangeAndInterval {
2024    /// Start timestamp (inclusive).
2025    pub start: u32,
2026    /// End timestamp (exclusive).
2027    pub end: u32,
2028    /// Interval to accumulate over for timeseries metric sets.
2029    ///
2030    /// Default is dependent on the requested metric set.
2031    pub interval: Option<TimeseriesInterval>,
2032}
2033
2034impl TimeRangeAndInterval {
2035    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2036    pub fn new(start: u32, end: u32) -> Self {
2037        Self {
2038            start,
2039            end,
2040            interval: None,
2041        }
2042    }
2043
2044    /// Set the interval to accumulate over for timeseries metric sets.
2045    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2046        Self {
2047            interval: Some(interval),
2048            ..self
2049        }
2050    }
2051}
2052
2053#[derive(Debug, Clone, Copy)]
2054/// Account metric set to return.
2055pub enum AccountMetricSet {
2056    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2057    /// specified time range.
2058    ActiveBasins(TimeRange),
2059    /// Returns [`AccumulationMetric`]s, one per account operation type.
2060    ///
2061    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2062    /// per interval over the requested time range.
2063    ///
2064    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2065    AccountOps(TimeRangeAndInterval),
2066}
2067
2068#[derive(Debug, Clone)]
2069#[non_exhaustive]
2070/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2071pub struct GetAccountMetricsInput {
2072    /// Metric set to return.
2073    pub set: AccountMetricSet,
2074}
2075
2076impl GetAccountMetricsInput {
2077    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2078    pub fn new(set: AccountMetricSet) -> Self {
2079        Self { set }
2080    }
2081}
2082
2083impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2084    fn from(value: GetAccountMetricsInput) -> Self {
2085        let (set, start, end, interval) = match value.set {
2086            AccountMetricSet::ActiveBasins(args) => (
2087                api::metrics::AccountMetricSet::ActiveBasins,
2088                args.start,
2089                args.end,
2090                None,
2091            ),
2092            AccountMetricSet::AccountOps(args) => (
2093                api::metrics::AccountMetricSet::AccountOps,
2094                args.start,
2095                args.end,
2096                args.interval,
2097            ),
2098        };
2099        Self {
2100            set,
2101            start: Some(start),
2102            end: Some(end),
2103            interval: interval.map(Into::into),
2104        }
2105    }
2106}
2107
2108#[derive(Debug, Clone, Copy)]
2109/// Basin metric set to return.
2110pub enum BasinMetricSet {
2111    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2112    /// in the basin, with one observed value for each hour over the requested time range.
2113    Storage(TimeRange),
2114    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2115    ///
2116    /// Each metric represents a timeseries of the number of append operations across all streams
2117    /// in the basin, with one accumulated value per interval over the requested time range.
2118    ///
2119    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2120    /// [`minute`](TimeseriesInterval::Minute).
2121    AppendOps(TimeRangeAndInterval),
2122    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2123    ///
2124    /// Each metric represents a timeseries of the number of read operations across all streams
2125    /// in the basin, with one accumulated value per interval over the requested time range.
2126    ///
2127    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2128    /// [`minute`](TimeseriesInterval::Minute).
2129    ReadOps(TimeRangeAndInterval),
2130    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2131    /// across all streams in the basin, with one accumulated value per interval
2132    /// over the requested time range.
2133    ///
2134    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2135    /// [`minute`](TimeseriesInterval::Minute).
2136    ReadThroughput(TimeRangeAndInterval),
2137    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2138    /// across all streams in the basin, with one accumulated value per interval
2139    /// over the requested time range.
2140    ///
2141    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2142    /// [`minute`](TimeseriesInterval::Minute).
2143    AppendThroughput(TimeRangeAndInterval),
2144    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2145    ///
2146    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2147    /// per interval over the requested time range.
2148    ///
2149    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2150    BasinOps(TimeRangeAndInterval),
2151}
2152
2153#[derive(Debug, Clone)]
2154#[non_exhaustive]
2155/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2156pub struct GetBasinMetricsInput {
2157    /// Basin name.
2158    pub name: BasinName,
2159    /// Metric set to return.
2160    pub set: BasinMetricSet,
2161}
2162
2163impl GetBasinMetricsInput {
2164    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2165    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2166        Self { name, set }
2167    }
2168}
2169
2170impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2171    fn from(value: GetBasinMetricsInput) -> Self {
2172        let (set, start, end, interval) = match value.set {
2173            BasinMetricSet::Storage(args) => (
2174                api::metrics::BasinMetricSet::Storage,
2175                args.start,
2176                args.end,
2177                None,
2178            ),
2179            BasinMetricSet::AppendOps(args) => (
2180                api::metrics::BasinMetricSet::AppendOps,
2181                args.start,
2182                args.end,
2183                args.interval,
2184            ),
2185            BasinMetricSet::ReadOps(args) => (
2186                api::metrics::BasinMetricSet::ReadOps,
2187                args.start,
2188                args.end,
2189                args.interval,
2190            ),
2191            BasinMetricSet::ReadThroughput(args) => (
2192                api::metrics::BasinMetricSet::ReadThroughput,
2193                args.start,
2194                args.end,
2195                args.interval,
2196            ),
2197            BasinMetricSet::AppendThroughput(args) => (
2198                api::metrics::BasinMetricSet::AppendThroughput,
2199                args.start,
2200                args.end,
2201                args.interval,
2202            ),
2203            BasinMetricSet::BasinOps(args) => (
2204                api::metrics::BasinMetricSet::BasinOps,
2205                args.start,
2206                args.end,
2207                args.interval,
2208            ),
2209        };
2210        (
2211            value.name,
2212            api::metrics::BasinMetricSetRequest {
2213                set,
2214                start: Some(start),
2215                end: Some(end),
2216                interval: interval.map(Into::into),
2217            },
2218        )
2219    }
2220}
2221
2222#[derive(Debug, Clone, Copy)]
2223/// Stream metric set to return.
2224pub enum StreamMetricSet {
2225    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2226    /// with one observed value for each minute over the requested time range.
2227    Storage(TimeRange),
2228}
2229
2230#[derive(Debug, Clone)]
2231#[non_exhaustive]
2232/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2233pub struct GetStreamMetricsInput {
2234    /// Basin name.
2235    pub basin_name: BasinName,
2236    /// Stream name.
2237    pub stream_name: StreamName,
2238    /// Metric set to return.
2239    pub set: StreamMetricSet,
2240}
2241
2242impl GetStreamMetricsInput {
2243    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2244    /// set.
2245    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2246        Self {
2247            basin_name,
2248            stream_name,
2249            set,
2250        }
2251    }
2252}
2253
2254impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2255    fn from(value: GetStreamMetricsInput) -> Self {
2256        let (set, start, end, interval) = match value.set {
2257            StreamMetricSet::Storage(args) => (
2258                api::metrics::StreamMetricSet::Storage,
2259                args.start,
2260                args.end,
2261                None,
2262            ),
2263        };
2264        (
2265            value.basin_name,
2266            value.stream_name,
2267            api::metrics::StreamMetricSetRequest {
2268                set,
2269                start: Some(start),
2270                end: Some(end),
2271                interval,
2272            },
2273        )
2274    }
2275}
2276
2277#[derive(Debug, Clone, Copy)]
2278/// Unit in which metric values are measured.
2279pub enum MetricUnit {
2280    /// Size in bytes.
2281    Bytes,
2282    /// Number of operations.
2283    Operations,
2284}
2285
2286impl From<api::metrics::MetricUnit> for MetricUnit {
2287    fn from(value: api::metrics::MetricUnit) -> Self {
2288        match value {
2289            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2290            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2291        }
2292    }
2293}
2294
2295#[derive(Debug, Clone)]
2296#[non_exhaustive]
2297/// Single named value.
2298pub struct ScalarMetric {
2299    /// Metric name.
2300    pub name: String,
2301    /// Unit for the metric value.
2302    pub unit: MetricUnit,
2303    /// Metric value.
2304    pub value: f64,
2305}
2306
2307#[derive(Debug, Clone)]
2308#[non_exhaustive]
2309/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2310/// specified interval.
2311pub struct AccumulationMetric {
2312    /// Timeseries name.
2313    pub name: String,
2314    /// Unit for the accumulated values.
2315    pub unit: MetricUnit,
2316    /// The interval at which datapoints are accumulated.
2317    pub interval: TimeseriesInterval,
2318    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2319    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2320    /// one `interval`.
2321    pub values: Vec<(u32, f64)>,
2322}
2323
2324#[derive(Debug, Clone)]
2325#[non_exhaustive]
2326/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2327pub struct GaugeMetric {
2328    /// Timeseries name.
2329    pub name: String,
2330    /// Unit for the instantaneous values.
2331    pub unit: MetricUnit,
2332    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2333    /// instant of the `timestamp` (in Unix epoch seconds).
2334    pub values: Vec<(u32, f64)>,
2335}
2336
2337#[derive(Debug, Clone)]
2338#[non_exhaustive]
2339/// Set of string labels.
2340pub struct LabelMetric {
2341    /// Label name.
2342    pub name: String,
2343    /// Label values.
2344    pub values: Vec<String>,
2345}
2346
2347#[derive(Debug, Clone)]
2348/// Individual metric in a returned metric set.
2349pub enum Metric {
2350    /// Single named value.
2351    Scalar(ScalarMetric),
2352    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2353    /// specified interval.
2354    Accumulation(AccumulationMetric),
2355    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2356    Gauge(GaugeMetric),
2357    /// Set of string labels.
2358    Label(LabelMetric),
2359}
2360
2361impl From<api::metrics::Metric> for Metric {
2362    fn from(value: api::metrics::Metric) -> Self {
2363        match value {
2364            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2365                name: sm.name.into(),
2366                unit: sm.unit.into(),
2367                value: sm.value,
2368            }),
2369            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2370                name: am.name.into(),
2371                unit: am.unit.into(),
2372                interval: am.interval.into(),
2373                values: am.values,
2374            }),
2375            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2376                name: gm.name.into(),
2377                unit: gm.unit.into(),
2378                values: gm.values,
2379            }),
2380            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2381                name: lm.name.into(),
2382                values: lm.values,
2383            }),
2384        }
2385    }
2386}
2387
2388#[derive(Debug, Clone, Default)]
2389#[non_exhaustive]
2390/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2391pub struct ListStreamsInput {
2392    /// Filter streams whose names begin with this value.
2393    ///
2394    /// Defaults to `""`.
2395    pub prefix: StreamNamePrefix,
2396    /// Filter streams whose names are lexicographically greater than this value.
2397    ///
2398    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2399    ///
2400    /// Defaults to `""`.
2401    pub start_after: StreamNameStartAfter,
2402    ///  Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2403    ///
2404    /// Defaults to `1000`.
2405    pub limit: Option<usize>,
2406}
2407
2408impl ListStreamsInput {
2409    /// Create a new [`ListStreamsInput`] with default values.
2410    pub fn new() -> Self {
2411        Self::default()
2412    }
2413
2414    /// Set the prefix used to filter streams whose names begin with this value.
2415    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2416        Self { prefix, ..self }
2417    }
2418
2419    /// Set the value used to filter streams whose names are lexicographically greater than this
2420    /// value.
2421    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2422        Self {
2423            start_after,
2424            ..self
2425        }
2426    }
2427
2428    /// Set the limit on number of streams to return in a page.
2429    pub fn with_limit(self, limit: usize) -> Self {
2430        Self {
2431            limit: Some(limit),
2432            ..self
2433        }
2434    }
2435}
2436
2437impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2438    fn from(value: ListStreamsInput) -> Self {
2439        Self {
2440            prefix: Some(value.prefix),
2441            start_after: Some(value.start_after),
2442            limit: value.limit,
2443        }
2444    }
2445}
2446
2447#[derive(Debug, Clone)]
2448/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2449pub struct ListAllStreamsInput {
2450    /// Filter streams whose names begin with this value.
2451    ///
2452    /// Defaults to `""`.
2453    pub prefix: StreamNamePrefix,
2454    /// Filter streams whose names are lexicographically greater than this value.
2455    ///
2456    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2457    ///
2458    /// Defaults to `""`.
2459    pub start_after: StreamNameStartAfter,
2460    /// Whether to ignore streams that have been requested for deletion but not yet deleted.
2461    ///
2462    /// Defaults to `true`.
2463    pub ignore_pending_deletions: bool,
2464}
2465
2466impl Default for ListAllStreamsInput {
2467    fn default() -> Self {
2468        Self {
2469            prefix: StreamNamePrefix::default(),
2470            start_after: StreamNameStartAfter::default(),
2471            ignore_pending_deletions: true,
2472        }
2473    }
2474}
2475
2476impl ListAllStreamsInput {
2477    /// Create a new [`ListAllStreamsInput`] with default values.
2478    pub fn new() -> Self {
2479        Self::default()
2480    }
2481
2482    /// Set the prefix used to filter streams whose names begin with this value.
2483    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2484        Self { prefix, ..self }
2485    }
2486
2487    /// Set the value used to filter streams whose names are lexicographically greater than this
2488    /// value.
2489    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2490        Self {
2491            start_after,
2492            ..self
2493        }
2494    }
2495
2496    /// Set whether to ignore streams that have been requested for deletion but not yet deleted.
2497    pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
2498        Self {
2499            ignore_pending_deletions,
2500            ..self
2501        }
2502    }
2503}
2504
2505#[derive(Debug, Clone, PartialEq)]
2506#[non_exhaustive]
2507/// Stream information.
2508pub struct StreamInfo {
2509    /// Stream name.
2510    pub name: StreamName,
2511    /// Creation time.
2512    pub created_at: S2DateTime,
2513    /// Deletion time if the stream is being deleted.
2514    pub deleted_at: Option<S2DateTime>,
2515}
2516
2517impl From<api::stream::StreamInfo> for StreamInfo {
2518    fn from(value: api::stream::StreamInfo) -> Self {
2519        Self {
2520            name: value.name,
2521            created_at: value.created_at.into(),
2522            deleted_at: value.deleted_at.map(Into::into),
2523        }
2524    }
2525}
2526
2527#[derive(Debug, Clone)]
2528#[non_exhaustive]
2529/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2530pub struct CreateStreamInput {
2531    /// Stream name.
2532    pub name: StreamName,
2533    /// Configuration for the stream.
2534    ///
2535    /// See [`StreamConfig`] for defaults.
2536    pub config: Option<StreamConfig>,
2537    /// Idempotency token for the operation.
2538    ///
2539    /// When a request is retried, the same token is reused,
2540    /// causing the server to return the original response instead of
2541    /// performing the operation again.
2542    ///
2543    /// Defaults to a random UUID.
2544    pub idempotency_token: String,
2545}
2546
2547impl CreateStreamInput {
2548    /// Create a new [`CreateStreamInput`] with the given stream name.
2549    pub fn new(name: StreamName) -> Self {
2550        Self {
2551            name,
2552            config: None,
2553            idempotency_token: idempotency_token(),
2554        }
2555    }
2556
2557    /// Set the configuration for the stream.
2558    pub fn with_config(self, config: StreamConfig) -> Self {
2559        Self {
2560            config: Some(config),
2561            ..self
2562        }
2563    }
2564
2565    /// Set the idempotency token for the operation.
2566    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2567        Self {
2568            idempotency_token: idempotency_token.into(),
2569            ..self
2570        }
2571    }
2572}
2573
2574impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2575    fn from(value: CreateStreamInput) -> Self {
2576        (
2577            api::stream::CreateStreamRequest {
2578                stream: value.name,
2579                config: value.config.map(Into::into),
2580            },
2581            value.idempotency_token,
2582        )
2583    }
2584}
2585
2586#[derive(Debug, Clone)]
2587#[non_exhaustive]
2588/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2589pub struct DeleteStreamInput {
2590    /// Stream name.
2591    pub name: StreamName,
2592    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2593    pub ignore_not_found: bool,
2594}
2595
2596impl DeleteStreamInput {
2597    /// Create a new [`DeleteStreamInput`] with the given stream name.
2598    pub fn new(name: StreamName) -> Self {
2599        Self {
2600            name,
2601            ignore_not_found: false,
2602        }
2603    }
2604
2605    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2606    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2607        Self {
2608            ignore_not_found,
2609            ..self
2610        }
2611    }
2612}
2613
2614#[derive(Debug, Clone)]
2615#[non_exhaustive]
2616/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2617pub struct ReconfigureStreamInput {
2618    /// Stream name.
2619    pub name: StreamName,
2620    /// Reconfiguration for [`StreamConfig`].
2621    pub config: StreamReconfiguration,
2622}
2623
2624impl ReconfigureStreamInput {
2625    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2626    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2627        Self { name, config }
2628    }
2629}
2630
2631#[derive(Debug, Clone, PartialEq, Eq)]
2632/// Token for fencing appends to a stream.
2633///
2634/// **Note:** It must not exceed 36 bytes in length.
2635///
2636/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2637pub struct FencingToken(String);
2638
2639impl FencingToken {
2640    /// Generate a random alphanumeric fencing token of `n` bytes.
2641    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2642        rand::rng()
2643            .sample_iter(&rand::distr::Alphanumeric)
2644            .take(n)
2645            .map(char::from)
2646            .collect::<String>()
2647            .parse()
2648    }
2649}
2650
2651impl FromStr for FencingToken {
2652    type Err = ValidationError;
2653
2654    fn from_str(s: &str) -> Result<Self, Self::Err> {
2655        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2656            return Err(ValidationError(format!(
2657                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2658            )));
2659        }
2660        Ok(FencingToken(s.to_string()))
2661    }
2662}
2663
2664impl std::fmt::Display for FencingToken {
2665    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2666        write!(f, "{}", self.0)
2667    }
2668}
2669
2670impl Deref for FencingToken {
2671    type Target = str;
2672
2673    fn deref(&self) -> &Self::Target {
2674        &self.0
2675    }
2676}
2677
2678#[derive(Debug, Clone, Copy, PartialEq)]
2679#[non_exhaustive]
2680/// A position in a stream.
2681pub struct StreamPosition {
2682    /// Sequence number assigned by the service.
2683    pub seq_num: u64,
2684    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2685    /// User-specified timestamps are passed through as-is.
2686    pub timestamp: u64,
2687}
2688
2689impl std::fmt::Display for StreamPosition {
2690    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2691        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2692    }
2693}
2694
2695impl From<api::stream::proto::StreamPosition> for StreamPosition {
2696    fn from(value: api::stream::proto::StreamPosition) -> Self {
2697        Self {
2698            seq_num: value.seq_num,
2699            timestamp: value.timestamp,
2700        }
2701    }
2702}
2703
2704impl From<api::stream::StreamPosition> for StreamPosition {
2705    fn from(value: api::stream::StreamPosition) -> Self {
2706        Self {
2707            seq_num: value.seq_num,
2708            timestamp: value.timestamp,
2709        }
2710    }
2711}
2712
2713#[derive(Debug, Clone, PartialEq)]
2714#[non_exhaustive]
2715/// A name-value pair.
2716pub struct Header {
2717    /// Name.
2718    pub name: Bytes,
2719    /// Value.
2720    pub value: Bytes,
2721}
2722
2723impl Header {
2724    /// Create a new [`Header`] with the given name and value.
2725    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2726        Self {
2727            name: name.into(),
2728            value: value.into(),
2729        }
2730    }
2731}
2732
2733impl From<Header> for api::stream::proto::Header {
2734    fn from(value: Header) -> Self {
2735        Self {
2736            name: value.name,
2737            value: value.value,
2738        }
2739    }
2740}
2741
2742impl From<api::stream::proto::Header> for Header {
2743    fn from(value: api::stream::proto::Header) -> Self {
2744        Self {
2745            name: value.name,
2746            value: value.value,
2747        }
2748    }
2749}
2750
2751#[derive(Debug, Clone, PartialEq)]
2752/// A record to append.
2753pub struct AppendRecord {
2754    body: Bytes,
2755    headers: Vec<Header>,
2756    timestamp: Option<u64>,
2757}
2758
2759impl AppendRecord {
2760    fn validate(self) -> Result<Self, ValidationError> {
2761        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2762            Err(ValidationError(format!(
2763                "metered_bytes: {} exceeds {}",
2764                self.metered_bytes(),
2765                RECORD_BATCH_MAX.bytes
2766            )))
2767        } else {
2768            Ok(self)
2769        }
2770    }
2771
2772    /// Create a new [`AppendRecord`] with the given record body.
2773    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2774        let record = Self {
2775            body: body.into(),
2776            headers: Vec::default(),
2777            timestamp: None,
2778        };
2779        record.validate()
2780    }
2781
2782    /// Set the headers for this record.
2783    pub fn with_headers(
2784        self,
2785        headers: impl IntoIterator<Item = Header>,
2786    ) -> Result<Self, ValidationError> {
2787        let record = Self {
2788            headers: headers.into_iter().collect(),
2789            ..self
2790        };
2791        record.validate()
2792    }
2793
2794    /// Set the timestamp for this record.
2795    ///
2796    /// Precise semantics depend on [`StreamConfig::timestamping`].
2797    pub fn with_timestamp(self, timestamp: u64) -> Self {
2798        Self {
2799            timestamp: Some(timestamp),
2800            ..self
2801        }
2802    }
2803}
2804
2805impl From<AppendRecord> for api::stream::proto::AppendRecord {
2806    fn from(value: AppendRecord) -> Self {
2807        Self {
2808            timestamp: value.timestamp,
2809            headers: value.headers.into_iter().map(Into::into).collect(),
2810            body: value.body,
2811        }
2812    }
2813}
2814
2815/// Metered byte size calculation.
2816///
2817/// Formula for a record:
2818/// ```text
2819/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2820/// ```
2821pub trait MeteredBytes {
2822    /// Returns the metered byte size.
2823    fn metered_bytes(&self) -> usize;
2824}
2825
2826macro_rules! metered_bytes_impl {
2827    ($ty:ty) => {
2828        impl MeteredBytes for $ty {
2829            fn metered_bytes(&self) -> usize {
2830                8 + (2 * self.headers.len())
2831                    + self
2832                        .headers
2833                        .iter()
2834                        .map(|h| h.name.len() + h.value.len())
2835                        .sum::<usize>()
2836                    + self.body.len()
2837            }
2838        }
2839    };
2840}
2841
2842metered_bytes_impl!(AppendRecord);
2843
2844#[derive(Debug, Clone)]
2845/// A batch of records to append atomically.
2846///
2847/// **Note:** It must contain at least `1` record and no more than `1000`.
2848/// The total size of the batch must not exceed `1MiB` in metered bytes.
2849///
2850/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
2851/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
2852/// that takes care of the abovementioned constraints.
2853pub struct AppendRecordBatch {
2854    records: Vec<AppendRecord>,
2855    metered_bytes: usize,
2856}
2857
2858impl AppendRecordBatch {
2859    pub(crate) fn with_capacity(capacity: usize) -> Self {
2860        Self {
2861            records: Vec::with_capacity(capacity),
2862            metered_bytes: 0,
2863        }
2864    }
2865
2866    pub(crate) fn push(&mut self, record: AppendRecord) {
2867        self.metered_bytes += record.metered_bytes();
2868        self.records.push(record);
2869    }
2870
2871    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
2872    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2873    where
2874        I: IntoIterator<Item = AppendRecord>,
2875    {
2876        let mut records = Vec::new();
2877        let mut metered_bytes = 0;
2878
2879        for record in iter {
2880            metered_bytes += record.metered_bytes();
2881            records.push(record);
2882
2883            if metered_bytes > RECORD_BATCH_MAX.bytes {
2884                return Err(ValidationError(format!(
2885                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
2886                    RECORD_BATCH_MAX.bytes
2887                )));
2888            }
2889
2890            if records.len() > RECORD_BATCH_MAX.count {
2891                return Err(ValidationError(format!(
2892                    "number of records in the batch exceeds {}",
2893                    RECORD_BATCH_MAX.count
2894                )));
2895            }
2896        }
2897
2898        if records.is_empty() {
2899            return Err(ValidationError("batch is empty".into()));
2900        }
2901
2902        Ok(Self {
2903            records,
2904            metered_bytes,
2905        })
2906    }
2907}
2908
2909impl Deref for AppendRecordBatch {
2910    type Target = [AppendRecord];
2911
2912    fn deref(&self) -> &Self::Target {
2913        &self.records
2914    }
2915}
2916
2917impl MeteredBytes for AppendRecordBatch {
2918    fn metered_bytes(&self) -> usize {
2919        self.metered_bytes
2920    }
2921}
2922
2923#[derive(Debug, Clone)]
2924/// Command to signal an operation.
2925pub enum Command {
2926    /// Fence operation.
2927    Fence {
2928        /// Fencing token.
2929        fencing_token: FencingToken,
2930    },
2931    /// Trim operation.
2932    Trim {
2933        /// Trim point.
2934        trim_point: u64,
2935    },
2936}
2937
2938#[derive(Debug, Clone)]
2939#[non_exhaustive]
2940/// Command record for signaling operations to the service.
2941///
2942/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
2943pub struct CommandRecord {
2944    /// Command to signal an operation.
2945    pub command: Command,
2946    /// Timestamp for this record.
2947    pub timestamp: Option<u64>,
2948}
2949
2950impl CommandRecord {
2951    const FENCE: &[u8] = b"fence";
2952    const TRIM: &[u8] = b"trim";
2953
2954    /// Create a fence command record with the given fencing token.
2955    ///
2956    /// Fencing is strongly consistent, and subsequent appends that specify a
2957    /// fencing token will fail if it does not match.
2958    pub fn fence(fencing_token: FencingToken) -> Self {
2959        Self {
2960            command: Command::Fence { fencing_token },
2961            timestamp: None,
2962        }
2963    }
2964
2965    /// Create a trim command record with the given trim point.
2966    ///
2967    /// Trim point is the desired earliest sequence number for the stream.
2968    ///
2969    /// Trimming is eventually consistent, and trimmed records may be visible
2970    /// for a brief period.
2971    pub fn trim(trim_point: u64) -> Self {
2972        Self {
2973            command: Command::Trim { trim_point },
2974            timestamp: None,
2975        }
2976    }
2977
2978    /// Set the timestamp for this record.
2979    pub fn with_timestamp(self, timestamp: u64) -> Self {
2980        Self {
2981            timestamp: Some(timestamp),
2982            ..self
2983        }
2984    }
2985}
2986
2987impl From<CommandRecord> for AppendRecord {
2988    fn from(value: CommandRecord) -> Self {
2989        let (header_value, body) = match value.command {
2990            Command::Fence { fencing_token } => (
2991                CommandRecord::FENCE,
2992                Bytes::copy_from_slice(fencing_token.as_bytes()),
2993            ),
2994            Command::Trim { trim_point } => (
2995                CommandRecord::TRIM,
2996                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
2997            ),
2998        };
2999        Self {
3000            body,
3001            headers: vec![Header::new("", header_value)],
3002            timestamp: value.timestamp,
3003        }
3004    }
3005}
3006
3007#[derive(Debug, Clone)]
3008#[non_exhaustive]
3009/// Input for [`append`](crate::S2Stream::append) operation and
3010/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3011pub struct AppendInput {
3012    /// Batch of records to append atomically.
3013    pub records: AppendRecordBatch,
3014    /// Expected sequence number for the first record in the batch.
3015    ///
3016    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3017    pub match_seq_num: Option<u64>,
3018    /// Fencing token to match against the stream's current fencing token.
3019    ///
3020    /// If unspecified, no matching is performed. If specified and mismatched,
3021    /// the append fails. A stream defaults to `""` as its fencing token.
3022    pub fencing_token: Option<FencingToken>,
3023}
3024
3025impl AppendInput {
3026    /// Create a new [`AppendInput`] with the given batch of records.
3027    pub fn new(records: AppendRecordBatch) -> Self {
3028        Self {
3029            records,
3030            match_seq_num: None,
3031            fencing_token: None,
3032        }
3033    }
3034
3035    /// Set the expected sequence number for the first record in the batch.
3036    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3037        Self {
3038            match_seq_num: Some(match_seq_num),
3039            ..self
3040        }
3041    }
3042
3043    /// Set the fencing token to match against the stream's current fencing token.
3044    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3045        Self {
3046            fencing_token: Some(fencing_token),
3047            ..self
3048        }
3049    }
3050}
3051
3052impl From<AppendInput> for api::stream::proto::AppendInput {
3053    fn from(value: AppendInput) -> Self {
3054        Self {
3055            records: value.records.iter().cloned().map(Into::into).collect(),
3056            match_seq_num: value.match_seq_num,
3057            fencing_token: value.fencing_token.map(|t| t.to_string()),
3058        }
3059    }
3060}
3061
3062#[derive(Debug, Clone, PartialEq)]
3063#[non_exhaustive]
3064/// Acknowledgement for an [`AppendInput`].
3065pub struct AppendAck {
3066    /// Sequence number and timestamp of the first record that was appended.
3067    pub start: StreamPosition,
3068    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3069    /// that was appended.
3070    ///
3071    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3072    /// appended.
3073    pub end: StreamPosition,
3074    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3075    /// the last record on the stream.
3076    ///
3077    /// This can be greater than the `end` position in case of concurrent appends.
3078    pub tail: StreamPosition,
3079}
3080
3081impl From<api::stream::proto::AppendAck> for AppendAck {
3082    fn from(value: api::stream::proto::AppendAck) -> Self {
3083        Self {
3084            start: value.start.unwrap_or_default().into(),
3085            end: value.end.unwrap_or_default().into(),
3086            tail: value.tail.unwrap_or_default().into(),
3087        }
3088    }
3089}
3090
3091#[derive(Debug, Clone, Copy)]
3092/// Starting position for reading from a stream.
3093pub enum ReadFrom {
3094    /// Read from this sequence number.
3095    SeqNum(u64),
3096    /// Read from this timestamp.
3097    Timestamp(u64),
3098    /// Read from N records before the tail.
3099    TailOffset(u64),
3100}
3101
3102impl Default for ReadFrom {
3103    fn default() -> Self {
3104        Self::SeqNum(0)
3105    }
3106}
3107
3108#[derive(Debug, Default, Clone)]
3109#[non_exhaustive]
3110/// Where to start reading.
3111pub struct ReadStart {
3112    /// Starting position.
3113    ///
3114    /// Defaults to reading from sequence number `0`.
3115    pub from: ReadFrom,
3116    /// Whether to start from tail if the requested starting position is beyond it.
3117    ///
3118    /// Defaults to `false` (errors if position is beyond tail).
3119    pub clamp_to_tail: bool,
3120}
3121
3122impl ReadStart {
3123    /// Create a new [`ReadStart`] with default values.
3124    pub fn new() -> Self {
3125        Self::default()
3126    }
3127
3128    /// Set the starting position.
3129    pub fn with_from(self, from: ReadFrom) -> Self {
3130        Self { from, ..self }
3131    }
3132
3133    /// Set whether to start from tail if the requested starting position is beyond it.
3134    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3135        Self {
3136            clamp_to_tail,
3137            ..self
3138        }
3139    }
3140}
3141
3142impl From<ReadStart> for api::stream::ReadStart {
3143    fn from(value: ReadStart) -> Self {
3144        let (seq_num, timestamp, tail_offset) = match value.from {
3145            ReadFrom::SeqNum(n) => (Some(n), None, None),
3146            ReadFrom::Timestamp(t) => (None, Some(t), None),
3147            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3148        };
3149        Self {
3150            seq_num,
3151            timestamp,
3152            tail_offset,
3153            clamp: if value.clamp_to_tail {
3154                Some(true)
3155            } else {
3156                None
3157            },
3158        }
3159    }
3160}
3161
3162#[derive(Debug, Clone, Default)]
3163#[non_exhaustive]
3164/// Limits on how much to read.
3165pub struct ReadLimits {
3166    /// Limit on number of records.
3167    ///
3168    /// Defaults to `1000` for non-streaming read.
3169    pub count: Option<usize>,
3170    /// Limit on total metered bytes of records.
3171    ///
3172    /// Defaults to `1MiB` for non-streaming read.
3173    pub bytes: Option<usize>,
3174}
3175
3176impl ReadLimits {
3177    /// Create a new [`ReadLimits`] with default values.
3178    pub fn new() -> Self {
3179        Self::default()
3180    }
3181
3182    /// Set the limit on number of records.
3183    pub fn with_count(self, count: usize) -> Self {
3184        Self {
3185            count: Some(count),
3186            ..self
3187        }
3188    }
3189
3190    /// Set the limit on total metered bytes of records.
3191    pub fn with_bytes(self, bytes: usize) -> Self {
3192        Self {
3193            bytes: Some(bytes),
3194            ..self
3195        }
3196    }
3197}
3198
3199#[derive(Debug, Clone, Default)]
3200#[non_exhaustive]
3201/// When to stop reading.
3202pub struct ReadStop {
3203    /// Limits on how much to read.
3204    pub limits: ReadLimits,
3205    /// Timestamp at which to stop (exclusive).
3206    pub until: Option<RangeTo<u64>>,
3207    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3208    /// seconds for [`read`](crate::S2Stream::read).
3209    ///
3210    /// Defaults to:
3211    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3212    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3213    ///   is specified.
3214    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3215    ///   `until` is specified.
3216    pub wait: Option<u32>,
3217}
3218
3219impl ReadStop {
3220    /// Create a new [`ReadStop`] with default values.
3221    pub fn new() -> Self {
3222        Self::default()
3223    }
3224
3225    /// Set the limits on how much to read.
3226    pub fn with_limits(self, limits: ReadLimits) -> Self {
3227        Self { limits, ..self }
3228    }
3229
3230    /// Set the timestamp at which to stop (exclusive).
3231    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3232        Self {
3233            until: Some(until),
3234            ..self
3235        }
3236    }
3237
3238    /// Set the duration in seconds to wait for new records before stopping.
3239    pub fn with_wait(self, wait: u32) -> Self {
3240        Self {
3241            wait: Some(wait),
3242            ..self
3243        }
3244    }
3245}
3246
3247impl From<ReadStop> for api::stream::ReadEnd {
3248    fn from(value: ReadStop) -> Self {
3249        Self {
3250            count: value.limits.count,
3251            bytes: value.limits.bytes,
3252            until: value.until.map(|r| r.end),
3253            wait: value.wait,
3254        }
3255    }
3256}
3257
3258#[derive(Debug, Clone, Default)]
3259#[non_exhaustive]
3260/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3261/// operations.
3262pub struct ReadInput {
3263    /// Where to start reading.
3264    pub start: ReadStart,
3265    /// When to stop reading.
3266    pub stop: ReadStop,
3267    /// Whether to filter out command records from the stream when reading.
3268    ///
3269    /// Defaults to `false`.
3270    pub ignore_command_records: bool,
3271}
3272
3273impl ReadInput {
3274    /// Create a new [`ReadInput`] with default values.
3275    pub fn new() -> Self {
3276        Self::default()
3277    }
3278
3279    /// Set where to start reading.
3280    pub fn with_start(self, start: ReadStart) -> Self {
3281        Self { start, ..self }
3282    }
3283
3284    /// Set when to stop reading.
3285    pub fn with_stop(self, stop: ReadStop) -> Self {
3286        Self { stop, ..self }
3287    }
3288
3289    /// Set whether to filter out command records from the stream when reading.
3290    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3291        Self {
3292            ignore_command_records,
3293            ..self
3294        }
3295    }
3296}
3297
3298#[derive(Debug, Clone)]
3299#[non_exhaustive]
3300/// Record that is durably sequenced on a stream.
3301pub struct SequencedRecord {
3302    /// Sequence number assigned to this record.
3303    pub seq_num: u64,
3304    /// Body of this record.
3305    pub body: Bytes,
3306    /// Headers for this record.
3307    pub headers: Vec<Header>,
3308    /// Timestamp for this record.
3309    pub timestamp: u64,
3310}
3311
3312impl SequencedRecord {
3313    /// Whether this is a command record.
3314    pub fn is_command_record(&self) -> bool {
3315        self.headers.len() == 1 && *self.headers[0].name == *b""
3316    }
3317}
3318
3319impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3320    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3321        Self {
3322            seq_num: value.seq_num,
3323            body: value.body,
3324            headers: value.headers.into_iter().map(Into::into).collect(),
3325            timestamp: value.timestamp,
3326        }
3327    }
3328}
3329
3330metered_bytes_impl!(SequencedRecord);
3331
3332#[derive(Debug, Clone)]
3333#[non_exhaustive]
3334/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3335/// [`read_session`](crate::S2Stream::read_session).
3336pub struct ReadBatch {
3337    /// Records that are durably sequenced on the stream.
3338    ///
3339    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3340    /// - the [`stop condition`](ReadInput::stop) was already met, or
3341    /// - all records in the batch were command records and
3342    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3343    pub records: Vec<SequencedRecord>,
3344    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3345    /// the last record.
3346    ///
3347    /// It will only be present when reading recent records.
3348    pub tail: Option<StreamPosition>,
3349}
3350
3351impl ReadBatch {
3352    pub(crate) fn from_api(
3353        batch: api::stream::proto::ReadBatch,
3354        ignore_command_records: bool,
3355    ) -> Self {
3356        Self {
3357            records: batch
3358                .records
3359                .into_iter()
3360                .map(Into::into)
3361                .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3362                .collect(),
3363            tail: batch.tail.map(Into::into),
3364        }
3365    }
3366}
3367
3368/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3369pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3370
3371#[derive(Debug, Clone, thiserror::Error)]
3372/// Why an append condition check failed.
3373pub enum AppendConditionFailed {
3374    #[error("fencing token mismatch, expected: {0}")]
3375    /// Fencing token did not match. Contains the expected fencing token.
3376    FencingTokenMismatch(FencingToken),
3377    #[error("sequence number mismatch, expected: {0}")]
3378    /// Sequence number did not match. Contains the expected sequence number.
3379    SeqNumMismatch(u64),
3380}
3381
3382impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3383    fn from(value: api::stream::AppendConditionFailed) -> Self {
3384        match value {
3385            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3386                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3387            }
3388            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3389                AppendConditionFailed::SeqNumMismatch(seq)
3390            }
3391        }
3392    }
3393}
3394
3395#[derive(Debug, Clone, thiserror::Error)]
3396/// Errors from S2 operations.
3397pub enum S2Error {
3398    #[error("{0}")]
3399    /// Client-side error.
3400    Client(String),
3401    #[error(transparent)]
3402    /// Validation error.
3403    Validation(#[from] ValidationError),
3404    #[error("{0}")]
3405    /// Append condition check failed. Contains the failure reason.
3406    AppendConditionFailed(AppendConditionFailed),
3407    #[error("read from an unwritten position. current tail: {0}")]
3408    /// Read from an unwritten position. Contains the current tail.
3409    ReadUnwritten(StreamPosition),
3410    #[error("{0}")]
3411    /// Other server-side error.
3412    Server(ErrorResponse),
3413}
3414
3415impl From<ApiError> for S2Error {
3416    fn from(err: ApiError) -> Self {
3417        match err {
3418            ApiError::ReadUnwritten(tail_response) => {
3419                Self::ReadUnwritten(tail_response.tail.into())
3420            }
3421            ApiError::AppendConditionFailed(condition_failed) => {
3422                Self::AppendConditionFailed(condition_failed.into())
3423            }
3424            ApiError::Server(_, response) => Self::Server(response.into()),
3425            other => Self::Client(other.to_string()),
3426        }
3427    }
3428}
3429
3430#[derive(Debug, Clone, thiserror::Error)]
3431#[error("{code}: {message}")]
3432#[non_exhaustive]
3433/// Error response from S2 server.
3434pub struct ErrorResponse {
3435    /// Error code.
3436    pub code: String,
3437    /// Error message.
3438    pub message: String,
3439}
3440
3441impl From<ApiErrorResponse> for ErrorResponse {
3442    fn from(response: ApiErrorResponse) -> Self {
3443        Self {
3444            code: response.code,
3445            message: response.message,
3446        }
3447    }
3448}
3449
3450fn idempotency_token() -> String {
3451    uuid::Uuid::new_v4().simple().to_string()
3452}