Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Stream name.
42///
43/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
44pub use s2_common::types::stream::StreamName;
45/// See [`ListStreamsInput::prefix`].
46pub use s2_common::types::stream::StreamNamePrefix;
47/// See [`ListStreamsInput::start_after`].
48pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57/// An RFC 3339 datetime.
58///
59/// It can be created in either of the following ways:
60/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
61/// - Convert from [`time::OffsetDateTime`] using [`From`]/[`Into`].
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66    fn from(dt: time::OffsetDateTime) -> Self {
67        Self(dt)
68    }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72    fn from(dt: S2DateTime) -> Self {
73        dt.0
74    }
75}
76
77impl FromStr for S2DateTime {
78    type Err = ValidationError;
79
80    fn from_str(s: &str) -> Result<Self, Self::Err> {
81        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82            .map(Self)
83            .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84    }
85}
86
87impl fmt::Display for S2DateTime {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        write!(
90            f,
91            "{}",
92            self.0
93                .format(&time::format_description::well_known::Rfc3339)
94                .expect("RFC3339 formatting should not fail for S2DateTime")
95        )
96    }
97}
98
99/// Authority for connecting to an S2 basin.
100#[derive(Debug, Clone, PartialEq)]
101pub(crate) enum BasinAuthority {
102    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
103    ParentZone(Authority),
104    /// Direct cell authority. Basin is expected to be hosted by this cell.
105    Direct(Authority),
106}
107
108/// Account endpoint.
109#[derive(Debug, Clone)]
110pub struct AccountEndpoint {
111    scheme: Scheme,
112    authority: Authority,
113}
114
115impl AccountEndpoint {
116    /// Create a new [`AccountEndpoint`] with the given endpoint.
117    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
118        endpoint.parse()
119    }
120}
121
122impl FromStr for AccountEndpoint {
123    type Err = ValidationError;
124
125    fn from_str(s: &str) -> Result<Self, Self::Err> {
126        let (scheme, authority) = match s.find("://") {
127            Some(idx) => {
128                let scheme: Scheme = s[..idx]
129                    .parse()
130                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
131                (scheme, &s[idx + 3..])
132            }
133            None => (Scheme::HTTPS, s),
134        };
135        Ok(Self {
136            scheme,
137            authority: authority
138                .parse()
139                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
140        })
141    }
142}
143
144/// Basin endpoint.
145#[derive(Debug, Clone)]
146pub struct BasinEndpoint {
147    scheme: Scheme,
148    authority: BasinAuthority,
149}
150
151impl BasinEndpoint {
152    /// Create a new [`BasinEndpoint`] with the given endpoint.
153    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
154        endpoint.parse()
155    }
156}
157
158impl FromStr for BasinEndpoint {
159    type Err = ValidationError;
160
161    fn from_str(s: &str) -> Result<Self, Self::Err> {
162        let (scheme, authority) = match s.find("://") {
163            Some(idx) => {
164                let scheme: Scheme = s[..idx]
165                    .parse()
166                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
167                (scheme, &s[idx + 3..])
168            }
169            None => (Scheme::HTTPS, s),
170        };
171        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
172            BasinAuthority::ParentZone(
173                authority
174                    .parse()
175                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
176            )
177        } else {
178            BasinAuthority::Direct(
179                authority
180                    .parse()
181                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
182            )
183        };
184        Ok(Self { scheme, authority })
185    }
186}
187
188#[derive(Debug, Clone)]
189#[non_exhaustive]
190/// Endpoints for the S2 environment.
191pub struct S2Endpoints {
192    pub(crate) scheme: Scheme,
193    pub(crate) account_authority: Authority,
194    pub(crate) basin_authority: BasinAuthority,
195}
196
197impl S2Endpoints {
198    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
199    pub fn new(
200        account_endpoint: AccountEndpoint,
201        basin_endpoint: BasinEndpoint,
202    ) -> Result<Self, ValidationError> {
203        if account_endpoint.scheme != basin_endpoint.scheme {
204            return Err("account and basin endpoints must have the same scheme".into());
205        }
206        Ok(Self {
207            scheme: account_endpoint.scheme,
208            account_authority: account_endpoint.authority,
209            basin_authority: basin_endpoint.authority,
210        })
211    }
212
213    /// Create a new [`S2Endpoints`] from environment variables.
214    ///
215    /// The following environment variables are expected to be set:
216    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
217    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
218    pub fn from_env() -> Result<Self, ValidationError> {
219        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
220            Ok(endpoint) => endpoint.parse()?,
221            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
222            Err(VarError::NotUnicode(_)) => {
223                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
224            }
225        };
226
227        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
228            Ok(endpoint) => endpoint.parse()?,
229            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
230            Err(VarError::NotUnicode(_)) => {
231                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
232            }
233        };
234
235        if account_endpoint.scheme != basin_endpoint.scheme {
236            return Err(
237                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
238            );
239        }
240
241        Ok(Self {
242            scheme: account_endpoint.scheme,
243            account_authority: account_endpoint.authority,
244            basin_authority: basin_endpoint.authority,
245        })
246    }
247
248    pub(crate) fn for_aws() -> Self {
249        Self {
250            scheme: Scheme::HTTPS,
251            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
252            basin_authority: BasinAuthority::ParentZone(
253                "b.aws.s2.dev".try_into().expect("valid authority"),
254            ),
255        }
256    }
257}
258
259#[derive(Debug, Clone, Copy)]
260/// Compression algorithm for request and response bodies.
261pub enum Compression {
262    /// No compression.
263    None,
264    /// Gzip compression.
265    Gzip,
266    /// Zstd compression.
267    Zstd,
268}
269
270impl From<Compression> for CompressionAlgorithm {
271    fn from(value: Compression) -> Self {
272        match value {
273            Compression::None => CompressionAlgorithm::None,
274            Compression::Gzip => CompressionAlgorithm::Gzip,
275            Compression::Zstd => CompressionAlgorithm::Zstd,
276        }
277    }
278}
279
280#[derive(Debug, Clone, Copy, PartialEq)]
281#[non_exhaustive]
282/// Retry policy for [`append`](crate::S2Stream::append) and
283/// [`append_session`](crate::S2Stream::append_session) operations.
284pub enum AppendRetryPolicy {
285    /// Retry all appends. Use when duplicate records on the stream are acceptable.
286    All,
287    /// Only retry appends that include [`match_seq_num`](AppendInput::match_seq_num).
288    NoSideEffects,
289}
290
291impl AppendRetryPolicy {
292    pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
293        match self {
294            Self::All => true,
295            Self::NoSideEffects => input.match_seq_num.is_some(),
296        }
297    }
298}
299
300#[derive(Debug, Clone)]
301#[non_exhaustive]
302/// Configuration for retrying requests in case of transient failures.
303///
304/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
305/// ```text
306/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
307///     jitter = rand[0, base_delay]
308///     delay  = base_delay + jitter
309/// ````
310pub struct RetryConfig {
311    /// Total number of attempts including the initial try. A value of `1` means no retries.
312    ///
313    /// Defaults to `3`.
314    pub max_attempts: NonZeroU32,
315    /// Minimum base delay for retries.
316    ///
317    /// Defaults to `100ms`.
318    pub min_base_delay: Duration,
319    /// Maximum base delay for retries.
320    ///
321    /// Defaults to `1s`.
322    pub max_base_delay: Duration,
323    /// Retry policy for [`append`](crate::S2Stream::append) and
324    /// [`append_session`](crate::S2Stream::append_session) operations.
325    ///
326    /// Defaults to `All`.
327    pub append_retry_policy: AppendRetryPolicy,
328}
329
330impl Default for RetryConfig {
331    fn default() -> Self {
332        Self {
333            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
334            min_base_delay: Duration::from_millis(100),
335            max_base_delay: Duration::from_secs(1),
336            append_retry_policy: AppendRetryPolicy::All,
337        }
338    }
339}
340
341impl RetryConfig {
342    /// Create a new [`RetryConfig`] with default settings.
343    pub fn new() -> Self {
344        Self::default()
345    }
346
347    pub(crate) fn max_retries(&self) -> u32 {
348        self.max_attempts.get() - 1
349    }
350
351    /// Set the total number of attempts including the initial try.
352    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
353        Self {
354            max_attempts,
355            ..self
356        }
357    }
358
359    /// Set the minimum base delay for retries.
360    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
361        Self {
362            min_base_delay,
363            ..self
364        }
365    }
366
367    /// Set the maximum base delay for retries.
368    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
369        Self {
370            max_base_delay,
371            ..self
372        }
373    }
374
375    /// Set the retry policy for [`append`](crate::S2Stream::append) and
376    /// [`append_session`](crate::S2Stream::append_session) operations.
377    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
378        Self {
379            append_retry_policy,
380            ..self
381        }
382    }
383}
384
385#[derive(Debug, Clone)]
386#[non_exhaustive]
387/// Configuration for [`S2`](crate::S2).
388pub struct S2Config {
389    pub(crate) access_token: SecretString,
390    pub(crate) endpoints: S2Endpoints,
391    pub(crate) connection_timeout: Duration,
392    pub(crate) request_timeout: Duration,
393    pub(crate) retry: RetryConfig,
394    pub(crate) compression: Compression,
395    pub(crate) user_agent: HeaderValue,
396    pub(crate) insecure_skip_cert_verification: bool,
397}
398
399impl S2Config {
400    /// Create a new [`S2Config`] with the given access token and default settings.
401    pub fn new(access_token: impl Into<String>) -> Self {
402        Self {
403            access_token: access_token.into().into(),
404            endpoints: S2Endpoints::for_aws(),
405            connection_timeout: Duration::from_secs(3),
406            request_timeout: Duration::from_secs(5),
407            retry: RetryConfig::new(),
408            compression: Compression::None,
409            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
410                .parse()
411                .expect("valid user agent"),
412            insecure_skip_cert_verification: false,
413        }
414    }
415
416    /// Set the S2 endpoints to connect to.
417    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
418        Self { endpoints, ..self }
419    }
420
421    /// Set the timeout for establishing a connection to the server.
422    ///
423    /// Defaults to `3s`.
424    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
425        Self {
426            connection_timeout,
427            ..self
428        }
429    }
430
431    /// Set the timeout for requests.
432    ///
433    /// Defaults to `5s`.
434    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
435        Self {
436            request_timeout,
437            ..self
438        }
439    }
440
441    /// Set the retry configuration for requests.
442    ///
443    /// See [`RetryConfig`] for defaults.
444    pub fn with_retry(self, retry: RetryConfig) -> Self {
445        Self { retry, ..self }
446    }
447
448    /// Set the compression algorithm for requests and responses.
449    ///
450    /// Defaults to no compression.
451    pub fn with_compression(self, compression: Compression) -> Self {
452        Self {
453            compression,
454            ..self
455        }
456    }
457
458    /// Skip TLS certificate verification (insecure).
459    ///
460    /// This is useful for connecting to endpoints with self-signed certificates
461    /// or certificates that don't match the hostname (similar to `curl -k`).
462    ///
463    /// # Warning
464    ///
465    /// This disables certificate verification and should only be used for
466    /// testing or development purposes. **Never use this in production.**
467    ///
468    /// Defaults to `false`.
469    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
470        Self {
471            insecure_skip_cert_verification: skip,
472            ..self
473        }
474    }
475
476    #[doc(hidden)]
477    #[cfg(feature = "_hidden")]
478    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
479        let user_agent = user_agent
480            .into()
481            .parse()
482            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
483        Ok(Self { user_agent, ..self })
484    }
485}
486
487#[derive(Debug, Default, Clone, PartialEq, Eq)]
488#[non_exhaustive]
489/// A page of values.
490pub struct Page<T> {
491    /// Values in this page.
492    pub values: Vec<T>,
493    /// Whether there are more pages.
494    pub has_more: bool,
495}
496
497impl<T> Page<T> {
498    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
499        Self {
500            values: values.into(),
501            has_more,
502        }
503    }
504}
505
506#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507/// Storage class for recent appends.
508pub enum StorageClass {
509    /// Standard storage class that offers append latencies under `500ms`.
510    Standard,
511    /// Express storage class that offers append latencies under `50ms`.
512    Express,
513}
514
515impl From<api::config::StorageClass> for StorageClass {
516    fn from(value: api::config::StorageClass) -> Self {
517        match value {
518            api::config::StorageClass::Standard => StorageClass::Standard,
519            api::config::StorageClass::Express => StorageClass::Express,
520        }
521    }
522}
523
524impl From<StorageClass> for api::config::StorageClass {
525    fn from(value: StorageClass) -> Self {
526        match value {
527            StorageClass::Standard => api::config::StorageClass::Standard,
528            StorageClass::Express => api::config::StorageClass::Express,
529        }
530    }
531}
532
533#[derive(Debug, Clone, Copy, PartialEq, Eq)]
534/// Retention policy for records in a stream.
535pub enum RetentionPolicy {
536    /// Age in seconds. Records older than this age are automatically trimmed.
537    Age(u64),
538    /// Records are retained indefinitely unless explicitly trimmed.
539    Infinite,
540}
541
542impl From<api::config::RetentionPolicy> for RetentionPolicy {
543    fn from(value: api::config::RetentionPolicy) -> Self {
544        match value {
545            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
546            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
547        }
548    }
549}
550
551impl From<RetentionPolicy> for api::config::RetentionPolicy {
552    fn from(value: RetentionPolicy) -> Self {
553        match value {
554            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
555            RetentionPolicy::Infinite => {
556                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
557            }
558        }
559    }
560}
561
562#[derive(Debug, Clone, Copy, PartialEq, Eq)]
563/// Timestamping mode for appends that influences how timestamps are handled.
564pub enum TimestampingMode {
565    /// Prefer client-specified timestamp if present otherwise use arrival time.
566    ClientPrefer,
567    /// Require a client-specified timestamp and reject the append if it is missing.
568    ClientRequire,
569    /// Use the arrival time and ignore any client-specified timestamp.
570    Arrival,
571}
572
573impl From<api::config::TimestampingMode> for TimestampingMode {
574    fn from(value: api::config::TimestampingMode) -> Self {
575        match value {
576            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
577            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
578            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
579        }
580    }
581}
582
583impl From<TimestampingMode> for api::config::TimestampingMode {
584    fn from(value: TimestampingMode) -> Self {
585        match value {
586            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
587            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
588            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
589        }
590    }
591}
592
593#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
594#[non_exhaustive]
595/// Configuration for timestamping behavior.
596pub struct TimestampingConfig {
597    /// Timestamping mode for appends that influences how timestamps are handled.
598    ///
599    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
600    pub mode: Option<TimestampingMode>,
601    /// Whether client-specified timestamps are allowed to exceed the arrival time.
602    ///
603    /// Defaults to `false` (client timestamps are capped at the arrival time).
604    pub uncapped: bool,
605}
606
607impl TimestampingConfig {
608    /// Create a new [`TimestampingConfig`] with default settings.
609    pub fn new() -> Self {
610        Self::default()
611    }
612
613    /// Set the timestamping mode for appends that influences how timestamps are handled.
614    pub fn with_mode(self, mode: TimestampingMode) -> Self {
615        Self {
616            mode: Some(mode),
617            ..self
618        }
619    }
620
621    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
622    pub fn with_uncapped(self, uncapped: bool) -> Self {
623        Self { uncapped, ..self }
624    }
625}
626
627impl From<api::config::TimestampingConfig> for TimestampingConfig {
628    fn from(value: api::config::TimestampingConfig) -> Self {
629        Self {
630            mode: value.mode.map(Into::into),
631            uncapped: value.uncapped.unwrap_or_default(),
632        }
633    }
634}
635
636impl From<TimestampingConfig> for api::config::TimestampingConfig {
637    fn from(value: TimestampingConfig) -> Self {
638        Self {
639            mode: value.mode.map(Into::into),
640            uncapped: Some(value.uncapped),
641        }
642    }
643}
644
645#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
646#[non_exhaustive]
647/// Configuration for automatically deleting a stream when it becomes empty.
648pub struct DeleteOnEmptyConfig {
649    /// Minimum age in seconds before an empty stream can be deleted.
650    ///
651    /// Defaults to `0` (disables automatic deletion).
652    pub min_age_secs: u64,
653}
654
655impl DeleteOnEmptyConfig {
656    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
657    pub fn new() -> Self {
658        Self::default()
659    }
660
661    /// Set the minimum age in seconds before an empty stream can be deleted.
662    pub fn with_min_age(self, min_age: Duration) -> Self {
663        Self {
664            min_age_secs: min_age.as_secs(),
665        }
666    }
667}
668
669impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
670    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
671        Self {
672            min_age_secs: value.min_age_secs,
673        }
674    }
675}
676
677impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
678    fn from(value: DeleteOnEmptyConfig) -> Self {
679        Self {
680            min_age_secs: value.min_age_secs,
681        }
682    }
683}
684
685#[derive(Debug, Clone, Default, PartialEq, Eq)]
686#[non_exhaustive]
687/// Configuration for a stream.
688pub struct StreamConfig {
689    /// Storage class for the stream.
690    ///
691    /// Defaults to [`Express`](StorageClass::Express).
692    pub storage_class: Option<StorageClass>,
693    /// Retention policy for records in the stream.
694    ///
695    /// Defaults to `7 days` of retention.
696    pub retention_policy: Option<RetentionPolicy>,
697    /// Configuration for timestamping behavior.
698    ///
699    /// See [`TimestampingConfig`] for defaults.
700    pub timestamping: Option<TimestampingConfig>,
701    /// Configuration for automatically deleting the stream when it becomes empty.
702    ///
703    /// See [`DeleteOnEmptyConfig`] for defaults.
704    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
705}
706
707impl StreamConfig {
708    /// Create a new [`StreamConfig`] with default settings.
709    pub fn new() -> Self {
710        Self::default()
711    }
712
713    /// Set the storage class for the stream.
714    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
715        Self {
716            storage_class: Some(storage_class),
717            ..self
718        }
719    }
720
721    /// Set the retention policy for records in the stream.
722    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
723        Self {
724            retention_policy: Some(retention_policy),
725            ..self
726        }
727    }
728
729    /// Set the configuration for timestamping behavior.
730    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
731        Self {
732            timestamping: Some(timestamping),
733            ..self
734        }
735    }
736
737    /// Set the configuration for automatically deleting the stream when it becomes empty.
738    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
739        Self {
740            delete_on_empty: Some(delete_on_empty),
741            ..self
742        }
743    }
744}
745
746impl From<api::config::StreamConfig> for StreamConfig {
747    fn from(value: api::config::StreamConfig) -> Self {
748        Self {
749            storage_class: value.storage_class.map(Into::into),
750            retention_policy: value.retention_policy.map(Into::into),
751            timestamping: value.timestamping.map(Into::into),
752            delete_on_empty: value.delete_on_empty.map(Into::into),
753        }
754    }
755}
756
757impl From<StreamConfig> for api::config::StreamConfig {
758    fn from(value: StreamConfig) -> Self {
759        Self {
760            storage_class: value.storage_class.map(Into::into),
761            retention_policy: value.retention_policy.map(Into::into),
762            timestamping: value.timestamping.map(Into::into),
763            delete_on_empty: value.delete_on_empty.map(Into::into),
764        }
765    }
766}
767
768#[derive(Debug, Clone, Default)]
769#[non_exhaustive]
770/// Configuration for a basin.
771pub struct BasinConfig {
772    /// Default configuration for all streams in the basin.
773    ///
774    /// See [`StreamConfig`] for defaults.
775    pub default_stream_config: Option<StreamConfig>,
776    /// Whether to create stream on append if it doesn't exist using default stream configuration.
777    ///
778    /// Defaults to `false`.
779    pub create_stream_on_append: bool,
780    /// Whether to create stream on read if it doesn't exist using default stream configuration.
781    ///
782    /// Defaults to `false`.
783    pub create_stream_on_read: bool,
784}
785
786impl BasinConfig {
787    /// Create a new [`BasinConfig`] with default settings.
788    pub fn new() -> Self {
789        Self::default()
790    }
791
792    /// Set the default configuration for all streams in the basin.
793    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
794        Self {
795            default_stream_config: Some(config),
796            ..self
797        }
798    }
799
800    /// Set whether to create stream on append if it doesn't exist using default stream
801    /// configuration.
802    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
803        Self {
804            create_stream_on_append,
805            ..self
806        }
807    }
808
809    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
810    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
811        Self {
812            create_stream_on_read,
813            ..self
814        }
815    }
816}
817
818impl From<api::config::BasinConfig> for BasinConfig {
819    fn from(value: api::config::BasinConfig) -> Self {
820        Self {
821            default_stream_config: value.default_stream_config.map(Into::into),
822            create_stream_on_append: value.create_stream_on_append,
823            create_stream_on_read: value.create_stream_on_read,
824        }
825    }
826}
827
828impl From<BasinConfig> for api::config::BasinConfig {
829    fn from(value: BasinConfig) -> Self {
830        Self {
831            default_stream_config: value.default_stream_config.map(Into::into),
832            create_stream_on_append: value.create_stream_on_append,
833            create_stream_on_read: value.create_stream_on_read,
834        }
835    }
836}
837
838#[derive(Debug, Clone, PartialEq, Eq)]
839/// Scope of a basin.
840pub enum BasinScope {
841    /// AWS `us-east-1` region.
842    AwsUsEast1,
843}
844
845impl From<api::basin::BasinScope> for BasinScope {
846    fn from(value: api::basin::BasinScope) -> Self {
847        match value {
848            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
849        }
850    }
851}
852
853impl From<BasinScope> for api::basin::BasinScope {
854    fn from(value: BasinScope) -> Self {
855        match value {
856            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
857        }
858    }
859}
860
861#[derive(Debug, Clone)]
862#[non_exhaustive]
863/// Input for [`create_basin`](crate::S2::create_basin) operation.
864pub struct CreateBasinInput {
865    /// Basin name.
866    pub name: BasinName,
867    /// Configuration for the basin.
868    ///
869    /// See [`BasinConfig`] for defaults.
870    pub config: Option<BasinConfig>,
871    /// Scope of the basin.
872    ///
873    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
874    pub scope: Option<BasinScope>,
875    /// Idempotency token for the operation.
876    ///
877    /// When a request is retried, the same token is reused,
878    /// causing the server to return the original response instead of
879    /// performing the operation again.
880    ///
881    /// Defaults to a random UUID.
882    pub idempotency_token: String,
883}
884
885impl CreateBasinInput {
886    /// Create a new [`CreateBasinInput`] with the given basin name.
887    pub fn new(name: BasinName) -> Self {
888        Self {
889            name,
890            config: None,
891            scope: None,
892            idempotency_token: idempotency_token(),
893        }
894    }
895
896    /// Set the configuration for the basin.
897    pub fn with_config(self, config: BasinConfig) -> Self {
898        Self {
899            config: Some(config),
900            ..self
901        }
902    }
903
904    /// Set the scope of the basin.
905    pub fn with_scope(self, scope: BasinScope) -> Self {
906        Self {
907            scope: Some(scope),
908            ..self
909        }
910    }
911
912    /// Set the idempotency token for the operation.
913    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
914        Self {
915            idempotency_token: idempotency_token.into(),
916            ..self
917        }
918    }
919}
920
921impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
922    fn from(value: CreateBasinInput) -> Self {
923        (
924            api::basin::CreateBasinRequest {
925                basin: value.name,
926                config: value.config.map(Into::into),
927                scope: value.scope.map(Into::into),
928            },
929            value.idempotency_token,
930        )
931    }
932}
933
934#[derive(Debug, Clone, Default)]
935#[non_exhaustive]
936/// Input for [`list_basins`](crate::S2::list_basins) operation.
937pub struct ListBasinsInput {
938    /// Filter basins whose names begin with this value.
939    ///
940    /// Defaults to `""`.
941    pub prefix: BasinNamePrefix,
942    /// Filter basins whose names are lexicographically greater than this value.
943    ///
944    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
945    ///
946    /// Defaults to `""`.
947    pub start_after: BasinNameStartAfter,
948    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
949    ///
950    /// Defaults to `1000`.
951    pub limit: Option<usize>,
952}
953
954impl ListBasinsInput {
955    /// Create a new [`ListBasinsInput`] with default values.
956    pub fn new() -> Self {
957        Self::default()
958    }
959
960    /// Set the prefix used to filter basins whose names begin with this value.
961    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
962        Self { prefix, ..self }
963    }
964
965    /// Set the value used to filter basins whose names are lexicographically greater than this
966    /// value.
967    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
968        Self {
969            start_after,
970            ..self
971        }
972    }
973
974    /// Set the limit on number of basins to return in a page.
975    pub fn with_limit(self, limit: usize) -> Self {
976        Self {
977            limit: Some(limit),
978            ..self
979        }
980    }
981}
982
983impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
984    fn from(value: ListBasinsInput) -> Self {
985        Self {
986            prefix: Some(value.prefix),
987            start_after: Some(value.start_after),
988            limit: value.limit,
989        }
990    }
991}
992
993#[derive(Debug, Clone, Default)]
994/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
995pub struct ListAllBasinsInput {
996    /// Filter basins whose names begin with this value.
997    ///
998    /// Defaults to `""`.
999    pub prefix: BasinNamePrefix,
1000    /// Filter basins whose names are lexicographically greater than this value.
1001    ///
1002    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
1003    ///
1004    /// Defaults to `""`.
1005    pub start_after: BasinNameStartAfter,
1006    /// Whether to include basins that are being deleted.
1007    ///
1008    /// Defaults to `false`.
1009    pub include_deleted: bool,
1010}
1011
1012impl ListAllBasinsInput {
1013    /// Create a new [`ListAllBasinsInput`] with default values.
1014    pub fn new() -> Self {
1015        Self::default()
1016    }
1017
1018    /// Set the prefix used to filter basins whose names begin with this value.
1019    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1020        Self { prefix, ..self }
1021    }
1022
1023    /// Set the value used to filter basins whose names are lexicographically greater than this
1024    /// value.
1025    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1026        Self {
1027            start_after,
1028            ..self
1029        }
1030    }
1031
1032    /// Set whether to include basins that are being deleted.
1033    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1034        Self {
1035            include_deleted,
1036            ..self
1037        }
1038    }
1039}
1040
1041#[derive(Debug, Clone, PartialEq, Eq)]
1042/// Current state of a basin.
1043pub enum BasinState {
1044    /// Active
1045    Active,
1046    /// Creating
1047    Creating,
1048    /// Deleting
1049    Deleting,
1050}
1051
1052impl From<api::basin::BasinState> for BasinState {
1053    fn from(value: api::basin::BasinState) -> Self {
1054        match value {
1055            api::basin::BasinState::Active => BasinState::Active,
1056            api::basin::BasinState::Creating => BasinState::Creating,
1057            api::basin::BasinState::Deleting => BasinState::Deleting,
1058        }
1059    }
1060}
1061
1062#[derive(Debug, Clone, PartialEq, Eq)]
1063#[non_exhaustive]
1064/// Basin information.
1065pub struct BasinInfo {
1066    /// Basin name.
1067    pub name: BasinName,
1068    /// Scope of the basin.
1069    pub scope: Option<BasinScope>,
1070    /// Current state of the basin.
1071    pub state: BasinState,
1072}
1073
1074impl From<api::basin::BasinInfo> for BasinInfo {
1075    fn from(value: api::basin::BasinInfo) -> Self {
1076        Self {
1077            name: value.name,
1078            scope: value.scope.map(Into::into),
1079            state: value.state.into(),
1080        }
1081    }
1082}
1083
1084#[derive(Debug, Clone)]
1085#[non_exhaustive]
1086/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1087pub struct DeleteBasinInput {
1088    /// Basin name.
1089    pub name: BasinName,
1090    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1091    pub ignore_not_found: bool,
1092}
1093
1094impl DeleteBasinInput {
1095    /// Create a new [`DeleteBasinInput`] with the given basin name.
1096    pub fn new(name: BasinName) -> Self {
1097        Self {
1098            name,
1099            ignore_not_found: false,
1100        }
1101    }
1102
1103    /// Set whether to ignore `Not Found` error if the basin is not existing.
1104    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1105        Self {
1106            ignore_not_found,
1107            ..self
1108        }
1109    }
1110}
1111
1112#[derive(Debug, Clone, Default)]
1113#[non_exhaustive]
1114/// Reconfiguration for [`TimestampingConfig`].
1115pub struct TimestampingReconfiguration {
1116    /// Override for the existing [`mode`](TimestampingConfig::mode).
1117    pub mode: Maybe<Option<TimestampingMode>>,
1118    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1119    pub uncapped: Maybe<Option<bool>>,
1120}
1121
1122impl TimestampingReconfiguration {
1123    /// Create a new [`TimestampingReconfiguration`].
1124    pub fn new() -> Self {
1125        Self::default()
1126    }
1127
1128    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1129    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1130        Self {
1131            mode: Maybe::Specified(Some(mode)),
1132            ..self
1133        }
1134    }
1135
1136    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1137    pub fn with_uncapped(self, uncapped: bool) -> Self {
1138        Self {
1139            uncapped: Maybe::Specified(Some(uncapped)),
1140            ..self
1141        }
1142    }
1143}
1144
1145impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1146    fn from(value: TimestampingReconfiguration) -> Self {
1147        Self {
1148            mode: value.mode.map(|m| m.map(Into::into)),
1149            uncapped: value.uncapped,
1150        }
1151    }
1152}
1153
1154#[derive(Debug, Clone, Default)]
1155#[non_exhaustive]
1156/// Reconfiguration for [`DeleteOnEmptyConfig`].
1157pub struct DeleteOnEmptyReconfiguration {
1158    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1159    pub min_age_secs: Maybe<Option<u64>>,
1160}
1161
1162impl DeleteOnEmptyReconfiguration {
1163    /// Create a new [`DeleteOnEmptyReconfiguration`].
1164    pub fn new() -> Self {
1165        Self::default()
1166    }
1167
1168    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1169    pub fn with_min_age(self, min_age: Duration) -> Self {
1170        Self {
1171            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1172        }
1173    }
1174}
1175
1176impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1177    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1178        Self {
1179            min_age_secs: value.min_age_secs,
1180        }
1181    }
1182}
1183
1184#[derive(Debug, Clone, Default)]
1185#[non_exhaustive]
1186/// Reconfiguration for [`StreamConfig`].
1187pub struct StreamReconfiguration {
1188    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1189    pub storage_class: Maybe<Option<StorageClass>>,
1190    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1191    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1192    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1193    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1194    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1195    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1196}
1197
1198impl StreamReconfiguration {
1199    /// Create a new [`StreamReconfiguration`].
1200    pub fn new() -> Self {
1201        Self::default()
1202    }
1203
1204    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1205    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1206        Self {
1207            storage_class: Maybe::Specified(Some(storage_class)),
1208            ..self
1209        }
1210    }
1211
1212    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1213    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1214        Self {
1215            retention_policy: Maybe::Specified(Some(retention_policy)),
1216            ..self
1217        }
1218    }
1219
1220    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1221    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1222        Self {
1223            timestamping: Maybe::Specified(Some(timestamping)),
1224            ..self
1225        }
1226    }
1227
1228    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1229    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1230        Self {
1231            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1232            ..self
1233        }
1234    }
1235}
1236
1237impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1238    fn from(value: StreamReconfiguration) -> Self {
1239        Self {
1240            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1241            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1242            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1243            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1244        }
1245    }
1246}
1247
1248#[derive(Debug, Clone, Default)]
1249#[non_exhaustive]
1250/// Reconfiguration for [`BasinConfig`].
1251pub struct BasinReconfiguration {
1252    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1253    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1254    /// Override for the existing
1255    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1256    pub create_stream_on_append: Maybe<bool>,
1257    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1258    pub create_stream_on_read: Maybe<bool>,
1259}
1260
1261impl BasinReconfiguration {
1262    /// Create a new [`BasinReconfiguration`].
1263    pub fn new() -> Self {
1264        Self::default()
1265    }
1266
1267    /// Set the override for the existing
1268    /// [`default_stream_config`](BasinConfig::default_stream_config).
1269    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1270        Self {
1271            default_stream_config: Maybe::Specified(Some(config)),
1272            ..self
1273        }
1274    }
1275
1276    /// Set the override for the existing
1277    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1278    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1279        Self {
1280            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1281            ..self
1282        }
1283    }
1284
1285    /// Set the override for the existing
1286    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1287    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1288        Self {
1289            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1290            ..self
1291        }
1292    }
1293}
1294
1295impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1296    fn from(value: BasinReconfiguration) -> Self {
1297        Self {
1298            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1299            create_stream_on_append: value.create_stream_on_append,
1300            create_stream_on_read: value.create_stream_on_read,
1301        }
1302    }
1303}
1304
1305#[derive(Debug, Clone)]
1306#[non_exhaustive]
1307/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1308pub struct ReconfigureBasinInput {
1309    /// Basin name.
1310    pub name: BasinName,
1311    /// Reconfiguration for [`BasinConfig`].
1312    pub config: BasinReconfiguration,
1313}
1314
1315impl ReconfigureBasinInput {
1316    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1317    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1318        Self { name, config }
1319    }
1320}
1321
1322#[derive(Debug, Clone, Default)]
1323#[non_exhaustive]
1324/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1325pub struct ListAccessTokensInput {
1326    /// Filter access tokens whose IDs begin with this value.
1327    ///
1328    /// Defaults to `""`.
1329    pub prefix: AccessTokenIdPrefix,
1330    /// Filter access tokens whose IDs are lexicographically greater than this value.
1331    ///
1332    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1333    ///
1334    /// Defaults to `""`.
1335    pub start_after: AccessTokenIdStartAfter,
1336    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1337    ///
1338    /// Defaults to `1000`.
1339    pub limit: Option<usize>,
1340}
1341
1342impl ListAccessTokensInput {
1343    /// Create a new [`ListAccessTokensInput`] with default values.
1344    pub fn new() -> Self {
1345        Self::default()
1346    }
1347
1348    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1349    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1350        Self { prefix, ..self }
1351    }
1352
1353    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1354    /// value.
1355    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1356        Self {
1357            start_after,
1358            ..self
1359        }
1360    }
1361
1362    /// Set the limit on number of access tokens to return in a page.
1363    pub fn with_limit(self, limit: usize) -> Self {
1364        Self {
1365            limit: Some(limit),
1366            ..self
1367        }
1368    }
1369}
1370
1371impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1372    fn from(value: ListAccessTokensInput) -> Self {
1373        Self {
1374            prefix: Some(value.prefix),
1375            start_after: Some(value.start_after),
1376            limit: value.limit,
1377        }
1378    }
1379}
1380
1381#[derive(Debug, Clone, Default)]
1382/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1383pub struct ListAllAccessTokensInput {
1384    /// Filter access tokens whose IDs begin with this value.
1385    ///
1386    /// Defaults to `""`.
1387    pub prefix: AccessTokenIdPrefix,
1388    /// Filter access tokens whose IDs are lexicographically greater than this value.
1389    ///
1390    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1391    ///
1392    /// Defaults to `""`.
1393    pub start_after: AccessTokenIdStartAfter,
1394}
1395
1396impl ListAllAccessTokensInput {
1397    /// Create a new [`ListAllAccessTokensInput`] with default values.
1398    pub fn new() -> Self {
1399        Self::default()
1400    }
1401
1402    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1403    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1404        Self { prefix, ..self }
1405    }
1406
1407    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1408    /// this value.
1409    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1410        Self {
1411            start_after,
1412            ..self
1413        }
1414    }
1415}
1416
1417#[derive(Debug, Clone)]
1418#[non_exhaustive]
1419/// Access token information.
1420pub struct AccessTokenInfo {
1421    /// Access token ID.
1422    pub id: AccessTokenId,
1423    /// Expiration time.
1424    pub expires_at: S2DateTime,
1425    /// Whether to automatically prefix stream names during creation and strip the prefix during
1426    /// listing.
1427    pub auto_prefix_streams: bool,
1428    /// Scope of the access token.
1429    pub scope: AccessTokenScope,
1430}
1431
1432impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1433    type Error = ValidationError;
1434
1435    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1436        let expires_at = value
1437            .expires_at
1438            .map(Into::into)
1439            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1440        Ok(Self {
1441            id: value.id,
1442            expires_at,
1443            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1444            scope: value.scope.into(),
1445        })
1446    }
1447}
1448
1449#[derive(Debug, Clone)]
1450/// Pattern for matching basins.
1451///
1452/// See [`AccessTokenScope::basins`].
1453pub enum BasinMatcher {
1454    /// Match no basins.
1455    None,
1456    /// Match exactly this basin.
1457    Exact(BasinName),
1458    /// Match all basins with this prefix.
1459    Prefix(BasinNamePrefix),
1460}
1461
1462#[derive(Debug, Clone)]
1463/// Pattern for matching streams.
1464///
1465/// See [`AccessTokenScope::streams`].
1466pub enum StreamMatcher {
1467    /// Match no streams.
1468    None,
1469    /// Match exactly this stream.
1470    Exact(StreamName),
1471    /// Match all streams with this prefix.
1472    Prefix(StreamNamePrefix),
1473}
1474
1475#[derive(Debug, Clone)]
1476/// Pattern for matching access tokens.
1477///
1478/// See [`AccessTokenScope::access_tokens`].
1479pub enum AccessTokenMatcher {
1480    /// Match no access tokens.
1481    None,
1482    /// Match exactly this access token.
1483    Exact(AccessTokenId),
1484    /// Match all access tokens with this prefix.
1485    Prefix(AccessTokenIdPrefix),
1486}
1487
1488#[derive(Debug, Clone, Default)]
1489#[non_exhaustive]
1490/// Permissions indicating allowed operations.
1491pub struct ReadWritePermissions {
1492    /// Read permission.
1493    ///
1494    /// Defaults to `false`.
1495    pub read: bool,
1496    /// Write permission.
1497    ///
1498    /// Defaults to `false`.
1499    pub write: bool,
1500}
1501
1502impl ReadWritePermissions {
1503    /// Create a new [`ReadWritePermissions`] with default values.
1504    pub fn new() -> Self {
1505        Self::default()
1506    }
1507
1508    /// Create read-only permissions.
1509    pub fn read_only() -> Self {
1510        Self {
1511            read: true,
1512            write: false,
1513        }
1514    }
1515
1516    /// Create write-only permissions.
1517    pub fn write_only() -> Self {
1518        Self {
1519            read: false,
1520            write: true,
1521        }
1522    }
1523
1524    /// Create read-write permissions.
1525    pub fn read_write() -> Self {
1526        Self {
1527            read: true,
1528            write: true,
1529        }
1530    }
1531}
1532
1533impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1534    fn from(value: ReadWritePermissions) -> Self {
1535        Self {
1536            read: Some(value.read),
1537            write: Some(value.write),
1538        }
1539    }
1540}
1541
1542impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1543    fn from(value: api::access::ReadWritePermissions) -> Self {
1544        Self {
1545            read: value.read.unwrap_or_default(),
1546            write: value.write.unwrap_or_default(),
1547        }
1548    }
1549}
1550
1551#[derive(Debug, Clone, Default)]
1552#[non_exhaustive]
1553/// Permissions at the operation group level.
1554///
1555/// See [`AccessTokenScope::op_group_perms`].
1556pub struct OperationGroupPermissions {
1557    /// Account-level access permissions.
1558    ///
1559    /// Defaults to `None`.
1560    pub account: Option<ReadWritePermissions>,
1561    /// Basin-level access permissions.
1562    ///
1563    /// Defaults to `None`.
1564    pub basin: Option<ReadWritePermissions>,
1565    /// Stream-level access permissions.
1566    ///
1567    /// Defaults to `None`.
1568    pub stream: Option<ReadWritePermissions>,
1569}
1570
1571impl OperationGroupPermissions {
1572    /// Create a new [`OperationGroupPermissions`] with default values.
1573    pub fn new() -> Self {
1574        Self::default()
1575    }
1576
1577    /// Create read-only permissions for all groups.
1578    pub fn read_only_all() -> Self {
1579        Self {
1580            account: Some(ReadWritePermissions::read_only()),
1581            basin: Some(ReadWritePermissions::read_only()),
1582            stream: Some(ReadWritePermissions::read_only()),
1583        }
1584    }
1585
1586    /// Create write-only permissions for all groups.
1587    pub fn write_only_all() -> Self {
1588        Self {
1589            account: Some(ReadWritePermissions::write_only()),
1590            basin: Some(ReadWritePermissions::write_only()),
1591            stream: Some(ReadWritePermissions::write_only()),
1592        }
1593    }
1594
1595    /// Create read-write permissions for all groups.
1596    pub fn read_write_all() -> Self {
1597        Self {
1598            account: Some(ReadWritePermissions::read_write()),
1599            basin: Some(ReadWritePermissions::read_write()),
1600            stream: Some(ReadWritePermissions::read_write()),
1601        }
1602    }
1603
1604    /// Set account-level access permissions.
1605    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1606        Self {
1607            account: Some(account),
1608            ..self
1609        }
1610    }
1611
1612    /// Set basin-level access permissions.
1613    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1614        Self {
1615            basin: Some(basin),
1616            ..self
1617        }
1618    }
1619
1620    /// Set stream-level access permissions.
1621    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1622        Self {
1623            stream: Some(stream),
1624            ..self
1625        }
1626    }
1627}
1628
1629impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1630    fn from(value: OperationGroupPermissions) -> Self {
1631        Self {
1632            account: value.account.map(Into::into),
1633            basin: value.basin.map(Into::into),
1634            stream: value.stream.map(Into::into),
1635        }
1636    }
1637}
1638
1639impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1640    fn from(value: api::access::PermittedOperationGroups) -> Self {
1641        Self {
1642            account: value.account.map(Into::into),
1643            basin: value.basin.map(Into::into),
1644            stream: value.stream.map(Into::into),
1645        }
1646    }
1647}
1648
1649#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1650/// Individual operation that can be permitted.
1651///
1652/// See [`AccessTokenScope::ops`].
1653pub enum Operation {
1654    /// List basins.
1655    ListBasins,
1656    /// Create a basin.
1657    CreateBasin,
1658    /// Get basin configuration.
1659    GetBasinConfig,
1660    /// Delete a basin.
1661    DeleteBasin,
1662    /// Reconfigure a basin.
1663    ReconfigureBasin,
1664    /// List access tokens.
1665    ListAccessTokens,
1666    /// Issue an access token.
1667    IssueAccessToken,
1668    /// Revoke an access token.
1669    RevokeAccessToken,
1670    /// Get account metrics.
1671    GetAccountMetrics,
1672    /// Get basin metrics.
1673    GetBasinMetrics,
1674    /// Get stream metrics.
1675    GetStreamMetrics,
1676    /// List streams.
1677    ListStreams,
1678    /// Create a stream.
1679    CreateStream,
1680    /// Get stream configuration.
1681    GetStreamConfig,
1682    /// Delete a stream.
1683    DeleteStream,
1684    /// Reconfigure a stream.
1685    ReconfigureStream,
1686    /// Check the tail of a stream.
1687    CheckTail,
1688    /// Append records to a stream.
1689    Append,
1690    /// Read records from a stream.
1691    Read,
1692    /// Trim records on a stream.
1693    Trim,
1694    /// Set the fencing token on a stream.
1695    Fence,
1696}
1697
1698impl From<Operation> for api::access::Operation {
1699    fn from(value: Operation) -> Self {
1700        match value {
1701            Operation::ListBasins => api::access::Operation::ListBasins,
1702            Operation::CreateBasin => api::access::Operation::CreateBasin,
1703            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1704            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1705            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1706            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1707            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1708            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1709            Operation::ListStreams => api::access::Operation::ListStreams,
1710            Operation::CreateStream => api::access::Operation::CreateStream,
1711            Operation::DeleteStream => api::access::Operation::DeleteStream,
1712            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1713            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1714            Operation::CheckTail => api::access::Operation::CheckTail,
1715            Operation::Append => api::access::Operation::Append,
1716            Operation::Read => api::access::Operation::Read,
1717            Operation::Trim => api::access::Operation::Trim,
1718            Operation::Fence => api::access::Operation::Fence,
1719            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1720            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1721            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1722        }
1723    }
1724}
1725
1726impl From<api::access::Operation> for Operation {
1727    fn from(value: api::access::Operation) -> Self {
1728        match value {
1729            api::access::Operation::ListBasins => Operation::ListBasins,
1730            api::access::Operation::CreateBasin => Operation::CreateBasin,
1731            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1732            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1733            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1734            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1735            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1736            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1737            api::access::Operation::ListStreams => Operation::ListStreams,
1738            api::access::Operation::CreateStream => Operation::CreateStream,
1739            api::access::Operation::DeleteStream => Operation::DeleteStream,
1740            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1741            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1742            api::access::Operation::CheckTail => Operation::CheckTail,
1743            api::access::Operation::Append => Operation::Append,
1744            api::access::Operation::Read => Operation::Read,
1745            api::access::Operation::Trim => Operation::Trim,
1746            api::access::Operation::Fence => Operation::Fence,
1747            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1748            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1749            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1750        }
1751    }
1752}
1753
1754#[derive(Debug, Clone)]
1755#[non_exhaustive]
1756/// Scope of an access token.
1757///
1758/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1759/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1760/// final set must not be empty.
1761///
1762/// See [`IssueAccessTokenInput::scope`].
1763pub struct AccessTokenScopeInput {
1764    basins: Option<BasinMatcher>,
1765    streams: Option<StreamMatcher>,
1766    access_tokens: Option<AccessTokenMatcher>,
1767    op_group_perms: Option<OperationGroupPermissions>,
1768    ops: HashSet<Operation>,
1769}
1770
1771impl AccessTokenScopeInput {
1772    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1773    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1774        Self {
1775            basins: None,
1776            streams: None,
1777            access_tokens: None,
1778            op_group_perms: None,
1779            ops: ops.into_iter().collect(),
1780        }
1781    }
1782
1783    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1784    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1785        Self {
1786            basins: None,
1787            streams: None,
1788            access_tokens: None,
1789            op_group_perms: Some(op_group_perms),
1790            ops: HashSet::default(),
1791        }
1792    }
1793
1794    /// Set the permitted operations.
1795    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1796        Self {
1797            ops: ops.into_iter().collect(),
1798            ..self
1799        }
1800    }
1801
1802    /// Set the access permissions at the operation group level.
1803    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1804        Self {
1805            op_group_perms: Some(op_group_perms),
1806            ..self
1807        }
1808    }
1809
1810    /// Set the permitted basins.
1811    ///
1812    /// Defaults to no basins.
1813    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1814        Self {
1815            basins: Some(basins),
1816            ..self
1817        }
1818    }
1819
1820    /// Set the permitted streams.
1821    ///
1822    /// Defaults to no streams.
1823    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1824        Self {
1825            streams: Some(streams),
1826            ..self
1827        }
1828    }
1829
1830    /// Set the permitted access tokens.
1831    ///
1832    /// Defaults to no access tokens.
1833    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1834        Self {
1835            access_tokens: Some(access_tokens),
1836            ..self
1837        }
1838    }
1839}
1840
1841#[derive(Debug, Clone)]
1842#[non_exhaustive]
1843/// Scope of an access token.
1844pub struct AccessTokenScope {
1845    /// Permitted basins.
1846    pub basins: Option<BasinMatcher>,
1847    /// Permitted streams.
1848    pub streams: Option<StreamMatcher>,
1849    /// Permitted access tokens.
1850    pub access_tokens: Option<AccessTokenMatcher>,
1851    /// Permissions at the operation group level.
1852    pub op_group_perms: Option<OperationGroupPermissions>,
1853    /// Permitted operations.
1854    pub ops: HashSet<Operation>,
1855}
1856
1857impl From<api::access::AccessTokenScope> for AccessTokenScope {
1858    fn from(value: api::access::AccessTokenScope) -> Self {
1859        Self {
1860            basins: value.basins.map(|rs| match rs {
1861                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1862                    BasinMatcher::Exact(e)
1863                }
1864                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1865                    BasinMatcher::None
1866                }
1867                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1868            }),
1869            streams: value.streams.map(|rs| match rs {
1870                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1871                    StreamMatcher::Exact(e)
1872                }
1873                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1874                    StreamMatcher::None
1875                }
1876                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1877            }),
1878            access_tokens: value.access_tokens.map(|rs| match rs {
1879                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1880                    AccessTokenMatcher::Exact(e)
1881                }
1882                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1883                    AccessTokenMatcher::None
1884                }
1885                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1886            }),
1887            op_group_perms: value.op_groups.map(Into::into),
1888            ops: value
1889                .ops
1890                .map(|ops| ops.into_iter().map(Into::into).collect())
1891                .unwrap_or_default(),
1892        }
1893    }
1894}
1895
1896impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1897    fn from(value: AccessTokenScopeInput) -> Self {
1898        Self {
1899            basins: value.basins.map(|rs| match rs {
1900                BasinMatcher::None => {
1901                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1902                }
1903                BasinMatcher::Exact(e) => {
1904                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1905                }
1906                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1907            }),
1908            streams: value.streams.map(|rs| match rs {
1909                StreamMatcher::None => {
1910                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1911                }
1912                StreamMatcher::Exact(e) => {
1913                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1914                }
1915                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1916            }),
1917            access_tokens: value.access_tokens.map(|rs| match rs {
1918                AccessTokenMatcher::None => {
1919                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1920                }
1921                AccessTokenMatcher::Exact(e) => {
1922                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1923                }
1924                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1925            }),
1926            op_groups: value.op_group_perms.map(Into::into),
1927            ops: if value.ops.is_empty() {
1928                None
1929            } else {
1930                Some(value.ops.into_iter().map(Into::into).collect())
1931            },
1932        }
1933    }
1934}
1935
1936#[derive(Debug, Clone)]
1937#[non_exhaustive]
1938/// Input for [`issue_access_token`](crate::S2::issue_access_token).
1939pub struct IssueAccessTokenInput {
1940    /// Access token ID.
1941    pub id: AccessTokenId,
1942    /// Expiration time.
1943    ///
1944    /// Defaults to the expiration time of requestor's access token passed via
1945    /// [`S2Config`](S2Config::new).
1946    pub expires_at: Option<S2DateTime>,
1947    /// Whether to automatically prefix stream names during creation and strip the prefix during
1948    /// listing.
1949    ///
1950    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
1951    /// prefix.
1952    ///
1953    /// Defaults to `false`.
1954    pub auto_prefix_streams: bool,
1955    /// Scope of the token.
1956    pub scope: AccessTokenScopeInput,
1957}
1958
1959impl IssueAccessTokenInput {
1960    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
1961    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1962        Self {
1963            id,
1964            expires_at: None,
1965            auto_prefix_streams: false,
1966            scope,
1967        }
1968    }
1969
1970    /// Set the expiration time.
1971    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1972        Self {
1973            expires_at: Some(expires_at),
1974            ..self
1975        }
1976    }
1977
1978    /// Set whether to automatically prefix stream names during creation and strip the prefix during
1979    /// listing.
1980    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1981        Self {
1982            auto_prefix_streams,
1983            ..self
1984        }
1985    }
1986}
1987
1988impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1989    fn from(value: IssueAccessTokenInput) -> Self {
1990        Self {
1991            id: value.id,
1992            expires_at: value.expires_at.map(Into::into),
1993            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1994            scope: value.scope.into(),
1995        }
1996    }
1997}
1998
1999#[derive(Debug, Clone, Copy)]
2000/// Interval to accumulate over for timeseries metric sets.
2001pub enum TimeseriesInterval {
2002    /// Minute.
2003    Minute,
2004    /// Hour.
2005    Hour,
2006    /// Day.
2007    Day,
2008}
2009
2010impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2011    fn from(value: TimeseriesInterval) -> Self {
2012        match value {
2013            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2014            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2015            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2016        }
2017    }
2018}
2019
2020impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2021    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2022        match value {
2023            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2024            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2025            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2026        }
2027    }
2028}
2029
2030#[derive(Debug, Clone, Copy)]
2031#[non_exhaustive]
2032/// Time range as Unix epoch seconds.
2033pub struct TimeRange {
2034    /// Start timestamp (inclusive).
2035    pub start: u32,
2036    /// End timestamp (exclusive).
2037    pub end: u32,
2038}
2039
2040impl TimeRange {
2041    /// Create a new [`TimeRange`] with the given start and end timestamps.
2042    pub fn new(start: u32, end: u32) -> Self {
2043        Self { start, end }
2044    }
2045}
2046
2047#[derive(Debug, Clone, Copy)]
2048#[non_exhaustive]
2049/// Time range as Unix epoch seconds and accumulation interval.
2050pub struct TimeRangeAndInterval {
2051    /// Start timestamp (inclusive).
2052    pub start: u32,
2053    /// End timestamp (exclusive).
2054    pub end: u32,
2055    /// Interval to accumulate over for timeseries metric sets.
2056    ///
2057    /// Default is dependent on the requested metric set.
2058    pub interval: Option<TimeseriesInterval>,
2059}
2060
2061impl TimeRangeAndInterval {
2062    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2063    pub fn new(start: u32, end: u32) -> Self {
2064        Self {
2065            start,
2066            end,
2067            interval: None,
2068        }
2069    }
2070
2071    /// Set the interval to accumulate over for timeseries metric sets.
2072    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2073        Self {
2074            interval: Some(interval),
2075            ..self
2076        }
2077    }
2078}
2079
2080#[derive(Debug, Clone, Copy)]
2081/// Account metric set to return.
2082pub enum AccountMetricSet {
2083    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2084    /// specified time range.
2085    ActiveBasins(TimeRange),
2086    /// Returns [`AccumulationMetric`]s, one per account operation type.
2087    ///
2088    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2089    /// per interval over the requested time range.
2090    ///
2091    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2092    AccountOps(TimeRangeAndInterval),
2093}
2094
2095#[derive(Debug, Clone)]
2096#[non_exhaustive]
2097/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2098pub struct GetAccountMetricsInput {
2099    /// Metric set to return.
2100    pub set: AccountMetricSet,
2101}
2102
2103impl GetAccountMetricsInput {
2104    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2105    pub fn new(set: AccountMetricSet) -> Self {
2106        Self { set }
2107    }
2108}
2109
2110impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2111    fn from(value: GetAccountMetricsInput) -> Self {
2112        let (set, start, end, interval) = match value.set {
2113            AccountMetricSet::ActiveBasins(args) => (
2114                api::metrics::AccountMetricSet::ActiveBasins,
2115                args.start,
2116                args.end,
2117                None,
2118            ),
2119            AccountMetricSet::AccountOps(args) => (
2120                api::metrics::AccountMetricSet::AccountOps,
2121                args.start,
2122                args.end,
2123                args.interval,
2124            ),
2125        };
2126        Self {
2127            set,
2128            start: Some(start),
2129            end: Some(end),
2130            interval: interval.map(Into::into),
2131        }
2132    }
2133}
2134
2135#[derive(Debug, Clone, Copy)]
2136/// Basin metric set to return.
2137pub enum BasinMetricSet {
2138    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2139    /// in the basin, with one observed value for each hour over the requested time range.
2140    Storage(TimeRange),
2141    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2142    ///
2143    /// Each metric represents a timeseries of the number of append operations across all streams
2144    /// in the basin, with one accumulated value per interval over the requested time range.
2145    ///
2146    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2147    /// [`minute`](TimeseriesInterval::Minute).
2148    AppendOps(TimeRangeAndInterval),
2149    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2150    ///
2151    /// Each metric represents a timeseries of the number of read operations across all streams
2152    /// in the basin, with one accumulated value per interval over the requested time range.
2153    ///
2154    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2155    /// [`minute`](TimeseriesInterval::Minute).
2156    ReadOps(TimeRangeAndInterval),
2157    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2158    /// across all streams in the basin, with one accumulated value per interval
2159    /// over the requested time range.
2160    ///
2161    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2162    /// [`minute`](TimeseriesInterval::Minute).
2163    ReadThroughput(TimeRangeAndInterval),
2164    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2165    /// across all streams in the basin, with one accumulated value per interval
2166    /// over the requested time range.
2167    ///
2168    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2169    /// [`minute`](TimeseriesInterval::Minute).
2170    AppendThroughput(TimeRangeAndInterval),
2171    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2172    ///
2173    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2174    /// per interval over the requested time range.
2175    ///
2176    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2177    BasinOps(TimeRangeAndInterval),
2178}
2179
2180#[derive(Debug, Clone)]
2181#[non_exhaustive]
2182/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2183pub struct GetBasinMetricsInput {
2184    /// Basin name.
2185    pub name: BasinName,
2186    /// Metric set to return.
2187    pub set: BasinMetricSet,
2188}
2189
2190impl GetBasinMetricsInput {
2191    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2192    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2193        Self { name, set }
2194    }
2195}
2196
2197impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2198    fn from(value: GetBasinMetricsInput) -> Self {
2199        let (set, start, end, interval) = match value.set {
2200            BasinMetricSet::Storage(args) => (
2201                api::metrics::BasinMetricSet::Storage,
2202                args.start,
2203                args.end,
2204                None,
2205            ),
2206            BasinMetricSet::AppendOps(args) => (
2207                api::metrics::BasinMetricSet::AppendOps,
2208                args.start,
2209                args.end,
2210                args.interval,
2211            ),
2212            BasinMetricSet::ReadOps(args) => (
2213                api::metrics::BasinMetricSet::ReadOps,
2214                args.start,
2215                args.end,
2216                args.interval,
2217            ),
2218            BasinMetricSet::ReadThroughput(args) => (
2219                api::metrics::BasinMetricSet::ReadThroughput,
2220                args.start,
2221                args.end,
2222                args.interval,
2223            ),
2224            BasinMetricSet::AppendThroughput(args) => (
2225                api::metrics::BasinMetricSet::AppendThroughput,
2226                args.start,
2227                args.end,
2228                args.interval,
2229            ),
2230            BasinMetricSet::BasinOps(args) => (
2231                api::metrics::BasinMetricSet::BasinOps,
2232                args.start,
2233                args.end,
2234                args.interval,
2235            ),
2236        };
2237        (
2238            value.name,
2239            api::metrics::BasinMetricSetRequest {
2240                set,
2241                start: Some(start),
2242                end: Some(end),
2243                interval: interval.map(Into::into),
2244            },
2245        )
2246    }
2247}
2248
2249#[derive(Debug, Clone, Copy)]
2250/// Stream metric set to return.
2251pub enum StreamMetricSet {
2252    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2253    /// with one observed value for each minute over the requested time range.
2254    Storage(TimeRange),
2255}
2256
2257#[derive(Debug, Clone)]
2258#[non_exhaustive]
2259/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2260pub struct GetStreamMetricsInput {
2261    /// Basin name.
2262    pub basin_name: BasinName,
2263    /// Stream name.
2264    pub stream_name: StreamName,
2265    /// Metric set to return.
2266    pub set: StreamMetricSet,
2267}
2268
2269impl GetStreamMetricsInput {
2270    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2271    /// set.
2272    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2273        Self {
2274            basin_name,
2275            stream_name,
2276            set,
2277        }
2278    }
2279}
2280
2281impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2282    fn from(value: GetStreamMetricsInput) -> Self {
2283        let (set, start, end, interval) = match value.set {
2284            StreamMetricSet::Storage(args) => (
2285                api::metrics::StreamMetricSet::Storage,
2286                args.start,
2287                args.end,
2288                None,
2289            ),
2290        };
2291        (
2292            value.basin_name,
2293            value.stream_name,
2294            api::metrics::StreamMetricSetRequest {
2295                set,
2296                start: Some(start),
2297                end: Some(end),
2298                interval,
2299            },
2300        )
2301    }
2302}
2303
2304#[derive(Debug, Clone, Copy)]
2305/// Unit in which metric values are measured.
2306pub enum MetricUnit {
2307    /// Size in bytes.
2308    Bytes,
2309    /// Number of operations.
2310    Operations,
2311}
2312
2313impl From<api::metrics::MetricUnit> for MetricUnit {
2314    fn from(value: api::metrics::MetricUnit) -> Self {
2315        match value {
2316            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2317            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2318        }
2319    }
2320}
2321
2322#[derive(Debug, Clone)]
2323#[non_exhaustive]
2324/// Single named value.
2325pub struct ScalarMetric {
2326    /// Metric name.
2327    pub name: String,
2328    /// Unit for the metric value.
2329    pub unit: MetricUnit,
2330    /// Metric value.
2331    pub value: f64,
2332}
2333
2334#[derive(Debug, Clone)]
2335#[non_exhaustive]
2336/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2337/// specified interval.
2338pub struct AccumulationMetric {
2339    /// Timeseries name.
2340    pub name: String,
2341    /// Unit for the accumulated values.
2342    pub unit: MetricUnit,
2343    /// The interval at which datapoints are accumulated.
2344    pub interval: TimeseriesInterval,
2345    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2346    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2347    /// one `interval`.
2348    pub values: Vec<(u32, f64)>,
2349}
2350
2351#[derive(Debug, Clone)]
2352#[non_exhaustive]
2353/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2354pub struct GaugeMetric {
2355    /// Timeseries name.
2356    pub name: String,
2357    /// Unit for the instantaneous values.
2358    pub unit: MetricUnit,
2359    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2360    /// instant of the `timestamp` (in Unix epoch seconds).
2361    pub values: Vec<(u32, f64)>,
2362}
2363
2364#[derive(Debug, Clone)]
2365#[non_exhaustive]
2366/// Set of string labels.
2367pub struct LabelMetric {
2368    /// Label name.
2369    pub name: String,
2370    /// Label values.
2371    pub values: Vec<String>,
2372}
2373
2374#[derive(Debug, Clone)]
2375/// Individual metric in a returned metric set.
2376pub enum Metric {
2377    /// Single named value.
2378    Scalar(ScalarMetric),
2379    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2380    /// specified interval.
2381    Accumulation(AccumulationMetric),
2382    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2383    Gauge(GaugeMetric),
2384    /// Set of string labels.
2385    Label(LabelMetric),
2386}
2387
2388impl From<api::metrics::Metric> for Metric {
2389    fn from(value: api::metrics::Metric) -> Self {
2390        match value {
2391            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2392                name: sm.name.into(),
2393                unit: sm.unit.into(),
2394                value: sm.value,
2395            }),
2396            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2397                name: am.name.into(),
2398                unit: am.unit.into(),
2399                interval: am.interval.into(),
2400                values: am.values,
2401            }),
2402            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2403                name: gm.name.into(),
2404                unit: gm.unit.into(),
2405                values: gm.values,
2406            }),
2407            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2408                name: lm.name.into(),
2409                values: lm.values,
2410            }),
2411        }
2412    }
2413}
2414
2415#[derive(Debug, Clone, Default)]
2416#[non_exhaustive]
2417/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2418pub struct ListStreamsInput {
2419    /// Filter streams whose names begin with this value.
2420    ///
2421    /// Defaults to `""`.
2422    pub prefix: StreamNamePrefix,
2423    /// Filter streams whose names are lexicographically greater than this value.
2424    ///
2425    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2426    ///
2427    /// Defaults to `""`.
2428    pub start_after: StreamNameStartAfter,
2429    ///  Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2430    ///
2431    /// Defaults to `1000`.
2432    pub limit: Option<usize>,
2433}
2434
2435impl ListStreamsInput {
2436    /// Create a new [`ListStreamsInput`] with default values.
2437    pub fn new() -> Self {
2438        Self::default()
2439    }
2440
2441    /// Set the prefix used to filter streams whose names begin with this value.
2442    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2443        Self { prefix, ..self }
2444    }
2445
2446    /// Set the value used to filter streams whose names are lexicographically greater than this
2447    /// value.
2448    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2449        Self {
2450            start_after,
2451            ..self
2452        }
2453    }
2454
2455    /// Set the limit on number of streams to return in a page.
2456    pub fn with_limit(self, limit: usize) -> Self {
2457        Self {
2458            limit: Some(limit),
2459            ..self
2460        }
2461    }
2462}
2463
2464impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2465    fn from(value: ListStreamsInput) -> Self {
2466        Self {
2467            prefix: Some(value.prefix),
2468            start_after: Some(value.start_after),
2469            limit: value.limit,
2470        }
2471    }
2472}
2473
2474#[derive(Debug, Clone, Default)]
2475/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2476pub struct ListAllStreamsInput {
2477    /// Filter streams whose names begin with this value.
2478    ///
2479    /// Defaults to `""`.
2480    pub prefix: StreamNamePrefix,
2481    /// Filter streams whose names are lexicographically greater than this value.
2482    ///
2483    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2484    ///
2485    /// Defaults to `""`.
2486    pub start_after: StreamNameStartAfter,
2487    /// Whether to include streams that are being deleted.
2488    ///
2489    /// Defaults to `false`.
2490    pub include_deleted: bool,
2491}
2492
2493impl ListAllStreamsInput {
2494    /// Create a new [`ListAllStreamsInput`] with default values.
2495    pub fn new() -> Self {
2496        Self::default()
2497    }
2498
2499    /// Set the prefix used to filter streams whose names begin with this value.
2500    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2501        Self { prefix, ..self }
2502    }
2503
2504    /// Set the value used to filter streams whose names are lexicographically greater than this
2505    /// value.
2506    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2507        Self {
2508            start_after,
2509            ..self
2510        }
2511    }
2512
2513    /// Set whether to include streams that are being deleted.
2514    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2515        Self {
2516            include_deleted,
2517            ..self
2518        }
2519    }
2520}
2521
2522#[derive(Debug, Clone, PartialEq)]
2523#[non_exhaustive]
2524/// Stream information.
2525pub struct StreamInfo {
2526    /// Stream name.
2527    pub name: StreamName,
2528    /// Creation time.
2529    pub created_at: S2DateTime,
2530    /// Deletion time if the stream is being deleted.
2531    pub deleted_at: Option<S2DateTime>,
2532}
2533
2534impl From<api::stream::StreamInfo> for StreamInfo {
2535    fn from(value: api::stream::StreamInfo) -> Self {
2536        Self {
2537            name: value.name,
2538            created_at: value.created_at.into(),
2539            deleted_at: value.deleted_at.map(Into::into),
2540        }
2541    }
2542}
2543
2544#[derive(Debug, Clone)]
2545#[non_exhaustive]
2546/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2547pub struct CreateStreamInput {
2548    /// Stream name.
2549    pub name: StreamName,
2550    /// Configuration for the stream.
2551    ///
2552    /// See [`StreamConfig`] for defaults.
2553    pub config: Option<StreamConfig>,
2554    /// Idempotency token for the operation.
2555    ///
2556    /// When a request is retried, the same token is reused,
2557    /// causing the server to return the original response instead of
2558    /// performing the operation again.
2559    ///
2560    /// Defaults to a random UUID.
2561    pub idempotency_token: String,
2562}
2563
2564impl CreateStreamInput {
2565    /// Create a new [`CreateStreamInput`] with the given stream name.
2566    pub fn new(name: StreamName) -> Self {
2567        Self {
2568            name,
2569            config: None,
2570            idempotency_token: idempotency_token(),
2571        }
2572    }
2573
2574    /// Set the configuration for the stream.
2575    pub fn with_config(self, config: StreamConfig) -> Self {
2576        Self {
2577            config: Some(config),
2578            ..self
2579        }
2580    }
2581
2582    /// Set the idempotency token for the operation.
2583    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2584        Self {
2585            idempotency_token: idempotency_token.into(),
2586            ..self
2587        }
2588    }
2589}
2590
2591impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2592    fn from(value: CreateStreamInput) -> Self {
2593        (
2594            api::stream::CreateStreamRequest {
2595                stream: value.name,
2596                config: value.config.map(Into::into),
2597            },
2598            value.idempotency_token,
2599        )
2600    }
2601}
2602
2603#[derive(Debug, Clone)]
2604#[non_exhaustive]
2605/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2606pub struct DeleteStreamInput {
2607    /// Stream name.
2608    pub name: StreamName,
2609    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2610    pub ignore_not_found: bool,
2611}
2612
2613impl DeleteStreamInput {
2614    /// Create a new [`DeleteStreamInput`] with the given stream name.
2615    pub fn new(name: StreamName) -> Self {
2616        Self {
2617            name,
2618            ignore_not_found: false,
2619        }
2620    }
2621
2622    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2623    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2624        Self {
2625            ignore_not_found,
2626            ..self
2627        }
2628    }
2629}
2630
2631#[derive(Debug, Clone)]
2632#[non_exhaustive]
2633/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2634pub struct ReconfigureStreamInput {
2635    /// Stream name.
2636    pub name: StreamName,
2637    /// Reconfiguration for [`StreamConfig`].
2638    pub config: StreamReconfiguration,
2639}
2640
2641impl ReconfigureStreamInput {
2642    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2643    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2644        Self { name, config }
2645    }
2646}
2647
2648#[derive(Debug, Clone, PartialEq, Eq)]
2649/// Token for fencing appends to a stream.
2650///
2651/// **Note:** It must not exceed 36 bytes in length.
2652///
2653/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2654pub struct FencingToken(String);
2655
2656impl FencingToken {
2657    /// Generate a random alphanumeric fencing token of `n` bytes.
2658    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2659        rand::rng()
2660            .sample_iter(&rand::distr::Alphanumeric)
2661            .take(n)
2662            .map(char::from)
2663            .collect::<String>()
2664            .parse()
2665    }
2666}
2667
2668impl FromStr for FencingToken {
2669    type Err = ValidationError;
2670
2671    fn from_str(s: &str) -> Result<Self, Self::Err> {
2672        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2673            return Err(ValidationError(format!(
2674                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2675            )));
2676        }
2677        Ok(FencingToken(s.to_string()))
2678    }
2679}
2680
2681impl std::fmt::Display for FencingToken {
2682    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2683        write!(f, "{}", self.0)
2684    }
2685}
2686
2687impl Deref for FencingToken {
2688    type Target = str;
2689
2690    fn deref(&self) -> &Self::Target {
2691        &self.0
2692    }
2693}
2694
2695#[derive(Debug, Clone, Copy, PartialEq)]
2696#[non_exhaustive]
2697/// A position in a stream.
2698pub struct StreamPosition {
2699    /// Sequence number assigned by the service.
2700    pub seq_num: u64,
2701    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2702    /// User-specified timestamps are passed through as-is.
2703    pub timestamp: u64,
2704}
2705
2706impl std::fmt::Display for StreamPosition {
2707    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2708        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2709    }
2710}
2711
2712impl From<api::stream::proto::StreamPosition> for StreamPosition {
2713    fn from(value: api::stream::proto::StreamPosition) -> Self {
2714        Self {
2715            seq_num: value.seq_num,
2716            timestamp: value.timestamp,
2717        }
2718    }
2719}
2720
2721impl From<api::stream::StreamPosition> for StreamPosition {
2722    fn from(value: api::stream::StreamPosition) -> Self {
2723        Self {
2724            seq_num: value.seq_num,
2725            timestamp: value.timestamp,
2726        }
2727    }
2728}
2729
2730#[derive(Debug, Clone, PartialEq)]
2731#[non_exhaustive]
2732/// A name-value pair.
2733pub struct Header {
2734    /// Name.
2735    pub name: Bytes,
2736    /// Value.
2737    pub value: Bytes,
2738}
2739
2740impl Header {
2741    /// Create a new [`Header`] with the given name and value.
2742    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2743        Self {
2744            name: name.into(),
2745            value: value.into(),
2746        }
2747    }
2748}
2749
2750impl From<Header> for api::stream::proto::Header {
2751    fn from(value: Header) -> Self {
2752        Self {
2753            name: value.name,
2754            value: value.value,
2755        }
2756    }
2757}
2758
2759impl From<api::stream::proto::Header> for Header {
2760    fn from(value: api::stream::proto::Header) -> Self {
2761        Self {
2762            name: value.name,
2763            value: value.value,
2764        }
2765    }
2766}
2767
2768#[derive(Debug, Clone, PartialEq)]
2769/// A record to append.
2770pub struct AppendRecord {
2771    body: Bytes,
2772    headers: Vec<Header>,
2773    timestamp: Option<u64>,
2774}
2775
2776impl AppendRecord {
2777    fn validate(self) -> Result<Self, ValidationError> {
2778        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2779            Err(ValidationError(format!(
2780                "metered_bytes: {} exceeds {}",
2781                self.metered_bytes(),
2782                RECORD_BATCH_MAX.bytes
2783            )))
2784        } else {
2785            Ok(self)
2786        }
2787    }
2788
2789    /// Create a new [`AppendRecord`] with the given record body.
2790    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2791        let record = Self {
2792            body: body.into(),
2793            headers: Vec::default(),
2794            timestamp: None,
2795        };
2796        record.validate()
2797    }
2798
2799    /// Set the headers for this record.
2800    pub fn with_headers(
2801        self,
2802        headers: impl IntoIterator<Item = Header>,
2803    ) -> Result<Self, ValidationError> {
2804        let record = Self {
2805            headers: headers.into_iter().collect(),
2806            ..self
2807        };
2808        record.validate()
2809    }
2810
2811    /// Set the timestamp for this record.
2812    ///
2813    /// Precise semantics depend on [`StreamConfig::timestamping`].
2814    pub fn with_timestamp(self, timestamp: u64) -> Self {
2815        Self {
2816            timestamp: Some(timestamp),
2817            ..self
2818        }
2819    }
2820}
2821
2822impl From<AppendRecord> for api::stream::proto::AppendRecord {
2823    fn from(value: AppendRecord) -> Self {
2824        Self {
2825            timestamp: value.timestamp,
2826            headers: value.headers.into_iter().map(Into::into).collect(),
2827            body: value.body,
2828        }
2829    }
2830}
2831
2832/// Metered byte size calculation.
2833///
2834/// Formula for a record:
2835/// ```text
2836/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2837/// ```
2838pub trait MeteredBytes {
2839    /// Returns the metered byte size.
2840    fn metered_bytes(&self) -> usize;
2841}
2842
2843macro_rules! metered_bytes_impl {
2844    ($ty:ty) => {
2845        impl MeteredBytes for $ty {
2846            fn metered_bytes(&self) -> usize {
2847                8 + (2 * self.headers.len())
2848                    + self
2849                        .headers
2850                        .iter()
2851                        .map(|h| h.name.len() + h.value.len())
2852                        .sum::<usize>()
2853                    + self.body.len()
2854            }
2855        }
2856    };
2857}
2858
2859metered_bytes_impl!(AppendRecord);
2860
2861#[derive(Debug, Clone)]
2862/// A batch of records to append atomically.
2863///
2864/// **Note:** It must contain at least `1` record and no more than `1000`.
2865/// The total size of the batch must not exceed `1MiB` in metered bytes.
2866///
2867/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
2868/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
2869/// that takes care of the abovementioned constraints.
2870pub struct AppendRecordBatch {
2871    records: Vec<AppendRecord>,
2872    metered_bytes: usize,
2873}
2874
2875impl AppendRecordBatch {
2876    pub(crate) fn with_capacity(capacity: usize) -> Self {
2877        Self {
2878            records: Vec::with_capacity(capacity),
2879            metered_bytes: 0,
2880        }
2881    }
2882
2883    pub(crate) fn push(&mut self, record: AppendRecord) {
2884        self.metered_bytes += record.metered_bytes();
2885        self.records.push(record);
2886    }
2887
2888    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
2889    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2890    where
2891        I: IntoIterator<Item = AppendRecord>,
2892    {
2893        let mut records = Vec::new();
2894        let mut metered_bytes = 0;
2895
2896        for record in iter {
2897            metered_bytes += record.metered_bytes();
2898            records.push(record);
2899
2900            if metered_bytes > RECORD_BATCH_MAX.bytes {
2901                return Err(ValidationError(format!(
2902                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
2903                    RECORD_BATCH_MAX.bytes
2904                )));
2905            }
2906
2907            if records.len() > RECORD_BATCH_MAX.count {
2908                return Err(ValidationError(format!(
2909                    "number of records in the batch exceeds {}",
2910                    RECORD_BATCH_MAX.count
2911                )));
2912            }
2913        }
2914
2915        if records.is_empty() {
2916            return Err(ValidationError("batch is empty".into()));
2917        }
2918
2919        Ok(Self {
2920            records,
2921            metered_bytes,
2922        })
2923    }
2924}
2925
2926impl Deref for AppendRecordBatch {
2927    type Target = [AppendRecord];
2928
2929    fn deref(&self) -> &Self::Target {
2930        &self.records
2931    }
2932}
2933
2934impl MeteredBytes for AppendRecordBatch {
2935    fn metered_bytes(&self) -> usize {
2936        self.metered_bytes
2937    }
2938}
2939
2940#[derive(Debug, Clone)]
2941/// Command to signal an operation.
2942pub enum Command {
2943    /// Fence operation.
2944    Fence {
2945        /// Fencing token.
2946        fencing_token: FencingToken,
2947    },
2948    /// Trim operation.
2949    Trim {
2950        /// Trim point.
2951        trim_point: u64,
2952    },
2953}
2954
2955#[derive(Debug, Clone)]
2956#[non_exhaustive]
2957/// Command record for signaling operations to the service.
2958///
2959/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
2960pub struct CommandRecord {
2961    /// Command to signal an operation.
2962    pub command: Command,
2963    /// Timestamp for this record.
2964    pub timestamp: Option<u64>,
2965}
2966
2967impl CommandRecord {
2968    const FENCE: &[u8] = b"fence";
2969    const TRIM: &[u8] = b"trim";
2970
2971    /// Create a fence command record with the given fencing token.
2972    ///
2973    /// Fencing is strongly consistent, and subsequent appends that specify a
2974    /// fencing token will fail if it does not match.
2975    pub fn fence(fencing_token: FencingToken) -> Self {
2976        Self {
2977            command: Command::Fence { fencing_token },
2978            timestamp: None,
2979        }
2980    }
2981
2982    /// Create a trim command record with the given trim point.
2983    ///
2984    /// Trim point is the desired earliest sequence number for the stream.
2985    ///
2986    /// Trimming is eventually consistent, and trimmed records may be visible
2987    /// for a brief period.
2988    pub fn trim(trim_point: u64) -> Self {
2989        Self {
2990            command: Command::Trim { trim_point },
2991            timestamp: None,
2992        }
2993    }
2994
2995    /// Set the timestamp for this record.
2996    pub fn with_timestamp(self, timestamp: u64) -> Self {
2997        Self {
2998            timestamp: Some(timestamp),
2999            ..self
3000        }
3001    }
3002}
3003
3004impl From<CommandRecord> for AppendRecord {
3005    fn from(value: CommandRecord) -> Self {
3006        let (header_value, body) = match value.command {
3007            Command::Fence { fencing_token } => (
3008                CommandRecord::FENCE,
3009                Bytes::copy_from_slice(fencing_token.as_bytes()),
3010            ),
3011            Command::Trim { trim_point } => (
3012                CommandRecord::TRIM,
3013                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3014            ),
3015        };
3016        Self {
3017            body,
3018            headers: vec![Header::new("", header_value)],
3019            timestamp: value.timestamp,
3020        }
3021    }
3022}
3023
3024#[derive(Debug, Clone)]
3025#[non_exhaustive]
3026/// Input for [`append`](crate::S2Stream::append) operation and
3027/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3028pub struct AppendInput {
3029    /// Batch of records to append atomically.
3030    pub records: AppendRecordBatch,
3031    /// Expected sequence number for the first record in the batch.
3032    ///
3033    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3034    pub match_seq_num: Option<u64>,
3035    /// Fencing token to match against the stream's current fencing token.
3036    ///
3037    /// If unspecified, no matching is performed. If specified and mismatched,
3038    /// the append fails. A stream defaults to `""` as its fencing token.
3039    pub fencing_token: Option<FencingToken>,
3040}
3041
3042impl AppendInput {
3043    /// Create a new [`AppendInput`] with the given batch of records.
3044    pub fn new(records: AppendRecordBatch) -> Self {
3045        Self {
3046            records,
3047            match_seq_num: None,
3048            fencing_token: None,
3049        }
3050    }
3051
3052    /// Set the expected sequence number for the first record in the batch.
3053    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3054        Self {
3055            match_seq_num: Some(match_seq_num),
3056            ..self
3057        }
3058    }
3059
3060    /// Set the fencing token to match against the stream's current fencing token.
3061    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3062        Self {
3063            fencing_token: Some(fencing_token),
3064            ..self
3065        }
3066    }
3067}
3068
3069impl From<AppendInput> for api::stream::proto::AppendInput {
3070    fn from(value: AppendInput) -> Self {
3071        Self {
3072            records: value.records.iter().cloned().map(Into::into).collect(),
3073            match_seq_num: value.match_seq_num,
3074            fencing_token: value.fencing_token.map(|t| t.to_string()),
3075        }
3076    }
3077}
3078
3079#[derive(Debug, Clone, PartialEq)]
3080#[non_exhaustive]
3081/// Acknowledgement for an [`AppendInput`].
3082pub struct AppendAck {
3083    /// Sequence number and timestamp of the first record that was appended.
3084    pub start: StreamPosition,
3085    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3086    /// that was appended.
3087    ///
3088    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3089    /// appended.
3090    pub end: StreamPosition,
3091    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3092    /// the last record on the stream.
3093    ///
3094    /// This can be greater than the `end` position in case of concurrent appends.
3095    pub tail: StreamPosition,
3096}
3097
3098impl From<api::stream::proto::AppendAck> for AppendAck {
3099    fn from(value: api::stream::proto::AppendAck) -> Self {
3100        Self {
3101            start: value.start.unwrap_or_default().into(),
3102            end: value.end.unwrap_or_default().into(),
3103            tail: value.tail.unwrap_or_default().into(),
3104        }
3105    }
3106}
3107
3108#[derive(Debug, Clone, Copy)]
3109/// Starting position for reading from a stream.
3110pub enum ReadFrom {
3111    /// Read from this sequence number.
3112    SeqNum(u64),
3113    /// Read from this timestamp.
3114    Timestamp(u64),
3115    /// Read from N records before the tail.
3116    TailOffset(u64),
3117}
3118
3119impl Default for ReadFrom {
3120    fn default() -> Self {
3121        Self::SeqNum(0)
3122    }
3123}
3124
3125#[derive(Debug, Default, Clone)]
3126#[non_exhaustive]
3127/// Where to start reading.
3128pub struct ReadStart {
3129    /// Starting position.
3130    ///
3131    /// Defaults to reading from sequence number `0`.
3132    pub from: ReadFrom,
3133    /// Whether to start from tail if the requested starting position is beyond it.
3134    ///
3135    /// Defaults to `false` (errors if position is beyond tail).
3136    pub clamp_to_tail: bool,
3137}
3138
3139impl ReadStart {
3140    /// Create a new [`ReadStart`] with default values.
3141    pub fn new() -> Self {
3142        Self::default()
3143    }
3144
3145    /// Set the starting position.
3146    pub fn with_from(self, from: ReadFrom) -> Self {
3147        Self { from, ..self }
3148    }
3149
3150    /// Set whether to start from tail if the requested starting position is beyond it.
3151    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3152        Self {
3153            clamp_to_tail,
3154            ..self
3155        }
3156    }
3157}
3158
3159impl From<ReadStart> for api::stream::ReadStart {
3160    fn from(value: ReadStart) -> Self {
3161        let (seq_num, timestamp, tail_offset) = match value.from {
3162            ReadFrom::SeqNum(n) => (Some(n), None, None),
3163            ReadFrom::Timestamp(t) => (None, Some(t), None),
3164            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3165        };
3166        Self {
3167            seq_num,
3168            timestamp,
3169            tail_offset,
3170            clamp: if value.clamp_to_tail {
3171                Some(true)
3172            } else {
3173                None
3174            },
3175        }
3176    }
3177}
3178
3179#[derive(Debug, Clone, Default)]
3180#[non_exhaustive]
3181/// Limits on how much to read.
3182pub struct ReadLimits {
3183    /// Limit on number of records.
3184    ///
3185    /// Defaults to `1000` for non-streaming read.
3186    pub count: Option<usize>,
3187    /// Limit on total metered bytes of records.
3188    ///
3189    /// Defaults to `1MiB` for non-streaming read.
3190    pub bytes: Option<usize>,
3191}
3192
3193impl ReadLimits {
3194    /// Create a new [`ReadLimits`] with default values.
3195    pub fn new() -> Self {
3196        Self::default()
3197    }
3198
3199    /// Set the limit on number of records.
3200    pub fn with_count(self, count: usize) -> Self {
3201        Self {
3202            count: Some(count),
3203            ..self
3204        }
3205    }
3206
3207    /// Set the limit on total metered bytes of records.
3208    pub fn with_bytes(self, bytes: usize) -> Self {
3209        Self {
3210            bytes: Some(bytes),
3211            ..self
3212        }
3213    }
3214}
3215
3216#[derive(Debug, Clone, Default)]
3217#[non_exhaustive]
3218/// When to stop reading.
3219pub struct ReadStop {
3220    /// Limits on how much to read.
3221    pub limits: ReadLimits,
3222    /// Timestamp at which to stop (exclusive).
3223    pub until: Option<RangeTo<u64>>,
3224    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3225    /// seconds for [`read`](crate::S2Stream::read).
3226    ///
3227    /// Defaults to:
3228    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3229    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3230    ///   is specified.
3231    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3232    ///   `until` is specified.
3233    pub wait: Option<u32>,
3234}
3235
3236impl ReadStop {
3237    /// Create a new [`ReadStop`] with default values.
3238    pub fn new() -> Self {
3239        Self::default()
3240    }
3241
3242    /// Set the limits on how much to read.
3243    pub fn with_limits(self, limits: ReadLimits) -> Self {
3244        Self { limits, ..self }
3245    }
3246
3247    /// Set the timestamp at which to stop (exclusive).
3248    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3249        Self {
3250            until: Some(until),
3251            ..self
3252        }
3253    }
3254
3255    /// Set the duration in seconds to wait for new records before stopping.
3256    pub fn with_wait(self, wait: u32) -> Self {
3257        Self {
3258            wait: Some(wait),
3259            ..self
3260        }
3261    }
3262}
3263
3264impl From<ReadStop> for api::stream::ReadEnd {
3265    fn from(value: ReadStop) -> Self {
3266        Self {
3267            count: value.limits.count,
3268            bytes: value.limits.bytes,
3269            until: value.until.map(|r| r.end),
3270            wait: value.wait,
3271        }
3272    }
3273}
3274
3275#[derive(Debug, Clone, Default)]
3276#[non_exhaustive]
3277/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3278/// operations.
3279pub struct ReadInput {
3280    /// Where to start reading.
3281    pub start: ReadStart,
3282    /// When to stop reading.
3283    pub stop: ReadStop,
3284    /// Whether to filter out command records from the stream when reading.
3285    ///
3286    /// Defaults to `false`.
3287    pub ignore_command_records: bool,
3288}
3289
3290impl ReadInput {
3291    /// Create a new [`ReadInput`] with default values.
3292    pub fn new() -> Self {
3293        Self::default()
3294    }
3295
3296    /// Set where to start reading.
3297    pub fn with_start(self, start: ReadStart) -> Self {
3298        Self { start, ..self }
3299    }
3300
3301    /// Set when to stop reading.
3302    pub fn with_stop(self, stop: ReadStop) -> Self {
3303        Self { stop, ..self }
3304    }
3305
3306    /// Set whether to filter out command records from the stream when reading.
3307    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3308        Self {
3309            ignore_command_records,
3310            ..self
3311        }
3312    }
3313}
3314
3315#[derive(Debug, Clone)]
3316#[non_exhaustive]
3317/// Record that is durably sequenced on a stream.
3318pub struct SequencedRecord {
3319    /// Sequence number assigned to this record.
3320    pub seq_num: u64,
3321    /// Body of this record.
3322    pub body: Bytes,
3323    /// Headers for this record.
3324    pub headers: Vec<Header>,
3325    /// Timestamp for this record.
3326    pub timestamp: u64,
3327}
3328
3329impl SequencedRecord {
3330    /// Whether this is a command record.
3331    pub fn is_command_record(&self) -> bool {
3332        self.headers.len() == 1 && *self.headers[0].name == *b""
3333    }
3334}
3335
3336impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3337    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3338        Self {
3339            seq_num: value.seq_num,
3340            body: value.body,
3341            headers: value.headers.into_iter().map(Into::into).collect(),
3342            timestamp: value.timestamp,
3343        }
3344    }
3345}
3346
3347metered_bytes_impl!(SequencedRecord);
3348
3349#[derive(Debug, Clone)]
3350#[non_exhaustive]
3351/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3352/// [`read_session`](crate::S2Stream::read_session).
3353pub struct ReadBatch {
3354    /// Records that are durably sequenced on the stream.
3355    ///
3356    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3357    /// - the [`stop condition`](ReadInput::stop) was already met, or
3358    /// - all records in the batch were command records and
3359    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3360    pub records: Vec<SequencedRecord>,
3361    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3362    /// the last record.
3363    ///
3364    /// It will only be present when reading recent records.
3365    pub tail: Option<StreamPosition>,
3366}
3367
3368impl ReadBatch {
3369    pub(crate) fn from_api(
3370        batch: api::stream::proto::ReadBatch,
3371        ignore_command_records: bool,
3372    ) -> Self {
3373        Self {
3374            records: batch
3375                .records
3376                .into_iter()
3377                .map(Into::into)
3378                .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3379                .collect(),
3380            tail: batch.tail.map(Into::into),
3381        }
3382    }
3383}
3384
3385/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3386pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3387
3388#[derive(Debug, Clone, thiserror::Error)]
3389/// Why an append condition check failed.
3390pub enum AppendConditionFailed {
3391    #[error("fencing token mismatch, expected: {0}")]
3392    /// Fencing token did not match. Contains the expected fencing token.
3393    FencingTokenMismatch(FencingToken),
3394    #[error("sequence number mismatch, expected: {0}")]
3395    /// Sequence number did not match. Contains the expected sequence number.
3396    SeqNumMismatch(u64),
3397}
3398
3399impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3400    fn from(value: api::stream::AppendConditionFailed) -> Self {
3401        match value {
3402            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3403                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3404            }
3405            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3406                AppendConditionFailed::SeqNumMismatch(seq)
3407            }
3408        }
3409    }
3410}
3411
3412#[derive(Debug, Clone, thiserror::Error)]
3413/// Errors from S2 operations.
3414pub enum S2Error {
3415    #[error("{0}")]
3416    /// Client-side error.
3417    Client(String),
3418    #[error(transparent)]
3419    /// Validation error.
3420    Validation(#[from] ValidationError),
3421    #[error("{0}")]
3422    /// Append condition check failed. Contains the failure reason.
3423    AppendConditionFailed(AppendConditionFailed),
3424    #[error("read from an unwritten position. current tail: {0}")]
3425    /// Read from an unwritten position. Contains the current tail.
3426    ReadUnwritten(StreamPosition),
3427    #[error("{0}")]
3428    /// Other server-side error.
3429    Server(ErrorResponse),
3430}
3431
3432impl From<ApiError> for S2Error {
3433    fn from(err: ApiError) -> Self {
3434        match err {
3435            ApiError::ReadUnwritten(tail_response) => {
3436                Self::ReadUnwritten(tail_response.tail.into())
3437            }
3438            ApiError::AppendConditionFailed(condition_failed) => {
3439                Self::AppendConditionFailed(condition_failed.into())
3440            }
3441            ApiError::Server(_, response) => Self::Server(response.into()),
3442            other => Self::Client(other.to_string()),
3443        }
3444    }
3445}
3446
3447#[derive(Debug, Clone, thiserror::Error)]
3448#[error("{code}: {message}")]
3449#[non_exhaustive]
3450/// Error response from S2 server.
3451pub struct ErrorResponse {
3452    /// Error code.
3453    pub code: String,
3454    /// Error message.
3455    pub message: String,
3456}
3457
3458impl From<ApiErrorResponse> for ErrorResponse {
3459    fn from(response: ApiErrorResponse) -> Self {
3460        Self {
3461            code: response.code,
3462            message: response.message,
3463        }
3464    }
3465}
3466
3467fn idempotency_token() -> String {
3468    uuid::Uuid::new_v4().simple().to_string()
3469}