s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Stream name.
42///
43/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
44pub use s2_common::types::stream::StreamName;
45/// See [`ListStreamsInput::prefix`].
46pub use s2_common::types::stream::StreamNamePrefix;
47/// See [`ListStreamsInput::start_after`].
48pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57/// An RFC 3339 datetime.
58///
59/// It can be created in either of the following ways:
60/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
61/// - Convert from [`time::OffsetDateTime`] using [`From`]/[`Into`].
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66    fn from(dt: time::OffsetDateTime) -> Self {
67        Self(dt)
68    }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72    fn from(dt: S2DateTime) -> Self {
73        dt.0
74    }
75}
76
77impl FromStr for S2DateTime {
78    type Err = ValidationError;
79
80    fn from_str(s: &str) -> Result<Self, Self::Err> {
81        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82            .map(Self)
83            .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84    }
85}
86
87impl fmt::Display for S2DateTime {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        write!(
90            f,
91            "{}",
92            self.0
93                .format(&time::format_description::well_known::Rfc3339)
94                .expect("RFC3339 formatting should not fail for S2DateTime")
95        )
96    }
97}
98
99/// Authority for connecting to an S2 basin.
100#[derive(Debug, Clone, PartialEq)]
101pub enum BasinAuthority {
102    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
103    ParentZone(Authority),
104    /// Direct cell authority. Basin is expected to be hosted by this cell.
105    Direct(Authority),
106}
107
108#[derive(Debug, Clone)]
109#[non_exhaustive]
110/// Endpoints for the S2 environment.
111pub struct S2Endpoints {
112    /// URI scheme.
113    ///
114    /// Defaults to `https`.
115    pub scheme: Scheme,
116    /// Account authority.
117    pub account_authority: Authority,
118    /// Basin authority.
119    pub basin_authority: BasinAuthority,
120}
121
122impl S2Endpoints {
123    /// Create a new [`S2Endpoints`] with the given account and basin authorities.
124    pub fn new(account_authority: Authority, basin_authority: BasinAuthority) -> Self {
125        Self {
126            scheme: Scheme::HTTPS,
127            account_authority,
128            basin_authority,
129        }
130    }
131
132    /// Create a new [`S2Endpoints`] from environment variables.
133    ///
134    /// The following environment variables are expected to be set:
135    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
136    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
137    pub fn from_env() -> Result<Self, ValidationError> {
138        let account_endpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
139            Ok(s) => s,
140            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
141            Err(VarError::NotUnicode(_)) => {
142                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
143            }
144        };
145
146        let basin_endpoint = match std::env::var("S2_BASIN_ENDPOINT") {
147            Ok(s) => s,
148            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
149            Err(VarError::NotUnicode(_)) => {
150                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
151            }
152        };
153
154        let (account_scheme, account_authority) = parse_account_endpoint(&account_endpoint)?;
155
156        let (basin_scheme, basin_authority) = parse_basin_endpoint(&basin_endpoint)?;
157
158        if account_scheme != basin_scheme {
159            return Err(
160                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
161            );
162        }
163
164        Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
165    }
166
167    #[doc(hidden)]
168    #[cfg(feature = "_hidden")]
169    pub fn parse_from(
170        account_endpoint: &str,
171        basin_endpoint: &str,
172    ) -> Result<Self, ValidationError> {
173        let (account_scheme, account_authority) = parse_account_endpoint(account_endpoint)?;
174        let (basin_scheme, basin_authority) = parse_basin_endpoint(basin_endpoint)?;
175
176        if account_scheme != basin_scheme {
177            return Err("account and basin endpoints must have the same scheme".into());
178        }
179        Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
180    }
181
182    /// Set the URI scheme.
183    pub fn with_scheme(self, scheme: Scheme) -> Self {
184        Self { scheme, ..self }
185    }
186
187    pub(crate) fn for_aws() -> Self {
188        Self {
189            scheme: Scheme::HTTPS,
190            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
191            basin_authority: BasinAuthority::ParentZone(
192                "b.aws.s2.dev".try_into().expect("valid authority"),
193            ),
194        }
195    }
196}
197
198fn parse_account_endpoint(s: &str) -> Result<(Scheme, Authority), ValidationError> {
199    let (scheme, authority) = match s.find("://") {
200        Some(idx) => {
201            let scheme: Scheme = s[..idx]
202                .parse()
203                .map_err(|_| "invalid account endpoint scheme".to_string())?;
204            (scheme, &s[idx + 3..])
205        }
206        None => (Scheme::HTTPS, s),
207    };
208    Ok((
209        scheme,
210        authority
211            .parse()
212            .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
213    ))
214}
215
216fn parse_basin_endpoint(s: &str) -> Result<(Scheme, BasinAuthority), ValidationError> {
217    let (scheme, authority) = match s.find("://") {
218        Some(idx) => {
219            let scheme: Scheme = s[..idx]
220                .parse()
221                .map_err(|_| "invalid basin endpoint scheme".to_string())?;
222            (scheme, &s[idx + 3..])
223        }
224        None => (Scheme::HTTPS, s),
225    };
226    let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
227        BasinAuthority::ParentZone(
228            authority
229                .parse()
230                .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
231        )
232    } else {
233        BasinAuthority::Direct(
234            authority
235                .parse()
236                .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
237        )
238    };
239    Ok((scheme, authority))
240}
241
242#[derive(Debug, Clone, Copy)]
243/// Compression algorithm for request and response bodies.
244pub enum Compression {
245    /// No compression.
246    None,
247    /// Gzip compression.
248    Gzip,
249    /// Zstd compression.
250    Zstd,
251}
252
253impl From<Compression> for CompressionAlgorithm {
254    fn from(value: Compression) -> Self {
255        match value {
256            Compression::None => CompressionAlgorithm::None,
257            Compression::Gzip => CompressionAlgorithm::Gzip,
258            Compression::Zstd => CompressionAlgorithm::Zstd,
259        }
260    }
261}
262
263#[derive(Debug, Clone, Copy, PartialEq)]
264#[non_exhaustive]
265/// Retry policy for [`append`](crate::S2Stream::append) and
266/// [`append_session`](crate::S2Stream::append_session) operations.
267pub enum AppendRetryPolicy {
268    /// Retry all appends. Use when duplicate records on the stream are acceptable.
269    All,
270    /// Only retry appends that include [`match_seq_num`](AppendInput::match_seq_num).
271    NoSideEffects,
272}
273
274impl AppendRetryPolicy {
275    pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
276        match self {
277            Self::All => true,
278            Self::NoSideEffects => input.match_seq_num.is_some(),
279        }
280    }
281}
282
283#[derive(Debug, Clone)]
284#[non_exhaustive]
285/// Configuration for retrying requests in case of transient failures.
286///
287/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
288/// ```text
289/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
290///     jitter = rand[0, base_delay]
291///     delay  = base_delay + jitter
292/// ````
293pub struct RetryConfig {
294    /// Total number of attempts including the initial try. A value of `1` means no retries.
295    ///
296    /// Defaults to `3`.
297    pub max_attempts: NonZeroU32,
298    /// Minimum base delay for retries.
299    ///
300    /// Defaults to `100ms`.
301    pub min_base_delay: Duration,
302    /// Maximum base delay for retries.
303    ///
304    /// Defaults to `1s`.
305    pub max_base_delay: Duration,
306    /// Retry policy for [`append`](crate::S2Stream::append) and
307    /// [`append_session`](crate::S2Stream::append_session) operations.
308    ///
309    /// Defaults to `All`.
310    pub append_retry_policy: AppendRetryPolicy,
311}
312
313impl Default for RetryConfig {
314    fn default() -> Self {
315        Self {
316            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
317            min_base_delay: Duration::from_millis(100),
318            max_base_delay: Duration::from_secs(1),
319            append_retry_policy: AppendRetryPolicy::All,
320        }
321    }
322}
323
324impl RetryConfig {
325    /// Create a new [`RetryConfig`] with default settings.
326    pub fn new() -> Self {
327        Self::default()
328    }
329
330    pub(crate) fn max_retries(&self) -> u32 {
331        self.max_attempts.get() - 1
332    }
333
334    /// Set the total number of attempts including the initial try.
335    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
336        Self {
337            max_attempts,
338            ..self
339        }
340    }
341
342    /// Set the minimum base delay for retries.
343    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
344        Self {
345            min_base_delay,
346            ..self
347        }
348    }
349
350    /// Set the maximum base delay for retries.
351    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
352        Self {
353            max_base_delay,
354            ..self
355        }
356    }
357
358    /// Set the retry policy for [`append`](crate::S2Stream::append) and
359    /// [`append_session`](crate::S2Stream::append_session) operations.
360    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
361        Self {
362            append_retry_policy,
363            ..self
364        }
365    }
366}
367
368#[derive(Debug, Clone)]
369#[non_exhaustive]
370/// Configuration for [`S2`](crate::S2).
371pub struct S2Config {
372    pub(crate) access_token: SecretString,
373    pub(crate) endpoints: S2Endpoints,
374    pub(crate) connection_timeout: Duration,
375    pub(crate) request_timeout: Duration,
376    pub(crate) retry: RetryConfig,
377    pub(crate) compression: Compression,
378    pub(crate) user_agent: HeaderValue,
379}
380
381impl S2Config {
382    /// Create a new [`S2Config`] with the given access token and default settings.
383    pub fn new(access_token: impl Into<String>) -> Self {
384        Self {
385            access_token: access_token.into().into(),
386            endpoints: S2Endpoints::for_aws(),
387            connection_timeout: Duration::from_secs(3),
388            request_timeout: Duration::from_secs(5),
389            retry: RetryConfig::new(),
390            compression: Compression::None,
391            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
392                .parse()
393                .expect("valid user agent"),
394        }
395    }
396
397    /// Set the S2 endpoints to connect to.
398    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
399        Self { endpoints, ..self }
400    }
401
402    /// Set the timeout for establishing a connection to the server.
403    ///
404    /// Defaults to `3s`.
405    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
406        Self {
407            connection_timeout,
408            ..self
409        }
410    }
411
412    /// Set the timeout for requests.
413    ///
414    /// Defaults to `5s`.
415    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
416        Self {
417            request_timeout,
418            ..self
419        }
420    }
421
422    /// Set the retry configuration for requests.
423    ///
424    /// See [`RetryConfig`] for defaults.
425    pub fn with_retry(self, retry: RetryConfig) -> Self {
426        Self { retry, ..self }
427    }
428
429    /// Set the compression algorithm for requests and responses.
430    ///
431    /// Defaults to no compression.
432    pub fn with_compression(self, compression: Compression) -> Self {
433        Self {
434            compression,
435            ..self
436        }
437    }
438
439    #[doc(hidden)]
440    #[cfg(feature = "_hidden")]
441    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
442        let user_agent = user_agent
443            .into()
444            .parse()
445            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
446        Ok(Self { user_agent, ..self })
447    }
448}
449
450#[derive(Debug, Default, Clone, PartialEq, Eq)]
451#[non_exhaustive]
452/// A page of values.
453pub struct Page<T> {
454    /// Values in this page.
455    pub values: Vec<T>,
456    /// Whether there are more pages.
457    pub has_more: bool,
458}
459
460impl<T> Page<T> {
461    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
462        Self {
463            values: values.into(),
464            has_more,
465        }
466    }
467}
468
469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
470/// Storage class for recent appends.
471pub enum StorageClass {
472    /// Standard storage class that offers append latencies under `500ms`.
473    Standard,
474    /// Express storage class that offers append latencies under `50ms`.
475    Express,
476}
477
478impl From<api::config::StorageClass> for StorageClass {
479    fn from(value: api::config::StorageClass) -> Self {
480        match value {
481            api::config::StorageClass::Standard => StorageClass::Standard,
482            api::config::StorageClass::Express => StorageClass::Express,
483        }
484    }
485}
486
487impl From<StorageClass> for api::config::StorageClass {
488    fn from(value: StorageClass) -> Self {
489        match value {
490            StorageClass::Standard => api::config::StorageClass::Standard,
491            StorageClass::Express => api::config::StorageClass::Express,
492        }
493    }
494}
495
496#[derive(Debug, Clone, Copy, PartialEq, Eq)]
497/// Retention policy for records in a stream.
498pub enum RetentionPolicy {
499    /// Age in seconds. Records older than this age are automatically trimmed.
500    Age(u64),
501    /// Records are retained indefinitely unless explicitly trimmed.
502    Infinite,
503}
504
505impl From<api::config::RetentionPolicy> for RetentionPolicy {
506    fn from(value: api::config::RetentionPolicy) -> Self {
507        match value {
508            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
509            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
510        }
511    }
512}
513
514impl From<RetentionPolicy> for api::config::RetentionPolicy {
515    fn from(value: RetentionPolicy) -> Self {
516        match value {
517            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
518            RetentionPolicy::Infinite => {
519                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
520            }
521        }
522    }
523}
524
525#[derive(Debug, Clone, Copy, PartialEq, Eq)]
526/// Timestamping mode for appends that influences how timestamps are handled.
527pub enum TimestampingMode {
528    /// Prefer client-specified timestamp if present otherwise use arrival time.
529    ClientPrefer,
530    /// Require a client-specified timestamp and reject the append if it is missing.
531    ClientRequire,
532    /// Use the arrival time and ignore any client-specified timestamp.
533    Arrival,
534}
535
536impl From<api::config::TimestampingMode> for TimestampingMode {
537    fn from(value: api::config::TimestampingMode) -> Self {
538        match value {
539            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
540            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
541            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
542        }
543    }
544}
545
546impl From<TimestampingMode> for api::config::TimestampingMode {
547    fn from(value: TimestampingMode) -> Self {
548        match value {
549            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
550            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
551            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
552        }
553    }
554}
555
556#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
557#[non_exhaustive]
558/// Configuration for timestamping behavior.
559pub struct TimestampingConfig {
560    /// Timestamping mode for appends that influences how timestamps are handled.
561    ///
562    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
563    pub mode: Option<TimestampingMode>,
564    /// Whether client-specified timestamps are allowed to exceed the arrival time.
565    ///
566    /// Defaults to `false` (client timestamps are capped at the arrival time).
567    pub uncapped: bool,
568}
569
570impl TimestampingConfig {
571    /// Create a new [`TimestampingConfig`] with default settings.
572    pub fn new() -> Self {
573        Self::default()
574    }
575
576    /// Set the timestamping mode for appends that influences how timestamps are handled.
577    pub fn with_mode(self, mode: TimestampingMode) -> Self {
578        Self {
579            mode: Some(mode),
580            ..self
581        }
582    }
583
584    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
585    pub fn with_uncapped(self, uncapped: bool) -> Self {
586        Self { uncapped, ..self }
587    }
588}
589
590impl From<api::config::TimestampingConfig> for TimestampingConfig {
591    fn from(value: api::config::TimestampingConfig) -> Self {
592        Self {
593            mode: value.mode.map(Into::into),
594            uncapped: value.uncapped.unwrap_or_default(),
595        }
596    }
597}
598
599impl From<TimestampingConfig> for api::config::TimestampingConfig {
600    fn from(value: TimestampingConfig) -> Self {
601        Self {
602            mode: value.mode.map(Into::into),
603            uncapped: Some(value.uncapped),
604        }
605    }
606}
607
608#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
609#[non_exhaustive]
610/// Configuration for automatically deleting a stream when it becomes empty.
611pub struct DeleteOnEmptyConfig {
612    /// Minimum age in seconds before an empty stream can be deleted.
613    ///
614    /// Defaults to `0` (disables automatic deletion).
615    pub min_age_secs: u64,
616}
617
618impl DeleteOnEmptyConfig {
619    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
620    pub fn new() -> Self {
621        Self::default()
622    }
623
624    /// Set the minimum age in seconds before an empty stream can be deleted.
625    pub fn with_min_age(self, min_age: Duration) -> Self {
626        Self {
627            min_age_secs: min_age.as_secs(),
628        }
629    }
630}
631
632impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
633    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
634        Self {
635            min_age_secs: value.min_age_secs,
636        }
637    }
638}
639
640impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
641    fn from(value: DeleteOnEmptyConfig) -> Self {
642        Self {
643            min_age_secs: value.min_age_secs,
644        }
645    }
646}
647
648#[derive(Debug, Clone, Default, PartialEq, Eq)]
649#[non_exhaustive]
650/// Configuration for a stream.
651pub struct StreamConfig {
652    /// Storage class for the stream.
653    ///
654    /// Defaults to [`Express`](StorageClass::Express).
655    pub storage_class: Option<StorageClass>,
656    /// Retention policy for records in the stream.
657    ///
658    /// Defaults to `7 days` of retention.
659    pub retention_policy: Option<RetentionPolicy>,
660    /// Configuration for timestamping behavior.
661    ///
662    /// See [`TimestampingConfig`] for defaults.
663    pub timestamping: Option<TimestampingConfig>,
664    /// Configuration for automatically deleting the stream when it becomes empty.
665    ///
666    /// See [`DeleteOnEmptyConfig`] for defaults.
667    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
668}
669
670impl StreamConfig {
671    /// Create a new [`StreamConfig`] with default settings.
672    pub fn new() -> Self {
673        Self::default()
674    }
675
676    /// Set the storage class for the stream.
677    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
678        Self {
679            storage_class: Some(storage_class),
680            ..self
681        }
682    }
683
684    /// Set the retention policy for records in the stream.
685    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
686        Self {
687            retention_policy: Some(retention_policy),
688            ..self
689        }
690    }
691
692    /// Set the configuration for timestamping behavior.
693    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
694        Self {
695            timestamping: Some(timestamping),
696            ..self
697        }
698    }
699
700    /// Set the configuration for automatically deleting the stream when it becomes empty.
701    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
702        Self {
703            delete_on_empty: Some(delete_on_empty),
704            ..self
705        }
706    }
707}
708
709impl From<api::config::StreamConfig> for StreamConfig {
710    fn from(value: api::config::StreamConfig) -> Self {
711        Self {
712            storage_class: value.storage_class.map(Into::into),
713            retention_policy: value.retention_policy.map(Into::into),
714            timestamping: value.timestamping.map(Into::into),
715            delete_on_empty: value.delete_on_empty.map(Into::into),
716        }
717    }
718}
719
720impl From<StreamConfig> for api::config::StreamConfig {
721    fn from(value: StreamConfig) -> Self {
722        Self {
723            storage_class: value.storage_class.map(Into::into),
724            retention_policy: value.retention_policy.map(Into::into),
725            timestamping: value.timestamping.map(Into::into),
726            delete_on_empty: value.delete_on_empty.map(Into::into),
727        }
728    }
729}
730
731#[derive(Debug, Clone, Default)]
732#[non_exhaustive]
733/// Configuration for a basin.
734pub struct BasinConfig {
735    /// Default configuration for all streams in the basin.
736    ///
737    /// See [`StreamConfig`] for defaults.
738    pub default_stream_config: Option<StreamConfig>,
739    /// Whether to create stream on append if it doesn't exist using default stream configuration.
740    ///
741    /// Defaults to `false`.
742    pub create_stream_on_append: bool,
743    /// Whether to create stream on read if it doesn't exist using default stream configuration.
744    ///
745    /// Defaults to `false`.
746    pub create_stream_on_read: bool,
747}
748
749impl BasinConfig {
750    /// Create a new [`BasinConfig`] with default settings.
751    pub fn new() -> Self {
752        Self::default()
753    }
754
755    /// Set the default configuration for all streams in the basin.
756    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
757        Self {
758            default_stream_config: Some(config),
759            ..self
760        }
761    }
762
763    /// Set whether to create stream on append if it doesn't exist using default stream
764    /// configuration.
765    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
766        Self {
767            create_stream_on_append,
768            ..self
769        }
770    }
771
772    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
773    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
774        Self {
775            create_stream_on_read,
776            ..self
777        }
778    }
779}
780
781impl From<api::config::BasinConfig> for BasinConfig {
782    fn from(value: api::config::BasinConfig) -> Self {
783        Self {
784            default_stream_config: value.default_stream_config.map(Into::into),
785            create_stream_on_append: value.create_stream_on_append,
786            create_stream_on_read: value.create_stream_on_read,
787        }
788    }
789}
790
791impl From<BasinConfig> for api::config::BasinConfig {
792    fn from(value: BasinConfig) -> Self {
793        Self {
794            default_stream_config: value.default_stream_config.map(Into::into),
795            create_stream_on_append: value.create_stream_on_append,
796            create_stream_on_read: value.create_stream_on_read,
797        }
798    }
799}
800
801#[derive(Debug, Clone, PartialEq, Eq)]
802/// Scope of a basin.
803pub enum BasinScope {
804    /// AWS `us-east-1` region.
805    AwsUsEast1,
806}
807
808impl From<api::basin::BasinScope> for BasinScope {
809    fn from(value: api::basin::BasinScope) -> Self {
810        match value {
811            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
812        }
813    }
814}
815
816impl From<BasinScope> for api::basin::BasinScope {
817    fn from(value: BasinScope) -> Self {
818        match value {
819            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
820        }
821    }
822}
823
824#[derive(Debug, Clone)]
825#[non_exhaustive]
826/// Input for [`create_basin`](crate::S2::create_basin) operation.
827pub struct CreateBasinInput {
828    /// Basin name.
829    pub name: BasinName,
830    /// Configuration for the basin.
831    ///
832    /// See [`BasinConfig`] for defaults.
833    pub config: Option<BasinConfig>,
834    /// Scope of the basin.
835    ///
836    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
837    pub scope: Option<BasinScope>,
838    /// Idempotency token for the operation.
839    ///
840    /// When a request is retried, the same token is reused,
841    /// causing the server to return the original response instead of
842    /// performing the operation again.
843    ///
844    /// Defaults to a random UUID.
845    pub idempotency_token: String,
846}
847
848impl CreateBasinInput {
849    /// Create a new [`CreateBasinInput`] with the given basin name.
850    pub fn new(name: BasinName) -> Self {
851        Self {
852            name,
853            config: None,
854            scope: None,
855            idempotency_token: idempotency_token(),
856        }
857    }
858
859    /// Set the configuration for the basin.
860    pub fn with_config(self, config: BasinConfig) -> Self {
861        Self {
862            config: Some(config),
863            ..self
864        }
865    }
866
867    /// Set the scope of the basin.
868    pub fn with_scope(self, scope: BasinScope) -> Self {
869        Self {
870            scope: Some(scope),
871            ..self
872        }
873    }
874
875    /// Set the idempotency token for the operation.
876    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
877        Self {
878            idempotency_token: idempotency_token.into(),
879            ..self
880        }
881    }
882}
883
884impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
885    fn from(value: CreateBasinInput) -> Self {
886        (
887            api::basin::CreateBasinRequest {
888                basin: value.name,
889                config: value.config.map(Into::into),
890                scope: value.scope.map(Into::into),
891            },
892            value.idempotency_token,
893        )
894    }
895}
896
897#[derive(Debug, Clone, Default)]
898#[non_exhaustive]
899/// Input for [`list_basins`](crate::S2::list_basins) operation.
900pub struct ListBasinsInput {
901    /// Filter basins whose names begin with this value.
902    ///
903    /// Defaults to `""`.
904    pub prefix: BasinNamePrefix,
905    /// Filter basins whose names are lexicographically greater than this value.
906    ///
907    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
908    ///
909    /// Defaults to `""`.
910    pub start_after: BasinNameStartAfter,
911    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
912    ///
913    /// Defaults to `1000`.
914    pub limit: Option<usize>,
915}
916
917impl ListBasinsInput {
918    /// Create a new [`ListBasinsInput`] with default values.
919    pub fn new() -> Self {
920        Self::default()
921    }
922
923    /// Set the prefix used to filter basins whose names begin with this value.
924    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
925        Self { prefix, ..self }
926    }
927
928    /// Set the value used to filter basins whose names are lexicographically greater than this
929    /// value.
930    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
931        Self {
932            start_after,
933            ..self
934        }
935    }
936
937    /// Set the limit on number of basins to return in a page.
938    pub fn with_limit(self, limit: usize) -> Self {
939        Self {
940            limit: Some(limit),
941            ..self
942        }
943    }
944}
945
946impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
947    fn from(value: ListBasinsInput) -> Self {
948        Self {
949            prefix: Some(value.prefix),
950            start_after: Some(value.start_after),
951            limit: value.limit,
952        }
953    }
954}
955
956#[derive(Debug, Clone, Default)]
957/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
958pub struct ListAllBasinsInput {
959    /// Filter basins whose names begin with this value.
960    ///
961    /// Defaults to `""`.
962    pub prefix: BasinNamePrefix,
963    /// Filter basins whose names are lexicographically greater than this value.
964    ///
965    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
966    ///
967    /// Defaults to `""`.
968    pub start_after: BasinNameStartAfter,
969    /// Whether to ignore basins that have been requested for deletion but not yet deleted.
970    ///
971    /// Defaults to `false`.
972    pub ignore_pending_deletions: bool,
973}
974
975impl ListAllBasinsInput {
976    /// Create a new [`ListAllBasinsInput`] with default values.
977    pub fn new() -> Self {
978        Self::default()
979    }
980
981    /// Set the prefix used to filter basins whose names begin with this value.
982    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
983        Self { prefix, ..self }
984    }
985
986    /// Set the value used to filter basins whose names are lexicographically greater than this
987    /// value.
988    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
989        Self {
990            start_after,
991            ..self
992        }
993    }
994
995    /// Set whether to ignore basins that have been requested for deletion but not yet deleted.
996    pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
997        Self {
998            ignore_pending_deletions,
999            ..self
1000        }
1001    }
1002}
1003
1004#[derive(Debug, Clone, PartialEq, Eq)]
1005/// Current state of a basin.
1006pub enum BasinState {
1007    /// Active
1008    Active,
1009    /// Creating
1010    Creating,
1011    /// Deleting
1012    Deleting,
1013}
1014
1015impl From<api::basin::BasinState> for BasinState {
1016    fn from(value: api::basin::BasinState) -> Self {
1017        match value {
1018            api::basin::BasinState::Active => BasinState::Active,
1019            api::basin::BasinState::Creating => BasinState::Creating,
1020            api::basin::BasinState::Deleting => BasinState::Deleting,
1021        }
1022    }
1023}
1024
1025#[derive(Debug, Clone, PartialEq, Eq)]
1026#[non_exhaustive]
1027/// Basin information.
1028pub struct BasinInfo {
1029    /// Basin name.
1030    pub name: BasinName,
1031    /// Scope of the basin.
1032    pub scope: Option<BasinScope>,
1033    /// Current state of the basin.
1034    pub state: BasinState,
1035}
1036
1037impl From<api::basin::BasinInfo> for BasinInfo {
1038    fn from(value: api::basin::BasinInfo) -> Self {
1039        Self {
1040            name: value.name,
1041            scope: value.scope.map(Into::into),
1042            state: value.state.into(),
1043        }
1044    }
1045}
1046
1047#[derive(Debug, Clone)]
1048#[non_exhaustive]
1049/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1050pub struct DeleteBasinInput {
1051    /// Basin name.
1052    pub name: BasinName,
1053    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1054    pub ignore_not_found: bool,
1055}
1056
1057impl DeleteBasinInput {
1058    /// Create a new [`DeleteBasinInput`] with the given basin name.
1059    pub fn new(name: BasinName) -> Self {
1060        Self {
1061            name,
1062            ignore_not_found: false,
1063        }
1064    }
1065
1066    /// Set whether to ignore `Not Found` error if the basin is not existing.
1067    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1068        Self {
1069            ignore_not_found,
1070            ..self
1071        }
1072    }
1073}
1074
1075#[derive(Debug, Clone, Default)]
1076#[non_exhaustive]
1077/// Reconfiguration for [`TimestampingConfig`].
1078pub struct TimestampingReconfiguration {
1079    /// Override for the existing [`mode`](TimestampingConfig::mode).
1080    pub mode: Maybe<Option<TimestampingMode>>,
1081    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1082    pub uncapped: Maybe<Option<bool>>,
1083}
1084
1085impl TimestampingReconfiguration {
1086    /// Create a new [`TimestampingReconfiguration`].
1087    pub fn new() -> Self {
1088        Self::default()
1089    }
1090
1091    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1092    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1093        Self {
1094            mode: Maybe::Specified(Some(mode)),
1095            ..self
1096        }
1097    }
1098
1099    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1100    pub fn with_uncapped(self, uncapped: bool) -> Self {
1101        Self {
1102            uncapped: Maybe::Specified(Some(uncapped)),
1103            ..self
1104        }
1105    }
1106}
1107
1108impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1109    fn from(value: TimestampingReconfiguration) -> Self {
1110        Self {
1111            mode: value.mode.map(|m| m.map(Into::into)),
1112            uncapped: value.uncapped,
1113        }
1114    }
1115}
1116
1117#[derive(Debug, Clone, Default)]
1118#[non_exhaustive]
1119/// Reconfiguration for [`DeleteOnEmptyConfig`].
1120pub struct DeleteOnEmptyReconfiguration {
1121    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1122    pub min_age_secs: Maybe<Option<u64>>,
1123}
1124
1125impl DeleteOnEmptyReconfiguration {
1126    /// Create a new [`DeleteOnEmptyReconfiguration`].
1127    pub fn new() -> Self {
1128        Self::default()
1129    }
1130
1131    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1132    pub fn with_min_age(self, min_age: Duration) -> Self {
1133        Self {
1134            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1135        }
1136    }
1137}
1138
1139impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1140    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1141        Self {
1142            min_age_secs: value.min_age_secs,
1143        }
1144    }
1145}
1146
1147#[derive(Debug, Clone, Default)]
1148#[non_exhaustive]
1149/// Reconfiguration for [`StreamConfig`].
1150pub struct StreamReconfiguration {
1151    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1152    pub storage_class: Maybe<Option<StorageClass>>,
1153    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1154    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1155    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1156    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1157    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1158    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1159}
1160
1161impl StreamReconfiguration {
1162    /// Create a new [`StreamReconfiguration`].
1163    pub fn new() -> Self {
1164        Self::default()
1165    }
1166
1167    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1168    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1169        Self {
1170            storage_class: Maybe::Specified(Some(storage_class)),
1171            ..self
1172        }
1173    }
1174
1175    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1176    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1177        Self {
1178            retention_policy: Maybe::Specified(Some(retention_policy)),
1179            ..self
1180        }
1181    }
1182
1183    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1184    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1185        Self {
1186            timestamping: Maybe::Specified(Some(timestamping)),
1187            ..self
1188        }
1189    }
1190
1191    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1192    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1193        Self {
1194            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1195            ..self
1196        }
1197    }
1198}
1199
1200impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1201    fn from(value: StreamReconfiguration) -> Self {
1202        Self {
1203            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1204            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1205            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1206            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1207        }
1208    }
1209}
1210
1211#[derive(Debug, Clone, Default)]
1212#[non_exhaustive]
1213/// Reconfiguration for [`BasinConfig`].
1214pub struct BasinReconfiguration {
1215    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1216    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1217    /// Override for the existing
1218    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1219    pub create_stream_on_append: Maybe<bool>,
1220    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1221    pub create_stream_on_read: Maybe<bool>,
1222}
1223
1224impl BasinReconfiguration {
1225    /// Create a new [`BasinReconfiguration`].
1226    pub fn new() -> Self {
1227        Self::default()
1228    }
1229
1230    /// Set the override for the existing
1231    /// [`default_stream_config`](BasinConfig::default_stream_config).
1232    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1233        Self {
1234            default_stream_config: Maybe::Specified(Some(config)),
1235            ..self
1236        }
1237    }
1238
1239    /// Set the override for the existing
1240    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1241    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1242        Self {
1243            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1244            ..self
1245        }
1246    }
1247
1248    /// Set the override for the existing
1249    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1250    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1251        Self {
1252            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1253            ..self
1254        }
1255    }
1256}
1257
1258impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1259    fn from(value: BasinReconfiguration) -> Self {
1260        Self {
1261            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1262            create_stream_on_append: value.create_stream_on_append,
1263            create_stream_on_read: value.create_stream_on_read,
1264        }
1265    }
1266}
1267
1268#[derive(Debug, Clone)]
1269#[non_exhaustive]
1270/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1271pub struct ReconfigureBasinInput {
1272    /// Basin name.
1273    pub name: BasinName,
1274    /// Reconfiguration for [`BasinConfig`].
1275    pub config: BasinReconfiguration,
1276}
1277
1278impl ReconfigureBasinInput {
1279    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1280    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1281        Self { name, config }
1282    }
1283}
1284
1285#[derive(Debug, Clone, Default)]
1286#[non_exhaustive]
1287/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1288pub struct ListAccessTokensInput {
1289    /// Filter access tokens whose IDs begin with this value.
1290    ///
1291    /// Defaults to `""`.
1292    pub prefix: AccessTokenIdPrefix,
1293    /// Filter access tokens whose IDs are lexicographically greater than this value.
1294    ///
1295    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1296    ///
1297    /// Defaults to `""`.
1298    pub start_after: AccessTokenIdStartAfter,
1299    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1300    ///
1301    /// Defaults to `1000`.
1302    pub limit: Option<usize>,
1303}
1304
1305impl ListAccessTokensInput {
1306    /// Create a new [`ListAccessTokensInput`] with default values.
1307    pub fn new() -> Self {
1308        Self::default()
1309    }
1310
1311    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1312    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1313        Self { prefix, ..self }
1314    }
1315
1316    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1317    /// value.
1318    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1319        Self {
1320            start_after,
1321            ..self
1322        }
1323    }
1324
1325    /// Set the limit on number of access tokens to return in a page.
1326    pub fn with_limit(self, limit: usize) -> Self {
1327        Self {
1328            limit: Some(limit),
1329            ..self
1330        }
1331    }
1332}
1333
1334impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1335    fn from(value: ListAccessTokensInput) -> Self {
1336        Self {
1337            prefix: Some(value.prefix),
1338            start_after: Some(value.start_after),
1339            limit: value.limit,
1340        }
1341    }
1342}
1343
1344#[derive(Debug, Clone, Default)]
1345/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1346pub struct ListAllAccessTokensInput {
1347    /// Filter access tokens whose IDs begin with this value.
1348    ///
1349    /// Defaults to `""`.
1350    pub prefix: AccessTokenIdPrefix,
1351    /// Filter access tokens whose IDs are lexicographically greater than this value.
1352    ///
1353    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1354    ///
1355    /// Defaults to `""`.
1356    pub start_after: AccessTokenIdStartAfter,
1357}
1358
1359impl ListAllAccessTokensInput {
1360    /// Create a new [`ListAllAccessTokensInput`] with default values.
1361    pub fn new() -> Self {
1362        Self::default()
1363    }
1364
1365    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1366    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1367        Self { prefix, ..self }
1368    }
1369
1370    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1371    /// this value.
1372    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1373        Self {
1374            start_after,
1375            ..self
1376        }
1377    }
1378}
1379
1380#[derive(Debug, Clone)]
1381#[non_exhaustive]
1382/// Access token information.
1383pub struct AccessTokenInfo {
1384    /// Access token ID.
1385    pub id: AccessTokenId,
1386    /// Expiration time.
1387    pub expires_at: S2DateTime,
1388    /// Whether to automatically prefix stream names during creation and strip the prefix during
1389    /// listing.
1390    pub auto_prefix_streams: bool,
1391    /// Scope of the access token.
1392    pub scope: AccessTokenScope,
1393}
1394
1395impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1396    type Error = ValidationError;
1397
1398    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1399        let expires_at = value
1400            .expires_at
1401            .map(Into::into)
1402            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1403        Ok(Self {
1404            id: value.id,
1405            expires_at,
1406            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1407            scope: value.scope.into(),
1408        })
1409    }
1410}
1411
1412#[derive(Debug, Clone)]
1413/// Pattern for matching basins.
1414///
1415/// See [`AccessTokenScope::basins`].
1416pub enum BasinMatcher {
1417    /// Match no basins.
1418    None,
1419    /// Match exactly this basin.
1420    Exact(BasinName),
1421    /// Match all basins with this prefix.
1422    Prefix(BasinNamePrefix),
1423}
1424
1425#[derive(Debug, Clone)]
1426/// Pattern for matching streams.
1427///
1428/// See [`AccessTokenScope::streams`].
1429pub enum StreamMatcher {
1430    /// Match no streams.
1431    None,
1432    /// Match exactly this stream.
1433    Exact(StreamName),
1434    /// Match all streams with this prefix.
1435    Prefix(StreamNamePrefix),
1436}
1437
1438#[derive(Debug, Clone)]
1439/// Pattern for matching access tokens.
1440///
1441/// See [`AccessTokenScope::access_tokens`].
1442pub enum AccessTokenMatcher {
1443    /// Match no access tokens.
1444    None,
1445    /// Match exactly this access token.
1446    Exact(AccessTokenId),
1447    /// Match all access tokens with this prefix.
1448    Prefix(AccessTokenIdPrefix),
1449}
1450
1451#[derive(Debug, Clone, Default)]
1452#[non_exhaustive]
1453/// Permissions indicating allowed operations.
1454pub struct ReadWritePermissions {
1455    /// Read permission.
1456    ///
1457    /// Defaults to `false`.
1458    pub read: bool,
1459    /// Write permission.
1460    ///
1461    /// Defaults to `false`.
1462    pub write: bool,
1463}
1464
1465impl ReadWritePermissions {
1466    /// Create a new [`ReadWritePermissions`] with default values.
1467    pub fn new() -> Self {
1468        Self::default()
1469    }
1470
1471    /// Create read-only permissions.
1472    pub fn read_only() -> Self {
1473        Self {
1474            read: true,
1475            write: false,
1476        }
1477    }
1478
1479    /// Create write-only permissions.
1480    pub fn write_only() -> Self {
1481        Self {
1482            read: false,
1483            write: true,
1484        }
1485    }
1486
1487    /// Create read-write permissions.
1488    pub fn read_write() -> Self {
1489        Self {
1490            read: true,
1491            write: true,
1492        }
1493    }
1494}
1495
1496impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1497    fn from(value: ReadWritePermissions) -> Self {
1498        Self {
1499            read: Some(value.read),
1500            write: Some(value.write),
1501        }
1502    }
1503}
1504
1505impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1506    fn from(value: api::access::ReadWritePermissions) -> Self {
1507        Self {
1508            read: value.read.unwrap_or_default(),
1509            write: value.write.unwrap_or_default(),
1510        }
1511    }
1512}
1513
1514#[derive(Debug, Clone, Default)]
1515#[non_exhaustive]
1516/// Permissions at the operation group level.
1517///
1518/// See [`AccessTokenScope::op_group_perms`].
1519pub struct OperationGroupPermissions {
1520    /// Account-level access permissions.
1521    ///
1522    /// Defaults to `None`.
1523    pub account: Option<ReadWritePermissions>,
1524    /// Basin-level access permissions.
1525    ///
1526    /// Defaults to `None`.
1527    pub basin: Option<ReadWritePermissions>,
1528    /// Stream-level access permissions.
1529    ///
1530    /// Defaults to `None`.
1531    pub stream: Option<ReadWritePermissions>,
1532}
1533
1534impl OperationGroupPermissions {
1535    /// Create a new [`OperationGroupPermissions`] with default values.
1536    pub fn new() -> Self {
1537        Self::default()
1538    }
1539
1540    /// Create read-only permissions for all groups.
1541    pub fn read_only_all() -> Self {
1542        Self {
1543            account: Some(ReadWritePermissions::read_only()),
1544            basin: Some(ReadWritePermissions::read_only()),
1545            stream: Some(ReadWritePermissions::read_only()),
1546        }
1547    }
1548
1549    /// Create write-only permissions for all groups.
1550    pub fn write_only_all() -> Self {
1551        Self {
1552            account: Some(ReadWritePermissions::write_only()),
1553            basin: Some(ReadWritePermissions::write_only()),
1554            stream: Some(ReadWritePermissions::write_only()),
1555        }
1556    }
1557
1558    /// Create read-write permissions for all groups.
1559    pub fn read_write_all() -> Self {
1560        Self {
1561            account: Some(ReadWritePermissions::read_write()),
1562            basin: Some(ReadWritePermissions::read_write()),
1563            stream: Some(ReadWritePermissions::read_write()),
1564        }
1565    }
1566
1567    /// Set account-level access permissions.
1568    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1569        Self {
1570            account: Some(account),
1571            ..self
1572        }
1573    }
1574
1575    /// Set basin-level access permissions.
1576    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1577        Self {
1578            basin: Some(basin),
1579            ..self
1580        }
1581    }
1582
1583    /// Set stream-level access permissions.
1584    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1585        Self {
1586            stream: Some(stream),
1587            ..self
1588        }
1589    }
1590}
1591
1592impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1593    fn from(value: OperationGroupPermissions) -> Self {
1594        Self {
1595            account: value.account.map(Into::into),
1596            basin: value.basin.map(Into::into),
1597            stream: value.stream.map(Into::into),
1598        }
1599    }
1600}
1601
1602impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1603    fn from(value: api::access::PermittedOperationGroups) -> Self {
1604        Self {
1605            account: value.account.map(Into::into),
1606            basin: value.basin.map(Into::into),
1607            stream: value.stream.map(Into::into),
1608        }
1609    }
1610}
1611
1612#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1613/// Individual operation that can be permitted.
1614///
1615/// See [`AccessTokenScope::ops`].
1616pub enum Operation {
1617    /// List basins.
1618    ListBasins,
1619    /// Create a basin.
1620    CreateBasin,
1621    /// Get basin configuration.
1622    GetBasinConfig,
1623    /// Delete a basin.
1624    DeleteBasin,
1625    /// Reconfigure a basin.
1626    ReconfigureBasin,
1627    /// List access tokens.
1628    ListAccessTokens,
1629    /// Issue an access token.
1630    IssueAccessToken,
1631    /// Revoke an access token.
1632    RevokeAccessToken,
1633    /// Get account metrics.
1634    GetAccountMetrics,
1635    /// Get basin metrics.
1636    GetBasinMetrics,
1637    /// Get stream metrics.
1638    GetStreamMetrics,
1639    /// List streams.
1640    ListStreams,
1641    /// Create a stream.
1642    CreateStream,
1643    /// Get stream configuration.
1644    GetStreamConfig,
1645    /// Delete a stream.
1646    DeleteStream,
1647    /// Reconfigure a stream.
1648    ReconfigureStream,
1649    /// Check the tail of a stream.
1650    CheckTail,
1651    /// Append records to a stream.
1652    Append,
1653    /// Read records from a stream.
1654    Read,
1655    /// Trim records on a stream.
1656    Trim,
1657    /// Set the fencing token on a stream.
1658    Fence,
1659}
1660
1661impl From<Operation> for api::access::Operation {
1662    fn from(value: Operation) -> Self {
1663        match value {
1664            Operation::ListBasins => api::access::Operation::ListBasins,
1665            Operation::CreateBasin => api::access::Operation::CreateBasin,
1666            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1667            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1668            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1669            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1670            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1671            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1672            Operation::ListStreams => api::access::Operation::ListStreams,
1673            Operation::CreateStream => api::access::Operation::CreateStream,
1674            Operation::DeleteStream => api::access::Operation::DeleteStream,
1675            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1676            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1677            Operation::CheckTail => api::access::Operation::CheckTail,
1678            Operation::Append => api::access::Operation::Append,
1679            Operation::Read => api::access::Operation::Read,
1680            Operation::Trim => api::access::Operation::Trim,
1681            Operation::Fence => api::access::Operation::Fence,
1682            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1683            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1684            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1685        }
1686    }
1687}
1688
1689impl From<api::access::Operation> for Operation {
1690    fn from(value: api::access::Operation) -> Self {
1691        match value {
1692            api::access::Operation::ListBasins => Operation::ListBasins,
1693            api::access::Operation::CreateBasin => Operation::CreateBasin,
1694            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1695            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1696            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1697            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1698            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1699            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1700            api::access::Operation::ListStreams => Operation::ListStreams,
1701            api::access::Operation::CreateStream => Operation::CreateStream,
1702            api::access::Operation::DeleteStream => Operation::DeleteStream,
1703            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1704            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1705            api::access::Operation::CheckTail => Operation::CheckTail,
1706            api::access::Operation::Append => Operation::Append,
1707            api::access::Operation::Read => Operation::Read,
1708            api::access::Operation::Trim => Operation::Trim,
1709            api::access::Operation::Fence => Operation::Fence,
1710            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1711            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1712            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1713        }
1714    }
1715}
1716
1717#[derive(Debug, Clone)]
1718#[non_exhaustive]
1719/// Scope of an access token.
1720///
1721/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1722/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1723/// final set must not be empty.
1724///
1725/// See [`IssueAccessTokenInput::scope`].
1726pub struct AccessTokenScopeInput {
1727    basins: Option<BasinMatcher>,
1728    streams: Option<StreamMatcher>,
1729    access_tokens: Option<AccessTokenMatcher>,
1730    op_group_perms: Option<OperationGroupPermissions>,
1731    ops: HashSet<Operation>,
1732}
1733
1734impl AccessTokenScopeInput {
1735    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1736    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1737        Self {
1738            basins: None,
1739            streams: None,
1740            access_tokens: None,
1741            op_group_perms: None,
1742            ops: ops.into_iter().collect(),
1743        }
1744    }
1745
1746    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1747    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1748        Self {
1749            basins: None,
1750            streams: None,
1751            access_tokens: None,
1752            op_group_perms: Some(op_group_perms),
1753            ops: HashSet::default(),
1754        }
1755    }
1756
1757    /// Set the permitted operations.
1758    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1759        Self {
1760            ops: ops.into_iter().collect(),
1761            ..self
1762        }
1763    }
1764
1765    /// Set the access permissions at the operation group level.
1766    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1767        Self {
1768            op_group_perms: Some(op_group_perms),
1769            ..self
1770        }
1771    }
1772
1773    /// Set the permitted basins.
1774    ///
1775    /// Defaults to no basins.
1776    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1777        Self {
1778            basins: Some(basins),
1779            ..self
1780        }
1781    }
1782
1783    /// Set the permitted streams.
1784    ///
1785    /// Defaults to no streams.
1786    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1787        Self {
1788            streams: Some(streams),
1789            ..self
1790        }
1791    }
1792
1793    /// Set the permitted access tokens.
1794    ///
1795    /// Defaults to no access tokens.
1796    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1797        Self {
1798            access_tokens: Some(access_tokens),
1799            ..self
1800        }
1801    }
1802}
1803
1804#[derive(Debug, Clone)]
1805#[non_exhaustive]
1806/// Scope of an access token.
1807pub struct AccessTokenScope {
1808    /// Permitted basins.
1809    pub basins: Option<BasinMatcher>,
1810    /// Permitted streams.
1811    pub streams: Option<StreamMatcher>,
1812    /// Permitted access tokens.
1813    pub access_tokens: Option<AccessTokenMatcher>,
1814    /// Permissions at the operation group level.
1815    pub op_group_perms: Option<OperationGroupPermissions>,
1816    /// Permitted operations.
1817    pub ops: HashSet<Operation>,
1818}
1819
1820impl From<api::access::AccessTokenScope> for AccessTokenScope {
1821    fn from(value: api::access::AccessTokenScope) -> Self {
1822        Self {
1823            basins: value.basins.map(|rs| match rs {
1824                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1825                    BasinMatcher::Exact(e)
1826                }
1827                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1828                    BasinMatcher::None
1829                }
1830                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1831            }),
1832            streams: value.streams.map(|rs| match rs {
1833                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1834                    StreamMatcher::Exact(e)
1835                }
1836                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1837                    StreamMatcher::None
1838                }
1839                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1840            }),
1841            access_tokens: value.access_tokens.map(|rs| match rs {
1842                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1843                    AccessTokenMatcher::Exact(e)
1844                }
1845                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1846                    AccessTokenMatcher::None
1847                }
1848                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1849            }),
1850            op_group_perms: value.op_groups.map(Into::into),
1851            ops: value
1852                .ops
1853                .map(|ops| ops.into_iter().map(Into::into).collect())
1854                .unwrap_or_default(),
1855        }
1856    }
1857}
1858
1859impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1860    fn from(value: AccessTokenScopeInput) -> Self {
1861        Self {
1862            basins: value.basins.map(|rs| match rs {
1863                BasinMatcher::None => {
1864                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1865                }
1866                BasinMatcher::Exact(e) => {
1867                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1868                }
1869                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1870            }),
1871            streams: value.streams.map(|rs| match rs {
1872                StreamMatcher::None => {
1873                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1874                }
1875                StreamMatcher::Exact(e) => {
1876                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1877                }
1878                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1879            }),
1880            access_tokens: value.access_tokens.map(|rs| match rs {
1881                AccessTokenMatcher::None => {
1882                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1883                }
1884                AccessTokenMatcher::Exact(e) => {
1885                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1886                }
1887                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1888            }),
1889            op_groups: value.op_group_perms.map(Into::into),
1890            ops: if value.ops.is_empty() {
1891                None
1892            } else {
1893                Some(value.ops.into_iter().map(Into::into).collect())
1894            },
1895        }
1896    }
1897}
1898
1899#[derive(Debug, Clone)]
1900#[non_exhaustive]
1901/// Input for [`issue_access_token`](crate::S2::issue_access_token).
1902pub struct IssueAccessTokenInput {
1903    /// Access token ID.
1904    pub id: AccessTokenId,
1905    /// Expiration time.
1906    ///
1907    /// Defaults to the expiration time of requestor's access token passed via
1908    /// [`S2Config`](S2Config::new).
1909    pub expires_at: Option<S2DateTime>,
1910    /// Whether to automatically prefix stream names during creation and strip the prefix during
1911    /// listing.
1912    ///
1913    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
1914    /// prefix.
1915    ///
1916    /// Defaults to `false`.
1917    pub auto_prefix_streams: bool,
1918    /// Scope of the token.
1919    pub scope: AccessTokenScopeInput,
1920}
1921
1922impl IssueAccessTokenInput {
1923    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
1924    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1925        Self {
1926            id,
1927            expires_at: None,
1928            auto_prefix_streams: false,
1929            scope,
1930        }
1931    }
1932
1933    /// Set the expiration time.
1934    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1935        Self {
1936            expires_at: Some(expires_at),
1937            ..self
1938        }
1939    }
1940
1941    /// Set whether to automatically prefix stream names during creation and strip the prefix during
1942    /// listing.
1943    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1944        Self {
1945            auto_prefix_streams,
1946            ..self
1947        }
1948    }
1949}
1950
1951impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1952    fn from(value: IssueAccessTokenInput) -> Self {
1953        Self {
1954            id: value.id,
1955            expires_at: value.expires_at.map(Into::into),
1956            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1957            scope: value.scope.into(),
1958        }
1959    }
1960}
1961
1962#[derive(Debug, Clone, Copy)]
1963/// Interval to accumulate over for timeseries metric sets.
1964pub enum TimeseriesInterval {
1965    /// Minute.
1966    Minute,
1967    /// Hour.
1968    Hour,
1969    /// Day.
1970    Day,
1971}
1972
1973impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
1974    fn from(value: TimeseriesInterval) -> Self {
1975        match value {
1976            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
1977            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
1978            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
1979        }
1980    }
1981}
1982
1983impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
1984    fn from(value: api::metrics::TimeseriesInterval) -> Self {
1985        match value {
1986            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
1987            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
1988            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
1989        }
1990    }
1991}
1992
1993#[derive(Debug, Clone, Copy)]
1994#[non_exhaustive]
1995/// Time range as Unix epoch seconds.
1996pub struct TimeRange {
1997    /// Start timestamp (inclusive).
1998    pub start: u32,
1999    /// End timestamp (exclusive).
2000    pub end: u32,
2001}
2002
2003impl TimeRange {
2004    /// Create a new [`TimeRange`] with the given start and end timestamps.
2005    pub fn new(start: u32, end: u32) -> Self {
2006        Self { start, end }
2007    }
2008}
2009
2010#[derive(Debug, Clone, Copy)]
2011#[non_exhaustive]
2012/// Time range as Unix epoch seconds and accumulation interval.
2013pub struct TimeRangeAndInterval {
2014    /// Start timestamp (inclusive).
2015    pub start: u32,
2016    /// End timestamp (exclusive).
2017    pub end: u32,
2018    /// Interval to accumulate over for timeseries metric sets.
2019    ///
2020    /// Default is dependent on the requested metric set.
2021    pub interval: Option<TimeseriesInterval>,
2022}
2023
2024impl TimeRangeAndInterval {
2025    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2026    pub fn new(start: u32, end: u32) -> Self {
2027        Self {
2028            start,
2029            end,
2030            interval: None,
2031        }
2032    }
2033
2034    /// Set the interval to accumulate over for timeseries metric sets.
2035    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2036        Self {
2037            interval: Some(interval),
2038            ..self
2039        }
2040    }
2041}
2042
2043#[derive(Debug, Clone, Copy)]
2044/// Account metric set to return.
2045pub enum AccountMetricSet {
2046    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2047    /// specified time range.
2048    ActiveBasins(TimeRange),
2049    /// Returns [`AccumulationMetric`]s, one per account operation type.
2050    ///
2051    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2052    /// per interval over the requested time range.
2053    ///
2054    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2055    AccountOps(TimeRangeAndInterval),
2056}
2057
2058#[derive(Debug, Clone)]
2059#[non_exhaustive]
2060/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2061pub struct GetAccountMetricsInput {
2062    /// Metric set to return.
2063    pub set: AccountMetricSet,
2064}
2065
2066impl GetAccountMetricsInput {
2067    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2068    pub fn new(set: AccountMetricSet) -> Self {
2069        Self { set }
2070    }
2071}
2072
2073impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2074    fn from(value: GetAccountMetricsInput) -> Self {
2075        let (set, start, end, interval) = match value.set {
2076            AccountMetricSet::ActiveBasins(args) => (
2077                api::metrics::AccountMetricSet::ActiveBasins,
2078                args.start,
2079                args.end,
2080                None,
2081            ),
2082            AccountMetricSet::AccountOps(args) => (
2083                api::metrics::AccountMetricSet::AccountOps,
2084                args.start,
2085                args.end,
2086                args.interval,
2087            ),
2088        };
2089        Self {
2090            set,
2091            start: Some(start),
2092            end: Some(end),
2093            interval: interval.map(Into::into),
2094        }
2095    }
2096}
2097
2098#[derive(Debug, Clone, Copy)]
2099/// Basin metric set to return.
2100pub enum BasinMetricSet {
2101    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2102    /// in the basin, with one observed value for each hour over the requested time range.
2103    Storage(TimeRange),
2104    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2105    ///
2106    /// Each metric represents a timeseries of the number of append operations across all streams
2107    /// in the basin, with one accumulated value per interval over the requested time range.
2108    ///
2109    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2110    /// [`minute`](TimeseriesInterval::Minute).
2111    AppendOps(TimeRangeAndInterval),
2112    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2113    ///
2114    /// Each metric represents a timeseries of the number of read operations across all streams
2115    /// in the basin, with one accumulated value per interval over the requested time range.
2116    ///
2117    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2118    /// [`minute`](TimeseriesInterval::Minute).
2119    ReadOps(TimeRangeAndInterval),
2120    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2121    /// across all streams in the basin, with one accumulated value per interval
2122    /// over the requested time range.
2123    ///
2124    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2125    /// [`minute`](TimeseriesInterval::Minute).
2126    ReadThroughput(TimeRangeAndInterval),
2127    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2128    /// across all streams in the basin, with one accumulated value per interval
2129    /// over the requested time range.
2130    ///
2131    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2132    /// [`minute`](TimeseriesInterval::Minute).
2133    AppendThroughput(TimeRangeAndInterval),
2134    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2135    ///
2136    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2137    /// per interval over the requested time range.
2138    ///
2139    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2140    BasinOps(TimeRangeAndInterval),
2141}
2142
2143#[derive(Debug, Clone)]
2144#[non_exhaustive]
2145/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2146pub struct GetBasinMetricsInput {
2147    /// Basin name.
2148    pub name: BasinName,
2149    /// Metric set to return.
2150    pub set: BasinMetricSet,
2151}
2152
2153impl GetBasinMetricsInput {
2154    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2155    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2156        Self { name, set }
2157    }
2158}
2159
2160impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2161    fn from(value: GetBasinMetricsInput) -> Self {
2162        let (set, start, end, interval) = match value.set {
2163            BasinMetricSet::Storage(args) => (
2164                api::metrics::BasinMetricSet::Storage,
2165                args.start,
2166                args.end,
2167                None,
2168            ),
2169            BasinMetricSet::AppendOps(args) => (
2170                api::metrics::BasinMetricSet::AppendOps,
2171                args.start,
2172                args.end,
2173                args.interval,
2174            ),
2175            BasinMetricSet::ReadOps(args) => (
2176                api::metrics::BasinMetricSet::ReadOps,
2177                args.start,
2178                args.end,
2179                args.interval,
2180            ),
2181            BasinMetricSet::ReadThroughput(args) => (
2182                api::metrics::BasinMetricSet::ReadThroughput,
2183                args.start,
2184                args.end,
2185                args.interval,
2186            ),
2187            BasinMetricSet::AppendThroughput(args) => (
2188                api::metrics::BasinMetricSet::AppendThroughput,
2189                args.start,
2190                args.end,
2191                args.interval,
2192            ),
2193            BasinMetricSet::BasinOps(args) => (
2194                api::metrics::BasinMetricSet::BasinOps,
2195                args.start,
2196                args.end,
2197                args.interval,
2198            ),
2199        };
2200        (
2201            value.name,
2202            api::metrics::BasinMetricSetRequest {
2203                set,
2204                start: Some(start),
2205                end: Some(end),
2206                interval: interval.map(Into::into),
2207            },
2208        )
2209    }
2210}
2211
2212#[derive(Debug, Clone, Copy)]
2213/// Stream metric set to return.
2214pub enum StreamMetricSet {
2215    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2216    /// with one observed value for each minute over the requested time range.
2217    Storage(TimeRange),
2218}
2219
2220#[derive(Debug, Clone)]
2221#[non_exhaustive]
2222/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2223pub struct GetStreamMetricsInput {
2224    /// Basin name.
2225    pub basin_name: BasinName,
2226    /// Stream name.
2227    pub stream_name: StreamName,
2228    /// Metric set to return.
2229    pub set: StreamMetricSet,
2230}
2231
2232impl GetStreamMetricsInput {
2233    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2234    /// set.
2235    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2236        Self {
2237            basin_name,
2238            stream_name,
2239            set,
2240        }
2241    }
2242}
2243
2244impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2245    fn from(value: GetStreamMetricsInput) -> Self {
2246        let (set, start, end, interval) = match value.set {
2247            StreamMetricSet::Storage(args) => (
2248                api::metrics::StreamMetricSet::Storage,
2249                args.start,
2250                args.end,
2251                None,
2252            ),
2253        };
2254        (
2255            value.basin_name,
2256            value.stream_name,
2257            api::metrics::StreamMetricSetRequest {
2258                set,
2259                start: Some(start),
2260                end: Some(end),
2261                interval,
2262            },
2263        )
2264    }
2265}
2266
2267#[derive(Debug, Clone, Copy)]
2268/// Unit in which metric values are measured.
2269pub enum MetricUnit {
2270    /// Size in bytes.
2271    Bytes,
2272    /// Number of operations.
2273    Operations,
2274}
2275
2276impl From<api::metrics::MetricUnit> for MetricUnit {
2277    fn from(value: api::metrics::MetricUnit) -> Self {
2278        match value {
2279            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2280            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2281        }
2282    }
2283}
2284
2285#[derive(Debug, Clone)]
2286#[non_exhaustive]
2287/// Single named value.
2288pub struct ScalarMetric {
2289    /// Metric name.
2290    pub name: String,
2291    /// Unit for the metric value.
2292    pub unit: MetricUnit,
2293    /// Metric value.
2294    pub value: f64,
2295}
2296
2297#[derive(Debug, Clone)]
2298#[non_exhaustive]
2299/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2300/// specified interval.
2301pub struct AccumulationMetric {
2302    /// Timeseries name.
2303    pub name: String,
2304    /// Unit for the accumulated values.
2305    pub unit: MetricUnit,
2306    /// The interval at which datapoints are accumulated.
2307    pub interval: TimeseriesInterval,
2308    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2309    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2310    /// one `interval`.
2311    pub values: Vec<(u32, f64)>,
2312}
2313
2314#[derive(Debug, Clone)]
2315#[non_exhaustive]
2316/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2317pub struct GaugeMetric {
2318    /// Timeseries name.
2319    pub name: String,
2320    /// Unit for the instantaneous values.
2321    pub unit: MetricUnit,
2322    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2323    /// instant of the `timestamp` (in Unix epoch seconds).
2324    pub values: Vec<(u32, f64)>,
2325}
2326
2327#[derive(Debug, Clone)]
2328#[non_exhaustive]
2329/// Set of string labels.
2330pub struct LabelMetric {
2331    /// Label name.
2332    pub name: String,
2333    /// Label values.
2334    pub values: Vec<String>,
2335}
2336
2337#[derive(Debug, Clone)]
2338/// Individual metric in a returned metric set.
2339pub enum Metric {
2340    /// Single named value.
2341    Scalar(ScalarMetric),
2342    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2343    /// specified interval.
2344    Accumulation(AccumulationMetric),
2345    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2346    Gauge(GaugeMetric),
2347    /// Set of string labels.
2348    Label(LabelMetric),
2349}
2350
2351impl From<api::metrics::Metric> for Metric {
2352    fn from(value: api::metrics::Metric) -> Self {
2353        match value {
2354            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2355                name: sm.name.into(),
2356                unit: sm.unit.into(),
2357                value: sm.value,
2358            }),
2359            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2360                name: am.name.into(),
2361                unit: am.unit.into(),
2362                interval: am.interval.into(),
2363                values: am.values,
2364            }),
2365            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2366                name: gm.name.into(),
2367                unit: gm.unit.into(),
2368                values: gm.values,
2369            }),
2370            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2371                name: lm.name.into(),
2372                values: lm.values,
2373            }),
2374        }
2375    }
2376}
2377
2378#[derive(Debug, Clone, Default)]
2379#[non_exhaustive]
2380/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2381pub struct ListStreamsInput {
2382    /// Filter streams whose names begin with this value.
2383    ///
2384    /// Defaults to `""`.
2385    pub prefix: StreamNamePrefix,
2386    /// Filter streams whose names are lexicographically greater than this value.
2387    ///
2388    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2389    ///
2390    /// Defaults to `""`.
2391    pub start_after: StreamNameStartAfter,
2392    ///  Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2393    ///
2394    /// Defaults to `1000`.
2395    pub limit: Option<usize>,
2396}
2397
2398impl ListStreamsInput {
2399    /// Create a new [`ListStreamsInput`] with default values.
2400    pub fn new() -> Self {
2401        Self::default()
2402    }
2403
2404    /// Set the prefix used to filter streams whose names begin with this value.
2405    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2406        Self { prefix, ..self }
2407    }
2408
2409    /// Set the value used to filter streams whose names are lexicographically greater than this
2410    /// value.
2411    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2412        Self {
2413            start_after,
2414            ..self
2415        }
2416    }
2417
2418    /// Set the limit on number of streams to return in a page.
2419    pub fn with_limit(self, limit: usize) -> Self {
2420        Self {
2421            limit: Some(limit),
2422            ..self
2423        }
2424    }
2425}
2426
2427impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2428    fn from(value: ListStreamsInput) -> Self {
2429        Self {
2430            prefix: Some(value.prefix),
2431            start_after: Some(value.start_after),
2432            limit: value.limit,
2433        }
2434    }
2435}
2436
2437#[derive(Debug, Clone, Default)]
2438/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2439pub struct ListAllStreamsInput {
2440    /// Filter streams whose names begin with this value.
2441    ///
2442    /// Defaults to `""`.
2443    pub prefix: StreamNamePrefix,
2444    /// Filter streams whose names are lexicographically greater than this value.
2445    ///
2446    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2447    ///
2448    /// Defaults to `""`.
2449    pub start_after: StreamNameStartAfter,
2450    /// Whether to ignore streams that have been requested for deletion but not yet deleted.
2451    ///
2452    /// Defaults to `false`.
2453    pub ignore_pending_deletions: bool,
2454}
2455
2456impl ListAllStreamsInput {
2457    /// Create a new [`ListAllStreamsInput`] with default values.
2458    pub fn new() -> Self {
2459        Self::default()
2460    }
2461
2462    /// Set the prefix used to filter streams whose names begin with this value.
2463    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2464        Self { prefix, ..self }
2465    }
2466
2467    /// Set the value used to filter streams whose names are lexicographically greater than this
2468    /// value.
2469    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2470        Self {
2471            start_after,
2472            ..self
2473        }
2474    }
2475
2476    /// Set whether to ignore streams that have been requested for deletion but not yet deleted.
2477    pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
2478        Self {
2479            ignore_pending_deletions,
2480            ..self
2481        }
2482    }
2483}
2484
2485#[derive(Debug, Clone, PartialEq)]
2486#[non_exhaustive]
2487/// Stream information.
2488pub struct StreamInfo {
2489    /// Stream name.
2490    pub name: StreamName,
2491    /// Creation time.
2492    pub created_at: S2DateTime,
2493    /// Deletion time if the stream is being deleted.
2494    pub deleted_at: Option<S2DateTime>,
2495}
2496
2497impl From<api::stream::StreamInfo> for StreamInfo {
2498    fn from(value: api::stream::StreamInfo) -> Self {
2499        Self {
2500            name: value.name,
2501            created_at: value.created_at.into(),
2502            deleted_at: value.deleted_at.map(Into::into),
2503        }
2504    }
2505}
2506
2507#[derive(Debug, Clone)]
2508#[non_exhaustive]
2509/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2510pub struct CreateStreamInput {
2511    /// Stream name.
2512    pub name: StreamName,
2513    /// Configuration for the stream.
2514    ///
2515    /// See [`StreamConfig`] for defaults.
2516    pub config: Option<StreamConfig>,
2517    /// Idempotency token for the operation.
2518    ///
2519    /// When a request is retried, the same token is reused,
2520    /// causing the server to return the original response instead of
2521    /// performing the operation again.
2522    ///
2523    /// Defaults to a random UUID.
2524    pub idempotency_token: String,
2525}
2526
2527impl CreateStreamInput {
2528    /// Create a new [`CreateStreamInput`] with the given stream name.
2529    pub fn new(name: StreamName) -> Self {
2530        Self {
2531            name,
2532            config: None,
2533            idempotency_token: idempotency_token(),
2534        }
2535    }
2536
2537    /// Set the configuration for the stream.
2538    pub fn with_config(self, config: StreamConfig) -> Self {
2539        Self {
2540            config: Some(config),
2541            ..self
2542        }
2543    }
2544
2545    /// Set the idempotency token for the operation.
2546    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2547        Self {
2548            idempotency_token: idempotency_token.into(),
2549            ..self
2550        }
2551    }
2552}
2553
2554impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2555    fn from(value: CreateStreamInput) -> Self {
2556        (
2557            api::stream::CreateStreamRequest {
2558                stream: value.name,
2559                config: value.config.map(Into::into),
2560            },
2561            value.idempotency_token,
2562        )
2563    }
2564}
2565
2566#[derive(Debug, Clone)]
2567#[non_exhaustive]
2568/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2569pub struct DeleteStreamInput {
2570    /// Stream name.
2571    pub name: StreamName,
2572    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2573    pub ignore_not_found: bool,
2574}
2575
2576impl DeleteStreamInput {
2577    /// Create a new [`DeleteStreamInput`] with the given stream name.
2578    pub fn new(name: StreamName) -> Self {
2579        Self {
2580            name,
2581            ignore_not_found: false,
2582        }
2583    }
2584
2585    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2586    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2587        Self {
2588            ignore_not_found,
2589            ..self
2590        }
2591    }
2592}
2593
2594#[derive(Debug, Clone)]
2595#[non_exhaustive]
2596/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2597pub struct ReconfigureStreamInput {
2598    /// Stream name.
2599    pub name: StreamName,
2600    /// Reconfiguration for [`StreamConfig`].
2601    pub config: StreamReconfiguration,
2602}
2603
2604impl ReconfigureStreamInput {
2605    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2606    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2607        Self { name, config }
2608    }
2609}
2610
2611#[derive(Debug, Clone, PartialEq, Eq)]
2612/// Token for fencing appends to a stream.
2613///
2614/// **Note:** It must not exceed 36 bytes in length.
2615///
2616/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2617pub struct FencingToken(String);
2618
2619impl FencingToken {
2620    /// Generate a random alphanumeric fencing token of `n` bytes.
2621    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2622        rand::rng()
2623            .sample_iter(&rand::distr::Alphanumeric)
2624            .take(n)
2625            .map(char::from)
2626            .collect::<String>()
2627            .parse()
2628    }
2629}
2630
2631impl FromStr for FencingToken {
2632    type Err = ValidationError;
2633
2634    fn from_str(s: &str) -> Result<Self, Self::Err> {
2635        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2636            return Err(ValidationError(format!(
2637                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2638            )));
2639        }
2640        Ok(FencingToken(s.to_string()))
2641    }
2642}
2643
2644impl std::fmt::Display for FencingToken {
2645    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2646        write!(f, "{}", self.0)
2647    }
2648}
2649
2650impl Deref for FencingToken {
2651    type Target = str;
2652
2653    fn deref(&self) -> &Self::Target {
2654        &self.0
2655    }
2656}
2657
2658#[derive(Debug, Clone, Copy, PartialEq)]
2659#[non_exhaustive]
2660/// A position in a stream.
2661pub struct StreamPosition {
2662    /// Sequence number assigned by the service.
2663    pub seq_num: u64,
2664    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2665    /// User-specified timestamps are passed through as-is.
2666    pub timestamp: u64,
2667}
2668
2669impl std::fmt::Display for StreamPosition {
2670    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2671        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2672    }
2673}
2674
2675impl From<api::stream::proto::StreamPosition> for StreamPosition {
2676    fn from(value: api::stream::proto::StreamPosition) -> Self {
2677        Self {
2678            seq_num: value.seq_num,
2679            timestamp: value.timestamp,
2680        }
2681    }
2682}
2683
2684impl From<api::stream::StreamPosition> for StreamPosition {
2685    fn from(value: api::stream::StreamPosition) -> Self {
2686        Self {
2687            seq_num: value.seq_num,
2688            timestamp: value.timestamp,
2689        }
2690    }
2691}
2692
2693#[derive(Debug, Clone, PartialEq)]
2694#[non_exhaustive]
2695/// A name-value pair.
2696pub struct Header {
2697    /// Name.
2698    pub name: Bytes,
2699    /// Value.
2700    pub value: Bytes,
2701}
2702
2703impl Header {
2704    /// Create a new [`Header`] with the given name and value.
2705    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2706        Self {
2707            name: name.into(),
2708            value: value.into(),
2709        }
2710    }
2711}
2712
2713impl From<Header> for api::stream::proto::Header {
2714    fn from(value: Header) -> Self {
2715        Self {
2716            name: value.name,
2717            value: value.value,
2718        }
2719    }
2720}
2721
2722impl From<api::stream::proto::Header> for Header {
2723    fn from(value: api::stream::proto::Header) -> Self {
2724        Self {
2725            name: value.name,
2726            value: value.value,
2727        }
2728    }
2729}
2730
2731#[derive(Debug, Clone, PartialEq)]
2732/// A record to append.
2733pub struct AppendRecord {
2734    body: Bytes,
2735    headers: Vec<Header>,
2736    timestamp: Option<u64>,
2737}
2738
2739impl AppendRecord {
2740    fn validate(self) -> Result<Self, ValidationError> {
2741        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2742            Err(ValidationError(format!(
2743                "metered_bytes: {} exceeds {}",
2744                self.metered_bytes(),
2745                RECORD_BATCH_MAX.bytes
2746            )))
2747        } else {
2748            Ok(self)
2749        }
2750    }
2751
2752    /// Create a new [`AppendRecord`] with the given record body.
2753    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2754        let record = Self {
2755            body: body.into(),
2756            headers: Vec::default(),
2757            timestamp: None,
2758        };
2759        record.validate()
2760    }
2761
2762    /// Set the headers for this record.
2763    pub fn with_headers(
2764        self,
2765        headers: impl IntoIterator<Item = Header>,
2766    ) -> Result<Self, ValidationError> {
2767        let record = Self {
2768            headers: headers.into_iter().collect(),
2769            ..self
2770        };
2771        record.validate()
2772    }
2773
2774    /// Set the timestamp for this record.
2775    ///
2776    /// Precise semantics depend on [`StreamConfig::timestamping`].
2777    pub fn with_timestamp(self, timestamp: u64) -> Self {
2778        Self {
2779            timestamp: Some(timestamp),
2780            ..self
2781        }
2782    }
2783}
2784
2785impl From<AppendRecord> for api::stream::proto::AppendRecord {
2786    fn from(value: AppendRecord) -> Self {
2787        Self {
2788            timestamp: value.timestamp,
2789            headers: value.headers.into_iter().map(Into::into).collect(),
2790            body: value.body,
2791        }
2792    }
2793}
2794
2795/// Metered byte size calculation.
2796///
2797/// Formula for a record:
2798/// ```text
2799/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2800/// ```
2801pub trait MeteredBytes {
2802    /// Returns the metered byte size.
2803    fn metered_bytes(&self) -> usize;
2804}
2805
2806macro_rules! metered_bytes_impl {
2807    ($ty:ty) => {
2808        impl MeteredBytes for $ty {
2809            fn metered_bytes(&self) -> usize {
2810                8 + (2 * self.headers.len())
2811                    + self
2812                        .headers
2813                        .iter()
2814                        .map(|h| h.name.len() + h.value.len())
2815                        .sum::<usize>()
2816                    + self.body.len()
2817            }
2818        }
2819    };
2820}
2821
2822metered_bytes_impl!(AppendRecord);
2823
2824#[derive(Debug, Clone)]
2825/// A batch of records to append atomically.
2826///
2827/// **Note:** It must contain at least `1` record and no more than `1000`.
2828/// The total size of the batch must not exceed `1MiB` in metered bytes.
2829///
2830/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
2831/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
2832/// that takes care of the abovementioned constraints.
2833pub struct AppendRecordBatch {
2834    records: Vec<AppendRecord>,
2835    metered_bytes: usize,
2836}
2837
2838impl AppendRecordBatch {
2839    pub(crate) fn with_capacity(capacity: usize) -> Self {
2840        Self {
2841            records: Vec::with_capacity(capacity),
2842            metered_bytes: 0,
2843        }
2844    }
2845
2846    pub(crate) fn push(&mut self, record: AppendRecord) {
2847        self.metered_bytes += record.metered_bytes();
2848        self.records.push(record);
2849    }
2850
2851    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
2852    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2853    where
2854        I: IntoIterator<Item = AppendRecord>,
2855    {
2856        let mut records = Vec::new();
2857        let mut metered_bytes = 0;
2858
2859        for record in iter {
2860            metered_bytes += record.metered_bytes();
2861            records.push(record);
2862
2863            if metered_bytes > RECORD_BATCH_MAX.bytes {
2864                return Err(ValidationError(format!(
2865                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
2866                    RECORD_BATCH_MAX.bytes
2867                )));
2868            }
2869
2870            if records.len() > RECORD_BATCH_MAX.count {
2871                return Err(ValidationError(format!(
2872                    "number of records in the batch exceeds {}",
2873                    RECORD_BATCH_MAX.count
2874                )));
2875            }
2876        }
2877
2878        if records.is_empty() {
2879            return Err(ValidationError("batch is empty".into()));
2880        }
2881
2882        Ok(Self {
2883            records,
2884            metered_bytes,
2885        })
2886    }
2887}
2888
2889impl Deref for AppendRecordBatch {
2890    type Target = [AppendRecord];
2891
2892    fn deref(&self) -> &Self::Target {
2893        &self.records
2894    }
2895}
2896
2897impl MeteredBytes for AppendRecordBatch {
2898    fn metered_bytes(&self) -> usize {
2899        self.metered_bytes
2900    }
2901}
2902
2903#[derive(Debug, Clone)]
2904/// Command to signal an operation.
2905pub enum Command {
2906    /// Fence operation.
2907    Fence {
2908        /// Fencing token.
2909        fencing_token: FencingToken,
2910    },
2911    /// Trim operation.
2912    Trim {
2913        /// Trim point.
2914        trim_point: u64,
2915    },
2916}
2917
2918#[derive(Debug, Clone)]
2919#[non_exhaustive]
2920/// Command record for signaling operations to the service.
2921///
2922/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
2923pub struct CommandRecord {
2924    /// Command to signal an operation.
2925    pub command: Command,
2926    /// Timestamp for this record.
2927    pub timestamp: Option<u64>,
2928}
2929
2930impl CommandRecord {
2931    const FENCE: &[u8] = b"fence";
2932    const TRIM: &[u8] = b"trim";
2933
2934    /// Create a fence command record with the given fencing token.
2935    ///
2936    /// Fencing is strongly consistent, and subsequent appends that specify a
2937    /// fencing token will fail if it does not match.
2938    pub fn fence(fencing_token: FencingToken) -> Self {
2939        Self {
2940            command: Command::Fence { fencing_token },
2941            timestamp: None,
2942        }
2943    }
2944
2945    /// Create a trim command record with the given trim point.
2946    ///
2947    /// Trim point is the desired earliest sequence number for the stream.
2948    ///
2949    /// Trimming is eventually consistent, and trimmed records may be visible
2950    /// for a brief period.
2951    pub fn trim(trim_point: u64) -> Self {
2952        Self {
2953            command: Command::Trim { trim_point },
2954            timestamp: None,
2955        }
2956    }
2957
2958    /// Set the timestamp for this record.
2959    pub fn with_timestamp(self, timestamp: u64) -> Self {
2960        Self {
2961            timestamp: Some(timestamp),
2962            ..self
2963        }
2964    }
2965}
2966
2967impl From<CommandRecord> for AppendRecord {
2968    fn from(value: CommandRecord) -> Self {
2969        let (header_value, body) = match value.command {
2970            Command::Fence { fencing_token } => (
2971                CommandRecord::FENCE,
2972                Bytes::copy_from_slice(fencing_token.as_bytes()),
2973            ),
2974            Command::Trim { trim_point } => (
2975                CommandRecord::TRIM,
2976                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
2977            ),
2978        };
2979        Self {
2980            body,
2981            headers: vec![Header::new("", header_value)],
2982            timestamp: value.timestamp,
2983        }
2984    }
2985}
2986
2987#[derive(Debug, Clone)]
2988#[non_exhaustive]
2989/// Input for [`append`](crate::S2Stream::append) operation and
2990/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
2991pub struct AppendInput {
2992    /// Batch of records to append atomically.
2993    pub records: AppendRecordBatch,
2994    /// Expected sequence number for the first record in the batch.
2995    ///
2996    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
2997    pub match_seq_num: Option<u64>,
2998    /// Fencing token to match against the stream's current fencing token.
2999    ///
3000    /// If unspecified, no matching is performed. If specified and mismatched,
3001    /// the append fails. A stream defaults to `""` as its fencing token.
3002    pub fencing_token: Option<FencingToken>,
3003}
3004
3005impl AppendInput {
3006    /// Create a new [`AppendInput`] with the given batch of records.
3007    pub fn new(records: AppendRecordBatch) -> Self {
3008        Self {
3009            records,
3010            match_seq_num: None,
3011            fencing_token: None,
3012        }
3013    }
3014
3015    /// Set the expected sequence number for the first record in the batch.
3016    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3017        Self {
3018            match_seq_num: Some(match_seq_num),
3019            ..self
3020        }
3021    }
3022
3023    /// Set the fencing token to match against the stream's current fencing token.
3024    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3025        Self {
3026            fencing_token: Some(fencing_token),
3027            ..self
3028        }
3029    }
3030}
3031
3032impl From<AppendInput> for api::stream::proto::AppendInput {
3033    fn from(value: AppendInput) -> Self {
3034        Self {
3035            records: value.records.iter().cloned().map(Into::into).collect(),
3036            match_seq_num: value.match_seq_num,
3037            fencing_token: value.fencing_token.map(|t| t.to_string()),
3038        }
3039    }
3040}
3041
3042#[derive(Debug, Clone, PartialEq)]
3043#[non_exhaustive]
3044/// Acknowledgement for an [`AppendInput`].
3045pub struct AppendAck {
3046    /// Sequence number and timestamp of the first record that was appended.
3047    pub start: StreamPosition,
3048    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3049    /// that was appended.
3050    ///
3051    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3052    /// appended.
3053    pub end: StreamPosition,
3054    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3055    /// the last record on the stream.
3056    ///
3057    /// This can be greater than the `end` position in case of concurrent appends.
3058    pub tail: StreamPosition,
3059}
3060
3061impl From<api::stream::proto::AppendAck> for AppendAck {
3062    fn from(value: api::stream::proto::AppendAck) -> Self {
3063        Self {
3064            start: value.start.unwrap_or_default().into(),
3065            end: value.end.unwrap_or_default().into(),
3066            tail: value.tail.unwrap_or_default().into(),
3067        }
3068    }
3069}
3070
3071#[derive(Debug, Clone, Copy)]
3072/// Starting position for reading from a stream.
3073pub enum ReadFrom {
3074    /// Read from this sequence number.
3075    SeqNum(u64),
3076    /// Read from this timestamp.
3077    Timestamp(u64),
3078    /// Read from N records before the tail.
3079    TailOffset(u64),
3080}
3081
3082impl Default for ReadFrom {
3083    fn default() -> Self {
3084        Self::SeqNum(0)
3085    }
3086}
3087
3088#[derive(Debug, Default, Clone)]
3089#[non_exhaustive]
3090/// Where to start reading.
3091pub struct ReadStart {
3092    /// Starting position.
3093    ///
3094    /// Defaults to reading from sequence number `0`.
3095    pub from: ReadFrom,
3096    /// Whether to start from tail if the requested starting position is beyond it.
3097    ///
3098    /// Defaults to `false` (errors if position is beyond tail).
3099    pub clamp_to_tail: bool,
3100}
3101
3102impl ReadStart {
3103    /// Create a new [`ReadStart`] with default values.
3104    pub fn new() -> Self {
3105        Self::default()
3106    }
3107
3108    /// Set the starting position.
3109    pub fn with_from(self, from: ReadFrom) -> Self {
3110        Self { from, ..self }
3111    }
3112
3113    /// Set whether to start from tail if the requested starting position is beyond it.
3114    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3115        Self {
3116            clamp_to_tail,
3117            ..self
3118        }
3119    }
3120}
3121
3122impl From<ReadStart> for api::stream::ReadStart {
3123    fn from(value: ReadStart) -> Self {
3124        let (seq_num, timestamp, tail_offset) = match value.from {
3125            ReadFrom::SeqNum(n) => (Some(n), None, None),
3126            ReadFrom::Timestamp(t) => (None, Some(t), None),
3127            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3128        };
3129        Self {
3130            seq_num,
3131            timestamp,
3132            tail_offset,
3133            clamp: if value.clamp_to_tail {
3134                Some(true)
3135            } else {
3136                None
3137            },
3138        }
3139    }
3140}
3141
3142#[derive(Debug, Clone, Default)]
3143#[non_exhaustive]
3144/// Limits on how much to read.
3145pub struct ReadLimits {
3146    /// Limit on number of records.
3147    ///
3148    /// Defaults to `1000` for non-streaming read.
3149    pub count: Option<usize>,
3150    /// Limit on total metered bytes of records.
3151    ///
3152    /// Defaults to `1MiB` for non-streaming read.
3153    pub bytes: Option<usize>,
3154}
3155
3156impl ReadLimits {
3157    /// Create a new [`ReadLimits`] with default values.
3158    pub fn new() -> Self {
3159        Self::default()
3160    }
3161
3162    /// Set the limit on number of records.
3163    pub fn with_count(self, count: usize) -> Self {
3164        Self {
3165            count: Some(count),
3166            ..self
3167        }
3168    }
3169
3170    /// Set the limit on total metered bytes of records.
3171    pub fn with_bytes(self, bytes: usize) -> Self {
3172        Self {
3173            bytes: Some(bytes),
3174            ..self
3175        }
3176    }
3177}
3178
3179#[derive(Debug, Clone, Default)]
3180#[non_exhaustive]
3181/// When to stop reading.
3182pub struct ReadStop {
3183    /// Limits on how much to read.
3184    pub limits: ReadLimits,
3185    /// Timestamp at which to stop (exclusive).
3186    pub until: Option<RangeTo<u64>>,
3187    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3188    /// seconds for [`read`](crate::S2Stream::read).
3189    ///
3190    /// Defaults to:
3191    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3192    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3193    ///   is specified.
3194    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3195    ///   `until` is specified.
3196    pub wait: Option<u32>,
3197}
3198
3199impl ReadStop {
3200    /// Create a new [`ReadStop`] with default values.
3201    pub fn new() -> Self {
3202        Self::default()
3203    }
3204
3205    /// Set the limits on how much to read.
3206    pub fn with_limits(self, limits: ReadLimits) -> Self {
3207        Self { limits, ..self }
3208    }
3209
3210    /// Set the timestamp at which to stop (exclusive).
3211    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3212        Self {
3213            until: Some(until),
3214            ..self
3215        }
3216    }
3217
3218    /// Set the duration in seconds to wait for new records before stopping.
3219    pub fn with_wait(self, wait: u32) -> Self {
3220        Self {
3221            wait: Some(wait),
3222            ..self
3223        }
3224    }
3225}
3226
3227impl From<ReadStop> for api::stream::ReadEnd {
3228    fn from(value: ReadStop) -> Self {
3229        Self {
3230            count: value.limits.count,
3231            bytes: value.limits.bytes,
3232            until: value.until.map(|r| r.end),
3233            wait: value.wait,
3234        }
3235    }
3236}
3237
3238#[derive(Debug, Clone, Default)]
3239#[non_exhaustive]
3240/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3241/// operations.
3242pub struct ReadInput {
3243    /// Where to start reading.
3244    pub start: ReadStart,
3245    /// When to stop reading.
3246    pub stop: ReadStop,
3247    /// Whether to filter out command records from the stream when reading.
3248    ///
3249    /// Defaults to `false`.
3250    pub ignore_command_records: bool,
3251}
3252
3253impl ReadInput {
3254    /// Create a new [`ReadInput`] with default values.
3255    pub fn new() -> Self {
3256        Self::default()
3257    }
3258
3259    /// Set where to start reading.
3260    pub fn with_start(self, start: ReadStart) -> Self {
3261        Self { start, ..self }
3262    }
3263
3264    /// Set when to stop reading.
3265    pub fn with_stop(self, stop: ReadStop) -> Self {
3266        Self { stop, ..self }
3267    }
3268
3269    /// Set whether to filter out command records from the stream when reading.
3270    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3271        Self {
3272            ignore_command_records,
3273            ..self
3274        }
3275    }
3276}
3277
3278#[derive(Debug, Clone)]
3279#[non_exhaustive]
3280/// Record that is durably sequenced on a stream.
3281pub struct SequencedRecord {
3282    /// Sequence number assigned to this record.
3283    pub seq_num: u64,
3284    /// Body of this record.
3285    pub body: Bytes,
3286    /// Headers for this record.
3287    pub headers: Vec<Header>,
3288    /// Timestamp for this record.
3289    pub timestamp: u64,
3290}
3291
3292impl SequencedRecord {
3293    /// Whether this is a command record.
3294    pub fn is_command_record(&self) -> bool {
3295        self.headers.len() == 1 && *self.headers[0].name == *b""
3296    }
3297}
3298
3299impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3300    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3301        Self {
3302            seq_num: value.seq_num,
3303            body: value.body,
3304            headers: value.headers.into_iter().map(Into::into).collect(),
3305            timestamp: value.timestamp,
3306        }
3307    }
3308}
3309
3310metered_bytes_impl!(SequencedRecord);
3311
3312#[derive(Debug, Clone)]
3313#[non_exhaustive]
3314/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3315/// [`read_session`](crate::S2Stream::read_session).
3316pub struct ReadBatch {
3317    /// Records that are durably sequenced on the stream.
3318    ///
3319    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3320    /// - the [`stop condition`](ReadInput::stop) was already met, or
3321    /// - all records in the batch were command records and
3322    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3323    pub records: Vec<SequencedRecord>,
3324    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3325    /// the last record.
3326    ///
3327    /// It will only be present when reading recent records.
3328    pub tail: Option<StreamPosition>,
3329}
3330
3331impl ReadBatch {
3332    pub(crate) fn from_api(
3333        batch: api::stream::proto::ReadBatch,
3334        ignore_command_records: bool,
3335    ) -> Self {
3336        Self {
3337            records: batch
3338                .records
3339                .into_iter()
3340                .map(Into::into)
3341                .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3342                .collect(),
3343            tail: batch.tail.map(Into::into),
3344        }
3345    }
3346}
3347
3348/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3349pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3350
3351#[derive(Debug, Clone, thiserror::Error)]
3352/// Why an append condition check failed.
3353pub enum AppendConditionFailed {
3354    #[error("fencing token mismatch, expected: {0}")]
3355    /// Fencing token did not match. Contains the expected fencing token.
3356    FencingTokenMismatch(FencingToken),
3357    #[error("sequence number mismatch, expected: {0}")]
3358    /// Sequence number did not match. Contains the expected sequence number.
3359    SeqNumMismatch(u64),
3360}
3361
3362impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3363    fn from(value: api::stream::AppendConditionFailed) -> Self {
3364        match value {
3365            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3366                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3367            }
3368            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3369                AppendConditionFailed::SeqNumMismatch(seq)
3370            }
3371        }
3372    }
3373}
3374
3375#[derive(Debug, Clone, thiserror::Error)]
3376/// Errors from S2 operations.
3377pub enum S2Error {
3378    #[error("{0}")]
3379    /// Client-side error.
3380    Client(String),
3381    #[error(transparent)]
3382    /// Validation error.
3383    Validation(#[from] ValidationError),
3384    #[error("{0}")]
3385    /// Append condition check failed. Contains the failure reason.
3386    AppendConditionFailed(AppendConditionFailed),
3387    #[error("read from an unwritten position. current tail: {0}")]
3388    /// Read from an unwritten position. Contains the current tail.
3389    ReadUnwritten(StreamPosition),
3390    #[error("{0}")]
3391    /// Other server-side error.
3392    Server(ErrorResponse),
3393}
3394
3395impl From<ApiError> for S2Error {
3396    fn from(err: ApiError) -> Self {
3397        match err {
3398            ApiError::ReadUnwritten(tail_response) => {
3399                Self::ReadUnwritten(tail_response.tail.into())
3400            }
3401            ApiError::AppendConditionFailed(condition_failed) => {
3402                Self::AppendConditionFailed(condition_failed.into())
3403            }
3404            ApiError::Server(_, response) => Self::Server(response.into()),
3405            other => Self::Client(other.to_string()),
3406        }
3407    }
3408}
3409
3410#[derive(Debug, Clone, thiserror::Error)]
3411#[error("{code}: {message}")]
3412#[non_exhaustive]
3413/// Error response from S2 server.
3414pub struct ErrorResponse {
3415    /// Error code.
3416    pub code: String,
3417    /// Error message.
3418    pub message: String,
3419}
3420
3421impl From<ApiErrorResponse> for ErrorResponse {
3422    fn from(response: ApiErrorResponse) -> Self {
3423        Self {
3424            code: response.code,
3425            message: response.message,
3426        }
3427    }
3428}
3429
3430fn idempotency_token() -> String {
3431    uuid::Uuid::new_v4().simple().to_string()
3432}