Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Stream name.
42///
43/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
44pub use s2_common::types::stream::StreamName;
45/// See [`ListStreamsInput::prefix`].
46pub use s2_common::types::stream::StreamNamePrefix;
47/// See [`ListStreamsInput::start_after`].
48pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57/// An RFC 3339 datetime.
58///
59/// It can be created in either of the following ways:
60/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
61/// - Convert from [`time::OffsetDateTime`] using [`TryFrom`]/[`TryInto`].
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl TryFrom<time::OffsetDateTime> for S2DateTime {
66    type Error = ValidationError;
67
68    fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
69        dt.format(&time::format_description::well_known::Rfc3339)
70            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
71        Ok(Self(dt))
72    }
73}
74
75impl From<S2DateTime> for time::OffsetDateTime {
76    fn from(dt: S2DateTime) -> Self {
77        dt.0
78    }
79}
80
81impl FromStr for S2DateTime {
82    type Err = ValidationError;
83
84    fn from_str(s: &str) -> Result<Self, Self::Err> {
85        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
86            .map(Self)
87            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
88    }
89}
90
91impl fmt::Display for S2DateTime {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        write!(
94            f,
95            "{}",
96            self.0
97                .format(&time::format_description::well_known::Rfc3339)
98                .expect("RFC3339 formatting should not fail for S2DateTime")
99        )
100    }
101}
102
103/// Authority for connecting to an S2 basin.
104#[derive(Debug, Clone, PartialEq)]
105pub(crate) enum BasinAuthority {
106    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
107    ParentZone(Authority),
108    /// Direct cell authority. Basin is expected to be hosted by this cell.
109    Direct(Authority),
110}
111
112/// Account endpoint.
113#[derive(Debug, Clone)]
114pub struct AccountEndpoint {
115    scheme: Scheme,
116    authority: Authority,
117}
118
119impl AccountEndpoint {
120    /// Create a new [`AccountEndpoint`] with the given endpoint.
121    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
122        endpoint.parse()
123    }
124}
125
126impl FromStr for AccountEndpoint {
127    type Err = ValidationError;
128
129    fn from_str(s: &str) -> Result<Self, Self::Err> {
130        let (scheme, authority) = match s.find("://") {
131            Some(idx) => {
132                let scheme: Scheme = s[..idx]
133                    .parse()
134                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
135                (scheme, &s[idx + 3..])
136            }
137            None => (Scheme::HTTPS, s),
138        };
139        Ok(Self {
140            scheme,
141            authority: authority
142                .parse()
143                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
144        })
145    }
146}
147
148/// Basin endpoint.
149#[derive(Debug, Clone)]
150pub struct BasinEndpoint {
151    scheme: Scheme,
152    authority: BasinAuthority,
153}
154
155impl BasinEndpoint {
156    /// Create a new [`BasinEndpoint`] with the given endpoint.
157    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
158        endpoint.parse()
159    }
160}
161
162impl FromStr for BasinEndpoint {
163    type Err = ValidationError;
164
165    fn from_str(s: &str) -> Result<Self, Self::Err> {
166        let (scheme, authority) = match s.find("://") {
167            Some(idx) => {
168                let scheme: Scheme = s[..idx]
169                    .parse()
170                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
171                (scheme, &s[idx + 3..])
172            }
173            None => (Scheme::HTTPS, s),
174        };
175        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
176            BasinAuthority::ParentZone(
177                authority
178                    .parse()
179                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
180            )
181        } else {
182            BasinAuthority::Direct(
183                authority
184                    .parse()
185                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
186            )
187        };
188        Ok(Self { scheme, authority })
189    }
190}
191
192#[derive(Debug, Clone)]
193#[non_exhaustive]
194/// Endpoints for the S2 environment.
195pub struct S2Endpoints {
196    pub(crate) scheme: Scheme,
197    pub(crate) account_authority: Authority,
198    pub(crate) basin_authority: BasinAuthority,
199}
200
201impl S2Endpoints {
202    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
203    pub fn new(
204        account_endpoint: AccountEndpoint,
205        basin_endpoint: BasinEndpoint,
206    ) -> Result<Self, ValidationError> {
207        if account_endpoint.scheme != basin_endpoint.scheme {
208            return Err("account and basin endpoints must have the same scheme".into());
209        }
210        Ok(Self {
211            scheme: account_endpoint.scheme,
212            account_authority: account_endpoint.authority,
213            basin_authority: basin_endpoint.authority,
214        })
215    }
216
217    /// Create a new [`S2Endpoints`] from environment variables.
218    ///
219    /// The following environment variables are expected to be set:
220    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
221    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
222    pub fn from_env() -> Result<Self, ValidationError> {
223        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
224            Ok(endpoint) => endpoint.parse()?,
225            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
226            Err(VarError::NotUnicode(_)) => {
227                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
228            }
229        };
230
231        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
232            Ok(endpoint) => endpoint.parse()?,
233            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
234            Err(VarError::NotUnicode(_)) => {
235                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
236            }
237        };
238
239        if account_endpoint.scheme != basin_endpoint.scheme {
240            return Err(
241                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
242            );
243        }
244
245        Ok(Self {
246            scheme: account_endpoint.scheme,
247            account_authority: account_endpoint.authority,
248            basin_authority: basin_endpoint.authority,
249        })
250    }
251
252    pub(crate) fn for_aws() -> Self {
253        Self {
254            scheme: Scheme::HTTPS,
255            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
256            basin_authority: BasinAuthority::ParentZone(
257                "b.aws.s2.dev".try_into().expect("valid authority"),
258            ),
259        }
260    }
261}
262
263#[derive(Debug, Clone, Copy)]
264/// Compression algorithm for request and response bodies.
265pub enum Compression {
266    /// No compression.
267    None,
268    /// Gzip compression.
269    Gzip,
270    /// Zstd compression.
271    Zstd,
272}
273
274impl From<Compression> for CompressionAlgorithm {
275    fn from(value: Compression) -> Self {
276        match value {
277            Compression::None => CompressionAlgorithm::None,
278            Compression::Gzip => CompressionAlgorithm::Gzip,
279            Compression::Zstd => CompressionAlgorithm::Zstd,
280        }
281    }
282}
283
284#[derive(Debug, Clone, Copy, PartialEq)]
285#[non_exhaustive]
286/// Retry policy for [`append`](crate::S2Stream::append) and
287/// [`append_session`](crate::S2Stream::append_session) operations.
288pub enum AppendRetryPolicy {
289    /// Retry all appends. Use when duplicate records on the stream are acceptable.
290    All,
291    /// Retry when it can be determined that the request had no side effects.
292    ///
293    /// Uses a frame-level signal to detect whether any body frames were consumed
294    /// by the HTTP transport. If no frames were sent, the server never saw the
295    /// request, so retry is safe and will not cause duplicate records.
296    ///
297    /// Certain server errors (`rate_limited`, `hot_server`) are also safe to
298    /// retry regardless of frame signal state, since they guarantee no mutation
299    /// occurred.
300    NoSideEffects,
301}
302
303#[derive(Debug, Clone)]
304#[non_exhaustive]
305/// Configuration for retrying requests in case of transient failures.
306///
307/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
308/// ```text
309/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
310///     jitter = rand[0, base_delay]
311///     delay  = base_delay + jitter
312/// ````
313pub struct RetryConfig {
314    /// Total number of attempts including the initial try. A value of `1` means no retries.
315    ///
316    /// Defaults to `3`.
317    pub max_attempts: NonZeroU32,
318    /// Minimum base delay for retries.
319    ///
320    /// Defaults to `100ms`.
321    pub min_base_delay: Duration,
322    /// Maximum base delay for retries.
323    ///
324    /// Defaults to `1s`.
325    pub max_base_delay: Duration,
326    /// Retry policy for [`append`](crate::S2Stream::append) and
327    /// [`append_session`](crate::S2Stream::append_session) operations.
328    ///
329    /// Defaults to `All`.
330    pub append_retry_policy: AppendRetryPolicy,
331}
332
333impl Default for RetryConfig {
334    fn default() -> Self {
335        Self {
336            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
337            min_base_delay: Duration::from_millis(100),
338            max_base_delay: Duration::from_secs(1),
339            append_retry_policy: AppendRetryPolicy::All,
340        }
341    }
342}
343
344impl RetryConfig {
345    /// Create a new [`RetryConfig`] with default settings.
346    pub fn new() -> Self {
347        Self::default()
348    }
349
350    pub(crate) fn max_retries(&self) -> u32 {
351        self.max_attempts.get() - 1
352    }
353
354    /// Set the total number of attempts including the initial try.
355    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
356        Self {
357            max_attempts,
358            ..self
359        }
360    }
361
362    /// Set the minimum base delay for retries.
363    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
364        Self {
365            min_base_delay,
366            ..self
367        }
368    }
369
370    /// Set the maximum base delay for retries.
371    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
372        Self {
373            max_base_delay,
374            ..self
375        }
376    }
377
378    /// Set the retry policy for [`append`](crate::S2Stream::append) and
379    /// [`append_session`](crate::S2Stream::append_session) operations.
380    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
381        Self {
382            append_retry_policy,
383            ..self
384        }
385    }
386}
387
388#[derive(Debug, Clone)]
389#[non_exhaustive]
390/// Configuration for [`S2`](crate::S2).
391pub struct S2Config {
392    pub(crate) access_token: SecretString,
393    pub(crate) endpoints: S2Endpoints,
394    pub(crate) connection_timeout: Duration,
395    pub(crate) request_timeout: Duration,
396    pub(crate) retry: RetryConfig,
397    pub(crate) compression: Compression,
398    pub(crate) user_agent: HeaderValue,
399    pub(crate) insecure_skip_cert_verification: bool,
400}
401
402impl S2Config {
403    /// Create a new [`S2Config`] with the given access token and default settings.
404    pub fn new(access_token: impl Into<String>) -> Self {
405        Self {
406            access_token: access_token.into().into(),
407            endpoints: S2Endpoints::for_aws(),
408            connection_timeout: Duration::from_secs(3),
409            request_timeout: Duration::from_secs(5),
410            retry: RetryConfig::new(),
411            compression: Compression::None,
412            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
413                .parse()
414                .expect("valid user agent"),
415            insecure_skip_cert_verification: false,
416        }
417    }
418
419    /// Set the S2 endpoints to connect to.
420    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
421        Self { endpoints, ..self }
422    }
423
424    /// Set the timeout for establishing a connection to the server.
425    ///
426    /// Defaults to `3s`.
427    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
428        Self {
429            connection_timeout,
430            ..self
431        }
432    }
433
434    /// Set the timeout for requests.
435    ///
436    /// Defaults to `5s`.
437    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
438        Self {
439            request_timeout,
440            ..self
441        }
442    }
443
444    /// Set the retry configuration for requests.
445    ///
446    /// See [`RetryConfig`] for defaults.
447    pub fn with_retry(self, retry: RetryConfig) -> Self {
448        Self { retry, ..self }
449    }
450
451    /// Set the compression algorithm for requests and responses.
452    ///
453    /// Defaults to no compression.
454    pub fn with_compression(self, compression: Compression) -> Self {
455        Self {
456            compression,
457            ..self
458        }
459    }
460
461    /// Skip TLS certificate verification (insecure).
462    ///
463    /// This is useful for connecting to endpoints with self-signed certificates
464    /// or certificates that don't match the hostname (similar to `curl -k`).
465    ///
466    /// # Warning
467    ///
468    /// This disables certificate verification and should only be used for
469    /// testing or development purposes. **Never use this in production.**
470    ///
471    /// Defaults to `false`.
472    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
473        Self {
474            insecure_skip_cert_verification: skip,
475            ..self
476        }
477    }
478
479    #[doc(hidden)]
480    #[cfg(feature = "_hidden")]
481    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
482        let user_agent = user_agent
483            .into()
484            .parse()
485            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
486        Ok(Self { user_agent, ..self })
487    }
488}
489
490#[derive(Debug, Default, Clone, PartialEq, Eq)]
491#[non_exhaustive]
492/// A page of values.
493pub struct Page<T> {
494    /// Values in this page.
495    pub values: Vec<T>,
496    /// Whether there are more pages.
497    pub has_more: bool,
498}
499
500impl<T> Page<T> {
501    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
502        Self {
503            values: values.into(),
504            has_more,
505        }
506    }
507}
508
509#[derive(Debug, Clone, Copy, PartialEq, Eq)]
510/// Storage class for recent appends.
511pub enum StorageClass {
512    /// Standard storage class that offers append latencies under `500ms`.
513    Standard,
514    /// Express storage class that offers append latencies under `50ms`.
515    Express,
516}
517
518impl From<api::config::StorageClass> for StorageClass {
519    fn from(value: api::config::StorageClass) -> Self {
520        match value {
521            api::config::StorageClass::Standard => StorageClass::Standard,
522            api::config::StorageClass::Express => StorageClass::Express,
523        }
524    }
525}
526
527impl From<StorageClass> for api::config::StorageClass {
528    fn from(value: StorageClass) -> Self {
529        match value {
530            StorageClass::Standard => api::config::StorageClass::Standard,
531            StorageClass::Express => api::config::StorageClass::Express,
532        }
533    }
534}
535
536#[derive(Debug, Clone, Copy, PartialEq, Eq)]
537/// Retention policy for records in a stream.
538pub enum RetentionPolicy {
539    /// Age in seconds. Records older than this age are automatically trimmed.
540    Age(u64),
541    /// Records are retained indefinitely unless explicitly trimmed.
542    Infinite,
543}
544
545impl From<api::config::RetentionPolicy> for RetentionPolicy {
546    fn from(value: api::config::RetentionPolicy) -> Self {
547        match value {
548            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
549            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
550        }
551    }
552}
553
554impl From<RetentionPolicy> for api::config::RetentionPolicy {
555    fn from(value: RetentionPolicy) -> Self {
556        match value {
557            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
558            RetentionPolicy::Infinite => {
559                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
560            }
561        }
562    }
563}
564
565#[derive(Debug, Clone, Copy, PartialEq, Eq)]
566/// Timestamping mode for appends that influences how timestamps are handled.
567pub enum TimestampingMode {
568    /// Prefer client-specified timestamp if present otherwise use arrival time.
569    ClientPrefer,
570    /// Require a client-specified timestamp and reject the append if it is missing.
571    ClientRequire,
572    /// Use the arrival time and ignore any client-specified timestamp.
573    Arrival,
574}
575
576impl From<api::config::TimestampingMode> for TimestampingMode {
577    fn from(value: api::config::TimestampingMode) -> Self {
578        match value {
579            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
580            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
581            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
582        }
583    }
584}
585
586impl From<TimestampingMode> for api::config::TimestampingMode {
587    fn from(value: TimestampingMode) -> Self {
588        match value {
589            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
590            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
591            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
592        }
593    }
594}
595
596#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
597#[non_exhaustive]
598/// Configuration for timestamping behavior.
599pub struct TimestampingConfig {
600    /// Timestamping mode for appends that influences how timestamps are handled.
601    ///
602    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
603    pub mode: Option<TimestampingMode>,
604    /// Whether client-specified timestamps are allowed to exceed the arrival time.
605    ///
606    /// Defaults to `false` (client timestamps are capped at the arrival time).
607    pub uncapped: bool,
608}
609
610impl TimestampingConfig {
611    /// Create a new [`TimestampingConfig`] with default settings.
612    pub fn new() -> Self {
613        Self::default()
614    }
615
616    /// Set the timestamping mode for appends that influences how timestamps are handled.
617    pub fn with_mode(self, mode: TimestampingMode) -> Self {
618        Self {
619            mode: Some(mode),
620            ..self
621        }
622    }
623
624    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
625    pub fn with_uncapped(self, uncapped: bool) -> Self {
626        Self { uncapped, ..self }
627    }
628}
629
630impl From<api::config::TimestampingConfig> for TimestampingConfig {
631    fn from(value: api::config::TimestampingConfig) -> Self {
632        Self {
633            mode: value.mode.map(Into::into),
634            uncapped: value.uncapped.unwrap_or_default(),
635        }
636    }
637}
638
639impl From<TimestampingConfig> for api::config::TimestampingConfig {
640    fn from(value: TimestampingConfig) -> Self {
641        Self {
642            mode: value.mode.map(Into::into),
643            uncapped: Some(value.uncapped),
644        }
645    }
646}
647
648#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
649#[non_exhaustive]
650/// Configuration for automatically deleting a stream when it becomes empty.
651pub struct DeleteOnEmptyConfig {
652    /// Minimum age in seconds before an empty stream can be deleted.
653    ///
654    /// Defaults to `0` (disables automatic deletion).
655    pub min_age_secs: u64,
656}
657
658impl DeleteOnEmptyConfig {
659    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
660    pub fn new() -> Self {
661        Self::default()
662    }
663
664    /// Set the minimum age in seconds before an empty stream can be deleted.
665    pub fn with_min_age(self, min_age: Duration) -> Self {
666        Self {
667            min_age_secs: min_age.as_secs(),
668        }
669    }
670}
671
672impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
673    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
674        Self {
675            min_age_secs: value.min_age_secs,
676        }
677    }
678}
679
680impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
681    fn from(value: DeleteOnEmptyConfig) -> Self {
682        Self {
683            min_age_secs: value.min_age_secs,
684        }
685    }
686}
687
688#[derive(Debug, Clone, Default, PartialEq, Eq)]
689#[non_exhaustive]
690/// Configuration for a stream.
691pub struct StreamConfig {
692    /// Storage class for the stream.
693    ///
694    /// Defaults to [`Express`](StorageClass::Express).
695    pub storage_class: Option<StorageClass>,
696    /// Retention policy for records in the stream.
697    ///
698    /// Defaults to `7 days` of retention.
699    pub retention_policy: Option<RetentionPolicy>,
700    /// Configuration for timestamping behavior.
701    ///
702    /// See [`TimestampingConfig`] for defaults.
703    pub timestamping: Option<TimestampingConfig>,
704    /// Configuration for automatically deleting the stream when it becomes empty.
705    ///
706    /// See [`DeleteOnEmptyConfig`] for defaults.
707    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
708}
709
710impl StreamConfig {
711    /// Create a new [`StreamConfig`] with default settings.
712    pub fn new() -> Self {
713        Self::default()
714    }
715
716    /// Set the storage class for the stream.
717    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
718        Self {
719            storage_class: Some(storage_class),
720            ..self
721        }
722    }
723
724    /// Set the retention policy for records in the stream.
725    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
726        Self {
727            retention_policy: Some(retention_policy),
728            ..self
729        }
730    }
731
732    /// Set the configuration for timestamping behavior.
733    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
734        Self {
735            timestamping: Some(timestamping),
736            ..self
737        }
738    }
739
740    /// Set the configuration for automatically deleting the stream when it becomes empty.
741    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
742        Self {
743            delete_on_empty: Some(delete_on_empty),
744            ..self
745        }
746    }
747}
748
749impl From<api::config::StreamConfig> for StreamConfig {
750    fn from(value: api::config::StreamConfig) -> Self {
751        Self {
752            storage_class: value.storage_class.map(Into::into),
753            retention_policy: value.retention_policy.map(Into::into),
754            timestamping: value.timestamping.map(Into::into),
755            delete_on_empty: value.delete_on_empty.map(Into::into),
756        }
757    }
758}
759
760impl From<StreamConfig> for api::config::StreamConfig {
761    fn from(value: StreamConfig) -> Self {
762        Self {
763            storage_class: value.storage_class.map(Into::into),
764            retention_policy: value.retention_policy.map(Into::into),
765            timestamping: value.timestamping.map(Into::into),
766            delete_on_empty: value.delete_on_empty.map(Into::into),
767        }
768    }
769}
770
771#[derive(Debug, Clone, Default)]
772#[non_exhaustive]
773/// Configuration for a basin.
774pub struct BasinConfig {
775    /// Default configuration for all streams in the basin.
776    ///
777    /// See [`StreamConfig`] for defaults.
778    pub default_stream_config: Option<StreamConfig>,
779    /// Whether to create stream on append if it doesn't exist using default stream configuration.
780    ///
781    /// Defaults to `false`.
782    pub create_stream_on_append: bool,
783    /// Whether to create stream on read if it doesn't exist using default stream configuration.
784    ///
785    /// Defaults to `false`.
786    pub create_stream_on_read: bool,
787}
788
789impl BasinConfig {
790    /// Create a new [`BasinConfig`] with default settings.
791    pub fn new() -> Self {
792        Self::default()
793    }
794
795    /// Set the default configuration for all streams in the basin.
796    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
797        Self {
798            default_stream_config: Some(config),
799            ..self
800        }
801    }
802
803    /// Set whether to create stream on append if it doesn't exist using default stream
804    /// configuration.
805    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
806        Self {
807            create_stream_on_append,
808            ..self
809        }
810    }
811
812    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
813    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
814        Self {
815            create_stream_on_read,
816            ..self
817        }
818    }
819}
820
821impl From<api::config::BasinConfig> for BasinConfig {
822    fn from(value: api::config::BasinConfig) -> Self {
823        Self {
824            default_stream_config: value.default_stream_config.map(Into::into),
825            create_stream_on_append: value.create_stream_on_append,
826            create_stream_on_read: value.create_stream_on_read,
827        }
828    }
829}
830
831impl From<BasinConfig> for api::config::BasinConfig {
832    fn from(value: BasinConfig) -> Self {
833        Self {
834            default_stream_config: value.default_stream_config.map(Into::into),
835            create_stream_on_append: value.create_stream_on_append,
836            create_stream_on_read: value.create_stream_on_read,
837        }
838    }
839}
840
841#[derive(Debug, Clone, PartialEq, Eq)]
842/// Scope of a basin.
843pub enum BasinScope {
844    /// AWS `us-east-1` region.
845    AwsUsEast1,
846}
847
848impl From<api::basin::BasinScope> for BasinScope {
849    fn from(value: api::basin::BasinScope) -> Self {
850        match value {
851            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
852        }
853    }
854}
855
856impl From<BasinScope> for api::basin::BasinScope {
857    fn from(value: BasinScope) -> Self {
858        match value {
859            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
860        }
861    }
862}
863
864/// Result of a create-or-reconfigure operation.
865///
866/// Indicates whether the resource was newly created or already existed and was
867/// reconfigured. Both variants hold the resource's current state.
868#[doc(hidden)]
869#[cfg(feature = "_hidden")]
870#[derive(Debug, Clone, PartialEq, Eq)]
871pub enum CreateOrReconfigured<T> {
872    /// Resource was newly created.
873    Created(T),
874    /// Resource already existed and was reconfigured to match the spec.
875    Reconfigured(T),
876}
877
878#[cfg(feature = "_hidden")]
879impl<T> CreateOrReconfigured<T> {
880    /// Returns `true` if the resource was newly created.
881    pub fn is_created(&self) -> bool {
882        matches!(self, Self::Created(_))
883    }
884
885    /// Unwrap the inner value regardless of variant.
886    pub fn into_inner(self) -> T {
887        match self {
888            Self::Created(t) | Self::Reconfigured(t) => t,
889        }
890    }
891}
892
893#[derive(Debug, Clone)]
894#[non_exhaustive]
895/// Input for [`create_basin`](crate::S2::create_basin) operation.
896pub struct CreateBasinInput {
897    /// Basin name.
898    pub name: BasinName,
899    /// Configuration for the basin.
900    ///
901    /// See [`BasinConfig`] for defaults.
902    pub config: Option<BasinConfig>,
903    /// Scope of the basin.
904    ///
905    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
906    pub scope: Option<BasinScope>,
907    idempotency_token: String,
908}
909
910impl CreateBasinInput {
911    /// Create a new [`CreateBasinInput`] with the given basin name.
912    pub fn new(name: BasinName) -> Self {
913        Self {
914            name,
915            config: None,
916            scope: None,
917            idempotency_token: idempotency_token(),
918        }
919    }
920
921    /// Set the configuration for the basin.
922    pub fn with_config(self, config: BasinConfig) -> Self {
923        Self {
924            config: Some(config),
925            ..self
926        }
927    }
928
929    /// Set the scope of the basin.
930    pub fn with_scope(self, scope: BasinScope) -> Self {
931        Self {
932            scope: Some(scope),
933            ..self
934        }
935    }
936}
937
938impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
939    fn from(value: CreateBasinInput) -> Self {
940        (
941            api::basin::CreateBasinRequest {
942                basin: value.name,
943                config: value.config.map(Into::into),
944                scope: value.scope.map(Into::into),
945            },
946            value.idempotency_token,
947        )
948    }
949}
950
951#[derive(Debug, Clone)]
952#[non_exhaustive]
953/// Input for [`create_or_reconfigure_basin`](crate::S2::create_or_reconfigure_basin) operation.
954#[doc(hidden)]
955#[cfg(feature = "_hidden")]
956pub struct CreateOrReconfigureBasinInput {
957    /// Basin name.
958    pub name: BasinName,
959    /// Reconfiguration for the basin.
960    ///
961    /// If `None`, the basin is created with default configuration or left unchanged if it exists.
962    pub config: Option<BasinReconfiguration>,
963    /// Scope of the basin.
964    ///
965    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once set.
966    pub scope: Option<BasinScope>,
967}
968
969#[cfg(feature = "_hidden")]
970impl CreateOrReconfigureBasinInput {
971    /// Create a new [`CreateOrReconfigureBasinInput`] with the given basin name.
972    pub fn new(name: BasinName) -> Self {
973        Self {
974            name,
975            config: None,
976            scope: None,
977        }
978    }
979
980    /// Set the reconfiguration for the basin.
981    pub fn with_config(self, config: BasinReconfiguration) -> Self {
982        Self {
983            config: Some(config),
984            ..self
985        }
986    }
987
988    /// Set the scope of the basin.
989    pub fn with_scope(self, scope: BasinScope) -> Self {
990        Self {
991            scope: Some(scope),
992            ..self
993        }
994    }
995}
996
997#[cfg(feature = "_hidden")]
998impl From<CreateOrReconfigureBasinInput>
999    for (
1000        BasinName,
1001        Option<api::basin::CreateOrReconfigureBasinRequest>,
1002    )
1003{
1004    fn from(value: CreateOrReconfigureBasinInput) -> Self {
1005        let request = if value.config.is_some() || value.scope.is_some() {
1006            Some(api::basin::CreateOrReconfigureBasinRequest {
1007                config: value.config.map(Into::into),
1008                scope: value.scope.map(Into::into),
1009            })
1010        } else {
1011            None
1012        };
1013        (value.name, request)
1014    }
1015}
1016
1017#[derive(Debug, Clone, Default)]
1018#[non_exhaustive]
1019/// Input for [`list_basins`](crate::S2::list_basins) operation.
1020pub struct ListBasinsInput {
1021    /// Filter basins whose names begin with this value.
1022    ///
1023    /// Defaults to `""`.
1024    pub prefix: BasinNamePrefix,
1025    /// Filter basins whose names are lexicographically greater than this value.
1026    ///
1027    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
1028    ///
1029    /// Defaults to `""`.
1030    pub start_after: BasinNameStartAfter,
1031    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
1032    ///
1033    /// Defaults to `1000`.
1034    pub limit: Option<usize>,
1035}
1036
1037impl ListBasinsInput {
1038    /// Create a new [`ListBasinsInput`] with default values.
1039    pub fn new() -> Self {
1040        Self::default()
1041    }
1042
1043    /// Set the prefix used to filter basins whose names begin with this value.
1044    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1045        Self { prefix, ..self }
1046    }
1047
1048    /// Set the value used to filter basins whose names are lexicographically greater than this
1049    /// value.
1050    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1051        Self {
1052            start_after,
1053            ..self
1054        }
1055    }
1056
1057    /// Set the limit on number of basins to return in a page.
1058    pub fn with_limit(self, limit: usize) -> Self {
1059        Self {
1060            limit: Some(limit),
1061            ..self
1062        }
1063    }
1064}
1065
1066impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1067    fn from(value: ListBasinsInput) -> Self {
1068        Self {
1069            prefix: Some(value.prefix),
1070            start_after: Some(value.start_after),
1071            limit: value.limit,
1072        }
1073    }
1074}
1075
1076#[derive(Debug, Clone, Default)]
1077/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
1078pub struct ListAllBasinsInput {
1079    /// Filter basins whose names begin with this value.
1080    ///
1081    /// Defaults to `""`.
1082    pub prefix: BasinNamePrefix,
1083    /// Filter basins whose names are lexicographically greater than this value.
1084    ///
1085    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
1086    ///
1087    /// Defaults to `""`.
1088    pub start_after: BasinNameStartAfter,
1089    /// Whether to include basins that are being deleted.
1090    ///
1091    /// Defaults to `false`.
1092    pub include_deleted: bool,
1093}
1094
1095impl ListAllBasinsInput {
1096    /// Create a new [`ListAllBasinsInput`] with default values.
1097    pub fn new() -> Self {
1098        Self::default()
1099    }
1100
1101    /// Set the prefix used to filter basins whose names begin with this value.
1102    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1103        Self { prefix, ..self }
1104    }
1105
1106    /// Set the value used to filter basins whose names are lexicographically greater than this
1107    /// value.
1108    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1109        Self {
1110            start_after,
1111            ..self
1112        }
1113    }
1114
1115    /// Set whether to include basins that are being deleted.
1116    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1117        Self {
1118            include_deleted,
1119            ..self
1120        }
1121    }
1122}
1123
1124#[derive(Debug, Clone, PartialEq, Eq)]
1125/// Current state of a basin.
1126pub enum BasinState {
1127    /// Active
1128    Active,
1129    /// Creating
1130    Creating,
1131    /// Deleting
1132    Deleting,
1133}
1134
1135impl From<api::basin::BasinState> for BasinState {
1136    fn from(value: api::basin::BasinState) -> Self {
1137        match value {
1138            api::basin::BasinState::Active => BasinState::Active,
1139            api::basin::BasinState::Creating => BasinState::Creating,
1140            api::basin::BasinState::Deleting => BasinState::Deleting,
1141        }
1142    }
1143}
1144
1145#[derive(Debug, Clone, PartialEq, Eq)]
1146#[non_exhaustive]
1147/// Basin information.
1148pub struct BasinInfo {
1149    /// Basin name.
1150    pub name: BasinName,
1151    /// Scope of the basin.
1152    pub scope: Option<BasinScope>,
1153    /// Current state of the basin.
1154    pub state: BasinState,
1155}
1156
1157impl From<api::basin::BasinInfo> for BasinInfo {
1158    fn from(value: api::basin::BasinInfo) -> Self {
1159        Self {
1160            name: value.name,
1161            scope: value.scope.map(Into::into),
1162            state: value.state.into(),
1163        }
1164    }
1165}
1166
1167#[derive(Debug, Clone)]
1168#[non_exhaustive]
1169/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1170pub struct DeleteBasinInput {
1171    /// Basin name.
1172    pub name: BasinName,
1173    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1174    pub ignore_not_found: bool,
1175}
1176
1177impl DeleteBasinInput {
1178    /// Create a new [`DeleteBasinInput`] with the given basin name.
1179    pub fn new(name: BasinName) -> Self {
1180        Self {
1181            name,
1182            ignore_not_found: false,
1183        }
1184    }
1185
1186    /// Set whether to ignore `Not Found` error if the basin is not existing.
1187    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1188        Self {
1189            ignore_not_found,
1190            ..self
1191        }
1192    }
1193}
1194
1195#[derive(Debug, Clone, Default)]
1196#[non_exhaustive]
1197/// Reconfiguration for [`TimestampingConfig`].
1198pub struct TimestampingReconfiguration {
1199    /// Override for the existing [`mode`](TimestampingConfig::mode).
1200    pub mode: Maybe<Option<TimestampingMode>>,
1201    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1202    pub uncapped: Maybe<Option<bool>>,
1203}
1204
1205impl TimestampingReconfiguration {
1206    /// Create a new [`TimestampingReconfiguration`].
1207    pub fn new() -> Self {
1208        Self::default()
1209    }
1210
1211    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1212    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1213        Self {
1214            mode: Maybe::Specified(Some(mode)),
1215            ..self
1216        }
1217    }
1218
1219    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1220    pub fn with_uncapped(self, uncapped: bool) -> Self {
1221        Self {
1222            uncapped: Maybe::Specified(Some(uncapped)),
1223            ..self
1224        }
1225    }
1226}
1227
1228impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1229    fn from(value: TimestampingReconfiguration) -> Self {
1230        Self {
1231            mode: value.mode.map(|m| m.map(Into::into)),
1232            uncapped: value.uncapped,
1233        }
1234    }
1235}
1236
1237#[derive(Debug, Clone, Default)]
1238#[non_exhaustive]
1239/// Reconfiguration for [`DeleteOnEmptyConfig`].
1240pub struct DeleteOnEmptyReconfiguration {
1241    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1242    pub min_age_secs: Maybe<Option<u64>>,
1243}
1244
1245impl DeleteOnEmptyReconfiguration {
1246    /// Create a new [`DeleteOnEmptyReconfiguration`].
1247    pub fn new() -> Self {
1248        Self::default()
1249    }
1250
1251    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1252    pub fn with_min_age(self, min_age: Duration) -> Self {
1253        Self {
1254            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1255        }
1256    }
1257}
1258
1259impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1260    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1261        Self {
1262            min_age_secs: value.min_age_secs,
1263        }
1264    }
1265}
1266
1267#[derive(Debug, Clone, Default)]
1268#[non_exhaustive]
1269/// Reconfiguration for [`StreamConfig`].
1270pub struct StreamReconfiguration {
1271    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1272    pub storage_class: Maybe<Option<StorageClass>>,
1273    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1274    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1275    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1276    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1277    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1278    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1279}
1280
1281impl StreamReconfiguration {
1282    /// Create a new [`StreamReconfiguration`].
1283    pub fn new() -> Self {
1284        Self::default()
1285    }
1286
1287    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1288    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1289        Self {
1290            storage_class: Maybe::Specified(Some(storage_class)),
1291            ..self
1292        }
1293    }
1294
1295    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1296    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1297        Self {
1298            retention_policy: Maybe::Specified(Some(retention_policy)),
1299            ..self
1300        }
1301    }
1302
1303    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1304    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1305        Self {
1306            timestamping: Maybe::Specified(Some(timestamping)),
1307            ..self
1308        }
1309    }
1310
1311    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1312    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1313        Self {
1314            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1315            ..self
1316        }
1317    }
1318}
1319
1320impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1321    fn from(value: StreamReconfiguration) -> Self {
1322        Self {
1323            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1324            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1325            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1326            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1327        }
1328    }
1329}
1330
1331#[derive(Debug, Clone, Default)]
1332#[non_exhaustive]
1333/// Reconfiguration for [`BasinConfig`].
1334pub struct BasinReconfiguration {
1335    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1336    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1337    /// Override for the existing
1338    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1339    pub create_stream_on_append: Maybe<bool>,
1340    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1341    pub create_stream_on_read: Maybe<bool>,
1342}
1343
1344impl BasinReconfiguration {
1345    /// Create a new [`BasinReconfiguration`].
1346    pub fn new() -> Self {
1347        Self::default()
1348    }
1349
1350    /// Set the override for the existing
1351    /// [`default_stream_config`](BasinConfig::default_stream_config).
1352    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1353        Self {
1354            default_stream_config: Maybe::Specified(Some(config)),
1355            ..self
1356        }
1357    }
1358
1359    /// Set the override for the existing
1360    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1361    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1362        Self {
1363            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1364            ..self
1365        }
1366    }
1367
1368    /// Set the override for the existing
1369    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1370    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1371        Self {
1372            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1373            ..self
1374        }
1375    }
1376}
1377
1378impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1379    fn from(value: BasinReconfiguration) -> Self {
1380        Self {
1381            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1382            create_stream_on_append: value.create_stream_on_append,
1383            create_stream_on_read: value.create_stream_on_read,
1384        }
1385    }
1386}
1387
1388#[derive(Debug, Clone)]
1389#[non_exhaustive]
1390/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1391pub struct ReconfigureBasinInput {
1392    /// Basin name.
1393    pub name: BasinName,
1394    /// Reconfiguration for [`BasinConfig`].
1395    pub config: BasinReconfiguration,
1396}
1397
1398impl ReconfigureBasinInput {
1399    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1400    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1401        Self { name, config }
1402    }
1403}
1404
1405#[derive(Debug, Clone, Default)]
1406#[non_exhaustive]
1407/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1408pub struct ListAccessTokensInput {
1409    /// Filter access tokens whose IDs begin with this value.
1410    ///
1411    /// Defaults to `""`.
1412    pub prefix: AccessTokenIdPrefix,
1413    /// Filter access tokens whose IDs are lexicographically greater than this value.
1414    ///
1415    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1416    ///
1417    /// Defaults to `""`.
1418    pub start_after: AccessTokenIdStartAfter,
1419    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1420    ///
1421    /// Defaults to `1000`.
1422    pub limit: Option<usize>,
1423}
1424
1425impl ListAccessTokensInput {
1426    /// Create a new [`ListAccessTokensInput`] with default values.
1427    pub fn new() -> Self {
1428        Self::default()
1429    }
1430
1431    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1432    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1433        Self { prefix, ..self }
1434    }
1435
1436    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1437    /// value.
1438    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1439        Self {
1440            start_after,
1441            ..self
1442        }
1443    }
1444
1445    /// Set the limit on number of access tokens to return in a page.
1446    pub fn with_limit(self, limit: usize) -> Self {
1447        Self {
1448            limit: Some(limit),
1449            ..self
1450        }
1451    }
1452}
1453
1454impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1455    fn from(value: ListAccessTokensInput) -> Self {
1456        Self {
1457            prefix: Some(value.prefix),
1458            start_after: Some(value.start_after),
1459            limit: value.limit,
1460        }
1461    }
1462}
1463
1464#[derive(Debug, Clone, Default)]
1465/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1466pub struct ListAllAccessTokensInput {
1467    /// Filter access tokens whose IDs begin with this value.
1468    ///
1469    /// Defaults to `""`.
1470    pub prefix: AccessTokenIdPrefix,
1471    /// Filter access tokens whose IDs are lexicographically greater than this value.
1472    ///
1473    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1474    ///
1475    /// Defaults to `""`.
1476    pub start_after: AccessTokenIdStartAfter,
1477}
1478
1479impl ListAllAccessTokensInput {
1480    /// Create a new [`ListAllAccessTokensInput`] with default values.
1481    pub fn new() -> Self {
1482        Self::default()
1483    }
1484
1485    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1486    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1487        Self { prefix, ..self }
1488    }
1489
1490    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1491    /// this value.
1492    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1493        Self {
1494            start_after,
1495            ..self
1496        }
1497    }
1498}
1499
1500#[derive(Debug, Clone)]
1501#[non_exhaustive]
1502/// Access token information.
1503pub struct AccessTokenInfo {
1504    /// Access token ID.
1505    pub id: AccessTokenId,
1506    /// Expiration time.
1507    pub expires_at: S2DateTime,
1508    /// Whether to automatically prefix stream names during creation and strip the prefix during
1509    /// listing.
1510    pub auto_prefix_streams: bool,
1511    /// Scope of the access token.
1512    pub scope: AccessTokenScope,
1513}
1514
1515impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1516    type Error = ValidationError;
1517
1518    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1519        let expires_at = value
1520            .expires_at
1521            .map(S2DateTime::try_from)
1522            .transpose()?
1523            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1524        Ok(Self {
1525            id: value.id,
1526            expires_at,
1527            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1528            scope: value.scope.into(),
1529        })
1530    }
1531}
1532
1533#[derive(Debug, Clone)]
1534/// Pattern for matching basins.
1535///
1536/// See [`AccessTokenScope::basins`].
1537pub enum BasinMatcher {
1538    /// Match no basins.
1539    None,
1540    /// Match exactly this basin.
1541    Exact(BasinName),
1542    /// Match all basins with this prefix.
1543    Prefix(BasinNamePrefix),
1544}
1545
1546#[derive(Debug, Clone)]
1547/// Pattern for matching streams.
1548///
1549/// See [`AccessTokenScope::streams`].
1550pub enum StreamMatcher {
1551    /// Match no streams.
1552    None,
1553    /// Match exactly this stream.
1554    Exact(StreamName),
1555    /// Match all streams with this prefix.
1556    Prefix(StreamNamePrefix),
1557}
1558
1559#[derive(Debug, Clone)]
1560/// Pattern for matching access tokens.
1561///
1562/// See [`AccessTokenScope::access_tokens`].
1563pub enum AccessTokenMatcher {
1564    /// Match no access tokens.
1565    None,
1566    /// Match exactly this access token.
1567    Exact(AccessTokenId),
1568    /// Match all access tokens with this prefix.
1569    Prefix(AccessTokenIdPrefix),
1570}
1571
1572#[derive(Debug, Clone, Default)]
1573#[non_exhaustive]
1574/// Permissions indicating allowed operations.
1575pub struct ReadWritePermissions {
1576    /// Read permission.
1577    ///
1578    /// Defaults to `false`.
1579    pub read: bool,
1580    /// Write permission.
1581    ///
1582    /// Defaults to `false`.
1583    pub write: bool,
1584}
1585
1586impl ReadWritePermissions {
1587    /// Create a new [`ReadWritePermissions`] with default values.
1588    pub fn new() -> Self {
1589        Self::default()
1590    }
1591
1592    /// Create read-only permissions.
1593    pub fn read_only() -> Self {
1594        Self {
1595            read: true,
1596            write: false,
1597        }
1598    }
1599
1600    /// Create write-only permissions.
1601    pub fn write_only() -> Self {
1602        Self {
1603            read: false,
1604            write: true,
1605        }
1606    }
1607
1608    /// Create read-write permissions.
1609    pub fn read_write() -> Self {
1610        Self {
1611            read: true,
1612            write: true,
1613        }
1614    }
1615}
1616
1617impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1618    fn from(value: ReadWritePermissions) -> Self {
1619        Self {
1620            read: Some(value.read),
1621            write: Some(value.write),
1622        }
1623    }
1624}
1625
1626impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1627    fn from(value: api::access::ReadWritePermissions) -> Self {
1628        Self {
1629            read: value.read.unwrap_or_default(),
1630            write: value.write.unwrap_or_default(),
1631        }
1632    }
1633}
1634
1635#[derive(Debug, Clone, Default)]
1636#[non_exhaustive]
1637/// Permissions at the operation group level.
1638///
1639/// See [`AccessTokenScope::op_group_perms`].
1640pub struct OperationGroupPermissions {
1641    /// Account-level access permissions.
1642    ///
1643    /// Defaults to `None`.
1644    pub account: Option<ReadWritePermissions>,
1645    /// Basin-level access permissions.
1646    ///
1647    /// Defaults to `None`.
1648    pub basin: Option<ReadWritePermissions>,
1649    /// Stream-level access permissions.
1650    ///
1651    /// Defaults to `None`.
1652    pub stream: Option<ReadWritePermissions>,
1653}
1654
1655impl OperationGroupPermissions {
1656    /// Create a new [`OperationGroupPermissions`] with default values.
1657    pub fn new() -> Self {
1658        Self::default()
1659    }
1660
1661    /// Create read-only permissions for all groups.
1662    pub fn read_only_all() -> Self {
1663        Self {
1664            account: Some(ReadWritePermissions::read_only()),
1665            basin: Some(ReadWritePermissions::read_only()),
1666            stream: Some(ReadWritePermissions::read_only()),
1667        }
1668    }
1669
1670    /// Create write-only permissions for all groups.
1671    pub fn write_only_all() -> Self {
1672        Self {
1673            account: Some(ReadWritePermissions::write_only()),
1674            basin: Some(ReadWritePermissions::write_only()),
1675            stream: Some(ReadWritePermissions::write_only()),
1676        }
1677    }
1678
1679    /// Create read-write permissions for all groups.
1680    pub fn read_write_all() -> Self {
1681        Self {
1682            account: Some(ReadWritePermissions::read_write()),
1683            basin: Some(ReadWritePermissions::read_write()),
1684            stream: Some(ReadWritePermissions::read_write()),
1685        }
1686    }
1687
1688    /// Set account-level access permissions.
1689    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1690        Self {
1691            account: Some(account),
1692            ..self
1693        }
1694    }
1695
1696    /// Set basin-level access permissions.
1697    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1698        Self {
1699            basin: Some(basin),
1700            ..self
1701        }
1702    }
1703
1704    /// Set stream-level access permissions.
1705    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1706        Self {
1707            stream: Some(stream),
1708            ..self
1709        }
1710    }
1711}
1712
1713impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1714    fn from(value: OperationGroupPermissions) -> Self {
1715        Self {
1716            account: value.account.map(Into::into),
1717            basin: value.basin.map(Into::into),
1718            stream: value.stream.map(Into::into),
1719        }
1720    }
1721}
1722
1723impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1724    fn from(value: api::access::PermittedOperationGroups) -> Self {
1725        Self {
1726            account: value.account.map(Into::into),
1727            basin: value.basin.map(Into::into),
1728            stream: value.stream.map(Into::into),
1729        }
1730    }
1731}
1732
1733#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1734/// Individual operation that can be permitted.
1735///
1736/// See [`AccessTokenScope::ops`].
1737pub enum Operation {
1738    /// List basins.
1739    ListBasins,
1740    /// Create a basin.
1741    CreateBasin,
1742    /// Get basin configuration.
1743    GetBasinConfig,
1744    /// Delete a basin.
1745    DeleteBasin,
1746    /// Reconfigure a basin.
1747    ReconfigureBasin,
1748    /// List access tokens.
1749    ListAccessTokens,
1750    /// Issue an access token.
1751    IssueAccessToken,
1752    /// Revoke an access token.
1753    RevokeAccessToken,
1754    /// Get account metrics.
1755    GetAccountMetrics,
1756    /// Get basin metrics.
1757    GetBasinMetrics,
1758    /// Get stream metrics.
1759    GetStreamMetrics,
1760    /// List streams.
1761    ListStreams,
1762    /// Create a stream.
1763    CreateStream,
1764    /// Get stream configuration.
1765    GetStreamConfig,
1766    /// Delete a stream.
1767    DeleteStream,
1768    /// Reconfigure a stream.
1769    ReconfigureStream,
1770    /// Check the tail of a stream.
1771    CheckTail,
1772    /// Append records to a stream.
1773    Append,
1774    /// Read records from a stream.
1775    Read,
1776    /// Trim records on a stream.
1777    Trim,
1778    /// Set the fencing token on a stream.
1779    Fence,
1780}
1781
1782impl From<Operation> for api::access::Operation {
1783    fn from(value: Operation) -> Self {
1784        match value {
1785            Operation::ListBasins => api::access::Operation::ListBasins,
1786            Operation::CreateBasin => api::access::Operation::CreateBasin,
1787            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1788            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1789            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1790            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1791            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1792            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1793            Operation::ListStreams => api::access::Operation::ListStreams,
1794            Operation::CreateStream => api::access::Operation::CreateStream,
1795            Operation::DeleteStream => api::access::Operation::DeleteStream,
1796            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1797            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1798            Operation::CheckTail => api::access::Operation::CheckTail,
1799            Operation::Append => api::access::Operation::Append,
1800            Operation::Read => api::access::Operation::Read,
1801            Operation::Trim => api::access::Operation::Trim,
1802            Operation::Fence => api::access::Operation::Fence,
1803            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1804            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1805            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1806        }
1807    }
1808}
1809
1810impl From<api::access::Operation> for Operation {
1811    fn from(value: api::access::Operation) -> Self {
1812        match value {
1813            api::access::Operation::ListBasins => Operation::ListBasins,
1814            api::access::Operation::CreateBasin => Operation::CreateBasin,
1815            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1816            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1817            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1818            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1819            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1820            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1821            api::access::Operation::ListStreams => Operation::ListStreams,
1822            api::access::Operation::CreateStream => Operation::CreateStream,
1823            api::access::Operation::DeleteStream => Operation::DeleteStream,
1824            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1825            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1826            api::access::Operation::CheckTail => Operation::CheckTail,
1827            api::access::Operation::Append => Operation::Append,
1828            api::access::Operation::Read => Operation::Read,
1829            api::access::Operation::Trim => Operation::Trim,
1830            api::access::Operation::Fence => Operation::Fence,
1831            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1832            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1833            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1834        }
1835    }
1836}
1837
1838#[derive(Debug, Clone)]
1839#[non_exhaustive]
1840/// Scope of an access token.
1841///
1842/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1843/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1844/// final set must not be empty.
1845///
1846/// See [`IssueAccessTokenInput::scope`].
1847pub struct AccessTokenScopeInput {
1848    basins: Option<BasinMatcher>,
1849    streams: Option<StreamMatcher>,
1850    access_tokens: Option<AccessTokenMatcher>,
1851    op_group_perms: Option<OperationGroupPermissions>,
1852    ops: HashSet<Operation>,
1853}
1854
1855impl AccessTokenScopeInput {
1856    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1857    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1858        Self {
1859            basins: None,
1860            streams: None,
1861            access_tokens: None,
1862            op_group_perms: None,
1863            ops: ops.into_iter().collect(),
1864        }
1865    }
1866
1867    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1868    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1869        Self {
1870            basins: None,
1871            streams: None,
1872            access_tokens: None,
1873            op_group_perms: Some(op_group_perms),
1874            ops: HashSet::default(),
1875        }
1876    }
1877
1878    /// Set the permitted operations.
1879    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1880        Self {
1881            ops: ops.into_iter().collect(),
1882            ..self
1883        }
1884    }
1885
1886    /// Set the access permissions at the operation group level.
1887    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1888        Self {
1889            op_group_perms: Some(op_group_perms),
1890            ..self
1891        }
1892    }
1893
1894    /// Set the permitted basins.
1895    ///
1896    /// Defaults to no basins.
1897    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1898        Self {
1899            basins: Some(basins),
1900            ..self
1901        }
1902    }
1903
1904    /// Set the permitted streams.
1905    ///
1906    /// Defaults to no streams.
1907    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1908        Self {
1909            streams: Some(streams),
1910            ..self
1911        }
1912    }
1913
1914    /// Set the permitted access tokens.
1915    ///
1916    /// Defaults to no access tokens.
1917    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1918        Self {
1919            access_tokens: Some(access_tokens),
1920            ..self
1921        }
1922    }
1923}
1924
1925#[derive(Debug, Clone)]
1926#[non_exhaustive]
1927/// Scope of an access token.
1928pub struct AccessTokenScope {
1929    /// Permitted basins.
1930    pub basins: Option<BasinMatcher>,
1931    /// Permitted streams.
1932    pub streams: Option<StreamMatcher>,
1933    /// Permitted access tokens.
1934    pub access_tokens: Option<AccessTokenMatcher>,
1935    /// Permissions at the operation group level.
1936    pub op_group_perms: Option<OperationGroupPermissions>,
1937    /// Permitted operations.
1938    pub ops: HashSet<Operation>,
1939}
1940
1941impl From<api::access::AccessTokenScope> for AccessTokenScope {
1942    fn from(value: api::access::AccessTokenScope) -> Self {
1943        Self {
1944            basins: value.basins.map(|rs| match rs {
1945                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1946                    BasinMatcher::Exact(e)
1947                }
1948                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1949                    BasinMatcher::None
1950                }
1951                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1952            }),
1953            streams: value.streams.map(|rs| match rs {
1954                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1955                    StreamMatcher::Exact(e)
1956                }
1957                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1958                    StreamMatcher::None
1959                }
1960                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1961            }),
1962            access_tokens: value.access_tokens.map(|rs| match rs {
1963                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1964                    AccessTokenMatcher::Exact(e)
1965                }
1966                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1967                    AccessTokenMatcher::None
1968                }
1969                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1970            }),
1971            op_group_perms: value.op_groups.map(Into::into),
1972            ops: value
1973                .ops
1974                .map(|ops| ops.into_iter().map(Into::into).collect())
1975                .unwrap_or_default(),
1976        }
1977    }
1978}
1979
1980impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1981    fn from(value: AccessTokenScopeInput) -> Self {
1982        Self {
1983            basins: value.basins.map(|rs| match rs {
1984                BasinMatcher::None => {
1985                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1986                }
1987                BasinMatcher::Exact(e) => {
1988                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1989                }
1990                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1991            }),
1992            streams: value.streams.map(|rs| match rs {
1993                StreamMatcher::None => {
1994                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1995                }
1996                StreamMatcher::Exact(e) => {
1997                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1998                }
1999                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2000            }),
2001            access_tokens: value.access_tokens.map(|rs| match rs {
2002                AccessTokenMatcher::None => {
2003                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2004                }
2005                AccessTokenMatcher::Exact(e) => {
2006                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2007                }
2008                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2009            }),
2010            op_groups: value.op_group_perms.map(Into::into),
2011            ops: if value.ops.is_empty() {
2012                None
2013            } else {
2014                Some(value.ops.into_iter().map(Into::into).collect())
2015            },
2016        }
2017    }
2018}
2019
2020#[derive(Debug, Clone)]
2021#[non_exhaustive]
2022/// Input for [`issue_access_token`](crate::S2::issue_access_token).
2023pub struct IssueAccessTokenInput {
2024    /// Access token ID.
2025    pub id: AccessTokenId,
2026    /// Expiration time.
2027    ///
2028    /// Defaults to the expiration time of requestor's access token passed via
2029    /// [`S2Config`](S2Config::new).
2030    pub expires_at: Option<S2DateTime>,
2031    /// Whether to automatically prefix stream names during creation and strip the prefix during
2032    /// listing.
2033    ///
2034    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
2035    /// prefix.
2036    ///
2037    /// Defaults to `false`.
2038    pub auto_prefix_streams: bool,
2039    /// Scope of the token.
2040    pub scope: AccessTokenScopeInput,
2041}
2042
2043impl IssueAccessTokenInput {
2044    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
2045    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2046        Self {
2047            id,
2048            expires_at: None,
2049            auto_prefix_streams: false,
2050            scope,
2051        }
2052    }
2053
2054    /// Set the expiration time.
2055    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2056        Self {
2057            expires_at: Some(expires_at),
2058            ..self
2059        }
2060    }
2061
2062    /// Set whether to automatically prefix stream names during creation and strip the prefix during
2063    /// listing.
2064    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2065        Self {
2066            auto_prefix_streams,
2067            ..self
2068        }
2069    }
2070}
2071
2072impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2073    fn from(value: IssueAccessTokenInput) -> Self {
2074        Self {
2075            id: value.id,
2076            expires_at: value.expires_at.map(Into::into),
2077            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2078            scope: value.scope.into(),
2079        }
2080    }
2081}
2082
2083#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2084/// Interval to accumulate over for timeseries metric sets.
2085pub enum TimeseriesInterval {
2086    /// Minute.
2087    Minute,
2088    /// Hour.
2089    Hour,
2090    /// Day.
2091    Day,
2092}
2093
2094impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2095    fn from(value: TimeseriesInterval) -> Self {
2096        match value {
2097            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2098            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2099            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2100        }
2101    }
2102}
2103
2104impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2105    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2106        match value {
2107            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2108            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2109            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2110        }
2111    }
2112}
2113
2114#[derive(Debug, Clone, Copy)]
2115#[non_exhaustive]
2116/// Time range as Unix epoch seconds.
2117pub struct TimeRange {
2118    /// Start timestamp (inclusive).
2119    pub start: u32,
2120    /// End timestamp (exclusive).
2121    pub end: u32,
2122}
2123
2124impl TimeRange {
2125    /// Create a new [`TimeRange`] with the given start and end timestamps.
2126    pub fn new(start: u32, end: u32) -> Self {
2127        Self { start, end }
2128    }
2129}
2130
2131#[derive(Debug, Clone, Copy)]
2132#[non_exhaustive]
2133/// Time range as Unix epoch seconds and accumulation interval.
2134pub struct TimeRangeAndInterval {
2135    /// Start timestamp (inclusive).
2136    pub start: u32,
2137    /// End timestamp (exclusive).
2138    pub end: u32,
2139    /// Interval to accumulate over for timeseries metric sets.
2140    ///
2141    /// Default is dependent on the requested metric set.
2142    pub interval: Option<TimeseriesInterval>,
2143}
2144
2145impl TimeRangeAndInterval {
2146    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2147    pub fn new(start: u32, end: u32) -> Self {
2148        Self {
2149            start,
2150            end,
2151            interval: None,
2152        }
2153    }
2154
2155    /// Set the interval to accumulate over for timeseries metric sets.
2156    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2157        Self {
2158            interval: Some(interval),
2159            ..self
2160        }
2161    }
2162}
2163
2164#[derive(Debug, Clone, Copy)]
2165/// Account metric set to return.
2166pub enum AccountMetricSet {
2167    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2168    /// specified time range.
2169    ActiveBasins(TimeRange),
2170    /// Returns [`AccumulationMetric`]s, one per account operation type.
2171    ///
2172    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2173    /// per interval over the requested time range.
2174    ///
2175    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2176    AccountOps(TimeRangeAndInterval),
2177}
2178
2179#[derive(Debug, Clone)]
2180#[non_exhaustive]
2181/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2182pub struct GetAccountMetricsInput {
2183    /// Metric set to return.
2184    pub set: AccountMetricSet,
2185}
2186
2187impl GetAccountMetricsInput {
2188    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2189    pub fn new(set: AccountMetricSet) -> Self {
2190        Self { set }
2191    }
2192}
2193
2194impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2195    fn from(value: GetAccountMetricsInput) -> Self {
2196        let (set, start, end, interval) = match value.set {
2197            AccountMetricSet::ActiveBasins(args) => (
2198                api::metrics::AccountMetricSet::ActiveBasins,
2199                args.start,
2200                args.end,
2201                None,
2202            ),
2203            AccountMetricSet::AccountOps(args) => (
2204                api::metrics::AccountMetricSet::AccountOps,
2205                args.start,
2206                args.end,
2207                args.interval,
2208            ),
2209        };
2210        Self {
2211            set,
2212            start: Some(start),
2213            end: Some(end),
2214            interval: interval.map(Into::into),
2215        }
2216    }
2217}
2218
2219#[derive(Debug, Clone, Copy)]
2220/// Basin metric set to return.
2221pub enum BasinMetricSet {
2222    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2223    /// in the basin, with one observed value for each hour over the requested time range.
2224    Storage(TimeRange),
2225    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2226    ///
2227    /// Each metric represents a timeseries of the number of append operations across all streams
2228    /// in the basin, with one accumulated value per interval over the requested time range.
2229    ///
2230    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2231    /// [`minute`](TimeseriesInterval::Minute).
2232    AppendOps(TimeRangeAndInterval),
2233    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2234    ///
2235    /// Each metric represents a timeseries of the number of read operations across all streams
2236    /// in the basin, with one accumulated value per interval over the requested time range.
2237    ///
2238    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2239    /// [`minute`](TimeseriesInterval::Minute).
2240    ReadOps(TimeRangeAndInterval),
2241    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2242    /// across all streams in the basin, with one accumulated value per interval
2243    /// over the requested time range.
2244    ///
2245    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2246    /// [`minute`](TimeseriesInterval::Minute).
2247    ReadThroughput(TimeRangeAndInterval),
2248    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2249    /// across all streams in the basin, with one accumulated value per interval
2250    /// over the requested time range.
2251    ///
2252    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2253    /// [`minute`](TimeseriesInterval::Minute).
2254    AppendThroughput(TimeRangeAndInterval),
2255    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2256    ///
2257    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2258    /// per interval over the requested time range.
2259    ///
2260    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2261    BasinOps(TimeRangeAndInterval),
2262}
2263
2264#[derive(Debug, Clone)]
2265#[non_exhaustive]
2266/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2267pub struct GetBasinMetricsInput {
2268    /// Basin name.
2269    pub name: BasinName,
2270    /// Metric set to return.
2271    pub set: BasinMetricSet,
2272}
2273
2274impl GetBasinMetricsInput {
2275    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2276    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2277        Self { name, set }
2278    }
2279}
2280
2281impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2282    fn from(value: GetBasinMetricsInput) -> Self {
2283        let (set, start, end, interval) = match value.set {
2284            BasinMetricSet::Storage(args) => (
2285                api::metrics::BasinMetricSet::Storage,
2286                args.start,
2287                args.end,
2288                None,
2289            ),
2290            BasinMetricSet::AppendOps(args) => (
2291                api::metrics::BasinMetricSet::AppendOps,
2292                args.start,
2293                args.end,
2294                args.interval,
2295            ),
2296            BasinMetricSet::ReadOps(args) => (
2297                api::metrics::BasinMetricSet::ReadOps,
2298                args.start,
2299                args.end,
2300                args.interval,
2301            ),
2302            BasinMetricSet::ReadThroughput(args) => (
2303                api::metrics::BasinMetricSet::ReadThroughput,
2304                args.start,
2305                args.end,
2306                args.interval,
2307            ),
2308            BasinMetricSet::AppendThroughput(args) => (
2309                api::metrics::BasinMetricSet::AppendThroughput,
2310                args.start,
2311                args.end,
2312                args.interval,
2313            ),
2314            BasinMetricSet::BasinOps(args) => (
2315                api::metrics::BasinMetricSet::BasinOps,
2316                args.start,
2317                args.end,
2318                args.interval,
2319            ),
2320        };
2321        (
2322            value.name,
2323            api::metrics::BasinMetricSetRequest {
2324                set,
2325                start: Some(start),
2326                end: Some(end),
2327                interval: interval.map(Into::into),
2328            },
2329        )
2330    }
2331}
2332
2333#[derive(Debug, Clone, Copy)]
2334/// Stream metric set to return.
2335pub enum StreamMetricSet {
2336    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2337    /// with one observed value for each minute over the requested time range.
2338    Storage(TimeRange),
2339}
2340
2341#[derive(Debug, Clone)]
2342#[non_exhaustive]
2343/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2344pub struct GetStreamMetricsInput {
2345    /// Basin name.
2346    pub basin_name: BasinName,
2347    /// Stream name.
2348    pub stream_name: StreamName,
2349    /// Metric set to return.
2350    pub set: StreamMetricSet,
2351}
2352
2353impl GetStreamMetricsInput {
2354    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2355    /// set.
2356    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2357        Self {
2358            basin_name,
2359            stream_name,
2360            set,
2361        }
2362    }
2363}
2364
2365impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2366    fn from(value: GetStreamMetricsInput) -> Self {
2367        let (set, start, end, interval) = match value.set {
2368            StreamMetricSet::Storage(args) => (
2369                api::metrics::StreamMetricSet::Storage,
2370                args.start,
2371                args.end,
2372                None,
2373            ),
2374        };
2375        (
2376            value.basin_name,
2377            value.stream_name,
2378            api::metrics::StreamMetricSetRequest {
2379                set,
2380                start: Some(start),
2381                end: Some(end),
2382                interval,
2383            },
2384        )
2385    }
2386}
2387
2388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2389/// Unit in which metric values are measured.
2390pub enum MetricUnit {
2391    /// Size in bytes.
2392    Bytes,
2393    /// Number of operations.
2394    Operations,
2395}
2396
2397impl From<api::metrics::MetricUnit> for MetricUnit {
2398    fn from(value: api::metrics::MetricUnit) -> Self {
2399        match value {
2400            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2401            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2402        }
2403    }
2404}
2405
2406#[derive(Debug, Clone)]
2407#[non_exhaustive]
2408/// Single named value.
2409pub struct ScalarMetric {
2410    /// Metric name.
2411    pub name: String,
2412    /// Unit for the metric value.
2413    pub unit: MetricUnit,
2414    /// Metric value.
2415    pub value: f64,
2416}
2417
2418#[derive(Debug, Clone)]
2419#[non_exhaustive]
2420/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2421/// specified interval.
2422pub struct AccumulationMetric {
2423    /// Timeseries name.
2424    pub name: String,
2425    /// Unit for the accumulated values.
2426    pub unit: MetricUnit,
2427    /// The interval at which datapoints are accumulated.
2428    pub interval: TimeseriesInterval,
2429    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2430    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2431    /// one `interval`.
2432    pub values: Vec<(u32, f64)>,
2433}
2434
2435#[derive(Debug, Clone)]
2436#[non_exhaustive]
2437/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2438pub struct GaugeMetric {
2439    /// Timeseries name.
2440    pub name: String,
2441    /// Unit for the instantaneous values.
2442    pub unit: MetricUnit,
2443    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2444    /// instant of the `timestamp` (in Unix epoch seconds).
2445    pub values: Vec<(u32, f64)>,
2446}
2447
2448#[derive(Debug, Clone)]
2449#[non_exhaustive]
2450/// Set of string labels.
2451pub struct LabelMetric {
2452    /// Label name.
2453    pub name: String,
2454    /// Label values.
2455    pub values: Vec<String>,
2456}
2457
2458#[derive(Debug, Clone)]
2459/// Individual metric in a returned metric set.
2460pub enum Metric {
2461    /// Single named value.
2462    Scalar(ScalarMetric),
2463    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2464    /// specified interval.
2465    Accumulation(AccumulationMetric),
2466    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2467    Gauge(GaugeMetric),
2468    /// Set of string labels.
2469    Label(LabelMetric),
2470}
2471
2472impl From<api::metrics::Metric> for Metric {
2473    fn from(value: api::metrics::Metric) -> Self {
2474        match value {
2475            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2476                name: sm.name.into(),
2477                unit: sm.unit.into(),
2478                value: sm.value,
2479            }),
2480            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2481                name: am.name.into(),
2482                unit: am.unit.into(),
2483                interval: am.interval.into(),
2484                values: am.values,
2485            }),
2486            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2487                name: gm.name.into(),
2488                unit: gm.unit.into(),
2489                values: gm.values,
2490            }),
2491            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2492                name: lm.name.into(),
2493                values: lm.values,
2494            }),
2495        }
2496    }
2497}
2498
2499#[derive(Debug, Clone, Default)]
2500#[non_exhaustive]
2501/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2502pub struct ListStreamsInput {
2503    /// Filter streams whose names begin with this value.
2504    ///
2505    /// Defaults to `""`.
2506    pub prefix: StreamNamePrefix,
2507    /// Filter streams whose names are lexicographically greater than this value.
2508    ///
2509    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2510    ///
2511    /// Defaults to `""`.
2512    pub start_after: StreamNameStartAfter,
2513    /// Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2514    ///
2515    /// Defaults to `1000`.
2516    pub limit: Option<usize>,
2517}
2518
2519impl ListStreamsInput {
2520    /// Create a new [`ListStreamsInput`] with default values.
2521    pub fn new() -> Self {
2522        Self::default()
2523    }
2524
2525    /// Set the prefix used to filter streams whose names begin with this value.
2526    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2527        Self { prefix, ..self }
2528    }
2529
2530    /// Set the value used to filter streams whose names are lexicographically greater than this
2531    /// value.
2532    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2533        Self {
2534            start_after,
2535            ..self
2536        }
2537    }
2538
2539    /// Set the limit on number of streams to return in a page.
2540    pub fn with_limit(self, limit: usize) -> Self {
2541        Self {
2542            limit: Some(limit),
2543            ..self
2544        }
2545    }
2546}
2547
2548impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2549    fn from(value: ListStreamsInput) -> Self {
2550        Self {
2551            prefix: Some(value.prefix),
2552            start_after: Some(value.start_after),
2553            limit: value.limit,
2554        }
2555    }
2556}
2557
2558#[derive(Debug, Clone, Default)]
2559/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2560pub struct ListAllStreamsInput {
2561    /// Filter streams whose names begin with this value.
2562    ///
2563    /// Defaults to `""`.
2564    pub prefix: StreamNamePrefix,
2565    /// Filter streams whose names are lexicographically greater than this value.
2566    ///
2567    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2568    ///
2569    /// Defaults to `""`.
2570    pub start_after: StreamNameStartAfter,
2571    /// Whether to include streams that are being deleted.
2572    ///
2573    /// Defaults to `false`.
2574    pub include_deleted: bool,
2575}
2576
2577impl ListAllStreamsInput {
2578    /// Create a new [`ListAllStreamsInput`] with default values.
2579    pub fn new() -> Self {
2580        Self::default()
2581    }
2582
2583    /// Set the prefix used to filter streams whose names begin with this value.
2584    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2585        Self { prefix, ..self }
2586    }
2587
2588    /// Set the value used to filter streams whose names are lexicographically greater than this
2589    /// value.
2590    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2591        Self {
2592            start_after,
2593            ..self
2594        }
2595    }
2596
2597    /// Set whether to include streams that are being deleted.
2598    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2599        Self {
2600            include_deleted,
2601            ..self
2602        }
2603    }
2604}
2605
2606#[derive(Debug, Clone, PartialEq)]
2607#[non_exhaustive]
2608/// Stream information.
2609pub struct StreamInfo {
2610    /// Stream name.
2611    pub name: StreamName,
2612    /// Creation time.
2613    pub created_at: S2DateTime,
2614    /// Deletion time if the stream is being deleted.
2615    pub deleted_at: Option<S2DateTime>,
2616}
2617
2618impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2619    type Error = ValidationError;
2620
2621    fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2622        Ok(Self {
2623            name: value.name,
2624            created_at: value.created_at.try_into()?,
2625            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2626        })
2627    }
2628}
2629
2630#[derive(Debug, Clone)]
2631#[non_exhaustive]
2632/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2633pub struct CreateStreamInput {
2634    /// Stream name.
2635    pub name: StreamName,
2636    /// Configuration for the stream.
2637    ///
2638    /// See [`StreamConfig`] for defaults.
2639    pub config: Option<StreamConfig>,
2640    idempotency_token: String,
2641}
2642
2643impl CreateStreamInput {
2644    /// Create a new [`CreateStreamInput`] with the given stream name.
2645    pub fn new(name: StreamName) -> Self {
2646        Self {
2647            name,
2648            config: None,
2649            idempotency_token: idempotency_token(),
2650        }
2651    }
2652
2653    /// Set the configuration for the stream.
2654    pub fn with_config(self, config: StreamConfig) -> Self {
2655        Self {
2656            config: Some(config),
2657            ..self
2658        }
2659    }
2660}
2661
2662impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2663    fn from(value: CreateStreamInput) -> Self {
2664        (
2665            api::stream::CreateStreamRequest {
2666                stream: value.name,
2667                config: value.config.map(Into::into),
2668            },
2669            value.idempotency_token,
2670        )
2671    }
2672}
2673
2674#[derive(Debug, Clone)]
2675#[non_exhaustive]
2676/// Input for [`create_or_reconfigure_stream`](crate::S2Basin::create_or_reconfigure_stream)
2677/// operation.
2678#[doc(hidden)]
2679#[cfg(feature = "_hidden")]
2680pub struct CreateOrReconfigureStreamInput {
2681    /// Stream name.
2682    pub name: StreamName,
2683    /// Reconfiguration for the stream.
2684    ///
2685    /// If `None`, the stream is created with default configuration or left unchanged if it exists.
2686    pub config: Option<StreamReconfiguration>,
2687}
2688
2689#[cfg(feature = "_hidden")]
2690impl CreateOrReconfigureStreamInput {
2691    /// Create a new [`CreateOrReconfigureStreamInput`] with the given stream name.
2692    pub fn new(name: StreamName) -> Self {
2693        Self { name, config: None }
2694    }
2695
2696    /// Set the reconfiguration for the stream.
2697    pub fn with_config(self, config: StreamReconfiguration) -> Self {
2698        Self {
2699            config: Some(config),
2700            ..self
2701        }
2702    }
2703}
2704
2705#[cfg(feature = "_hidden")]
2706impl From<CreateOrReconfigureStreamInput>
2707    for (StreamName, Option<api::config::StreamReconfiguration>)
2708{
2709    fn from(value: CreateOrReconfigureStreamInput) -> Self {
2710        (value.name, value.config.map(Into::into))
2711    }
2712}
2713
2714#[derive(Debug, Clone)]
2715#[non_exhaustive]
2716/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2717pub struct DeleteStreamInput {
2718    /// Stream name.
2719    pub name: StreamName,
2720    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2721    pub ignore_not_found: bool,
2722}
2723
2724impl DeleteStreamInput {
2725    /// Create a new [`DeleteStreamInput`] with the given stream name.
2726    pub fn new(name: StreamName) -> Self {
2727        Self {
2728            name,
2729            ignore_not_found: false,
2730        }
2731    }
2732
2733    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2734    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2735        Self {
2736            ignore_not_found,
2737            ..self
2738        }
2739    }
2740}
2741
2742#[derive(Debug, Clone)]
2743#[non_exhaustive]
2744/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2745pub struct ReconfigureStreamInput {
2746    /// Stream name.
2747    pub name: StreamName,
2748    /// Reconfiguration for [`StreamConfig`].
2749    pub config: StreamReconfiguration,
2750}
2751
2752impl ReconfigureStreamInput {
2753    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2754    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2755        Self { name, config }
2756    }
2757}
2758
2759#[derive(Debug, Clone, PartialEq, Eq)]
2760/// Token for fencing appends to a stream.
2761///
2762/// **Note:** It must not exceed 36 bytes in length.
2763///
2764/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2765pub struct FencingToken(String);
2766
2767impl FencingToken {
2768    /// Generate a random alphanumeric fencing token of `n` bytes.
2769    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2770        rand::rng()
2771            .sample_iter(&rand::distr::Alphanumeric)
2772            .take(n)
2773            .map(char::from)
2774            .collect::<String>()
2775            .parse()
2776    }
2777}
2778
2779impl FromStr for FencingToken {
2780    type Err = ValidationError;
2781
2782    fn from_str(s: &str) -> Result<Self, Self::Err> {
2783        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2784            return Err(ValidationError(format!(
2785                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2786            )));
2787        }
2788        Ok(FencingToken(s.to_string()))
2789    }
2790}
2791
2792impl std::fmt::Display for FencingToken {
2793    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2794        write!(f, "{}", self.0)
2795    }
2796}
2797
2798impl Deref for FencingToken {
2799    type Target = str;
2800
2801    fn deref(&self) -> &Self::Target {
2802        &self.0
2803    }
2804}
2805
2806#[derive(Debug, Clone, Copy, PartialEq)]
2807#[non_exhaustive]
2808/// A position in a stream.
2809pub struct StreamPosition {
2810    /// Sequence number assigned by the service.
2811    pub seq_num: u64,
2812    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2813    /// User-specified timestamps are passed through as-is.
2814    pub timestamp: u64,
2815}
2816
2817impl std::fmt::Display for StreamPosition {
2818    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2819        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2820    }
2821}
2822
2823impl From<api::stream::proto::StreamPosition> for StreamPosition {
2824    fn from(value: api::stream::proto::StreamPosition) -> Self {
2825        Self {
2826            seq_num: value.seq_num,
2827            timestamp: value.timestamp,
2828        }
2829    }
2830}
2831
2832impl From<api::stream::StreamPosition> for StreamPosition {
2833    fn from(value: api::stream::StreamPosition) -> Self {
2834        Self {
2835            seq_num: value.seq_num,
2836            timestamp: value.timestamp,
2837        }
2838    }
2839}
2840
2841#[derive(Debug, Clone, PartialEq)]
2842#[non_exhaustive]
2843/// A name-value pair.
2844pub struct Header {
2845    /// Name.
2846    pub name: Bytes,
2847    /// Value.
2848    pub value: Bytes,
2849}
2850
2851impl Header {
2852    /// Create a new [`Header`] with the given name and value.
2853    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2854        Self {
2855            name: name.into(),
2856            value: value.into(),
2857        }
2858    }
2859}
2860
2861impl From<Header> for api::stream::proto::Header {
2862    fn from(value: Header) -> Self {
2863        Self {
2864            name: value.name,
2865            value: value.value,
2866        }
2867    }
2868}
2869
2870impl From<api::stream::proto::Header> for Header {
2871    fn from(value: api::stream::proto::Header) -> Self {
2872        Self {
2873            name: value.name,
2874            value: value.value,
2875        }
2876    }
2877}
2878
2879#[derive(Debug, Clone, PartialEq)]
2880/// A record to append.
2881pub struct AppendRecord {
2882    body: Bytes,
2883    headers: Vec<Header>,
2884    timestamp: Option<u64>,
2885}
2886
2887impl AppendRecord {
2888    fn validate(self) -> Result<Self, ValidationError> {
2889        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2890            Err(ValidationError(format!(
2891                "metered_bytes: {} exceeds {}",
2892                self.metered_bytes(),
2893                RECORD_BATCH_MAX.bytes
2894            )))
2895        } else {
2896            Ok(self)
2897        }
2898    }
2899
2900    /// Create a new [`AppendRecord`] with the given record body.
2901    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2902        let record = Self {
2903            body: body.into(),
2904            headers: Vec::default(),
2905            timestamp: None,
2906        };
2907        record.validate()
2908    }
2909
2910    /// Set the headers for this record.
2911    pub fn with_headers(
2912        self,
2913        headers: impl IntoIterator<Item = Header>,
2914    ) -> Result<Self, ValidationError> {
2915        let record = Self {
2916            headers: headers.into_iter().collect(),
2917            ..self
2918        };
2919        record.validate()
2920    }
2921
2922    /// Set the timestamp for this record.
2923    ///
2924    /// Precise semantics depend on [`StreamConfig::timestamping`].
2925    pub fn with_timestamp(self, timestamp: u64) -> Self {
2926        Self {
2927            timestamp: Some(timestamp),
2928            ..self
2929        }
2930    }
2931
2932    /// Get the body of this record.
2933    pub fn body(&self) -> &[u8] {
2934        &self.body
2935    }
2936
2937    /// Get the headers of this record.
2938    pub fn headers(&self) -> &[Header] {
2939        &self.headers
2940    }
2941
2942    /// Get the timestamp of this record.
2943    pub fn timestamp(&self) -> Option<u64> {
2944        self.timestamp
2945    }
2946}
2947
2948impl From<AppendRecord> for api::stream::proto::AppendRecord {
2949    fn from(value: AppendRecord) -> Self {
2950        Self {
2951            timestamp: value.timestamp,
2952            headers: value.headers.into_iter().map(Into::into).collect(),
2953            body: value.body,
2954        }
2955    }
2956}
2957
2958/// Metered byte size calculation.
2959///
2960/// Formula for a record:
2961/// ```text
2962/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2963/// ```
2964pub trait MeteredBytes {
2965    /// Returns the metered byte size.
2966    fn metered_bytes(&self) -> usize;
2967}
2968
2969macro_rules! metered_bytes_impl {
2970    ($ty:ty) => {
2971        impl MeteredBytes for $ty {
2972            fn metered_bytes(&self) -> usize {
2973                8 + (2 * self.headers.len())
2974                    + self
2975                        .headers
2976                        .iter()
2977                        .map(|h| h.name.len() + h.value.len())
2978                        .sum::<usize>()
2979                    + self.body.len()
2980            }
2981        }
2982    };
2983}
2984
2985metered_bytes_impl!(AppendRecord);
2986
2987#[derive(Debug, Clone)]
2988/// A batch of records to append atomically.
2989///
2990/// **Note:** It must contain at least `1` record and no more than `1000`.
2991/// The total size of the batch must not exceed `1MiB` in metered bytes.
2992///
2993/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
2994/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
2995/// that takes care of the abovementioned constraints.
2996pub struct AppendRecordBatch {
2997    records: Vec<AppendRecord>,
2998    metered_bytes: usize,
2999}
3000
3001impl AppendRecordBatch {
3002    pub(crate) fn with_capacity(capacity: usize) -> Self {
3003        Self {
3004            records: Vec::with_capacity(capacity),
3005            metered_bytes: 0,
3006        }
3007    }
3008
3009    pub(crate) fn push(&mut self, record: AppendRecord) {
3010        self.metered_bytes += record.metered_bytes();
3011        self.records.push(record);
3012    }
3013
3014    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
3015    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3016    where
3017        I: IntoIterator<Item = AppendRecord>,
3018    {
3019        let mut records = Vec::new();
3020        let mut metered_bytes = 0;
3021
3022        for record in iter {
3023            metered_bytes += record.metered_bytes();
3024            records.push(record);
3025
3026            if metered_bytes > RECORD_BATCH_MAX.bytes {
3027                return Err(ValidationError(format!(
3028                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
3029                    RECORD_BATCH_MAX.bytes
3030                )));
3031            }
3032
3033            if records.len() > RECORD_BATCH_MAX.count {
3034                return Err(ValidationError(format!(
3035                    "number of records in the batch exceeds {}",
3036                    RECORD_BATCH_MAX.count
3037                )));
3038            }
3039        }
3040
3041        if records.is_empty() {
3042            return Err(ValidationError("batch is empty".into()));
3043        }
3044
3045        Ok(Self {
3046            records,
3047            metered_bytes,
3048        })
3049    }
3050}
3051
3052impl Deref for AppendRecordBatch {
3053    type Target = [AppendRecord];
3054
3055    fn deref(&self) -> &Self::Target {
3056        &self.records
3057    }
3058}
3059
3060impl MeteredBytes for AppendRecordBatch {
3061    fn metered_bytes(&self) -> usize {
3062        self.metered_bytes
3063    }
3064}
3065
3066#[derive(Debug, Clone)]
3067/// Command to signal an operation.
3068pub enum Command {
3069    /// Fence operation.
3070    Fence {
3071        /// Fencing token.
3072        fencing_token: FencingToken,
3073    },
3074    /// Trim operation.
3075    Trim {
3076        /// Trim point.
3077        trim_point: u64,
3078    },
3079}
3080
3081#[derive(Debug, Clone)]
3082#[non_exhaustive]
3083/// Command record for signaling operations to the service.
3084///
3085/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
3086pub struct CommandRecord {
3087    /// Command to signal an operation.
3088    pub command: Command,
3089    /// Timestamp for this record.
3090    pub timestamp: Option<u64>,
3091}
3092
3093impl CommandRecord {
3094    const FENCE: &[u8] = b"fence";
3095    const TRIM: &[u8] = b"trim";
3096
3097    /// Create a fence command record with the given fencing token.
3098    ///
3099    /// Fencing is strongly consistent, and subsequent appends that specify a
3100    /// fencing token will fail if it does not match.
3101    pub fn fence(fencing_token: FencingToken) -> Self {
3102        Self {
3103            command: Command::Fence { fencing_token },
3104            timestamp: None,
3105        }
3106    }
3107
3108    /// Create a trim command record with the given trim point.
3109    ///
3110    /// Trim point is the desired earliest sequence number for the stream.
3111    ///
3112    /// Trimming is eventually consistent, and trimmed records may be visible
3113    /// for a brief period.
3114    pub fn trim(trim_point: u64) -> Self {
3115        Self {
3116            command: Command::Trim { trim_point },
3117            timestamp: None,
3118        }
3119    }
3120
3121    /// Set the timestamp for this record.
3122    pub fn with_timestamp(self, timestamp: u64) -> Self {
3123        Self {
3124            timestamp: Some(timestamp),
3125            ..self
3126        }
3127    }
3128}
3129
3130impl From<CommandRecord> for AppendRecord {
3131    fn from(value: CommandRecord) -> Self {
3132        let (header_value, body) = match value.command {
3133            Command::Fence { fencing_token } => (
3134                CommandRecord::FENCE,
3135                Bytes::copy_from_slice(fencing_token.as_bytes()),
3136            ),
3137            Command::Trim { trim_point } => (
3138                CommandRecord::TRIM,
3139                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3140            ),
3141        };
3142        Self {
3143            body,
3144            headers: vec![Header::new("", header_value)],
3145            timestamp: value.timestamp,
3146        }
3147    }
3148}
3149
3150#[derive(Debug, Clone)]
3151#[non_exhaustive]
3152/// Input for [`append`](crate::S2Stream::append) operation and
3153/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3154pub struct AppendInput {
3155    /// Batch of records to append atomically.
3156    pub records: AppendRecordBatch,
3157    /// Expected sequence number for the first record in the batch.
3158    ///
3159    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3160    pub match_seq_num: Option<u64>,
3161    /// Fencing token to match against the stream's current fencing token.
3162    ///
3163    /// If unspecified, no matching is performed. If specified and mismatched,
3164    /// the append fails. A stream defaults to `""` as its fencing token.
3165    pub fencing_token: Option<FencingToken>,
3166}
3167
3168impl AppendInput {
3169    /// Create a new [`AppendInput`] with the given batch of records.
3170    pub fn new(records: AppendRecordBatch) -> Self {
3171        Self {
3172            records,
3173            match_seq_num: None,
3174            fencing_token: None,
3175        }
3176    }
3177
3178    /// Set the expected sequence number for the first record in the batch.
3179    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3180        Self {
3181            match_seq_num: Some(match_seq_num),
3182            ..self
3183        }
3184    }
3185
3186    /// Set the fencing token to match against the stream's current fencing token.
3187    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3188        Self {
3189            fencing_token: Some(fencing_token),
3190            ..self
3191        }
3192    }
3193}
3194
3195impl From<AppendInput> for api::stream::proto::AppendInput {
3196    fn from(value: AppendInput) -> Self {
3197        Self {
3198            records: value.records.iter().cloned().map(Into::into).collect(),
3199            match_seq_num: value.match_seq_num,
3200            fencing_token: value.fencing_token.map(|t| t.to_string()),
3201        }
3202    }
3203}
3204
3205#[derive(Debug, Clone, PartialEq)]
3206#[non_exhaustive]
3207/// Acknowledgement for an [`AppendInput`].
3208pub struct AppendAck {
3209    /// Sequence number and timestamp of the first record that was appended.
3210    pub start: StreamPosition,
3211    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3212    /// that was appended.
3213    ///
3214    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3215    /// appended.
3216    pub end: StreamPosition,
3217    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3218    /// the last record on the stream.
3219    ///
3220    /// This can be greater than the `end` position in case of concurrent appends.
3221    pub tail: StreamPosition,
3222}
3223
3224impl From<api::stream::proto::AppendAck> for AppendAck {
3225    fn from(value: api::stream::proto::AppendAck) -> Self {
3226        Self {
3227            start: value.start.unwrap_or_default().into(),
3228            end: value.end.unwrap_or_default().into(),
3229            tail: value.tail.unwrap_or_default().into(),
3230        }
3231    }
3232}
3233
3234#[derive(Debug, Clone, Copy)]
3235/// Starting position for reading from a stream.
3236pub enum ReadFrom {
3237    /// Read from this sequence number.
3238    SeqNum(u64),
3239    /// Read from this timestamp.
3240    Timestamp(u64),
3241    /// Read from N records before the tail.
3242    TailOffset(u64),
3243}
3244
3245impl Default for ReadFrom {
3246    fn default() -> Self {
3247        Self::SeqNum(0)
3248    }
3249}
3250
3251#[derive(Debug, Default, Clone)]
3252#[non_exhaustive]
3253/// Where to start reading.
3254pub struct ReadStart {
3255    /// Starting position.
3256    ///
3257    /// Defaults to reading from sequence number `0`.
3258    pub from: ReadFrom,
3259    /// Whether to start from tail if the requested starting position is beyond it.
3260    ///
3261    /// Defaults to `false` (errors if position is beyond tail).
3262    pub clamp_to_tail: bool,
3263}
3264
3265impl ReadStart {
3266    /// Create a new [`ReadStart`] with default values.
3267    pub fn new() -> Self {
3268        Self::default()
3269    }
3270
3271    /// Set the starting position.
3272    pub fn with_from(self, from: ReadFrom) -> Self {
3273        Self { from, ..self }
3274    }
3275
3276    /// Set whether to start from tail if the requested starting position is beyond it.
3277    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3278        Self {
3279            clamp_to_tail,
3280            ..self
3281        }
3282    }
3283}
3284
3285impl From<ReadStart> for api::stream::ReadStart {
3286    fn from(value: ReadStart) -> Self {
3287        let (seq_num, timestamp, tail_offset) = match value.from {
3288            ReadFrom::SeqNum(n) => (Some(n), None, None),
3289            ReadFrom::Timestamp(t) => (None, Some(t), None),
3290            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3291        };
3292        Self {
3293            seq_num,
3294            timestamp,
3295            tail_offset,
3296            clamp: if value.clamp_to_tail {
3297                Some(true)
3298            } else {
3299                None
3300            },
3301        }
3302    }
3303}
3304
3305#[derive(Debug, Clone, Default)]
3306#[non_exhaustive]
3307/// Limits on how much to read.
3308pub struct ReadLimits {
3309    /// Limit on number of records.
3310    ///
3311    /// Defaults to `1000` for non-streaming read.
3312    pub count: Option<usize>,
3313    /// Limit on total metered bytes of records.
3314    ///
3315    /// Defaults to `1MiB` for non-streaming read.
3316    pub bytes: Option<usize>,
3317}
3318
3319impl ReadLimits {
3320    /// Create a new [`ReadLimits`] with default values.
3321    pub fn new() -> Self {
3322        Self::default()
3323    }
3324
3325    /// Set the limit on number of records.
3326    pub fn with_count(self, count: usize) -> Self {
3327        Self {
3328            count: Some(count),
3329            ..self
3330        }
3331    }
3332
3333    /// Set the limit on total metered bytes of records.
3334    pub fn with_bytes(self, bytes: usize) -> Self {
3335        Self {
3336            bytes: Some(bytes),
3337            ..self
3338        }
3339    }
3340}
3341
3342#[derive(Debug, Clone, Default)]
3343#[non_exhaustive]
3344/// When to stop reading.
3345pub struct ReadStop {
3346    /// Limits on how much to read.
3347    ///
3348    /// See [`ReadLimits`] for defaults.
3349    pub limits: ReadLimits,
3350    /// Timestamp at which to stop (exclusive).
3351    ///
3352    /// Defaults to `None`.
3353    pub until: Option<RangeTo<u64>>,
3354    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3355    /// seconds for [`read`](crate::S2Stream::read).
3356    ///
3357    /// Defaults to:
3358    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3359    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3360    ///   is specified.
3361    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3362    ///   `until` is specified.
3363    pub wait: Option<u32>,
3364}
3365
3366impl ReadStop {
3367    /// Create a new [`ReadStop`] with default values.
3368    pub fn new() -> Self {
3369        Self::default()
3370    }
3371
3372    /// Set the limits on how much to read.
3373    pub fn with_limits(self, limits: ReadLimits) -> Self {
3374        Self { limits, ..self }
3375    }
3376
3377    /// Set the timestamp at which to stop (exclusive).
3378    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3379        Self {
3380            until: Some(until),
3381            ..self
3382        }
3383    }
3384
3385    /// Set the duration in seconds to wait for new records before stopping.
3386    pub fn with_wait(self, wait: u32) -> Self {
3387        Self {
3388            wait: Some(wait),
3389            ..self
3390        }
3391    }
3392}
3393
3394impl From<ReadStop> for api::stream::ReadEnd {
3395    fn from(value: ReadStop) -> Self {
3396        Self {
3397            count: value.limits.count,
3398            bytes: value.limits.bytes,
3399            until: value.until.map(|r| r.end),
3400            wait: value.wait,
3401        }
3402    }
3403}
3404
3405#[derive(Debug, Clone, Default)]
3406#[non_exhaustive]
3407/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3408/// operations.
3409pub struct ReadInput {
3410    /// Where to start reading.
3411    ///
3412    /// See [`ReadStart`] for defaults.
3413    pub start: ReadStart,
3414    /// When to stop reading.
3415    ///
3416    /// See [`ReadStop`] for defaults.
3417    pub stop: ReadStop,
3418    /// Whether to filter out command records from the stream when reading.
3419    ///
3420    /// Defaults to `false`.
3421    pub ignore_command_records: bool,
3422}
3423
3424impl ReadInput {
3425    /// Create a new [`ReadInput`] with default values.
3426    pub fn new() -> Self {
3427        Self::default()
3428    }
3429
3430    /// Set where to start reading.
3431    pub fn with_start(self, start: ReadStart) -> Self {
3432        Self { start, ..self }
3433    }
3434
3435    /// Set when to stop reading.
3436    pub fn with_stop(self, stop: ReadStop) -> Self {
3437        Self { stop, ..self }
3438    }
3439
3440    /// Set whether to filter out command records from the stream when reading.
3441    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3442        Self {
3443            ignore_command_records,
3444            ..self
3445        }
3446    }
3447}
3448
3449#[derive(Debug, Clone)]
3450#[non_exhaustive]
3451/// Record that is durably sequenced on a stream.
3452pub struct SequencedRecord {
3453    /// Sequence number assigned to this record.
3454    pub seq_num: u64,
3455    /// Body of this record.
3456    pub body: Bytes,
3457    /// Headers for this record.
3458    pub headers: Vec<Header>,
3459    /// Timestamp for this record.
3460    pub timestamp: u64,
3461}
3462
3463impl SequencedRecord {
3464    /// Whether this is a command record.
3465    pub fn is_command_record(&self) -> bool {
3466        self.headers.len() == 1 && *self.headers[0].name == *b""
3467    }
3468}
3469
3470impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3471    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3472        Self {
3473            seq_num: value.seq_num,
3474            body: value.body,
3475            headers: value.headers.into_iter().map(Into::into).collect(),
3476            timestamp: value.timestamp,
3477        }
3478    }
3479}
3480
3481metered_bytes_impl!(SequencedRecord);
3482
3483#[derive(Debug, Clone)]
3484#[non_exhaustive]
3485/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3486/// [`read_session`](crate::S2Stream::read_session).
3487pub struct ReadBatch {
3488    /// Records that are durably sequenced on the stream.
3489    ///
3490    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3491    /// - the [`stop condition`](ReadInput::stop) was already met, or
3492    /// - all records in the batch were command records and
3493    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3494    pub records: Vec<SequencedRecord>,
3495    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3496    /// the last record.
3497    ///
3498    /// It will only be present when reading recent records.
3499    pub tail: Option<StreamPosition>,
3500}
3501
3502impl ReadBatch {
3503    pub(crate) fn from_api(
3504        batch: api::stream::proto::ReadBatch,
3505        ignore_command_records: bool,
3506    ) -> Self {
3507        Self {
3508            records: batch
3509                .records
3510                .into_iter()
3511                .map(Into::into)
3512                .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3513                .collect(),
3514            tail: batch.tail.map(Into::into),
3515        }
3516    }
3517}
3518
3519/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3520pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3521
3522#[derive(Debug, Clone, thiserror::Error)]
3523/// Why an append condition check failed.
3524pub enum AppendConditionFailed {
3525    #[error("fencing token mismatch, expected: {0}")]
3526    /// Fencing token did not match. Contains the expected fencing token.
3527    FencingTokenMismatch(FencingToken),
3528    #[error("sequence number mismatch, expected: {0}")]
3529    /// Sequence number did not match. Contains the expected sequence number.
3530    SeqNumMismatch(u64),
3531}
3532
3533impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3534    fn from(value: api::stream::AppendConditionFailed) -> Self {
3535        match value {
3536            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3537                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3538            }
3539            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3540                AppendConditionFailed::SeqNumMismatch(seq)
3541            }
3542        }
3543    }
3544}
3545
3546#[derive(Debug, Clone, thiserror::Error)]
3547/// Errors from S2 operations.
3548pub enum S2Error {
3549    #[error("{0}")]
3550    /// Client-side error.
3551    Client(String),
3552    #[error(transparent)]
3553    /// Validation error.
3554    Validation(#[from] ValidationError),
3555    #[error("{0}")]
3556    /// Append condition check failed. Contains the failure reason.
3557    AppendConditionFailed(AppendConditionFailed),
3558    #[error("read from an unwritten position. current tail: {0}")]
3559    /// Read from an unwritten position. Contains the current tail.
3560    ReadUnwritten(StreamPosition),
3561    #[error("{0}")]
3562    /// Other server-side error.
3563    Server(ErrorResponse),
3564}
3565
3566impl From<ApiError> for S2Error {
3567    fn from(err: ApiError) -> Self {
3568        match err {
3569            ApiError::ReadUnwritten(tail_response) => {
3570                Self::ReadUnwritten(tail_response.tail.into())
3571            }
3572            ApiError::AppendConditionFailed(condition_failed) => {
3573                Self::AppendConditionFailed(condition_failed.into())
3574            }
3575            ApiError::Server(_, response) => Self::Server(response.into()),
3576            other => Self::Client(other.to_string()),
3577        }
3578    }
3579}
3580
3581#[derive(Debug, Clone, thiserror::Error)]
3582#[error("{code}: {message}")]
3583#[non_exhaustive]
3584/// Error response from S2 server.
3585pub struct ErrorResponse {
3586    /// Error code.
3587    pub code: String,
3588    /// Error message.
3589    pub message: String,
3590}
3591
3592impl From<ApiErrorResponse> for ErrorResponse {
3593    fn from(response: ApiErrorResponse) -> Self {
3594        Self {
3595            code: response.code,
3596            message: response.message,
3597        }
3598    }
3599}
3600
3601fn idempotency_token() -> String {
3602    uuid::Uuid::new_v4().simple().to_string()
3603}