s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Stream name.
42///
43/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
44pub use s2_common::types::stream::StreamName;
45/// See [`ListStreamsInput::prefix`].
46pub use s2_common::types::stream::StreamNamePrefix;
47/// See [`ListStreamsInput::start_after`].
48pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57/// An RFC 3339 datetime.
58///
59/// It can be created in either of the following ways:
60/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
61/// - Convert from [`time::OffsetDateTime`] using [`From`]/[`Into`].
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66    fn from(dt: time::OffsetDateTime) -> Self {
67        Self(dt)
68    }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72    fn from(dt: S2DateTime) -> Self {
73        dt.0
74    }
75}
76
77impl FromStr for S2DateTime {
78    type Err = ValidationError;
79
80    fn from_str(s: &str) -> Result<Self, Self::Err> {
81        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82            .map(Self)
83            .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84    }
85}
86
87impl fmt::Display for S2DateTime {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        write!(
90            f,
91            "{}",
92            self.0
93                .format(&time::format_description::well_known::Rfc3339)
94                .expect("RFC3339 formatting should not fail for S2DateTime")
95        )
96    }
97}
98
99/// Authority for connecting to an S2 basin.
100#[derive(Debug, Clone, PartialEq)]
101pub(crate) enum BasinAuthority {
102    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
103    ParentZone(Authority),
104    /// Direct cell authority. Basin is expected to be hosted by this cell.
105    Direct(Authority),
106}
107
108/// Account endpoint.
109#[derive(Debug, Clone)]
110pub struct AccountEndpoint {
111    scheme: Scheme,
112    authority: Authority,
113}
114
115impl AccountEndpoint {
116    /// Create a new [`AccountEndpoint`] with the given endpoint.
117    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
118        endpoint.parse()
119    }
120}
121
122impl FromStr for AccountEndpoint {
123    type Err = ValidationError;
124
125    fn from_str(s: &str) -> Result<Self, Self::Err> {
126        let (scheme, authority) = match s.find("://") {
127            Some(idx) => {
128                let scheme: Scheme = s[..idx]
129                    .parse()
130                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
131                (scheme, &s[idx + 3..])
132            }
133            None => (Scheme::HTTPS, s),
134        };
135        Ok(Self {
136            scheme,
137            authority: authority
138                .parse()
139                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
140        })
141    }
142}
143
144/// Basin endpoint.
145#[derive(Debug, Clone)]
146pub struct BasinEndpoint {
147    scheme: Scheme,
148    authority: BasinAuthority,
149}
150
151impl BasinEndpoint {
152    /// Create a new [`BasinEndpoint`] with the given endpoint.
153    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
154        endpoint.parse()
155    }
156}
157
158impl FromStr for BasinEndpoint {
159    type Err = ValidationError;
160
161    fn from_str(s: &str) -> Result<Self, Self::Err> {
162        let (scheme, authority) = match s.find("://") {
163            Some(idx) => {
164                let scheme: Scheme = s[..idx]
165                    .parse()
166                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
167                (scheme, &s[idx + 3..])
168            }
169            None => (Scheme::HTTPS, s),
170        };
171        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
172            BasinAuthority::ParentZone(
173                authority
174                    .parse()
175                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
176            )
177        } else {
178            BasinAuthority::Direct(
179                authority
180                    .parse()
181                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
182            )
183        };
184        Ok(Self { scheme, authority })
185    }
186}
187
188#[derive(Debug, Clone)]
189#[non_exhaustive]
190/// Endpoints for the S2 environment.
191pub struct S2Endpoints {
192    pub(crate) scheme: Scheme,
193    pub(crate) account_authority: Authority,
194    pub(crate) basin_authority: BasinAuthority,
195}
196
197impl S2Endpoints {
198    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
199    pub fn new(
200        account_endpoint: AccountEndpoint,
201        basin_endpoint: BasinEndpoint,
202    ) -> Result<Self, ValidationError> {
203        if account_endpoint.scheme != basin_endpoint.scheme {
204            return Err("account and basin endpoints must have the same scheme".into());
205        }
206        Ok(Self {
207            scheme: account_endpoint.scheme,
208            account_authority: account_endpoint.authority,
209            basin_authority: basin_endpoint.authority,
210        })
211    }
212
213    /// Create a new [`S2Endpoints`] from environment variables.
214    ///
215    /// The following environment variables are expected to be set:
216    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
217    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
218    pub fn from_env() -> Result<Self, ValidationError> {
219        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
220            Ok(endpoint) => endpoint.parse()?,
221            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
222            Err(VarError::NotUnicode(_)) => {
223                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
224            }
225        };
226
227        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
228            Ok(endpoint) => endpoint.parse()?,
229            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
230            Err(VarError::NotUnicode(_)) => {
231                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
232            }
233        };
234
235        if account_endpoint.scheme != basin_endpoint.scheme {
236            return Err(
237                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
238            );
239        }
240
241        Ok(Self {
242            scheme: account_endpoint.scheme,
243            account_authority: account_endpoint.authority,
244            basin_authority: basin_endpoint.authority,
245        })
246    }
247
248    pub(crate) fn for_aws() -> Self {
249        Self {
250            scheme: Scheme::HTTPS,
251            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
252            basin_authority: BasinAuthority::ParentZone(
253                "b.aws.s2.dev".try_into().expect("valid authority"),
254            ),
255        }
256    }
257}
258
259#[derive(Debug, Clone, Copy)]
260/// Compression algorithm for request and response bodies.
261pub enum Compression {
262    /// No compression.
263    None,
264    /// Gzip compression.
265    Gzip,
266    /// Zstd compression.
267    Zstd,
268}
269
270impl From<Compression> for CompressionAlgorithm {
271    fn from(value: Compression) -> Self {
272        match value {
273            Compression::None => CompressionAlgorithm::None,
274            Compression::Gzip => CompressionAlgorithm::Gzip,
275            Compression::Zstd => CompressionAlgorithm::Zstd,
276        }
277    }
278}
279
280#[derive(Debug, Clone, Copy, PartialEq)]
281#[non_exhaustive]
282/// Retry policy for [`append`](crate::S2Stream::append) and
283/// [`append_session`](crate::S2Stream::append_session) operations.
284pub enum AppendRetryPolicy {
285    /// Retry all appends. Use when duplicate records on the stream are acceptable.
286    All,
287    /// Only retry appends that include [`match_seq_num`](AppendInput::match_seq_num).
288    NoSideEffects,
289}
290
291impl AppendRetryPolicy {
292    pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
293        match self {
294            Self::All => true,
295            Self::NoSideEffects => input.match_seq_num.is_some(),
296        }
297    }
298}
299
300#[derive(Debug, Clone)]
301#[non_exhaustive]
302/// Configuration for retrying requests in case of transient failures.
303///
304/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
305/// ```text
306/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
307///     jitter = rand[0, base_delay]
308///     delay  = base_delay + jitter
309/// ````
310pub struct RetryConfig {
311    /// Total number of attempts including the initial try. A value of `1` means no retries.
312    ///
313    /// Defaults to `3`.
314    pub max_attempts: NonZeroU32,
315    /// Minimum base delay for retries.
316    ///
317    /// Defaults to `100ms`.
318    pub min_base_delay: Duration,
319    /// Maximum base delay for retries.
320    ///
321    /// Defaults to `1s`.
322    pub max_base_delay: Duration,
323    /// Retry policy for [`append`](crate::S2Stream::append) and
324    /// [`append_session`](crate::S2Stream::append_session) operations.
325    ///
326    /// Defaults to `All`.
327    pub append_retry_policy: AppendRetryPolicy,
328}
329
330impl Default for RetryConfig {
331    fn default() -> Self {
332        Self {
333            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
334            min_base_delay: Duration::from_millis(100),
335            max_base_delay: Duration::from_secs(1),
336            append_retry_policy: AppendRetryPolicy::All,
337        }
338    }
339}
340
341impl RetryConfig {
342    /// Create a new [`RetryConfig`] with default settings.
343    pub fn new() -> Self {
344        Self::default()
345    }
346
347    pub(crate) fn max_retries(&self) -> u32 {
348        self.max_attempts.get() - 1
349    }
350
351    /// Set the total number of attempts including the initial try.
352    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
353        Self {
354            max_attempts,
355            ..self
356        }
357    }
358
359    /// Set the minimum base delay for retries.
360    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
361        Self {
362            min_base_delay,
363            ..self
364        }
365    }
366
367    /// Set the maximum base delay for retries.
368    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
369        Self {
370            max_base_delay,
371            ..self
372        }
373    }
374
375    /// Set the retry policy for [`append`](crate::S2Stream::append) and
376    /// [`append_session`](crate::S2Stream::append_session) operations.
377    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
378        Self {
379            append_retry_policy,
380            ..self
381        }
382    }
383}
384
385#[derive(Debug, Clone)]
386#[non_exhaustive]
387/// Configuration for [`S2`](crate::S2).
388pub struct S2Config {
389    pub(crate) access_token: SecretString,
390    pub(crate) endpoints: S2Endpoints,
391    pub(crate) connection_timeout: Duration,
392    pub(crate) request_timeout: Duration,
393    pub(crate) retry: RetryConfig,
394    pub(crate) compression: Compression,
395    pub(crate) user_agent: HeaderValue,
396}
397
398impl S2Config {
399    /// Create a new [`S2Config`] with the given access token and default settings.
400    pub fn new(access_token: impl Into<String>) -> Self {
401        Self {
402            access_token: access_token.into().into(),
403            endpoints: S2Endpoints::for_aws(),
404            connection_timeout: Duration::from_secs(3),
405            request_timeout: Duration::from_secs(5),
406            retry: RetryConfig::new(),
407            compression: Compression::None,
408            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
409                .parse()
410                .expect("valid user agent"),
411        }
412    }
413
414    /// Set the S2 endpoints to connect to.
415    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
416        Self { endpoints, ..self }
417    }
418
419    /// Set the timeout for establishing a connection to the server.
420    ///
421    /// Defaults to `3s`.
422    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
423        Self {
424            connection_timeout,
425            ..self
426        }
427    }
428
429    /// Set the timeout for requests.
430    ///
431    /// Defaults to `5s`.
432    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
433        Self {
434            request_timeout,
435            ..self
436        }
437    }
438
439    /// Set the retry configuration for requests.
440    ///
441    /// See [`RetryConfig`] for defaults.
442    pub fn with_retry(self, retry: RetryConfig) -> Self {
443        Self { retry, ..self }
444    }
445
446    /// Set the compression algorithm for requests and responses.
447    ///
448    /// Defaults to no compression.
449    pub fn with_compression(self, compression: Compression) -> Self {
450        Self {
451            compression,
452            ..self
453        }
454    }
455
456    #[doc(hidden)]
457    #[cfg(feature = "_hidden")]
458    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
459        let user_agent = user_agent
460            .into()
461            .parse()
462            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
463        Ok(Self { user_agent, ..self })
464    }
465}
466
467#[derive(Debug, Default, Clone, PartialEq, Eq)]
468#[non_exhaustive]
469/// A page of values.
470pub struct Page<T> {
471    /// Values in this page.
472    pub values: Vec<T>,
473    /// Whether there are more pages.
474    pub has_more: bool,
475}
476
477impl<T> Page<T> {
478    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
479        Self {
480            values: values.into(),
481            has_more,
482        }
483    }
484}
485
486#[derive(Debug, Clone, Copy, PartialEq, Eq)]
487/// Storage class for recent appends.
488pub enum StorageClass {
489    /// Standard storage class that offers append latencies under `500ms`.
490    Standard,
491    /// Express storage class that offers append latencies under `50ms`.
492    Express,
493}
494
495impl From<api::config::StorageClass> for StorageClass {
496    fn from(value: api::config::StorageClass) -> Self {
497        match value {
498            api::config::StorageClass::Standard => StorageClass::Standard,
499            api::config::StorageClass::Express => StorageClass::Express,
500        }
501    }
502}
503
504impl From<StorageClass> for api::config::StorageClass {
505    fn from(value: StorageClass) -> Self {
506        match value {
507            StorageClass::Standard => api::config::StorageClass::Standard,
508            StorageClass::Express => api::config::StorageClass::Express,
509        }
510    }
511}
512
513#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514/// Retention policy for records in a stream.
515pub enum RetentionPolicy {
516    /// Age in seconds. Records older than this age are automatically trimmed.
517    Age(u64),
518    /// Records are retained indefinitely unless explicitly trimmed.
519    Infinite,
520}
521
522impl From<api::config::RetentionPolicy> for RetentionPolicy {
523    fn from(value: api::config::RetentionPolicy) -> Self {
524        match value {
525            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
526            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
527        }
528    }
529}
530
531impl From<RetentionPolicy> for api::config::RetentionPolicy {
532    fn from(value: RetentionPolicy) -> Self {
533        match value {
534            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
535            RetentionPolicy::Infinite => {
536                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
537            }
538        }
539    }
540}
541
542#[derive(Debug, Clone, Copy, PartialEq, Eq)]
543/// Timestamping mode for appends that influences how timestamps are handled.
544pub enum TimestampingMode {
545    /// Prefer client-specified timestamp if present otherwise use arrival time.
546    ClientPrefer,
547    /// Require a client-specified timestamp and reject the append if it is missing.
548    ClientRequire,
549    /// Use the arrival time and ignore any client-specified timestamp.
550    Arrival,
551}
552
553impl From<api::config::TimestampingMode> for TimestampingMode {
554    fn from(value: api::config::TimestampingMode) -> Self {
555        match value {
556            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
557            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
558            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
559        }
560    }
561}
562
563impl From<TimestampingMode> for api::config::TimestampingMode {
564    fn from(value: TimestampingMode) -> Self {
565        match value {
566            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
567            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
568            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
569        }
570    }
571}
572
573#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
574#[non_exhaustive]
575/// Configuration for timestamping behavior.
576pub struct TimestampingConfig {
577    /// Timestamping mode for appends that influences how timestamps are handled.
578    ///
579    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
580    pub mode: Option<TimestampingMode>,
581    /// Whether client-specified timestamps are allowed to exceed the arrival time.
582    ///
583    /// Defaults to `false` (client timestamps are capped at the arrival time).
584    pub uncapped: bool,
585}
586
587impl TimestampingConfig {
588    /// Create a new [`TimestampingConfig`] with default settings.
589    pub fn new() -> Self {
590        Self::default()
591    }
592
593    /// Set the timestamping mode for appends that influences how timestamps are handled.
594    pub fn with_mode(self, mode: TimestampingMode) -> Self {
595        Self {
596            mode: Some(mode),
597            ..self
598        }
599    }
600
601    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
602    pub fn with_uncapped(self, uncapped: bool) -> Self {
603        Self { uncapped, ..self }
604    }
605}
606
607impl From<api::config::TimestampingConfig> for TimestampingConfig {
608    fn from(value: api::config::TimestampingConfig) -> Self {
609        Self {
610            mode: value.mode.map(Into::into),
611            uncapped: value.uncapped.unwrap_or_default(),
612        }
613    }
614}
615
616impl From<TimestampingConfig> for api::config::TimestampingConfig {
617    fn from(value: TimestampingConfig) -> Self {
618        Self {
619            mode: value.mode.map(Into::into),
620            uncapped: Some(value.uncapped),
621        }
622    }
623}
624
625#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
626#[non_exhaustive]
627/// Configuration for automatically deleting a stream when it becomes empty.
628pub struct DeleteOnEmptyConfig {
629    /// Minimum age in seconds before an empty stream can be deleted.
630    ///
631    /// Defaults to `0` (disables automatic deletion).
632    pub min_age_secs: u64,
633}
634
635impl DeleteOnEmptyConfig {
636    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
637    pub fn new() -> Self {
638        Self::default()
639    }
640
641    /// Set the minimum age in seconds before an empty stream can be deleted.
642    pub fn with_min_age(self, min_age: Duration) -> Self {
643        Self {
644            min_age_secs: min_age.as_secs(),
645        }
646    }
647}
648
649impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
650    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
651        Self {
652            min_age_secs: value.min_age_secs,
653        }
654    }
655}
656
657impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
658    fn from(value: DeleteOnEmptyConfig) -> Self {
659        Self {
660            min_age_secs: value.min_age_secs,
661        }
662    }
663}
664
665#[derive(Debug, Clone, Default, PartialEq, Eq)]
666#[non_exhaustive]
667/// Configuration for a stream.
668pub struct StreamConfig {
669    /// Storage class for the stream.
670    ///
671    /// Defaults to [`Express`](StorageClass::Express).
672    pub storage_class: Option<StorageClass>,
673    /// Retention policy for records in the stream.
674    ///
675    /// Defaults to `7 days` of retention.
676    pub retention_policy: Option<RetentionPolicy>,
677    /// Configuration for timestamping behavior.
678    ///
679    /// See [`TimestampingConfig`] for defaults.
680    pub timestamping: Option<TimestampingConfig>,
681    /// Configuration for automatically deleting the stream when it becomes empty.
682    ///
683    /// See [`DeleteOnEmptyConfig`] for defaults.
684    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
685}
686
687impl StreamConfig {
688    /// Create a new [`StreamConfig`] with default settings.
689    pub fn new() -> Self {
690        Self::default()
691    }
692
693    /// Set the storage class for the stream.
694    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
695        Self {
696            storage_class: Some(storage_class),
697            ..self
698        }
699    }
700
701    /// Set the retention policy for records in the stream.
702    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
703        Self {
704            retention_policy: Some(retention_policy),
705            ..self
706        }
707    }
708
709    /// Set the configuration for timestamping behavior.
710    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
711        Self {
712            timestamping: Some(timestamping),
713            ..self
714        }
715    }
716
717    /// Set the configuration for automatically deleting the stream when it becomes empty.
718    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
719        Self {
720            delete_on_empty: Some(delete_on_empty),
721            ..self
722        }
723    }
724}
725
726impl From<api::config::StreamConfig> for StreamConfig {
727    fn from(value: api::config::StreamConfig) -> Self {
728        Self {
729            storage_class: value.storage_class.map(Into::into),
730            retention_policy: value.retention_policy.map(Into::into),
731            timestamping: value.timestamping.map(Into::into),
732            delete_on_empty: value.delete_on_empty.map(Into::into),
733        }
734    }
735}
736
737impl From<StreamConfig> for api::config::StreamConfig {
738    fn from(value: StreamConfig) -> Self {
739        Self {
740            storage_class: value.storage_class.map(Into::into),
741            retention_policy: value.retention_policy.map(Into::into),
742            timestamping: value.timestamping.map(Into::into),
743            delete_on_empty: value.delete_on_empty.map(Into::into),
744        }
745    }
746}
747
748#[derive(Debug, Clone, Default)]
749#[non_exhaustive]
750/// Configuration for a basin.
751pub struct BasinConfig {
752    /// Default configuration for all streams in the basin.
753    ///
754    /// See [`StreamConfig`] for defaults.
755    pub default_stream_config: Option<StreamConfig>,
756    /// Whether to create stream on append if it doesn't exist using default stream configuration.
757    ///
758    /// Defaults to `false`.
759    pub create_stream_on_append: bool,
760    /// Whether to create stream on read if it doesn't exist using default stream configuration.
761    ///
762    /// Defaults to `false`.
763    pub create_stream_on_read: bool,
764}
765
766impl BasinConfig {
767    /// Create a new [`BasinConfig`] with default settings.
768    pub fn new() -> Self {
769        Self::default()
770    }
771
772    /// Set the default configuration for all streams in the basin.
773    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
774        Self {
775            default_stream_config: Some(config),
776            ..self
777        }
778    }
779
780    /// Set whether to create stream on append if it doesn't exist using default stream
781    /// configuration.
782    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
783        Self {
784            create_stream_on_append,
785            ..self
786        }
787    }
788
789    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
790    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
791        Self {
792            create_stream_on_read,
793            ..self
794        }
795    }
796}
797
798impl From<api::config::BasinConfig> for BasinConfig {
799    fn from(value: api::config::BasinConfig) -> Self {
800        Self {
801            default_stream_config: value.default_stream_config.map(Into::into),
802            create_stream_on_append: value.create_stream_on_append,
803            create_stream_on_read: value.create_stream_on_read,
804        }
805    }
806}
807
808impl From<BasinConfig> for api::config::BasinConfig {
809    fn from(value: BasinConfig) -> Self {
810        Self {
811            default_stream_config: value.default_stream_config.map(Into::into),
812            create_stream_on_append: value.create_stream_on_append,
813            create_stream_on_read: value.create_stream_on_read,
814        }
815    }
816}
817
818#[derive(Debug, Clone, PartialEq, Eq)]
819/// Scope of a basin.
820pub enum BasinScope {
821    /// AWS `us-east-1` region.
822    AwsUsEast1,
823}
824
825impl From<api::basin::BasinScope> for BasinScope {
826    fn from(value: api::basin::BasinScope) -> Self {
827        match value {
828            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
829        }
830    }
831}
832
833impl From<BasinScope> for api::basin::BasinScope {
834    fn from(value: BasinScope) -> Self {
835        match value {
836            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
837        }
838    }
839}
840
841#[derive(Debug, Clone)]
842#[non_exhaustive]
843/// Input for [`create_basin`](crate::S2::create_basin) operation.
844pub struct CreateBasinInput {
845    /// Basin name.
846    pub name: BasinName,
847    /// Configuration for the basin.
848    ///
849    /// See [`BasinConfig`] for defaults.
850    pub config: Option<BasinConfig>,
851    /// Scope of the basin.
852    ///
853    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
854    pub scope: Option<BasinScope>,
855    /// Idempotency token for the operation.
856    ///
857    /// When a request is retried, the same token is reused,
858    /// causing the server to return the original response instead of
859    /// performing the operation again.
860    ///
861    /// Defaults to a random UUID.
862    pub idempotency_token: String,
863}
864
865impl CreateBasinInput {
866    /// Create a new [`CreateBasinInput`] with the given basin name.
867    pub fn new(name: BasinName) -> Self {
868        Self {
869            name,
870            config: None,
871            scope: None,
872            idempotency_token: idempotency_token(),
873        }
874    }
875
876    /// Set the configuration for the basin.
877    pub fn with_config(self, config: BasinConfig) -> Self {
878        Self {
879            config: Some(config),
880            ..self
881        }
882    }
883
884    /// Set the scope of the basin.
885    pub fn with_scope(self, scope: BasinScope) -> Self {
886        Self {
887            scope: Some(scope),
888            ..self
889        }
890    }
891
892    /// Set the idempotency token for the operation.
893    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
894        Self {
895            idempotency_token: idempotency_token.into(),
896            ..self
897        }
898    }
899}
900
901impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
902    fn from(value: CreateBasinInput) -> Self {
903        (
904            api::basin::CreateBasinRequest {
905                basin: value.name,
906                config: value.config.map(Into::into),
907                scope: value.scope.map(Into::into),
908            },
909            value.idempotency_token,
910        )
911    }
912}
913
914#[derive(Debug, Clone, Default)]
915#[non_exhaustive]
916/// Input for [`list_basins`](crate::S2::list_basins) operation.
917pub struct ListBasinsInput {
918    /// Filter basins whose names begin with this value.
919    ///
920    /// Defaults to `""`.
921    pub prefix: BasinNamePrefix,
922    /// Filter basins whose names are lexicographically greater than this value.
923    ///
924    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
925    ///
926    /// Defaults to `""`.
927    pub start_after: BasinNameStartAfter,
928    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
929    ///
930    /// Defaults to `1000`.
931    pub limit: Option<usize>,
932}
933
934impl ListBasinsInput {
935    /// Create a new [`ListBasinsInput`] with default values.
936    pub fn new() -> Self {
937        Self::default()
938    }
939
940    /// Set the prefix used to filter basins whose names begin with this value.
941    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
942        Self { prefix, ..self }
943    }
944
945    /// Set the value used to filter basins whose names are lexicographically greater than this
946    /// value.
947    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
948        Self {
949            start_after,
950            ..self
951        }
952    }
953
954    /// Set the limit on number of basins to return in a page.
955    pub fn with_limit(self, limit: usize) -> Self {
956        Self {
957            limit: Some(limit),
958            ..self
959        }
960    }
961}
962
963impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
964    fn from(value: ListBasinsInput) -> Self {
965        Self {
966            prefix: Some(value.prefix),
967            start_after: Some(value.start_after),
968            limit: value.limit,
969        }
970    }
971}
972
973#[derive(Debug, Clone, Default)]
974/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
975pub struct ListAllBasinsInput {
976    /// Filter basins whose names begin with this value.
977    ///
978    /// Defaults to `""`.
979    pub prefix: BasinNamePrefix,
980    /// Filter basins whose names are lexicographically greater than this value.
981    ///
982    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
983    ///
984    /// Defaults to `""`.
985    pub start_after: BasinNameStartAfter,
986    /// Whether to include basins that are being deleted.
987    ///
988    /// Defaults to `false`.
989    pub include_deleted: bool,
990}
991
992impl ListAllBasinsInput {
993    /// Create a new [`ListAllBasinsInput`] with default values.
994    pub fn new() -> Self {
995        Self::default()
996    }
997
998    /// Set the prefix used to filter basins whose names begin with this value.
999    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1000        Self { prefix, ..self }
1001    }
1002
1003    /// Set the value used to filter basins whose names are lexicographically greater than this
1004    /// value.
1005    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1006        Self {
1007            start_after,
1008            ..self
1009        }
1010    }
1011
1012    /// Set whether to include basins that are being deleted.
1013    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1014        Self {
1015            include_deleted,
1016            ..self
1017        }
1018    }
1019}
1020
1021#[derive(Debug, Clone, PartialEq, Eq)]
1022/// Current state of a basin.
1023pub enum BasinState {
1024    /// Active
1025    Active,
1026    /// Creating
1027    Creating,
1028    /// Deleting
1029    Deleting,
1030}
1031
1032impl From<api::basin::BasinState> for BasinState {
1033    fn from(value: api::basin::BasinState) -> Self {
1034        match value {
1035            api::basin::BasinState::Active => BasinState::Active,
1036            api::basin::BasinState::Creating => BasinState::Creating,
1037            api::basin::BasinState::Deleting => BasinState::Deleting,
1038        }
1039    }
1040}
1041
1042#[derive(Debug, Clone, PartialEq, Eq)]
1043#[non_exhaustive]
1044/// Basin information.
1045pub struct BasinInfo {
1046    /// Basin name.
1047    pub name: BasinName,
1048    /// Scope of the basin.
1049    pub scope: Option<BasinScope>,
1050    /// Current state of the basin.
1051    pub state: BasinState,
1052}
1053
1054impl From<api::basin::BasinInfo> for BasinInfo {
1055    fn from(value: api::basin::BasinInfo) -> Self {
1056        Self {
1057            name: value.name,
1058            scope: value.scope.map(Into::into),
1059            state: value.state.into(),
1060        }
1061    }
1062}
1063
1064#[derive(Debug, Clone)]
1065#[non_exhaustive]
1066/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1067pub struct DeleteBasinInput {
1068    /// Basin name.
1069    pub name: BasinName,
1070    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1071    pub ignore_not_found: bool,
1072}
1073
1074impl DeleteBasinInput {
1075    /// Create a new [`DeleteBasinInput`] with the given basin name.
1076    pub fn new(name: BasinName) -> Self {
1077        Self {
1078            name,
1079            ignore_not_found: false,
1080        }
1081    }
1082
1083    /// Set whether to ignore `Not Found` error if the basin is not existing.
1084    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1085        Self {
1086            ignore_not_found,
1087            ..self
1088        }
1089    }
1090}
1091
1092#[derive(Debug, Clone, Default)]
1093#[non_exhaustive]
1094/// Reconfiguration for [`TimestampingConfig`].
1095pub struct TimestampingReconfiguration {
1096    /// Override for the existing [`mode`](TimestampingConfig::mode).
1097    pub mode: Maybe<Option<TimestampingMode>>,
1098    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1099    pub uncapped: Maybe<Option<bool>>,
1100}
1101
1102impl TimestampingReconfiguration {
1103    /// Create a new [`TimestampingReconfiguration`].
1104    pub fn new() -> Self {
1105        Self::default()
1106    }
1107
1108    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1109    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1110        Self {
1111            mode: Maybe::Specified(Some(mode)),
1112            ..self
1113        }
1114    }
1115
1116    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1117    pub fn with_uncapped(self, uncapped: bool) -> Self {
1118        Self {
1119            uncapped: Maybe::Specified(Some(uncapped)),
1120            ..self
1121        }
1122    }
1123}
1124
1125impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1126    fn from(value: TimestampingReconfiguration) -> Self {
1127        Self {
1128            mode: value.mode.map(|m| m.map(Into::into)),
1129            uncapped: value.uncapped,
1130        }
1131    }
1132}
1133
1134#[derive(Debug, Clone, Default)]
1135#[non_exhaustive]
1136/// Reconfiguration for [`DeleteOnEmptyConfig`].
1137pub struct DeleteOnEmptyReconfiguration {
1138    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1139    pub min_age_secs: Maybe<Option<u64>>,
1140}
1141
1142impl DeleteOnEmptyReconfiguration {
1143    /// Create a new [`DeleteOnEmptyReconfiguration`].
1144    pub fn new() -> Self {
1145        Self::default()
1146    }
1147
1148    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1149    pub fn with_min_age(self, min_age: Duration) -> Self {
1150        Self {
1151            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1152        }
1153    }
1154}
1155
1156impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1157    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1158        Self {
1159            min_age_secs: value.min_age_secs,
1160        }
1161    }
1162}
1163
1164#[derive(Debug, Clone, Default)]
1165#[non_exhaustive]
1166/// Reconfiguration for [`StreamConfig`].
1167pub struct StreamReconfiguration {
1168    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1169    pub storage_class: Maybe<Option<StorageClass>>,
1170    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1171    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1172    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1173    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1174    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1175    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1176}
1177
1178impl StreamReconfiguration {
1179    /// Create a new [`StreamReconfiguration`].
1180    pub fn new() -> Self {
1181        Self::default()
1182    }
1183
1184    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1185    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1186        Self {
1187            storage_class: Maybe::Specified(Some(storage_class)),
1188            ..self
1189        }
1190    }
1191
1192    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1193    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1194        Self {
1195            retention_policy: Maybe::Specified(Some(retention_policy)),
1196            ..self
1197        }
1198    }
1199
1200    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1201    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1202        Self {
1203            timestamping: Maybe::Specified(Some(timestamping)),
1204            ..self
1205        }
1206    }
1207
1208    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1209    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1210        Self {
1211            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1212            ..self
1213        }
1214    }
1215}
1216
1217impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1218    fn from(value: StreamReconfiguration) -> Self {
1219        Self {
1220            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1221            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1222            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1223            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1224        }
1225    }
1226}
1227
1228#[derive(Debug, Clone, Default)]
1229#[non_exhaustive]
1230/// Reconfiguration for [`BasinConfig`].
1231pub struct BasinReconfiguration {
1232    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1233    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1234    /// Override for the existing
1235    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1236    pub create_stream_on_append: Maybe<bool>,
1237    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1238    pub create_stream_on_read: Maybe<bool>,
1239}
1240
1241impl BasinReconfiguration {
1242    /// Create a new [`BasinReconfiguration`].
1243    pub fn new() -> Self {
1244        Self::default()
1245    }
1246
1247    /// Set the override for the existing
1248    /// [`default_stream_config`](BasinConfig::default_stream_config).
1249    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1250        Self {
1251            default_stream_config: Maybe::Specified(Some(config)),
1252            ..self
1253        }
1254    }
1255
1256    /// Set the override for the existing
1257    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1258    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1259        Self {
1260            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1261            ..self
1262        }
1263    }
1264
1265    /// Set the override for the existing
1266    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1267    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1268        Self {
1269            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1270            ..self
1271        }
1272    }
1273}
1274
1275impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1276    fn from(value: BasinReconfiguration) -> Self {
1277        Self {
1278            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1279            create_stream_on_append: value.create_stream_on_append,
1280            create_stream_on_read: value.create_stream_on_read,
1281        }
1282    }
1283}
1284
1285#[derive(Debug, Clone)]
1286#[non_exhaustive]
1287/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1288pub struct ReconfigureBasinInput {
1289    /// Basin name.
1290    pub name: BasinName,
1291    /// Reconfiguration for [`BasinConfig`].
1292    pub config: BasinReconfiguration,
1293}
1294
1295impl ReconfigureBasinInput {
1296    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1297    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1298        Self { name, config }
1299    }
1300}
1301
1302#[derive(Debug, Clone, Default)]
1303#[non_exhaustive]
1304/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1305pub struct ListAccessTokensInput {
1306    /// Filter access tokens whose IDs begin with this value.
1307    ///
1308    /// Defaults to `""`.
1309    pub prefix: AccessTokenIdPrefix,
1310    /// Filter access tokens whose IDs are lexicographically greater than this value.
1311    ///
1312    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1313    ///
1314    /// Defaults to `""`.
1315    pub start_after: AccessTokenIdStartAfter,
1316    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1317    ///
1318    /// Defaults to `1000`.
1319    pub limit: Option<usize>,
1320}
1321
1322impl ListAccessTokensInput {
1323    /// Create a new [`ListAccessTokensInput`] with default values.
1324    pub fn new() -> Self {
1325        Self::default()
1326    }
1327
1328    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1329    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1330        Self { prefix, ..self }
1331    }
1332
1333    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1334    /// value.
1335    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1336        Self {
1337            start_after,
1338            ..self
1339        }
1340    }
1341
1342    /// Set the limit on number of access tokens to return in a page.
1343    pub fn with_limit(self, limit: usize) -> Self {
1344        Self {
1345            limit: Some(limit),
1346            ..self
1347        }
1348    }
1349}
1350
1351impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1352    fn from(value: ListAccessTokensInput) -> Self {
1353        Self {
1354            prefix: Some(value.prefix),
1355            start_after: Some(value.start_after),
1356            limit: value.limit,
1357        }
1358    }
1359}
1360
1361#[derive(Debug, Clone, Default)]
1362/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1363pub struct ListAllAccessTokensInput {
1364    /// Filter access tokens whose IDs begin with this value.
1365    ///
1366    /// Defaults to `""`.
1367    pub prefix: AccessTokenIdPrefix,
1368    /// Filter access tokens whose IDs are lexicographically greater than this value.
1369    ///
1370    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1371    ///
1372    /// Defaults to `""`.
1373    pub start_after: AccessTokenIdStartAfter,
1374}
1375
1376impl ListAllAccessTokensInput {
1377    /// Create a new [`ListAllAccessTokensInput`] with default values.
1378    pub fn new() -> Self {
1379        Self::default()
1380    }
1381
1382    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1383    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1384        Self { prefix, ..self }
1385    }
1386
1387    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1388    /// this value.
1389    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1390        Self {
1391            start_after,
1392            ..self
1393        }
1394    }
1395}
1396
1397#[derive(Debug, Clone)]
1398#[non_exhaustive]
1399/// Access token information.
1400pub struct AccessTokenInfo {
1401    /// Access token ID.
1402    pub id: AccessTokenId,
1403    /// Expiration time.
1404    pub expires_at: S2DateTime,
1405    /// Whether to automatically prefix stream names during creation and strip the prefix during
1406    /// listing.
1407    pub auto_prefix_streams: bool,
1408    /// Scope of the access token.
1409    pub scope: AccessTokenScope,
1410}
1411
1412impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1413    type Error = ValidationError;
1414
1415    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1416        let expires_at = value
1417            .expires_at
1418            .map(Into::into)
1419            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1420        Ok(Self {
1421            id: value.id,
1422            expires_at,
1423            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1424            scope: value.scope.into(),
1425        })
1426    }
1427}
1428
1429#[derive(Debug, Clone)]
1430/// Pattern for matching basins.
1431///
1432/// See [`AccessTokenScope::basins`].
1433pub enum BasinMatcher {
1434    /// Match no basins.
1435    None,
1436    /// Match exactly this basin.
1437    Exact(BasinName),
1438    /// Match all basins with this prefix.
1439    Prefix(BasinNamePrefix),
1440}
1441
1442#[derive(Debug, Clone)]
1443/// Pattern for matching streams.
1444///
1445/// See [`AccessTokenScope::streams`].
1446pub enum StreamMatcher {
1447    /// Match no streams.
1448    None,
1449    /// Match exactly this stream.
1450    Exact(StreamName),
1451    /// Match all streams with this prefix.
1452    Prefix(StreamNamePrefix),
1453}
1454
1455#[derive(Debug, Clone)]
1456/// Pattern for matching access tokens.
1457///
1458/// See [`AccessTokenScope::access_tokens`].
1459pub enum AccessTokenMatcher {
1460    /// Match no access tokens.
1461    None,
1462    /// Match exactly this access token.
1463    Exact(AccessTokenId),
1464    /// Match all access tokens with this prefix.
1465    Prefix(AccessTokenIdPrefix),
1466}
1467
1468#[derive(Debug, Clone, Default)]
1469#[non_exhaustive]
1470/// Permissions indicating allowed operations.
1471pub struct ReadWritePermissions {
1472    /// Read permission.
1473    ///
1474    /// Defaults to `false`.
1475    pub read: bool,
1476    /// Write permission.
1477    ///
1478    /// Defaults to `false`.
1479    pub write: bool,
1480}
1481
1482impl ReadWritePermissions {
1483    /// Create a new [`ReadWritePermissions`] with default values.
1484    pub fn new() -> Self {
1485        Self::default()
1486    }
1487
1488    /// Create read-only permissions.
1489    pub fn read_only() -> Self {
1490        Self {
1491            read: true,
1492            write: false,
1493        }
1494    }
1495
1496    /// Create write-only permissions.
1497    pub fn write_only() -> Self {
1498        Self {
1499            read: false,
1500            write: true,
1501        }
1502    }
1503
1504    /// Create read-write permissions.
1505    pub fn read_write() -> Self {
1506        Self {
1507            read: true,
1508            write: true,
1509        }
1510    }
1511}
1512
1513impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1514    fn from(value: ReadWritePermissions) -> Self {
1515        Self {
1516            read: Some(value.read),
1517            write: Some(value.write),
1518        }
1519    }
1520}
1521
1522impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1523    fn from(value: api::access::ReadWritePermissions) -> Self {
1524        Self {
1525            read: value.read.unwrap_or_default(),
1526            write: value.write.unwrap_or_default(),
1527        }
1528    }
1529}
1530
1531#[derive(Debug, Clone, Default)]
1532#[non_exhaustive]
1533/// Permissions at the operation group level.
1534///
1535/// See [`AccessTokenScope::op_group_perms`].
1536pub struct OperationGroupPermissions {
1537    /// Account-level access permissions.
1538    ///
1539    /// Defaults to `None`.
1540    pub account: Option<ReadWritePermissions>,
1541    /// Basin-level access permissions.
1542    ///
1543    /// Defaults to `None`.
1544    pub basin: Option<ReadWritePermissions>,
1545    /// Stream-level access permissions.
1546    ///
1547    /// Defaults to `None`.
1548    pub stream: Option<ReadWritePermissions>,
1549}
1550
1551impl OperationGroupPermissions {
1552    /// Create a new [`OperationGroupPermissions`] with default values.
1553    pub fn new() -> Self {
1554        Self::default()
1555    }
1556
1557    /// Create read-only permissions for all groups.
1558    pub fn read_only_all() -> Self {
1559        Self {
1560            account: Some(ReadWritePermissions::read_only()),
1561            basin: Some(ReadWritePermissions::read_only()),
1562            stream: Some(ReadWritePermissions::read_only()),
1563        }
1564    }
1565
1566    /// Create write-only permissions for all groups.
1567    pub fn write_only_all() -> Self {
1568        Self {
1569            account: Some(ReadWritePermissions::write_only()),
1570            basin: Some(ReadWritePermissions::write_only()),
1571            stream: Some(ReadWritePermissions::write_only()),
1572        }
1573    }
1574
1575    /// Create read-write permissions for all groups.
1576    pub fn read_write_all() -> Self {
1577        Self {
1578            account: Some(ReadWritePermissions::read_write()),
1579            basin: Some(ReadWritePermissions::read_write()),
1580            stream: Some(ReadWritePermissions::read_write()),
1581        }
1582    }
1583
1584    /// Set account-level access permissions.
1585    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1586        Self {
1587            account: Some(account),
1588            ..self
1589        }
1590    }
1591
1592    /// Set basin-level access permissions.
1593    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1594        Self {
1595            basin: Some(basin),
1596            ..self
1597        }
1598    }
1599
1600    /// Set stream-level access permissions.
1601    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1602        Self {
1603            stream: Some(stream),
1604            ..self
1605        }
1606    }
1607}
1608
1609impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1610    fn from(value: OperationGroupPermissions) -> Self {
1611        Self {
1612            account: value.account.map(Into::into),
1613            basin: value.basin.map(Into::into),
1614            stream: value.stream.map(Into::into),
1615        }
1616    }
1617}
1618
1619impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1620    fn from(value: api::access::PermittedOperationGroups) -> Self {
1621        Self {
1622            account: value.account.map(Into::into),
1623            basin: value.basin.map(Into::into),
1624            stream: value.stream.map(Into::into),
1625        }
1626    }
1627}
1628
1629#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1630/// Individual operation that can be permitted.
1631///
1632/// See [`AccessTokenScope::ops`].
1633pub enum Operation {
1634    /// List basins.
1635    ListBasins,
1636    /// Create a basin.
1637    CreateBasin,
1638    /// Get basin configuration.
1639    GetBasinConfig,
1640    /// Delete a basin.
1641    DeleteBasin,
1642    /// Reconfigure a basin.
1643    ReconfigureBasin,
1644    /// List access tokens.
1645    ListAccessTokens,
1646    /// Issue an access token.
1647    IssueAccessToken,
1648    /// Revoke an access token.
1649    RevokeAccessToken,
1650    /// Get account metrics.
1651    GetAccountMetrics,
1652    /// Get basin metrics.
1653    GetBasinMetrics,
1654    /// Get stream metrics.
1655    GetStreamMetrics,
1656    /// List streams.
1657    ListStreams,
1658    /// Create a stream.
1659    CreateStream,
1660    /// Get stream configuration.
1661    GetStreamConfig,
1662    /// Delete a stream.
1663    DeleteStream,
1664    /// Reconfigure a stream.
1665    ReconfigureStream,
1666    /// Check the tail of a stream.
1667    CheckTail,
1668    /// Append records to a stream.
1669    Append,
1670    /// Read records from a stream.
1671    Read,
1672    /// Trim records on a stream.
1673    Trim,
1674    /// Set the fencing token on a stream.
1675    Fence,
1676}
1677
1678impl From<Operation> for api::access::Operation {
1679    fn from(value: Operation) -> Self {
1680        match value {
1681            Operation::ListBasins => api::access::Operation::ListBasins,
1682            Operation::CreateBasin => api::access::Operation::CreateBasin,
1683            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1684            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1685            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1686            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1687            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1688            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1689            Operation::ListStreams => api::access::Operation::ListStreams,
1690            Operation::CreateStream => api::access::Operation::CreateStream,
1691            Operation::DeleteStream => api::access::Operation::DeleteStream,
1692            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1693            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1694            Operation::CheckTail => api::access::Operation::CheckTail,
1695            Operation::Append => api::access::Operation::Append,
1696            Operation::Read => api::access::Operation::Read,
1697            Operation::Trim => api::access::Operation::Trim,
1698            Operation::Fence => api::access::Operation::Fence,
1699            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1700            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1701            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1702        }
1703    }
1704}
1705
1706impl From<api::access::Operation> for Operation {
1707    fn from(value: api::access::Operation) -> Self {
1708        match value {
1709            api::access::Operation::ListBasins => Operation::ListBasins,
1710            api::access::Operation::CreateBasin => Operation::CreateBasin,
1711            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1712            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1713            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1714            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1715            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1716            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1717            api::access::Operation::ListStreams => Operation::ListStreams,
1718            api::access::Operation::CreateStream => Operation::CreateStream,
1719            api::access::Operation::DeleteStream => Operation::DeleteStream,
1720            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1721            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1722            api::access::Operation::CheckTail => Operation::CheckTail,
1723            api::access::Operation::Append => Operation::Append,
1724            api::access::Operation::Read => Operation::Read,
1725            api::access::Operation::Trim => Operation::Trim,
1726            api::access::Operation::Fence => Operation::Fence,
1727            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1728            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1729            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1730        }
1731    }
1732}
1733
1734#[derive(Debug, Clone)]
1735#[non_exhaustive]
1736/// Scope of an access token.
1737///
1738/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1739/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1740/// final set must not be empty.
1741///
1742/// See [`IssueAccessTokenInput::scope`].
1743pub struct AccessTokenScopeInput {
1744    basins: Option<BasinMatcher>,
1745    streams: Option<StreamMatcher>,
1746    access_tokens: Option<AccessTokenMatcher>,
1747    op_group_perms: Option<OperationGroupPermissions>,
1748    ops: HashSet<Operation>,
1749}
1750
1751impl AccessTokenScopeInput {
1752    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1753    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1754        Self {
1755            basins: None,
1756            streams: None,
1757            access_tokens: None,
1758            op_group_perms: None,
1759            ops: ops.into_iter().collect(),
1760        }
1761    }
1762
1763    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1764    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1765        Self {
1766            basins: None,
1767            streams: None,
1768            access_tokens: None,
1769            op_group_perms: Some(op_group_perms),
1770            ops: HashSet::default(),
1771        }
1772    }
1773
1774    /// Set the permitted operations.
1775    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1776        Self {
1777            ops: ops.into_iter().collect(),
1778            ..self
1779        }
1780    }
1781
1782    /// Set the access permissions at the operation group level.
1783    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1784        Self {
1785            op_group_perms: Some(op_group_perms),
1786            ..self
1787        }
1788    }
1789
1790    /// Set the permitted basins.
1791    ///
1792    /// Defaults to no basins.
1793    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1794        Self {
1795            basins: Some(basins),
1796            ..self
1797        }
1798    }
1799
1800    /// Set the permitted streams.
1801    ///
1802    /// Defaults to no streams.
1803    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1804        Self {
1805            streams: Some(streams),
1806            ..self
1807        }
1808    }
1809
1810    /// Set the permitted access tokens.
1811    ///
1812    /// Defaults to no access tokens.
1813    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1814        Self {
1815            access_tokens: Some(access_tokens),
1816            ..self
1817        }
1818    }
1819}
1820
1821#[derive(Debug, Clone)]
1822#[non_exhaustive]
1823/// Scope of an access token.
1824pub struct AccessTokenScope {
1825    /// Permitted basins.
1826    pub basins: Option<BasinMatcher>,
1827    /// Permitted streams.
1828    pub streams: Option<StreamMatcher>,
1829    /// Permitted access tokens.
1830    pub access_tokens: Option<AccessTokenMatcher>,
1831    /// Permissions at the operation group level.
1832    pub op_group_perms: Option<OperationGroupPermissions>,
1833    /// Permitted operations.
1834    pub ops: HashSet<Operation>,
1835}
1836
1837impl From<api::access::AccessTokenScope> for AccessTokenScope {
1838    fn from(value: api::access::AccessTokenScope) -> Self {
1839        Self {
1840            basins: value.basins.map(|rs| match rs {
1841                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1842                    BasinMatcher::Exact(e)
1843                }
1844                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1845                    BasinMatcher::None
1846                }
1847                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1848            }),
1849            streams: value.streams.map(|rs| match rs {
1850                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1851                    StreamMatcher::Exact(e)
1852                }
1853                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1854                    StreamMatcher::None
1855                }
1856                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1857            }),
1858            access_tokens: value.access_tokens.map(|rs| match rs {
1859                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1860                    AccessTokenMatcher::Exact(e)
1861                }
1862                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1863                    AccessTokenMatcher::None
1864                }
1865                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1866            }),
1867            op_group_perms: value.op_groups.map(Into::into),
1868            ops: value
1869                .ops
1870                .map(|ops| ops.into_iter().map(Into::into).collect())
1871                .unwrap_or_default(),
1872        }
1873    }
1874}
1875
1876impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1877    fn from(value: AccessTokenScopeInput) -> Self {
1878        Self {
1879            basins: value.basins.map(|rs| match rs {
1880                BasinMatcher::None => {
1881                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1882                }
1883                BasinMatcher::Exact(e) => {
1884                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1885                }
1886                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1887            }),
1888            streams: value.streams.map(|rs| match rs {
1889                StreamMatcher::None => {
1890                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1891                }
1892                StreamMatcher::Exact(e) => {
1893                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1894                }
1895                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1896            }),
1897            access_tokens: value.access_tokens.map(|rs| match rs {
1898                AccessTokenMatcher::None => {
1899                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1900                }
1901                AccessTokenMatcher::Exact(e) => {
1902                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1903                }
1904                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1905            }),
1906            op_groups: value.op_group_perms.map(Into::into),
1907            ops: if value.ops.is_empty() {
1908                None
1909            } else {
1910                Some(value.ops.into_iter().map(Into::into).collect())
1911            },
1912        }
1913    }
1914}
1915
1916#[derive(Debug, Clone)]
1917#[non_exhaustive]
1918/// Input for [`issue_access_token`](crate::S2::issue_access_token).
1919pub struct IssueAccessTokenInput {
1920    /// Access token ID.
1921    pub id: AccessTokenId,
1922    /// Expiration time.
1923    ///
1924    /// Defaults to the expiration time of requestor's access token passed via
1925    /// [`S2Config`](S2Config::new).
1926    pub expires_at: Option<S2DateTime>,
1927    /// Whether to automatically prefix stream names during creation and strip the prefix during
1928    /// listing.
1929    ///
1930    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
1931    /// prefix.
1932    ///
1933    /// Defaults to `false`.
1934    pub auto_prefix_streams: bool,
1935    /// Scope of the token.
1936    pub scope: AccessTokenScopeInput,
1937}
1938
1939impl IssueAccessTokenInput {
1940    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
1941    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1942        Self {
1943            id,
1944            expires_at: None,
1945            auto_prefix_streams: false,
1946            scope,
1947        }
1948    }
1949
1950    /// Set the expiration time.
1951    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1952        Self {
1953            expires_at: Some(expires_at),
1954            ..self
1955        }
1956    }
1957
1958    /// Set whether to automatically prefix stream names during creation and strip the prefix during
1959    /// listing.
1960    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1961        Self {
1962            auto_prefix_streams,
1963            ..self
1964        }
1965    }
1966}
1967
1968impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1969    fn from(value: IssueAccessTokenInput) -> Self {
1970        Self {
1971            id: value.id,
1972            expires_at: value.expires_at.map(Into::into),
1973            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1974            scope: value.scope.into(),
1975        }
1976    }
1977}
1978
1979#[derive(Debug, Clone, Copy)]
1980/// Interval to accumulate over for timeseries metric sets.
1981pub enum TimeseriesInterval {
1982    /// Minute.
1983    Minute,
1984    /// Hour.
1985    Hour,
1986    /// Day.
1987    Day,
1988}
1989
1990impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
1991    fn from(value: TimeseriesInterval) -> Self {
1992        match value {
1993            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
1994            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
1995            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
1996        }
1997    }
1998}
1999
2000impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2001    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2002        match value {
2003            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2004            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2005            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2006        }
2007    }
2008}
2009
2010#[derive(Debug, Clone, Copy)]
2011#[non_exhaustive]
2012/// Time range as Unix epoch seconds.
2013pub struct TimeRange {
2014    /// Start timestamp (inclusive).
2015    pub start: u32,
2016    /// End timestamp (exclusive).
2017    pub end: u32,
2018}
2019
2020impl TimeRange {
2021    /// Create a new [`TimeRange`] with the given start and end timestamps.
2022    pub fn new(start: u32, end: u32) -> Self {
2023        Self { start, end }
2024    }
2025}
2026
2027#[derive(Debug, Clone, Copy)]
2028#[non_exhaustive]
2029/// Time range as Unix epoch seconds and accumulation interval.
2030pub struct TimeRangeAndInterval {
2031    /// Start timestamp (inclusive).
2032    pub start: u32,
2033    /// End timestamp (exclusive).
2034    pub end: u32,
2035    /// Interval to accumulate over for timeseries metric sets.
2036    ///
2037    /// Default is dependent on the requested metric set.
2038    pub interval: Option<TimeseriesInterval>,
2039}
2040
2041impl TimeRangeAndInterval {
2042    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2043    pub fn new(start: u32, end: u32) -> Self {
2044        Self {
2045            start,
2046            end,
2047            interval: None,
2048        }
2049    }
2050
2051    /// Set the interval to accumulate over for timeseries metric sets.
2052    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2053        Self {
2054            interval: Some(interval),
2055            ..self
2056        }
2057    }
2058}
2059
2060#[derive(Debug, Clone, Copy)]
2061/// Account metric set to return.
2062pub enum AccountMetricSet {
2063    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2064    /// specified time range.
2065    ActiveBasins(TimeRange),
2066    /// Returns [`AccumulationMetric`]s, one per account operation type.
2067    ///
2068    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2069    /// per interval over the requested time range.
2070    ///
2071    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2072    AccountOps(TimeRangeAndInterval),
2073}
2074
2075#[derive(Debug, Clone)]
2076#[non_exhaustive]
2077/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2078pub struct GetAccountMetricsInput {
2079    /// Metric set to return.
2080    pub set: AccountMetricSet,
2081}
2082
2083impl GetAccountMetricsInput {
2084    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2085    pub fn new(set: AccountMetricSet) -> Self {
2086        Self { set }
2087    }
2088}
2089
2090impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2091    fn from(value: GetAccountMetricsInput) -> Self {
2092        let (set, start, end, interval) = match value.set {
2093            AccountMetricSet::ActiveBasins(args) => (
2094                api::metrics::AccountMetricSet::ActiveBasins,
2095                args.start,
2096                args.end,
2097                None,
2098            ),
2099            AccountMetricSet::AccountOps(args) => (
2100                api::metrics::AccountMetricSet::AccountOps,
2101                args.start,
2102                args.end,
2103                args.interval,
2104            ),
2105        };
2106        Self {
2107            set,
2108            start: Some(start),
2109            end: Some(end),
2110            interval: interval.map(Into::into),
2111        }
2112    }
2113}
2114
2115#[derive(Debug, Clone, Copy)]
2116/// Basin metric set to return.
2117pub enum BasinMetricSet {
2118    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2119    /// in the basin, with one observed value for each hour over the requested time range.
2120    Storage(TimeRange),
2121    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2122    ///
2123    /// Each metric represents a timeseries of the number of append operations across all streams
2124    /// in the basin, with one accumulated value per interval over the requested time range.
2125    ///
2126    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2127    /// [`minute`](TimeseriesInterval::Minute).
2128    AppendOps(TimeRangeAndInterval),
2129    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2130    ///
2131    /// Each metric represents a timeseries of the number of read operations across all streams
2132    /// in the basin, with one accumulated value per interval over the requested time range.
2133    ///
2134    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2135    /// [`minute`](TimeseriesInterval::Minute).
2136    ReadOps(TimeRangeAndInterval),
2137    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2138    /// across all streams in the basin, with one accumulated value per interval
2139    /// over the requested time range.
2140    ///
2141    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2142    /// [`minute`](TimeseriesInterval::Minute).
2143    ReadThroughput(TimeRangeAndInterval),
2144    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2145    /// across all streams in the basin, with one accumulated value per interval
2146    /// over the requested time range.
2147    ///
2148    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2149    /// [`minute`](TimeseriesInterval::Minute).
2150    AppendThroughput(TimeRangeAndInterval),
2151    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2152    ///
2153    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2154    /// per interval over the requested time range.
2155    ///
2156    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2157    BasinOps(TimeRangeAndInterval),
2158}
2159
2160#[derive(Debug, Clone)]
2161#[non_exhaustive]
2162/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2163pub struct GetBasinMetricsInput {
2164    /// Basin name.
2165    pub name: BasinName,
2166    /// Metric set to return.
2167    pub set: BasinMetricSet,
2168}
2169
2170impl GetBasinMetricsInput {
2171    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2172    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2173        Self { name, set }
2174    }
2175}
2176
2177impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2178    fn from(value: GetBasinMetricsInput) -> Self {
2179        let (set, start, end, interval) = match value.set {
2180            BasinMetricSet::Storage(args) => (
2181                api::metrics::BasinMetricSet::Storage,
2182                args.start,
2183                args.end,
2184                None,
2185            ),
2186            BasinMetricSet::AppendOps(args) => (
2187                api::metrics::BasinMetricSet::AppendOps,
2188                args.start,
2189                args.end,
2190                args.interval,
2191            ),
2192            BasinMetricSet::ReadOps(args) => (
2193                api::metrics::BasinMetricSet::ReadOps,
2194                args.start,
2195                args.end,
2196                args.interval,
2197            ),
2198            BasinMetricSet::ReadThroughput(args) => (
2199                api::metrics::BasinMetricSet::ReadThroughput,
2200                args.start,
2201                args.end,
2202                args.interval,
2203            ),
2204            BasinMetricSet::AppendThroughput(args) => (
2205                api::metrics::BasinMetricSet::AppendThroughput,
2206                args.start,
2207                args.end,
2208                args.interval,
2209            ),
2210            BasinMetricSet::BasinOps(args) => (
2211                api::metrics::BasinMetricSet::BasinOps,
2212                args.start,
2213                args.end,
2214                args.interval,
2215            ),
2216        };
2217        (
2218            value.name,
2219            api::metrics::BasinMetricSetRequest {
2220                set,
2221                start: Some(start),
2222                end: Some(end),
2223                interval: interval.map(Into::into),
2224            },
2225        )
2226    }
2227}
2228
2229#[derive(Debug, Clone, Copy)]
2230/// Stream metric set to return.
2231pub enum StreamMetricSet {
2232    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2233    /// with one observed value for each minute over the requested time range.
2234    Storage(TimeRange),
2235}
2236
2237#[derive(Debug, Clone)]
2238#[non_exhaustive]
2239/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2240pub struct GetStreamMetricsInput {
2241    /// Basin name.
2242    pub basin_name: BasinName,
2243    /// Stream name.
2244    pub stream_name: StreamName,
2245    /// Metric set to return.
2246    pub set: StreamMetricSet,
2247}
2248
2249impl GetStreamMetricsInput {
2250    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2251    /// set.
2252    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2253        Self {
2254            basin_name,
2255            stream_name,
2256            set,
2257        }
2258    }
2259}
2260
2261impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2262    fn from(value: GetStreamMetricsInput) -> Self {
2263        let (set, start, end, interval) = match value.set {
2264            StreamMetricSet::Storage(args) => (
2265                api::metrics::StreamMetricSet::Storage,
2266                args.start,
2267                args.end,
2268                None,
2269            ),
2270        };
2271        (
2272            value.basin_name,
2273            value.stream_name,
2274            api::metrics::StreamMetricSetRequest {
2275                set,
2276                start: Some(start),
2277                end: Some(end),
2278                interval,
2279            },
2280        )
2281    }
2282}
2283
2284#[derive(Debug, Clone, Copy)]
2285/// Unit in which metric values are measured.
2286pub enum MetricUnit {
2287    /// Size in bytes.
2288    Bytes,
2289    /// Number of operations.
2290    Operations,
2291}
2292
2293impl From<api::metrics::MetricUnit> for MetricUnit {
2294    fn from(value: api::metrics::MetricUnit) -> Self {
2295        match value {
2296            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2297            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2298        }
2299    }
2300}
2301
2302#[derive(Debug, Clone)]
2303#[non_exhaustive]
2304/// Single named value.
2305pub struct ScalarMetric {
2306    /// Metric name.
2307    pub name: String,
2308    /// Unit for the metric value.
2309    pub unit: MetricUnit,
2310    /// Metric value.
2311    pub value: f64,
2312}
2313
2314#[derive(Debug, Clone)]
2315#[non_exhaustive]
2316/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2317/// specified interval.
2318pub struct AccumulationMetric {
2319    /// Timeseries name.
2320    pub name: String,
2321    /// Unit for the accumulated values.
2322    pub unit: MetricUnit,
2323    /// The interval at which datapoints are accumulated.
2324    pub interval: TimeseriesInterval,
2325    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2326    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2327    /// one `interval`.
2328    pub values: Vec<(u32, f64)>,
2329}
2330
2331#[derive(Debug, Clone)]
2332#[non_exhaustive]
2333/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2334pub struct GaugeMetric {
2335    /// Timeseries name.
2336    pub name: String,
2337    /// Unit for the instantaneous values.
2338    pub unit: MetricUnit,
2339    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2340    /// instant of the `timestamp` (in Unix epoch seconds).
2341    pub values: Vec<(u32, f64)>,
2342}
2343
2344#[derive(Debug, Clone)]
2345#[non_exhaustive]
2346/// Set of string labels.
2347pub struct LabelMetric {
2348    /// Label name.
2349    pub name: String,
2350    /// Label values.
2351    pub values: Vec<String>,
2352}
2353
2354#[derive(Debug, Clone)]
2355/// Individual metric in a returned metric set.
2356pub enum Metric {
2357    /// Single named value.
2358    Scalar(ScalarMetric),
2359    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2360    /// specified interval.
2361    Accumulation(AccumulationMetric),
2362    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2363    Gauge(GaugeMetric),
2364    /// Set of string labels.
2365    Label(LabelMetric),
2366}
2367
2368impl From<api::metrics::Metric> for Metric {
2369    fn from(value: api::metrics::Metric) -> Self {
2370        match value {
2371            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2372                name: sm.name.into(),
2373                unit: sm.unit.into(),
2374                value: sm.value,
2375            }),
2376            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2377                name: am.name.into(),
2378                unit: am.unit.into(),
2379                interval: am.interval.into(),
2380                values: am.values,
2381            }),
2382            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2383                name: gm.name.into(),
2384                unit: gm.unit.into(),
2385                values: gm.values,
2386            }),
2387            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2388                name: lm.name.into(),
2389                values: lm.values,
2390            }),
2391        }
2392    }
2393}
2394
2395#[derive(Debug, Clone, Default)]
2396#[non_exhaustive]
2397/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2398pub struct ListStreamsInput {
2399    /// Filter streams whose names begin with this value.
2400    ///
2401    /// Defaults to `""`.
2402    pub prefix: StreamNamePrefix,
2403    /// Filter streams whose names are lexicographically greater than this value.
2404    ///
2405    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2406    ///
2407    /// Defaults to `""`.
2408    pub start_after: StreamNameStartAfter,
2409    ///  Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2410    ///
2411    /// Defaults to `1000`.
2412    pub limit: Option<usize>,
2413}
2414
2415impl ListStreamsInput {
2416    /// Create a new [`ListStreamsInput`] with default values.
2417    pub fn new() -> Self {
2418        Self::default()
2419    }
2420
2421    /// Set the prefix used to filter streams whose names begin with this value.
2422    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2423        Self { prefix, ..self }
2424    }
2425
2426    /// Set the value used to filter streams whose names are lexicographically greater than this
2427    /// value.
2428    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2429        Self {
2430            start_after,
2431            ..self
2432        }
2433    }
2434
2435    /// Set the limit on number of streams to return in a page.
2436    pub fn with_limit(self, limit: usize) -> Self {
2437        Self {
2438            limit: Some(limit),
2439            ..self
2440        }
2441    }
2442}
2443
2444impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2445    fn from(value: ListStreamsInput) -> Self {
2446        Self {
2447            prefix: Some(value.prefix),
2448            start_after: Some(value.start_after),
2449            limit: value.limit,
2450        }
2451    }
2452}
2453
2454#[derive(Debug, Clone, Default)]
2455/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2456pub struct ListAllStreamsInput {
2457    /// Filter streams whose names begin with this value.
2458    ///
2459    /// Defaults to `""`.
2460    pub prefix: StreamNamePrefix,
2461    /// Filter streams whose names are lexicographically greater than this value.
2462    ///
2463    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2464    ///
2465    /// Defaults to `""`.
2466    pub start_after: StreamNameStartAfter,
2467    /// Whether to include streams that are being deleted.
2468    ///
2469    /// Defaults to `false`.
2470    pub include_deleted: bool,
2471}
2472
2473impl ListAllStreamsInput {
2474    /// Create a new [`ListAllStreamsInput`] with default values.
2475    pub fn new() -> Self {
2476        Self::default()
2477    }
2478
2479    /// Set the prefix used to filter streams whose names begin with this value.
2480    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2481        Self { prefix, ..self }
2482    }
2483
2484    /// Set the value used to filter streams whose names are lexicographically greater than this
2485    /// value.
2486    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2487        Self {
2488            start_after,
2489            ..self
2490        }
2491    }
2492
2493    /// Set whether to include streams that are being deleted.
2494    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2495        Self {
2496            include_deleted,
2497            ..self
2498        }
2499    }
2500}
2501
2502#[derive(Debug, Clone, PartialEq)]
2503#[non_exhaustive]
2504/// Stream information.
2505pub struct StreamInfo {
2506    /// Stream name.
2507    pub name: StreamName,
2508    /// Creation time.
2509    pub created_at: S2DateTime,
2510    /// Deletion time if the stream is being deleted.
2511    pub deleted_at: Option<S2DateTime>,
2512}
2513
2514impl From<api::stream::StreamInfo> for StreamInfo {
2515    fn from(value: api::stream::StreamInfo) -> Self {
2516        Self {
2517            name: value.name,
2518            created_at: value.created_at.into(),
2519            deleted_at: value.deleted_at.map(Into::into),
2520        }
2521    }
2522}
2523
2524#[derive(Debug, Clone)]
2525#[non_exhaustive]
2526/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2527pub struct CreateStreamInput {
2528    /// Stream name.
2529    pub name: StreamName,
2530    /// Configuration for the stream.
2531    ///
2532    /// See [`StreamConfig`] for defaults.
2533    pub config: Option<StreamConfig>,
2534    /// Idempotency token for the operation.
2535    ///
2536    /// When a request is retried, the same token is reused,
2537    /// causing the server to return the original response instead of
2538    /// performing the operation again.
2539    ///
2540    /// Defaults to a random UUID.
2541    pub idempotency_token: String,
2542}
2543
2544impl CreateStreamInput {
2545    /// Create a new [`CreateStreamInput`] with the given stream name.
2546    pub fn new(name: StreamName) -> Self {
2547        Self {
2548            name,
2549            config: None,
2550            idempotency_token: idempotency_token(),
2551        }
2552    }
2553
2554    /// Set the configuration for the stream.
2555    pub fn with_config(self, config: StreamConfig) -> Self {
2556        Self {
2557            config: Some(config),
2558            ..self
2559        }
2560    }
2561
2562    /// Set the idempotency token for the operation.
2563    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2564        Self {
2565            idempotency_token: idempotency_token.into(),
2566            ..self
2567        }
2568    }
2569}
2570
2571impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2572    fn from(value: CreateStreamInput) -> Self {
2573        (
2574            api::stream::CreateStreamRequest {
2575                stream: value.name,
2576                config: value.config.map(Into::into),
2577            },
2578            value.idempotency_token,
2579        )
2580    }
2581}
2582
2583#[derive(Debug, Clone)]
2584#[non_exhaustive]
2585/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2586pub struct DeleteStreamInput {
2587    /// Stream name.
2588    pub name: StreamName,
2589    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2590    pub ignore_not_found: bool,
2591}
2592
2593impl DeleteStreamInput {
2594    /// Create a new [`DeleteStreamInput`] with the given stream name.
2595    pub fn new(name: StreamName) -> Self {
2596        Self {
2597            name,
2598            ignore_not_found: false,
2599        }
2600    }
2601
2602    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2603    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2604        Self {
2605            ignore_not_found,
2606            ..self
2607        }
2608    }
2609}
2610
2611#[derive(Debug, Clone)]
2612#[non_exhaustive]
2613/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2614pub struct ReconfigureStreamInput {
2615    /// Stream name.
2616    pub name: StreamName,
2617    /// Reconfiguration for [`StreamConfig`].
2618    pub config: StreamReconfiguration,
2619}
2620
2621impl ReconfigureStreamInput {
2622    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2623    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2624        Self { name, config }
2625    }
2626}
2627
2628#[derive(Debug, Clone, PartialEq, Eq)]
2629/// Token for fencing appends to a stream.
2630///
2631/// **Note:** It must not exceed 36 bytes in length.
2632///
2633/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2634pub struct FencingToken(String);
2635
2636impl FencingToken {
2637    /// Generate a random alphanumeric fencing token of `n` bytes.
2638    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2639        rand::rng()
2640            .sample_iter(&rand::distr::Alphanumeric)
2641            .take(n)
2642            .map(char::from)
2643            .collect::<String>()
2644            .parse()
2645    }
2646}
2647
2648impl FromStr for FencingToken {
2649    type Err = ValidationError;
2650
2651    fn from_str(s: &str) -> Result<Self, Self::Err> {
2652        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2653            return Err(ValidationError(format!(
2654                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2655            )));
2656        }
2657        Ok(FencingToken(s.to_string()))
2658    }
2659}
2660
2661impl std::fmt::Display for FencingToken {
2662    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2663        write!(f, "{}", self.0)
2664    }
2665}
2666
2667impl Deref for FencingToken {
2668    type Target = str;
2669
2670    fn deref(&self) -> &Self::Target {
2671        &self.0
2672    }
2673}
2674
2675#[derive(Debug, Clone, Copy, PartialEq)]
2676#[non_exhaustive]
2677/// A position in a stream.
2678pub struct StreamPosition {
2679    /// Sequence number assigned by the service.
2680    pub seq_num: u64,
2681    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2682    /// User-specified timestamps are passed through as-is.
2683    pub timestamp: u64,
2684}
2685
2686impl std::fmt::Display for StreamPosition {
2687    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2688        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2689    }
2690}
2691
2692impl From<api::stream::proto::StreamPosition> for StreamPosition {
2693    fn from(value: api::stream::proto::StreamPosition) -> Self {
2694        Self {
2695            seq_num: value.seq_num,
2696            timestamp: value.timestamp,
2697        }
2698    }
2699}
2700
2701impl From<api::stream::StreamPosition> for StreamPosition {
2702    fn from(value: api::stream::StreamPosition) -> Self {
2703        Self {
2704            seq_num: value.seq_num,
2705            timestamp: value.timestamp,
2706        }
2707    }
2708}
2709
2710#[derive(Debug, Clone, PartialEq)]
2711#[non_exhaustive]
2712/// A name-value pair.
2713pub struct Header {
2714    /// Name.
2715    pub name: Bytes,
2716    /// Value.
2717    pub value: Bytes,
2718}
2719
2720impl Header {
2721    /// Create a new [`Header`] with the given name and value.
2722    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2723        Self {
2724            name: name.into(),
2725            value: value.into(),
2726        }
2727    }
2728}
2729
2730impl From<Header> for api::stream::proto::Header {
2731    fn from(value: Header) -> Self {
2732        Self {
2733            name: value.name,
2734            value: value.value,
2735        }
2736    }
2737}
2738
2739impl From<api::stream::proto::Header> for Header {
2740    fn from(value: api::stream::proto::Header) -> Self {
2741        Self {
2742            name: value.name,
2743            value: value.value,
2744        }
2745    }
2746}
2747
2748#[derive(Debug, Clone, PartialEq)]
2749/// A record to append.
2750pub struct AppendRecord {
2751    body: Bytes,
2752    headers: Vec<Header>,
2753    timestamp: Option<u64>,
2754}
2755
2756impl AppendRecord {
2757    fn validate(self) -> Result<Self, ValidationError> {
2758        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2759            Err(ValidationError(format!(
2760                "metered_bytes: {} exceeds {}",
2761                self.metered_bytes(),
2762                RECORD_BATCH_MAX.bytes
2763            )))
2764        } else {
2765            Ok(self)
2766        }
2767    }
2768
2769    /// Create a new [`AppendRecord`] with the given record body.
2770    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2771        let record = Self {
2772            body: body.into(),
2773            headers: Vec::default(),
2774            timestamp: None,
2775        };
2776        record.validate()
2777    }
2778
2779    /// Set the headers for this record.
2780    pub fn with_headers(
2781        self,
2782        headers: impl IntoIterator<Item = Header>,
2783    ) -> Result<Self, ValidationError> {
2784        let record = Self {
2785            headers: headers.into_iter().collect(),
2786            ..self
2787        };
2788        record.validate()
2789    }
2790
2791    /// Set the timestamp for this record.
2792    ///
2793    /// Precise semantics depend on [`StreamConfig::timestamping`].
2794    pub fn with_timestamp(self, timestamp: u64) -> Self {
2795        Self {
2796            timestamp: Some(timestamp),
2797            ..self
2798        }
2799    }
2800}
2801
2802impl From<AppendRecord> for api::stream::proto::AppendRecord {
2803    fn from(value: AppendRecord) -> Self {
2804        Self {
2805            timestamp: value.timestamp,
2806            headers: value.headers.into_iter().map(Into::into).collect(),
2807            body: value.body,
2808        }
2809    }
2810}
2811
2812/// Metered byte size calculation.
2813///
2814/// Formula for a record:
2815/// ```text
2816/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2817/// ```
2818pub trait MeteredBytes {
2819    /// Returns the metered byte size.
2820    fn metered_bytes(&self) -> usize;
2821}
2822
2823macro_rules! metered_bytes_impl {
2824    ($ty:ty) => {
2825        impl MeteredBytes for $ty {
2826            fn metered_bytes(&self) -> usize {
2827                8 + (2 * self.headers.len())
2828                    + self
2829                        .headers
2830                        .iter()
2831                        .map(|h| h.name.len() + h.value.len())
2832                        .sum::<usize>()
2833                    + self.body.len()
2834            }
2835        }
2836    };
2837}
2838
2839metered_bytes_impl!(AppendRecord);
2840
2841#[derive(Debug, Clone)]
2842/// A batch of records to append atomically.
2843///
2844/// **Note:** It must contain at least `1` record and no more than `1000`.
2845/// The total size of the batch must not exceed `1MiB` in metered bytes.
2846///
2847/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
2848/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
2849/// that takes care of the abovementioned constraints.
2850pub struct AppendRecordBatch {
2851    records: Vec<AppendRecord>,
2852    metered_bytes: usize,
2853}
2854
2855impl AppendRecordBatch {
2856    pub(crate) fn with_capacity(capacity: usize) -> Self {
2857        Self {
2858            records: Vec::with_capacity(capacity),
2859            metered_bytes: 0,
2860        }
2861    }
2862
2863    pub(crate) fn push(&mut self, record: AppendRecord) {
2864        self.metered_bytes += record.metered_bytes();
2865        self.records.push(record);
2866    }
2867
2868    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
2869    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2870    where
2871        I: IntoIterator<Item = AppendRecord>,
2872    {
2873        let mut records = Vec::new();
2874        let mut metered_bytes = 0;
2875
2876        for record in iter {
2877            metered_bytes += record.metered_bytes();
2878            records.push(record);
2879
2880            if metered_bytes > RECORD_BATCH_MAX.bytes {
2881                return Err(ValidationError(format!(
2882                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
2883                    RECORD_BATCH_MAX.bytes
2884                )));
2885            }
2886
2887            if records.len() > RECORD_BATCH_MAX.count {
2888                return Err(ValidationError(format!(
2889                    "number of records in the batch exceeds {}",
2890                    RECORD_BATCH_MAX.count
2891                )));
2892            }
2893        }
2894
2895        if records.is_empty() {
2896            return Err(ValidationError("batch is empty".into()));
2897        }
2898
2899        Ok(Self {
2900            records,
2901            metered_bytes,
2902        })
2903    }
2904}
2905
2906impl Deref for AppendRecordBatch {
2907    type Target = [AppendRecord];
2908
2909    fn deref(&self) -> &Self::Target {
2910        &self.records
2911    }
2912}
2913
2914impl MeteredBytes for AppendRecordBatch {
2915    fn metered_bytes(&self) -> usize {
2916        self.metered_bytes
2917    }
2918}
2919
2920#[derive(Debug, Clone)]
2921/// Command to signal an operation.
2922pub enum Command {
2923    /// Fence operation.
2924    Fence {
2925        /// Fencing token.
2926        fencing_token: FencingToken,
2927    },
2928    /// Trim operation.
2929    Trim {
2930        /// Trim point.
2931        trim_point: u64,
2932    },
2933}
2934
2935#[derive(Debug, Clone)]
2936#[non_exhaustive]
2937/// Command record for signaling operations to the service.
2938///
2939/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
2940pub struct CommandRecord {
2941    /// Command to signal an operation.
2942    pub command: Command,
2943    /// Timestamp for this record.
2944    pub timestamp: Option<u64>,
2945}
2946
2947impl CommandRecord {
2948    const FENCE: &[u8] = b"fence";
2949    const TRIM: &[u8] = b"trim";
2950
2951    /// Create a fence command record with the given fencing token.
2952    ///
2953    /// Fencing is strongly consistent, and subsequent appends that specify a
2954    /// fencing token will fail if it does not match.
2955    pub fn fence(fencing_token: FencingToken) -> Self {
2956        Self {
2957            command: Command::Fence { fencing_token },
2958            timestamp: None,
2959        }
2960    }
2961
2962    /// Create a trim command record with the given trim point.
2963    ///
2964    /// Trim point is the desired earliest sequence number for the stream.
2965    ///
2966    /// Trimming is eventually consistent, and trimmed records may be visible
2967    /// for a brief period.
2968    pub fn trim(trim_point: u64) -> Self {
2969        Self {
2970            command: Command::Trim { trim_point },
2971            timestamp: None,
2972        }
2973    }
2974
2975    /// Set the timestamp for this record.
2976    pub fn with_timestamp(self, timestamp: u64) -> Self {
2977        Self {
2978            timestamp: Some(timestamp),
2979            ..self
2980        }
2981    }
2982}
2983
2984impl From<CommandRecord> for AppendRecord {
2985    fn from(value: CommandRecord) -> Self {
2986        let (header_value, body) = match value.command {
2987            Command::Fence { fencing_token } => (
2988                CommandRecord::FENCE,
2989                Bytes::copy_from_slice(fencing_token.as_bytes()),
2990            ),
2991            Command::Trim { trim_point } => (
2992                CommandRecord::TRIM,
2993                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
2994            ),
2995        };
2996        Self {
2997            body,
2998            headers: vec![Header::new("", header_value)],
2999            timestamp: value.timestamp,
3000        }
3001    }
3002}
3003
3004#[derive(Debug, Clone)]
3005#[non_exhaustive]
3006/// Input for [`append`](crate::S2Stream::append) operation and
3007/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3008pub struct AppendInput {
3009    /// Batch of records to append atomically.
3010    pub records: AppendRecordBatch,
3011    /// Expected sequence number for the first record in the batch.
3012    ///
3013    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3014    pub match_seq_num: Option<u64>,
3015    /// Fencing token to match against the stream's current fencing token.
3016    ///
3017    /// If unspecified, no matching is performed. If specified and mismatched,
3018    /// the append fails. A stream defaults to `""` as its fencing token.
3019    pub fencing_token: Option<FencingToken>,
3020}
3021
3022impl AppendInput {
3023    /// Create a new [`AppendInput`] with the given batch of records.
3024    pub fn new(records: AppendRecordBatch) -> Self {
3025        Self {
3026            records,
3027            match_seq_num: None,
3028            fencing_token: None,
3029        }
3030    }
3031
3032    /// Set the expected sequence number for the first record in the batch.
3033    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3034        Self {
3035            match_seq_num: Some(match_seq_num),
3036            ..self
3037        }
3038    }
3039
3040    /// Set the fencing token to match against the stream's current fencing token.
3041    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3042        Self {
3043            fencing_token: Some(fencing_token),
3044            ..self
3045        }
3046    }
3047}
3048
3049impl From<AppendInput> for api::stream::proto::AppendInput {
3050    fn from(value: AppendInput) -> Self {
3051        Self {
3052            records: value.records.iter().cloned().map(Into::into).collect(),
3053            match_seq_num: value.match_seq_num,
3054            fencing_token: value.fencing_token.map(|t| t.to_string()),
3055        }
3056    }
3057}
3058
3059#[derive(Debug, Clone, PartialEq)]
3060#[non_exhaustive]
3061/// Acknowledgement for an [`AppendInput`].
3062pub struct AppendAck {
3063    /// Sequence number and timestamp of the first record that was appended.
3064    pub start: StreamPosition,
3065    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3066    /// that was appended.
3067    ///
3068    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3069    /// appended.
3070    pub end: StreamPosition,
3071    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3072    /// the last record on the stream.
3073    ///
3074    /// This can be greater than the `end` position in case of concurrent appends.
3075    pub tail: StreamPosition,
3076}
3077
3078impl From<api::stream::proto::AppendAck> for AppendAck {
3079    fn from(value: api::stream::proto::AppendAck) -> Self {
3080        Self {
3081            start: value.start.unwrap_or_default().into(),
3082            end: value.end.unwrap_or_default().into(),
3083            tail: value.tail.unwrap_or_default().into(),
3084        }
3085    }
3086}
3087
3088#[derive(Debug, Clone, Copy)]
3089/// Starting position for reading from a stream.
3090pub enum ReadFrom {
3091    /// Read from this sequence number.
3092    SeqNum(u64),
3093    /// Read from this timestamp.
3094    Timestamp(u64),
3095    /// Read from N records before the tail.
3096    TailOffset(u64),
3097}
3098
3099impl Default for ReadFrom {
3100    fn default() -> Self {
3101        Self::SeqNum(0)
3102    }
3103}
3104
3105#[derive(Debug, Default, Clone)]
3106#[non_exhaustive]
3107/// Where to start reading.
3108pub struct ReadStart {
3109    /// Starting position.
3110    ///
3111    /// Defaults to reading from sequence number `0`.
3112    pub from: ReadFrom,
3113    /// Whether to start from tail if the requested starting position is beyond it.
3114    ///
3115    /// Defaults to `false` (errors if position is beyond tail).
3116    pub clamp_to_tail: bool,
3117}
3118
3119impl ReadStart {
3120    /// Create a new [`ReadStart`] with default values.
3121    pub fn new() -> Self {
3122        Self::default()
3123    }
3124
3125    /// Set the starting position.
3126    pub fn with_from(self, from: ReadFrom) -> Self {
3127        Self { from, ..self }
3128    }
3129
3130    /// Set whether to start from tail if the requested starting position is beyond it.
3131    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3132        Self {
3133            clamp_to_tail,
3134            ..self
3135        }
3136    }
3137}
3138
3139impl From<ReadStart> for api::stream::ReadStart {
3140    fn from(value: ReadStart) -> Self {
3141        let (seq_num, timestamp, tail_offset) = match value.from {
3142            ReadFrom::SeqNum(n) => (Some(n), None, None),
3143            ReadFrom::Timestamp(t) => (None, Some(t), None),
3144            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3145        };
3146        Self {
3147            seq_num,
3148            timestamp,
3149            tail_offset,
3150            clamp: if value.clamp_to_tail {
3151                Some(true)
3152            } else {
3153                None
3154            },
3155        }
3156    }
3157}
3158
3159#[derive(Debug, Clone, Default)]
3160#[non_exhaustive]
3161/// Limits on how much to read.
3162pub struct ReadLimits {
3163    /// Limit on number of records.
3164    ///
3165    /// Defaults to `1000` for non-streaming read.
3166    pub count: Option<usize>,
3167    /// Limit on total metered bytes of records.
3168    ///
3169    /// Defaults to `1MiB` for non-streaming read.
3170    pub bytes: Option<usize>,
3171}
3172
3173impl ReadLimits {
3174    /// Create a new [`ReadLimits`] with default values.
3175    pub fn new() -> Self {
3176        Self::default()
3177    }
3178
3179    /// Set the limit on number of records.
3180    pub fn with_count(self, count: usize) -> Self {
3181        Self {
3182            count: Some(count),
3183            ..self
3184        }
3185    }
3186
3187    /// Set the limit on total metered bytes of records.
3188    pub fn with_bytes(self, bytes: usize) -> Self {
3189        Self {
3190            bytes: Some(bytes),
3191            ..self
3192        }
3193    }
3194}
3195
3196#[derive(Debug, Clone, Default)]
3197#[non_exhaustive]
3198/// When to stop reading.
3199pub struct ReadStop {
3200    /// Limits on how much to read.
3201    pub limits: ReadLimits,
3202    /// Timestamp at which to stop (exclusive).
3203    pub until: Option<RangeTo<u64>>,
3204    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3205    /// seconds for [`read`](crate::S2Stream::read).
3206    ///
3207    /// Defaults to:
3208    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3209    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3210    ///   is specified.
3211    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3212    ///   `until` is specified.
3213    pub wait: Option<u32>,
3214}
3215
3216impl ReadStop {
3217    /// Create a new [`ReadStop`] with default values.
3218    pub fn new() -> Self {
3219        Self::default()
3220    }
3221
3222    /// Set the limits on how much to read.
3223    pub fn with_limits(self, limits: ReadLimits) -> Self {
3224        Self { limits, ..self }
3225    }
3226
3227    /// Set the timestamp at which to stop (exclusive).
3228    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3229        Self {
3230            until: Some(until),
3231            ..self
3232        }
3233    }
3234
3235    /// Set the duration in seconds to wait for new records before stopping.
3236    pub fn with_wait(self, wait: u32) -> Self {
3237        Self {
3238            wait: Some(wait),
3239            ..self
3240        }
3241    }
3242}
3243
3244impl From<ReadStop> for api::stream::ReadEnd {
3245    fn from(value: ReadStop) -> Self {
3246        Self {
3247            count: value.limits.count,
3248            bytes: value.limits.bytes,
3249            until: value.until.map(|r| r.end),
3250            wait: value.wait,
3251        }
3252    }
3253}
3254
3255#[derive(Debug, Clone, Default)]
3256#[non_exhaustive]
3257/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3258/// operations.
3259pub struct ReadInput {
3260    /// Where to start reading.
3261    pub start: ReadStart,
3262    /// When to stop reading.
3263    pub stop: ReadStop,
3264    /// Whether to filter out command records from the stream when reading.
3265    ///
3266    /// Defaults to `false`.
3267    pub ignore_command_records: bool,
3268}
3269
3270impl ReadInput {
3271    /// Create a new [`ReadInput`] with default values.
3272    pub fn new() -> Self {
3273        Self::default()
3274    }
3275
3276    /// Set where to start reading.
3277    pub fn with_start(self, start: ReadStart) -> Self {
3278        Self { start, ..self }
3279    }
3280
3281    /// Set when to stop reading.
3282    pub fn with_stop(self, stop: ReadStop) -> Self {
3283        Self { stop, ..self }
3284    }
3285
3286    /// Set whether to filter out command records from the stream when reading.
3287    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3288        Self {
3289            ignore_command_records,
3290            ..self
3291        }
3292    }
3293}
3294
3295#[derive(Debug, Clone)]
3296#[non_exhaustive]
3297/// Record that is durably sequenced on a stream.
3298pub struct SequencedRecord {
3299    /// Sequence number assigned to this record.
3300    pub seq_num: u64,
3301    /// Body of this record.
3302    pub body: Bytes,
3303    /// Headers for this record.
3304    pub headers: Vec<Header>,
3305    /// Timestamp for this record.
3306    pub timestamp: u64,
3307}
3308
3309impl SequencedRecord {
3310    /// Whether this is a command record.
3311    pub fn is_command_record(&self) -> bool {
3312        self.headers.len() == 1 && *self.headers[0].name == *b""
3313    }
3314}
3315
3316impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3317    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3318        Self {
3319            seq_num: value.seq_num,
3320            body: value.body,
3321            headers: value.headers.into_iter().map(Into::into).collect(),
3322            timestamp: value.timestamp,
3323        }
3324    }
3325}
3326
3327metered_bytes_impl!(SequencedRecord);
3328
3329#[derive(Debug, Clone)]
3330#[non_exhaustive]
3331/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3332/// [`read_session`](crate::S2Stream::read_session).
3333pub struct ReadBatch {
3334    /// Records that are durably sequenced on the stream.
3335    ///
3336    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3337    /// - the [`stop condition`](ReadInput::stop) was already met, or
3338    /// - all records in the batch were command records and
3339    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3340    pub records: Vec<SequencedRecord>,
3341    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3342    /// the last record.
3343    ///
3344    /// It will only be present when reading recent records.
3345    pub tail: Option<StreamPosition>,
3346}
3347
3348impl ReadBatch {
3349    pub(crate) fn from_api(
3350        batch: api::stream::proto::ReadBatch,
3351        ignore_command_records: bool,
3352    ) -> Self {
3353        Self {
3354            records: batch
3355                .records
3356                .into_iter()
3357                .map(Into::into)
3358                .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3359                .collect(),
3360            tail: batch.tail.map(Into::into),
3361        }
3362    }
3363}
3364
3365/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3366pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3367
3368#[derive(Debug, Clone, thiserror::Error)]
3369/// Why an append condition check failed.
3370pub enum AppendConditionFailed {
3371    #[error("fencing token mismatch, expected: {0}")]
3372    /// Fencing token did not match. Contains the expected fencing token.
3373    FencingTokenMismatch(FencingToken),
3374    #[error("sequence number mismatch, expected: {0}")]
3375    /// Sequence number did not match. Contains the expected sequence number.
3376    SeqNumMismatch(u64),
3377}
3378
3379impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3380    fn from(value: api::stream::AppendConditionFailed) -> Self {
3381        match value {
3382            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3383                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3384            }
3385            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3386                AppendConditionFailed::SeqNumMismatch(seq)
3387            }
3388        }
3389    }
3390}
3391
3392#[derive(Debug, Clone, thiserror::Error)]
3393/// Errors from S2 operations.
3394pub enum S2Error {
3395    #[error("{0}")]
3396    /// Client-side error.
3397    Client(String),
3398    #[error(transparent)]
3399    /// Validation error.
3400    Validation(#[from] ValidationError),
3401    #[error("{0}")]
3402    /// Append condition check failed. Contains the failure reason.
3403    AppendConditionFailed(AppendConditionFailed),
3404    #[error("read from an unwritten position. current tail: {0}")]
3405    /// Read from an unwritten position. Contains the current tail.
3406    ReadUnwritten(StreamPosition),
3407    #[error("{0}")]
3408    /// Other server-side error.
3409    Server(ErrorResponse),
3410}
3411
3412impl From<ApiError> for S2Error {
3413    fn from(err: ApiError) -> Self {
3414        match err {
3415            ApiError::ReadUnwritten(tail_response) => {
3416                Self::ReadUnwritten(tail_response.tail.into())
3417            }
3418            ApiError::AppendConditionFailed(condition_failed) => {
3419                Self::AppendConditionFailed(condition_failed.into())
3420            }
3421            ApiError::Server(_, response) => Self::Server(response.into()),
3422            other => Self::Client(other.to_string()),
3423        }
3424    }
3425}
3426
3427#[derive(Debug, Clone, thiserror::Error)]
3428#[error("{code}: {message}")]
3429#[non_exhaustive]
3430/// Error response from S2 server.
3431pub struct ErrorResponse {
3432    /// Error code.
3433    pub code: String,
3434    /// Error message.
3435    pub message: String,
3436}
3437
3438impl From<ApiErrorResponse> for ErrorResponse {
3439    fn from(response: ApiErrorResponse) -> Self {
3440        Self {
3441            code: response.code,
3442            message: response.message,
3443        }
3444    }
3445}
3446
3447fn idempotency_token() -> String {
3448    uuid::Uuid::new_v4().simple().to_string()
3449}