Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Stream name.
42///
43/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
44pub use s2_common::types::stream::StreamName;
45/// See [`ListStreamsInput::prefix`].
46pub use s2_common::types::stream::StreamNamePrefix;
47/// See [`ListStreamsInput::start_after`].
48pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57/// An RFC 3339 datetime.
58///
59/// It can be created in either of the following ways:
60/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
61/// - Convert from [`time::OffsetDateTime`] using [`TryFrom`]/[`TryInto`].
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl TryFrom<time::OffsetDateTime> for S2DateTime {
66    type Error = ValidationError;
67
68    fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
69        dt.format(&time::format_description::well_known::Rfc3339)
70            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
71        Ok(Self(dt))
72    }
73}
74
75impl From<S2DateTime> for time::OffsetDateTime {
76    fn from(dt: S2DateTime) -> Self {
77        dt.0
78    }
79}
80
81impl FromStr for S2DateTime {
82    type Err = ValidationError;
83
84    fn from_str(s: &str) -> Result<Self, Self::Err> {
85        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
86            .map(Self)
87            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
88    }
89}
90
91impl fmt::Display for S2DateTime {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        write!(
94            f,
95            "{}",
96            self.0
97                .format(&time::format_description::well_known::Rfc3339)
98                .expect("RFC3339 formatting should not fail for S2DateTime")
99        )
100    }
101}
102
103/// Authority for connecting to an S2 basin.
104#[derive(Debug, Clone, PartialEq)]
105pub(crate) enum BasinAuthority {
106    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
107    ParentZone(Authority),
108    /// Direct cell authority. Basin is expected to be hosted by this cell.
109    Direct(Authority),
110}
111
112/// Account endpoint.
113#[derive(Debug, Clone)]
114pub struct AccountEndpoint {
115    scheme: Scheme,
116    authority: Authority,
117}
118
119impl AccountEndpoint {
120    /// Create a new [`AccountEndpoint`] with the given endpoint.
121    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
122        endpoint.parse()
123    }
124}
125
126impl FromStr for AccountEndpoint {
127    type Err = ValidationError;
128
129    fn from_str(s: &str) -> Result<Self, Self::Err> {
130        let (scheme, authority) = match s.find("://") {
131            Some(idx) => {
132                let scheme: Scheme = s[..idx]
133                    .parse()
134                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
135                (scheme, &s[idx + 3..])
136            }
137            None => (Scheme::HTTPS, s),
138        };
139        Ok(Self {
140            scheme,
141            authority: authority
142                .parse()
143                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
144        })
145    }
146}
147
148/// Basin endpoint.
149#[derive(Debug, Clone)]
150pub struct BasinEndpoint {
151    scheme: Scheme,
152    authority: BasinAuthority,
153}
154
155impl BasinEndpoint {
156    /// Create a new [`BasinEndpoint`] with the given endpoint.
157    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
158        endpoint.parse()
159    }
160}
161
162impl FromStr for BasinEndpoint {
163    type Err = ValidationError;
164
165    fn from_str(s: &str) -> Result<Self, Self::Err> {
166        let (scheme, authority) = match s.find("://") {
167            Some(idx) => {
168                let scheme: Scheme = s[..idx]
169                    .parse()
170                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
171                (scheme, &s[idx + 3..])
172            }
173            None => (Scheme::HTTPS, s),
174        };
175        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
176            BasinAuthority::ParentZone(
177                authority
178                    .parse()
179                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
180            )
181        } else {
182            BasinAuthority::Direct(
183                authority
184                    .parse()
185                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
186            )
187        };
188        Ok(Self { scheme, authority })
189    }
190}
191
192#[derive(Debug, Clone)]
193#[non_exhaustive]
194/// Endpoints for the S2 environment.
195pub struct S2Endpoints {
196    pub(crate) scheme: Scheme,
197    pub(crate) account_authority: Authority,
198    pub(crate) basin_authority: BasinAuthority,
199}
200
201impl S2Endpoints {
202    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
203    pub fn new(
204        account_endpoint: AccountEndpoint,
205        basin_endpoint: BasinEndpoint,
206    ) -> Result<Self, ValidationError> {
207        if account_endpoint.scheme != basin_endpoint.scheme {
208            return Err("account and basin endpoints must have the same scheme".into());
209        }
210        Ok(Self {
211            scheme: account_endpoint.scheme,
212            account_authority: account_endpoint.authority,
213            basin_authority: basin_endpoint.authority,
214        })
215    }
216
217    /// Create a new [`S2Endpoints`] from environment variables.
218    ///
219    /// The following environment variables are expected to be set:
220    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
221    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
222    pub fn from_env() -> Result<Self, ValidationError> {
223        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
224            Ok(endpoint) => endpoint.parse()?,
225            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
226            Err(VarError::NotUnicode(_)) => {
227                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
228            }
229        };
230
231        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
232            Ok(endpoint) => endpoint.parse()?,
233            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
234            Err(VarError::NotUnicode(_)) => {
235                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
236            }
237        };
238
239        if account_endpoint.scheme != basin_endpoint.scheme {
240            return Err(
241                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
242            );
243        }
244
245        Ok(Self {
246            scheme: account_endpoint.scheme,
247            account_authority: account_endpoint.authority,
248            basin_authority: basin_endpoint.authority,
249        })
250    }
251
252    pub(crate) fn for_aws() -> Self {
253        Self {
254            scheme: Scheme::HTTPS,
255            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
256            basin_authority: BasinAuthority::ParentZone(
257                "b.aws.s2.dev".try_into().expect("valid authority"),
258            ),
259        }
260    }
261}
262
263#[derive(Debug, Clone, Copy)]
264/// Compression algorithm for request and response bodies.
265pub enum Compression {
266    /// No compression.
267    None,
268    /// Gzip compression.
269    Gzip,
270    /// Zstd compression.
271    Zstd,
272}
273
274impl From<Compression> for CompressionAlgorithm {
275    fn from(value: Compression) -> Self {
276        match value {
277            Compression::None => CompressionAlgorithm::None,
278            Compression::Gzip => CompressionAlgorithm::Gzip,
279            Compression::Zstd => CompressionAlgorithm::Zstd,
280        }
281    }
282}
283
284#[derive(Debug, Clone, Copy, PartialEq)]
285#[non_exhaustive]
286/// Retry policy for [`append`](crate::S2Stream::append) and
287/// [`append_session`](crate::S2Stream::append_session) operations.
288pub enum AppendRetryPolicy {
289    /// Retry all appends. Use when duplicate records on the stream are acceptable.
290    All,
291    /// Only retry appends that include [`match_seq_num`](AppendInput::match_seq_num).
292    NoSideEffects,
293}
294
295impl AppendRetryPolicy {
296    pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
297        match self {
298            Self::All => true,
299            Self::NoSideEffects => input.match_seq_num.is_some(),
300        }
301    }
302}
303
304#[derive(Debug, Clone)]
305#[non_exhaustive]
306/// Configuration for retrying requests in case of transient failures.
307///
308/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
309/// ```text
310/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
311///     jitter = rand[0, base_delay]
312///     delay  = base_delay + jitter
313/// ````
314pub struct RetryConfig {
315    /// Total number of attempts including the initial try. A value of `1` means no retries.
316    ///
317    /// Defaults to `3`.
318    pub max_attempts: NonZeroU32,
319    /// Minimum base delay for retries.
320    ///
321    /// Defaults to `100ms`.
322    pub min_base_delay: Duration,
323    /// Maximum base delay for retries.
324    ///
325    /// Defaults to `1s`.
326    pub max_base_delay: Duration,
327    /// Retry policy for [`append`](crate::S2Stream::append) and
328    /// [`append_session`](crate::S2Stream::append_session) operations.
329    ///
330    /// Defaults to `All`.
331    pub append_retry_policy: AppendRetryPolicy,
332}
333
334impl Default for RetryConfig {
335    fn default() -> Self {
336        Self {
337            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
338            min_base_delay: Duration::from_millis(100),
339            max_base_delay: Duration::from_secs(1),
340            append_retry_policy: AppendRetryPolicy::All,
341        }
342    }
343}
344
345impl RetryConfig {
346    /// Create a new [`RetryConfig`] with default settings.
347    pub fn new() -> Self {
348        Self::default()
349    }
350
351    pub(crate) fn max_retries(&self) -> u32 {
352        self.max_attempts.get() - 1
353    }
354
355    /// Set the total number of attempts including the initial try.
356    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
357        Self {
358            max_attempts,
359            ..self
360        }
361    }
362
363    /// Set the minimum base delay for retries.
364    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
365        Self {
366            min_base_delay,
367            ..self
368        }
369    }
370
371    /// Set the maximum base delay for retries.
372    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
373        Self {
374            max_base_delay,
375            ..self
376        }
377    }
378
379    /// Set the retry policy for [`append`](crate::S2Stream::append) and
380    /// [`append_session`](crate::S2Stream::append_session) operations.
381    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
382        Self {
383            append_retry_policy,
384            ..self
385        }
386    }
387}
388
389#[derive(Debug, Clone)]
390#[non_exhaustive]
391/// Configuration for [`S2`](crate::S2).
392pub struct S2Config {
393    pub(crate) access_token: SecretString,
394    pub(crate) endpoints: S2Endpoints,
395    pub(crate) connection_timeout: Duration,
396    pub(crate) request_timeout: Duration,
397    pub(crate) retry: RetryConfig,
398    pub(crate) compression: Compression,
399    pub(crate) user_agent: HeaderValue,
400    pub(crate) insecure_skip_cert_verification: bool,
401}
402
403impl S2Config {
404    /// Create a new [`S2Config`] with the given access token and default settings.
405    pub fn new(access_token: impl Into<String>) -> Self {
406        Self {
407            access_token: access_token.into().into(),
408            endpoints: S2Endpoints::for_aws(),
409            connection_timeout: Duration::from_secs(3),
410            request_timeout: Duration::from_secs(5),
411            retry: RetryConfig::new(),
412            compression: Compression::None,
413            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
414                .parse()
415                .expect("valid user agent"),
416            insecure_skip_cert_verification: false,
417        }
418    }
419
420    /// Set the S2 endpoints to connect to.
421    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
422        Self { endpoints, ..self }
423    }
424
425    /// Set the timeout for establishing a connection to the server.
426    ///
427    /// Defaults to `3s`.
428    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
429        Self {
430            connection_timeout,
431            ..self
432        }
433    }
434
435    /// Set the timeout for requests.
436    ///
437    /// Defaults to `5s`.
438    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
439        Self {
440            request_timeout,
441            ..self
442        }
443    }
444
445    /// Set the retry configuration for requests.
446    ///
447    /// See [`RetryConfig`] for defaults.
448    pub fn with_retry(self, retry: RetryConfig) -> Self {
449        Self { retry, ..self }
450    }
451
452    /// Set the compression algorithm for requests and responses.
453    ///
454    /// Defaults to no compression.
455    pub fn with_compression(self, compression: Compression) -> Self {
456        Self {
457            compression,
458            ..self
459        }
460    }
461
462    /// Skip TLS certificate verification (insecure).
463    ///
464    /// This is useful for connecting to endpoints with self-signed certificates
465    /// or certificates that don't match the hostname (similar to `curl -k`).
466    ///
467    /// # Warning
468    ///
469    /// This disables certificate verification and should only be used for
470    /// testing or development purposes. **Never use this in production.**
471    ///
472    /// Defaults to `false`.
473    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
474        Self {
475            insecure_skip_cert_verification: skip,
476            ..self
477        }
478    }
479
480    #[doc(hidden)]
481    #[cfg(feature = "_hidden")]
482    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
483        let user_agent = user_agent
484            .into()
485            .parse()
486            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
487        Ok(Self { user_agent, ..self })
488    }
489}
490
491#[derive(Debug, Default, Clone, PartialEq, Eq)]
492#[non_exhaustive]
493/// A page of values.
494pub struct Page<T> {
495    /// Values in this page.
496    pub values: Vec<T>,
497    /// Whether there are more pages.
498    pub has_more: bool,
499}
500
501impl<T> Page<T> {
502    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
503        Self {
504            values: values.into(),
505            has_more,
506        }
507    }
508}
509
510#[derive(Debug, Clone, Copy, PartialEq, Eq)]
511/// Storage class for recent appends.
512pub enum StorageClass {
513    /// Standard storage class that offers append latencies under `500ms`.
514    Standard,
515    /// Express storage class that offers append latencies under `50ms`.
516    Express,
517}
518
519impl From<api::config::StorageClass> for StorageClass {
520    fn from(value: api::config::StorageClass) -> Self {
521        match value {
522            api::config::StorageClass::Standard => StorageClass::Standard,
523            api::config::StorageClass::Express => StorageClass::Express,
524        }
525    }
526}
527
528impl From<StorageClass> for api::config::StorageClass {
529    fn from(value: StorageClass) -> Self {
530        match value {
531            StorageClass::Standard => api::config::StorageClass::Standard,
532            StorageClass::Express => api::config::StorageClass::Express,
533        }
534    }
535}
536
537#[derive(Debug, Clone, Copy, PartialEq, Eq)]
538/// Retention policy for records in a stream.
539pub enum RetentionPolicy {
540    /// Age in seconds. Records older than this age are automatically trimmed.
541    Age(u64),
542    /// Records are retained indefinitely unless explicitly trimmed.
543    Infinite,
544}
545
546impl From<api::config::RetentionPolicy> for RetentionPolicy {
547    fn from(value: api::config::RetentionPolicy) -> Self {
548        match value {
549            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
550            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
551        }
552    }
553}
554
555impl From<RetentionPolicy> for api::config::RetentionPolicy {
556    fn from(value: RetentionPolicy) -> Self {
557        match value {
558            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
559            RetentionPolicy::Infinite => {
560                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
561            }
562        }
563    }
564}
565
566#[derive(Debug, Clone, Copy, PartialEq, Eq)]
567/// Timestamping mode for appends that influences how timestamps are handled.
568pub enum TimestampingMode {
569    /// Prefer client-specified timestamp if present otherwise use arrival time.
570    ClientPrefer,
571    /// Require a client-specified timestamp and reject the append if it is missing.
572    ClientRequire,
573    /// Use the arrival time and ignore any client-specified timestamp.
574    Arrival,
575}
576
577impl From<api::config::TimestampingMode> for TimestampingMode {
578    fn from(value: api::config::TimestampingMode) -> Self {
579        match value {
580            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
581            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
582            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
583        }
584    }
585}
586
587impl From<TimestampingMode> for api::config::TimestampingMode {
588    fn from(value: TimestampingMode) -> Self {
589        match value {
590            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
591            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
592            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
593        }
594    }
595}
596
597#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
598#[non_exhaustive]
599/// Configuration for timestamping behavior.
600pub struct TimestampingConfig {
601    /// Timestamping mode for appends that influences how timestamps are handled.
602    ///
603    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
604    pub mode: Option<TimestampingMode>,
605    /// Whether client-specified timestamps are allowed to exceed the arrival time.
606    ///
607    /// Defaults to `false` (client timestamps are capped at the arrival time).
608    pub uncapped: bool,
609}
610
611impl TimestampingConfig {
612    /// Create a new [`TimestampingConfig`] with default settings.
613    pub fn new() -> Self {
614        Self::default()
615    }
616
617    /// Set the timestamping mode for appends that influences how timestamps are handled.
618    pub fn with_mode(self, mode: TimestampingMode) -> Self {
619        Self {
620            mode: Some(mode),
621            ..self
622        }
623    }
624
625    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
626    pub fn with_uncapped(self, uncapped: bool) -> Self {
627        Self { uncapped, ..self }
628    }
629}
630
631impl From<api::config::TimestampingConfig> for TimestampingConfig {
632    fn from(value: api::config::TimestampingConfig) -> Self {
633        Self {
634            mode: value.mode.map(Into::into),
635            uncapped: value.uncapped.unwrap_or_default(),
636        }
637    }
638}
639
640impl From<TimestampingConfig> for api::config::TimestampingConfig {
641    fn from(value: TimestampingConfig) -> Self {
642        Self {
643            mode: value.mode.map(Into::into),
644            uncapped: Some(value.uncapped),
645        }
646    }
647}
648
649#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
650#[non_exhaustive]
651/// Configuration for automatically deleting a stream when it becomes empty.
652pub struct DeleteOnEmptyConfig {
653    /// Minimum age in seconds before an empty stream can be deleted.
654    ///
655    /// Defaults to `0` (disables automatic deletion).
656    pub min_age_secs: u64,
657}
658
659impl DeleteOnEmptyConfig {
660    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
661    pub fn new() -> Self {
662        Self::default()
663    }
664
665    /// Set the minimum age in seconds before an empty stream can be deleted.
666    pub fn with_min_age(self, min_age: Duration) -> Self {
667        Self {
668            min_age_secs: min_age.as_secs(),
669        }
670    }
671}
672
673impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
674    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
675        Self {
676            min_age_secs: value.min_age_secs,
677        }
678    }
679}
680
681impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
682    fn from(value: DeleteOnEmptyConfig) -> Self {
683        Self {
684            min_age_secs: value.min_age_secs,
685        }
686    }
687}
688
689#[derive(Debug, Clone, Default, PartialEq, Eq)]
690#[non_exhaustive]
691/// Configuration for a stream.
692pub struct StreamConfig {
693    /// Storage class for the stream.
694    ///
695    /// Defaults to [`Express`](StorageClass::Express).
696    pub storage_class: Option<StorageClass>,
697    /// Retention policy for records in the stream.
698    ///
699    /// Defaults to `7 days` of retention.
700    pub retention_policy: Option<RetentionPolicy>,
701    /// Configuration for timestamping behavior.
702    ///
703    /// See [`TimestampingConfig`] for defaults.
704    pub timestamping: Option<TimestampingConfig>,
705    /// Configuration for automatically deleting the stream when it becomes empty.
706    ///
707    /// See [`DeleteOnEmptyConfig`] for defaults.
708    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
709}
710
711impl StreamConfig {
712    /// Create a new [`StreamConfig`] with default settings.
713    pub fn new() -> Self {
714        Self::default()
715    }
716
717    /// Set the storage class for the stream.
718    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
719        Self {
720            storage_class: Some(storage_class),
721            ..self
722        }
723    }
724
725    /// Set the retention policy for records in the stream.
726    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
727        Self {
728            retention_policy: Some(retention_policy),
729            ..self
730        }
731    }
732
733    /// Set the configuration for timestamping behavior.
734    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
735        Self {
736            timestamping: Some(timestamping),
737            ..self
738        }
739    }
740
741    /// Set the configuration for automatically deleting the stream when it becomes empty.
742    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
743        Self {
744            delete_on_empty: Some(delete_on_empty),
745            ..self
746        }
747    }
748}
749
750impl From<api::config::StreamConfig> for StreamConfig {
751    fn from(value: api::config::StreamConfig) -> Self {
752        Self {
753            storage_class: value.storage_class.map(Into::into),
754            retention_policy: value.retention_policy.map(Into::into),
755            timestamping: value.timestamping.map(Into::into),
756            delete_on_empty: value.delete_on_empty.map(Into::into),
757        }
758    }
759}
760
761impl From<StreamConfig> for api::config::StreamConfig {
762    fn from(value: StreamConfig) -> Self {
763        Self {
764            storage_class: value.storage_class.map(Into::into),
765            retention_policy: value.retention_policy.map(Into::into),
766            timestamping: value.timestamping.map(Into::into),
767            delete_on_empty: value.delete_on_empty.map(Into::into),
768        }
769    }
770}
771
772#[derive(Debug, Clone, Default)]
773#[non_exhaustive]
774/// Configuration for a basin.
775pub struct BasinConfig {
776    /// Default configuration for all streams in the basin.
777    ///
778    /// See [`StreamConfig`] for defaults.
779    pub default_stream_config: Option<StreamConfig>,
780    /// Whether to create stream on append if it doesn't exist using default stream configuration.
781    ///
782    /// Defaults to `false`.
783    pub create_stream_on_append: bool,
784    /// Whether to create stream on read if it doesn't exist using default stream configuration.
785    ///
786    /// Defaults to `false`.
787    pub create_stream_on_read: bool,
788}
789
790impl BasinConfig {
791    /// Create a new [`BasinConfig`] with default settings.
792    pub fn new() -> Self {
793        Self::default()
794    }
795
796    /// Set the default configuration for all streams in the basin.
797    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
798        Self {
799            default_stream_config: Some(config),
800            ..self
801        }
802    }
803
804    /// Set whether to create stream on append if it doesn't exist using default stream
805    /// configuration.
806    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
807        Self {
808            create_stream_on_append,
809            ..self
810        }
811    }
812
813    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
814    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
815        Self {
816            create_stream_on_read,
817            ..self
818        }
819    }
820}
821
822impl From<api::config::BasinConfig> for BasinConfig {
823    fn from(value: api::config::BasinConfig) -> Self {
824        Self {
825            default_stream_config: value.default_stream_config.map(Into::into),
826            create_stream_on_append: value.create_stream_on_append,
827            create_stream_on_read: value.create_stream_on_read,
828        }
829    }
830}
831
832impl From<BasinConfig> for api::config::BasinConfig {
833    fn from(value: BasinConfig) -> Self {
834        Self {
835            default_stream_config: value.default_stream_config.map(Into::into),
836            create_stream_on_append: value.create_stream_on_append,
837            create_stream_on_read: value.create_stream_on_read,
838        }
839    }
840}
841
842#[derive(Debug, Clone, PartialEq, Eq)]
843/// Scope of a basin.
844pub enum BasinScope {
845    /// AWS `us-east-1` region.
846    AwsUsEast1,
847}
848
849impl From<api::basin::BasinScope> for BasinScope {
850    fn from(value: api::basin::BasinScope) -> Self {
851        match value {
852            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
853        }
854    }
855}
856
857impl From<BasinScope> for api::basin::BasinScope {
858    fn from(value: BasinScope) -> Self {
859        match value {
860            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
861        }
862    }
863}
864
865/// Result of a create-or-reconfigure operation.
866///
867/// Indicates whether the resource was newly created or already existed and was
868/// reconfigured. Both variants hold the resource's current state.
869#[doc(hidden)]
870#[cfg(feature = "_hidden")]
871#[derive(Debug, Clone, PartialEq, Eq)]
872pub enum CreateOrReconfigured<T> {
873    /// Resource was newly created.
874    Created(T),
875    /// Resource already existed and was reconfigured to match the spec.
876    Reconfigured(T),
877}
878
879#[cfg(feature = "_hidden")]
880impl<T> CreateOrReconfigured<T> {
881    /// Returns `true` if the resource was newly created.
882    pub fn is_created(&self) -> bool {
883        matches!(self, Self::Created(_))
884    }
885
886    /// Unwrap the inner value regardless of variant.
887    pub fn into_inner(self) -> T {
888        match self {
889            Self::Created(t) | Self::Reconfigured(t) => t,
890        }
891    }
892}
893
894#[derive(Debug, Clone)]
895#[non_exhaustive]
896/// Input for [`create_basin`](crate::S2::create_basin) operation.
897pub struct CreateBasinInput {
898    /// Basin name.
899    pub name: BasinName,
900    /// Configuration for the basin.
901    ///
902    /// See [`BasinConfig`] for defaults.
903    pub config: Option<BasinConfig>,
904    /// Scope of the basin.
905    ///
906    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
907    pub scope: Option<BasinScope>,
908    idempotency_token: String,
909}
910
911impl CreateBasinInput {
912    /// Create a new [`CreateBasinInput`] with the given basin name.
913    pub fn new(name: BasinName) -> Self {
914        Self {
915            name,
916            config: None,
917            scope: None,
918            idempotency_token: idempotency_token(),
919        }
920    }
921
922    /// Set the configuration for the basin.
923    pub fn with_config(self, config: BasinConfig) -> Self {
924        Self {
925            config: Some(config),
926            ..self
927        }
928    }
929
930    /// Set the scope of the basin.
931    pub fn with_scope(self, scope: BasinScope) -> Self {
932        Self {
933            scope: Some(scope),
934            ..self
935        }
936    }
937}
938
939impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
940    fn from(value: CreateBasinInput) -> Self {
941        (
942            api::basin::CreateBasinRequest {
943                basin: value.name,
944                config: value.config.map(Into::into),
945                scope: value.scope.map(Into::into),
946            },
947            value.idempotency_token,
948        )
949    }
950}
951
952#[derive(Debug, Clone)]
953#[non_exhaustive]
954/// Input for [`create_or_reconfigure_basin`](crate::S2::create_or_reconfigure_basin) operation.
955#[doc(hidden)]
956#[cfg(feature = "_hidden")]
957pub struct CreateOrReconfigureBasinInput {
958    /// Basin name.
959    pub name: BasinName,
960    /// Reconfiguration for the basin.
961    ///
962    /// If `None`, the basin is created with default configuration or left unchanged if it exists.
963    pub config: Option<BasinReconfiguration>,
964    /// Scope of the basin.
965    ///
966    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once set.
967    pub scope: Option<BasinScope>,
968}
969
970#[cfg(feature = "_hidden")]
971impl CreateOrReconfigureBasinInput {
972    /// Create a new [`CreateOrReconfigureBasinInput`] with the given basin name.
973    pub fn new(name: BasinName) -> Self {
974        Self {
975            name,
976            config: None,
977            scope: None,
978        }
979    }
980
981    /// Set the reconfiguration for the basin.
982    pub fn with_config(self, config: BasinReconfiguration) -> Self {
983        Self {
984            config: Some(config),
985            ..self
986        }
987    }
988
989    /// Set the scope of the basin.
990    pub fn with_scope(self, scope: BasinScope) -> Self {
991        Self {
992            scope: Some(scope),
993            ..self
994        }
995    }
996}
997
998#[cfg(feature = "_hidden")]
999impl From<CreateOrReconfigureBasinInput>
1000    for (
1001        BasinName,
1002        Option<api::basin::CreateOrReconfigureBasinRequest>,
1003    )
1004{
1005    fn from(value: CreateOrReconfigureBasinInput) -> Self {
1006        let request = if value.config.is_some() || value.scope.is_some() {
1007            Some(api::basin::CreateOrReconfigureBasinRequest {
1008                config: value.config.map(Into::into),
1009                scope: value.scope.map(Into::into),
1010            })
1011        } else {
1012            None
1013        };
1014        (value.name, request)
1015    }
1016}
1017
1018#[derive(Debug, Clone, Default)]
1019#[non_exhaustive]
1020/// Input for [`list_basins`](crate::S2::list_basins) operation.
1021pub struct ListBasinsInput {
1022    /// Filter basins whose names begin with this value.
1023    ///
1024    /// Defaults to `""`.
1025    pub prefix: BasinNamePrefix,
1026    /// Filter basins whose names are lexicographically greater than this value.
1027    ///
1028    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
1029    ///
1030    /// Defaults to `""`.
1031    pub start_after: BasinNameStartAfter,
1032    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
1033    ///
1034    /// Defaults to `1000`.
1035    pub limit: Option<usize>,
1036}
1037
1038impl ListBasinsInput {
1039    /// Create a new [`ListBasinsInput`] with default values.
1040    pub fn new() -> Self {
1041        Self::default()
1042    }
1043
1044    /// Set the prefix used to filter basins whose names begin with this value.
1045    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1046        Self { prefix, ..self }
1047    }
1048
1049    /// Set the value used to filter basins whose names are lexicographically greater than this
1050    /// value.
1051    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1052        Self {
1053            start_after,
1054            ..self
1055        }
1056    }
1057
1058    /// Set the limit on number of basins to return in a page.
1059    pub fn with_limit(self, limit: usize) -> Self {
1060        Self {
1061            limit: Some(limit),
1062            ..self
1063        }
1064    }
1065}
1066
1067impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1068    fn from(value: ListBasinsInput) -> Self {
1069        Self {
1070            prefix: Some(value.prefix),
1071            start_after: Some(value.start_after),
1072            limit: value.limit,
1073        }
1074    }
1075}
1076
1077#[derive(Debug, Clone, Default)]
1078/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
1079pub struct ListAllBasinsInput {
1080    /// Filter basins whose names begin with this value.
1081    ///
1082    /// Defaults to `""`.
1083    pub prefix: BasinNamePrefix,
1084    /// Filter basins whose names are lexicographically greater than this value.
1085    ///
1086    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
1087    ///
1088    /// Defaults to `""`.
1089    pub start_after: BasinNameStartAfter,
1090    /// Whether to include basins that are being deleted.
1091    ///
1092    /// Defaults to `false`.
1093    pub include_deleted: bool,
1094}
1095
1096impl ListAllBasinsInput {
1097    /// Create a new [`ListAllBasinsInput`] with default values.
1098    pub fn new() -> Self {
1099        Self::default()
1100    }
1101
1102    /// Set the prefix used to filter basins whose names begin with this value.
1103    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1104        Self { prefix, ..self }
1105    }
1106
1107    /// Set the value used to filter basins whose names are lexicographically greater than this
1108    /// value.
1109    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1110        Self {
1111            start_after,
1112            ..self
1113        }
1114    }
1115
1116    /// Set whether to include basins that are being deleted.
1117    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1118        Self {
1119            include_deleted,
1120            ..self
1121        }
1122    }
1123}
1124
1125#[derive(Debug, Clone, PartialEq, Eq)]
1126/// Current state of a basin.
1127pub enum BasinState {
1128    /// Active
1129    Active,
1130    /// Creating
1131    Creating,
1132    /// Deleting
1133    Deleting,
1134}
1135
1136impl From<api::basin::BasinState> for BasinState {
1137    fn from(value: api::basin::BasinState) -> Self {
1138        match value {
1139            api::basin::BasinState::Active => BasinState::Active,
1140            api::basin::BasinState::Creating => BasinState::Creating,
1141            api::basin::BasinState::Deleting => BasinState::Deleting,
1142        }
1143    }
1144}
1145
1146#[derive(Debug, Clone, PartialEq, Eq)]
1147#[non_exhaustive]
1148/// Basin information.
1149pub struct BasinInfo {
1150    /// Basin name.
1151    pub name: BasinName,
1152    /// Scope of the basin.
1153    pub scope: Option<BasinScope>,
1154    /// Current state of the basin.
1155    pub state: BasinState,
1156}
1157
1158impl From<api::basin::BasinInfo> for BasinInfo {
1159    fn from(value: api::basin::BasinInfo) -> Self {
1160        Self {
1161            name: value.name,
1162            scope: value.scope.map(Into::into),
1163            state: value.state.into(),
1164        }
1165    }
1166}
1167
1168#[derive(Debug, Clone)]
1169#[non_exhaustive]
1170/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1171pub struct DeleteBasinInput {
1172    /// Basin name.
1173    pub name: BasinName,
1174    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1175    pub ignore_not_found: bool,
1176}
1177
1178impl DeleteBasinInput {
1179    /// Create a new [`DeleteBasinInput`] with the given basin name.
1180    pub fn new(name: BasinName) -> Self {
1181        Self {
1182            name,
1183            ignore_not_found: false,
1184        }
1185    }
1186
1187    /// Set whether to ignore `Not Found` error if the basin is not existing.
1188    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1189        Self {
1190            ignore_not_found,
1191            ..self
1192        }
1193    }
1194}
1195
1196#[derive(Debug, Clone, Default)]
1197#[non_exhaustive]
1198/// Reconfiguration for [`TimestampingConfig`].
1199pub struct TimestampingReconfiguration {
1200    /// Override for the existing [`mode`](TimestampingConfig::mode).
1201    pub mode: Maybe<Option<TimestampingMode>>,
1202    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1203    pub uncapped: Maybe<Option<bool>>,
1204}
1205
1206impl TimestampingReconfiguration {
1207    /// Create a new [`TimestampingReconfiguration`].
1208    pub fn new() -> Self {
1209        Self::default()
1210    }
1211
1212    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1213    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1214        Self {
1215            mode: Maybe::Specified(Some(mode)),
1216            ..self
1217        }
1218    }
1219
1220    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1221    pub fn with_uncapped(self, uncapped: bool) -> Self {
1222        Self {
1223            uncapped: Maybe::Specified(Some(uncapped)),
1224            ..self
1225        }
1226    }
1227}
1228
1229impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1230    fn from(value: TimestampingReconfiguration) -> Self {
1231        Self {
1232            mode: value.mode.map(|m| m.map(Into::into)),
1233            uncapped: value.uncapped,
1234        }
1235    }
1236}
1237
1238#[derive(Debug, Clone, Default)]
1239#[non_exhaustive]
1240/// Reconfiguration for [`DeleteOnEmptyConfig`].
1241pub struct DeleteOnEmptyReconfiguration {
1242    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1243    pub min_age_secs: Maybe<Option<u64>>,
1244}
1245
1246impl DeleteOnEmptyReconfiguration {
1247    /// Create a new [`DeleteOnEmptyReconfiguration`].
1248    pub fn new() -> Self {
1249        Self::default()
1250    }
1251
1252    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1253    pub fn with_min_age(self, min_age: Duration) -> Self {
1254        Self {
1255            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1256        }
1257    }
1258}
1259
1260impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1261    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1262        Self {
1263            min_age_secs: value.min_age_secs,
1264        }
1265    }
1266}
1267
1268#[derive(Debug, Clone, Default)]
1269#[non_exhaustive]
1270/// Reconfiguration for [`StreamConfig`].
1271pub struct StreamReconfiguration {
1272    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1273    pub storage_class: Maybe<Option<StorageClass>>,
1274    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1275    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1276    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1277    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1278    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1279    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1280}
1281
1282impl StreamReconfiguration {
1283    /// Create a new [`StreamReconfiguration`].
1284    pub fn new() -> Self {
1285        Self::default()
1286    }
1287
1288    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1289    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1290        Self {
1291            storage_class: Maybe::Specified(Some(storage_class)),
1292            ..self
1293        }
1294    }
1295
1296    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1297    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1298        Self {
1299            retention_policy: Maybe::Specified(Some(retention_policy)),
1300            ..self
1301        }
1302    }
1303
1304    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1305    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1306        Self {
1307            timestamping: Maybe::Specified(Some(timestamping)),
1308            ..self
1309        }
1310    }
1311
1312    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1313    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1314        Self {
1315            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1316            ..self
1317        }
1318    }
1319}
1320
1321impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1322    fn from(value: StreamReconfiguration) -> Self {
1323        Self {
1324            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1325            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1326            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1327            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1328        }
1329    }
1330}
1331
1332#[derive(Debug, Clone, Default)]
1333#[non_exhaustive]
1334/// Reconfiguration for [`BasinConfig`].
1335pub struct BasinReconfiguration {
1336    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1337    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1338    /// Override for the existing
1339    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1340    pub create_stream_on_append: Maybe<bool>,
1341    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1342    pub create_stream_on_read: Maybe<bool>,
1343}
1344
1345impl BasinReconfiguration {
1346    /// Create a new [`BasinReconfiguration`].
1347    pub fn new() -> Self {
1348        Self::default()
1349    }
1350
1351    /// Set the override for the existing
1352    /// [`default_stream_config`](BasinConfig::default_stream_config).
1353    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1354        Self {
1355            default_stream_config: Maybe::Specified(Some(config)),
1356            ..self
1357        }
1358    }
1359
1360    /// Set the override for the existing
1361    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1362    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1363        Self {
1364            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1365            ..self
1366        }
1367    }
1368
1369    /// Set the override for the existing
1370    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1371    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1372        Self {
1373            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1374            ..self
1375        }
1376    }
1377}
1378
1379impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1380    fn from(value: BasinReconfiguration) -> Self {
1381        Self {
1382            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1383            create_stream_on_append: value.create_stream_on_append,
1384            create_stream_on_read: value.create_stream_on_read,
1385        }
1386    }
1387}
1388
1389#[derive(Debug, Clone)]
1390#[non_exhaustive]
1391/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1392pub struct ReconfigureBasinInput {
1393    /// Basin name.
1394    pub name: BasinName,
1395    /// Reconfiguration for [`BasinConfig`].
1396    pub config: BasinReconfiguration,
1397}
1398
1399impl ReconfigureBasinInput {
1400    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1401    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1402        Self { name, config }
1403    }
1404}
1405
1406#[derive(Debug, Clone, Default)]
1407#[non_exhaustive]
1408/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1409pub struct ListAccessTokensInput {
1410    /// Filter access tokens whose IDs begin with this value.
1411    ///
1412    /// Defaults to `""`.
1413    pub prefix: AccessTokenIdPrefix,
1414    /// Filter access tokens whose IDs are lexicographically greater than this value.
1415    ///
1416    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1417    ///
1418    /// Defaults to `""`.
1419    pub start_after: AccessTokenIdStartAfter,
1420    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1421    ///
1422    /// Defaults to `1000`.
1423    pub limit: Option<usize>,
1424}
1425
1426impl ListAccessTokensInput {
1427    /// Create a new [`ListAccessTokensInput`] with default values.
1428    pub fn new() -> Self {
1429        Self::default()
1430    }
1431
1432    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1433    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1434        Self { prefix, ..self }
1435    }
1436
1437    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1438    /// value.
1439    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1440        Self {
1441            start_after,
1442            ..self
1443        }
1444    }
1445
1446    /// Set the limit on number of access tokens to return in a page.
1447    pub fn with_limit(self, limit: usize) -> Self {
1448        Self {
1449            limit: Some(limit),
1450            ..self
1451        }
1452    }
1453}
1454
1455impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1456    fn from(value: ListAccessTokensInput) -> Self {
1457        Self {
1458            prefix: Some(value.prefix),
1459            start_after: Some(value.start_after),
1460            limit: value.limit,
1461        }
1462    }
1463}
1464
1465#[derive(Debug, Clone, Default)]
1466/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1467pub struct ListAllAccessTokensInput {
1468    /// Filter access tokens whose IDs begin with this value.
1469    ///
1470    /// Defaults to `""`.
1471    pub prefix: AccessTokenIdPrefix,
1472    /// Filter access tokens whose IDs are lexicographically greater than this value.
1473    ///
1474    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1475    ///
1476    /// Defaults to `""`.
1477    pub start_after: AccessTokenIdStartAfter,
1478}
1479
1480impl ListAllAccessTokensInput {
1481    /// Create a new [`ListAllAccessTokensInput`] with default values.
1482    pub fn new() -> Self {
1483        Self::default()
1484    }
1485
1486    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1487    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1488        Self { prefix, ..self }
1489    }
1490
1491    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1492    /// this value.
1493    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1494        Self {
1495            start_after,
1496            ..self
1497        }
1498    }
1499}
1500
1501#[derive(Debug, Clone)]
1502#[non_exhaustive]
1503/// Access token information.
1504pub struct AccessTokenInfo {
1505    /// Access token ID.
1506    pub id: AccessTokenId,
1507    /// Expiration time.
1508    pub expires_at: S2DateTime,
1509    /// Whether to automatically prefix stream names during creation and strip the prefix during
1510    /// listing.
1511    pub auto_prefix_streams: bool,
1512    /// Scope of the access token.
1513    pub scope: AccessTokenScope,
1514}
1515
1516impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1517    type Error = ValidationError;
1518
1519    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1520        let expires_at = value
1521            .expires_at
1522            .map(S2DateTime::try_from)
1523            .transpose()?
1524            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1525        Ok(Self {
1526            id: value.id,
1527            expires_at,
1528            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1529            scope: value.scope.into(),
1530        })
1531    }
1532}
1533
1534#[derive(Debug, Clone)]
1535/// Pattern for matching basins.
1536///
1537/// See [`AccessTokenScope::basins`].
1538pub enum BasinMatcher {
1539    /// Match no basins.
1540    None,
1541    /// Match exactly this basin.
1542    Exact(BasinName),
1543    /// Match all basins with this prefix.
1544    Prefix(BasinNamePrefix),
1545}
1546
1547#[derive(Debug, Clone)]
1548/// Pattern for matching streams.
1549///
1550/// See [`AccessTokenScope::streams`].
1551pub enum StreamMatcher {
1552    /// Match no streams.
1553    None,
1554    /// Match exactly this stream.
1555    Exact(StreamName),
1556    /// Match all streams with this prefix.
1557    Prefix(StreamNamePrefix),
1558}
1559
1560#[derive(Debug, Clone)]
1561/// Pattern for matching access tokens.
1562///
1563/// See [`AccessTokenScope::access_tokens`].
1564pub enum AccessTokenMatcher {
1565    /// Match no access tokens.
1566    None,
1567    /// Match exactly this access token.
1568    Exact(AccessTokenId),
1569    /// Match all access tokens with this prefix.
1570    Prefix(AccessTokenIdPrefix),
1571}
1572
1573#[derive(Debug, Clone, Default)]
1574#[non_exhaustive]
1575/// Permissions indicating allowed operations.
1576pub struct ReadWritePermissions {
1577    /// Read permission.
1578    ///
1579    /// Defaults to `false`.
1580    pub read: bool,
1581    /// Write permission.
1582    ///
1583    /// Defaults to `false`.
1584    pub write: bool,
1585}
1586
1587impl ReadWritePermissions {
1588    /// Create a new [`ReadWritePermissions`] with default values.
1589    pub fn new() -> Self {
1590        Self::default()
1591    }
1592
1593    /// Create read-only permissions.
1594    pub fn read_only() -> Self {
1595        Self {
1596            read: true,
1597            write: false,
1598        }
1599    }
1600
1601    /// Create write-only permissions.
1602    pub fn write_only() -> Self {
1603        Self {
1604            read: false,
1605            write: true,
1606        }
1607    }
1608
1609    /// Create read-write permissions.
1610    pub fn read_write() -> Self {
1611        Self {
1612            read: true,
1613            write: true,
1614        }
1615    }
1616}
1617
1618impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1619    fn from(value: ReadWritePermissions) -> Self {
1620        Self {
1621            read: Some(value.read),
1622            write: Some(value.write),
1623        }
1624    }
1625}
1626
1627impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1628    fn from(value: api::access::ReadWritePermissions) -> Self {
1629        Self {
1630            read: value.read.unwrap_or_default(),
1631            write: value.write.unwrap_or_default(),
1632        }
1633    }
1634}
1635
1636#[derive(Debug, Clone, Default)]
1637#[non_exhaustive]
1638/// Permissions at the operation group level.
1639///
1640/// See [`AccessTokenScope::op_group_perms`].
1641pub struct OperationGroupPermissions {
1642    /// Account-level access permissions.
1643    ///
1644    /// Defaults to `None`.
1645    pub account: Option<ReadWritePermissions>,
1646    /// Basin-level access permissions.
1647    ///
1648    /// Defaults to `None`.
1649    pub basin: Option<ReadWritePermissions>,
1650    /// Stream-level access permissions.
1651    ///
1652    /// Defaults to `None`.
1653    pub stream: Option<ReadWritePermissions>,
1654}
1655
1656impl OperationGroupPermissions {
1657    /// Create a new [`OperationGroupPermissions`] with default values.
1658    pub fn new() -> Self {
1659        Self::default()
1660    }
1661
1662    /// Create read-only permissions for all groups.
1663    pub fn read_only_all() -> Self {
1664        Self {
1665            account: Some(ReadWritePermissions::read_only()),
1666            basin: Some(ReadWritePermissions::read_only()),
1667            stream: Some(ReadWritePermissions::read_only()),
1668        }
1669    }
1670
1671    /// Create write-only permissions for all groups.
1672    pub fn write_only_all() -> Self {
1673        Self {
1674            account: Some(ReadWritePermissions::write_only()),
1675            basin: Some(ReadWritePermissions::write_only()),
1676            stream: Some(ReadWritePermissions::write_only()),
1677        }
1678    }
1679
1680    /// Create read-write permissions for all groups.
1681    pub fn read_write_all() -> Self {
1682        Self {
1683            account: Some(ReadWritePermissions::read_write()),
1684            basin: Some(ReadWritePermissions::read_write()),
1685            stream: Some(ReadWritePermissions::read_write()),
1686        }
1687    }
1688
1689    /// Set account-level access permissions.
1690    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1691        Self {
1692            account: Some(account),
1693            ..self
1694        }
1695    }
1696
1697    /// Set basin-level access permissions.
1698    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1699        Self {
1700            basin: Some(basin),
1701            ..self
1702        }
1703    }
1704
1705    /// Set stream-level access permissions.
1706    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1707        Self {
1708            stream: Some(stream),
1709            ..self
1710        }
1711    }
1712}
1713
1714impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1715    fn from(value: OperationGroupPermissions) -> Self {
1716        Self {
1717            account: value.account.map(Into::into),
1718            basin: value.basin.map(Into::into),
1719            stream: value.stream.map(Into::into),
1720        }
1721    }
1722}
1723
1724impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1725    fn from(value: api::access::PermittedOperationGroups) -> Self {
1726        Self {
1727            account: value.account.map(Into::into),
1728            basin: value.basin.map(Into::into),
1729            stream: value.stream.map(Into::into),
1730        }
1731    }
1732}
1733
1734#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1735/// Individual operation that can be permitted.
1736///
1737/// See [`AccessTokenScope::ops`].
1738pub enum Operation {
1739    /// List basins.
1740    ListBasins,
1741    /// Create a basin.
1742    CreateBasin,
1743    /// Get basin configuration.
1744    GetBasinConfig,
1745    /// Delete a basin.
1746    DeleteBasin,
1747    /// Reconfigure a basin.
1748    ReconfigureBasin,
1749    /// List access tokens.
1750    ListAccessTokens,
1751    /// Issue an access token.
1752    IssueAccessToken,
1753    /// Revoke an access token.
1754    RevokeAccessToken,
1755    /// Get account metrics.
1756    GetAccountMetrics,
1757    /// Get basin metrics.
1758    GetBasinMetrics,
1759    /// Get stream metrics.
1760    GetStreamMetrics,
1761    /// List streams.
1762    ListStreams,
1763    /// Create a stream.
1764    CreateStream,
1765    /// Get stream configuration.
1766    GetStreamConfig,
1767    /// Delete a stream.
1768    DeleteStream,
1769    /// Reconfigure a stream.
1770    ReconfigureStream,
1771    /// Check the tail of a stream.
1772    CheckTail,
1773    /// Append records to a stream.
1774    Append,
1775    /// Read records from a stream.
1776    Read,
1777    /// Trim records on a stream.
1778    Trim,
1779    /// Set the fencing token on a stream.
1780    Fence,
1781}
1782
1783impl From<Operation> for api::access::Operation {
1784    fn from(value: Operation) -> Self {
1785        match value {
1786            Operation::ListBasins => api::access::Operation::ListBasins,
1787            Operation::CreateBasin => api::access::Operation::CreateBasin,
1788            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1789            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1790            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1791            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1792            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1793            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1794            Operation::ListStreams => api::access::Operation::ListStreams,
1795            Operation::CreateStream => api::access::Operation::CreateStream,
1796            Operation::DeleteStream => api::access::Operation::DeleteStream,
1797            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1798            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1799            Operation::CheckTail => api::access::Operation::CheckTail,
1800            Operation::Append => api::access::Operation::Append,
1801            Operation::Read => api::access::Operation::Read,
1802            Operation::Trim => api::access::Operation::Trim,
1803            Operation::Fence => api::access::Operation::Fence,
1804            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1805            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1806            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1807        }
1808    }
1809}
1810
1811impl From<api::access::Operation> for Operation {
1812    fn from(value: api::access::Operation) -> Self {
1813        match value {
1814            api::access::Operation::ListBasins => Operation::ListBasins,
1815            api::access::Operation::CreateBasin => Operation::CreateBasin,
1816            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1817            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1818            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1819            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1820            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1821            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1822            api::access::Operation::ListStreams => Operation::ListStreams,
1823            api::access::Operation::CreateStream => Operation::CreateStream,
1824            api::access::Operation::DeleteStream => Operation::DeleteStream,
1825            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1826            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1827            api::access::Operation::CheckTail => Operation::CheckTail,
1828            api::access::Operation::Append => Operation::Append,
1829            api::access::Operation::Read => Operation::Read,
1830            api::access::Operation::Trim => Operation::Trim,
1831            api::access::Operation::Fence => Operation::Fence,
1832            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1833            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1834            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1835        }
1836    }
1837}
1838
1839#[derive(Debug, Clone)]
1840#[non_exhaustive]
1841/// Scope of an access token.
1842///
1843/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1844/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1845/// final set must not be empty.
1846///
1847/// See [`IssueAccessTokenInput::scope`].
1848pub struct AccessTokenScopeInput {
1849    basins: Option<BasinMatcher>,
1850    streams: Option<StreamMatcher>,
1851    access_tokens: Option<AccessTokenMatcher>,
1852    op_group_perms: Option<OperationGroupPermissions>,
1853    ops: HashSet<Operation>,
1854}
1855
1856impl AccessTokenScopeInput {
1857    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1858    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1859        Self {
1860            basins: None,
1861            streams: None,
1862            access_tokens: None,
1863            op_group_perms: None,
1864            ops: ops.into_iter().collect(),
1865        }
1866    }
1867
1868    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1869    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1870        Self {
1871            basins: None,
1872            streams: None,
1873            access_tokens: None,
1874            op_group_perms: Some(op_group_perms),
1875            ops: HashSet::default(),
1876        }
1877    }
1878
1879    /// Set the permitted operations.
1880    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1881        Self {
1882            ops: ops.into_iter().collect(),
1883            ..self
1884        }
1885    }
1886
1887    /// Set the access permissions at the operation group level.
1888    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1889        Self {
1890            op_group_perms: Some(op_group_perms),
1891            ..self
1892        }
1893    }
1894
1895    /// Set the permitted basins.
1896    ///
1897    /// Defaults to no basins.
1898    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1899        Self {
1900            basins: Some(basins),
1901            ..self
1902        }
1903    }
1904
1905    /// Set the permitted streams.
1906    ///
1907    /// Defaults to no streams.
1908    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1909        Self {
1910            streams: Some(streams),
1911            ..self
1912        }
1913    }
1914
1915    /// Set the permitted access tokens.
1916    ///
1917    /// Defaults to no access tokens.
1918    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1919        Self {
1920            access_tokens: Some(access_tokens),
1921            ..self
1922        }
1923    }
1924}
1925
1926#[derive(Debug, Clone)]
1927#[non_exhaustive]
1928/// Scope of an access token.
1929pub struct AccessTokenScope {
1930    /// Permitted basins.
1931    pub basins: Option<BasinMatcher>,
1932    /// Permitted streams.
1933    pub streams: Option<StreamMatcher>,
1934    /// Permitted access tokens.
1935    pub access_tokens: Option<AccessTokenMatcher>,
1936    /// Permissions at the operation group level.
1937    pub op_group_perms: Option<OperationGroupPermissions>,
1938    /// Permitted operations.
1939    pub ops: HashSet<Operation>,
1940}
1941
1942impl From<api::access::AccessTokenScope> for AccessTokenScope {
1943    fn from(value: api::access::AccessTokenScope) -> Self {
1944        Self {
1945            basins: value.basins.map(|rs| match rs {
1946                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1947                    BasinMatcher::Exact(e)
1948                }
1949                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1950                    BasinMatcher::None
1951                }
1952                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1953            }),
1954            streams: value.streams.map(|rs| match rs {
1955                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1956                    StreamMatcher::Exact(e)
1957                }
1958                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1959                    StreamMatcher::None
1960                }
1961                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1962            }),
1963            access_tokens: value.access_tokens.map(|rs| match rs {
1964                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1965                    AccessTokenMatcher::Exact(e)
1966                }
1967                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1968                    AccessTokenMatcher::None
1969                }
1970                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1971            }),
1972            op_group_perms: value.op_groups.map(Into::into),
1973            ops: value
1974                .ops
1975                .map(|ops| ops.into_iter().map(Into::into).collect())
1976                .unwrap_or_default(),
1977        }
1978    }
1979}
1980
1981impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1982    fn from(value: AccessTokenScopeInput) -> Self {
1983        Self {
1984            basins: value.basins.map(|rs| match rs {
1985                BasinMatcher::None => {
1986                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1987                }
1988                BasinMatcher::Exact(e) => {
1989                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1990                }
1991                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1992            }),
1993            streams: value.streams.map(|rs| match rs {
1994                StreamMatcher::None => {
1995                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1996                }
1997                StreamMatcher::Exact(e) => {
1998                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1999                }
2000                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2001            }),
2002            access_tokens: value.access_tokens.map(|rs| match rs {
2003                AccessTokenMatcher::None => {
2004                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
2005                }
2006                AccessTokenMatcher::Exact(e) => {
2007                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2008                }
2009                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2010            }),
2011            op_groups: value.op_group_perms.map(Into::into),
2012            ops: if value.ops.is_empty() {
2013                None
2014            } else {
2015                Some(value.ops.into_iter().map(Into::into).collect())
2016            },
2017        }
2018    }
2019}
2020
2021#[derive(Debug, Clone)]
2022#[non_exhaustive]
2023/// Input for [`issue_access_token`](crate::S2::issue_access_token).
2024pub struct IssueAccessTokenInput {
2025    /// Access token ID.
2026    pub id: AccessTokenId,
2027    /// Expiration time.
2028    ///
2029    /// Defaults to the expiration time of requestor's access token passed via
2030    /// [`S2Config`](S2Config::new).
2031    pub expires_at: Option<S2DateTime>,
2032    /// Whether to automatically prefix stream names during creation and strip the prefix during
2033    /// listing.
2034    ///
2035    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
2036    /// prefix.
2037    ///
2038    /// Defaults to `false`.
2039    pub auto_prefix_streams: bool,
2040    /// Scope of the token.
2041    pub scope: AccessTokenScopeInput,
2042}
2043
2044impl IssueAccessTokenInput {
2045    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
2046    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2047        Self {
2048            id,
2049            expires_at: None,
2050            auto_prefix_streams: false,
2051            scope,
2052        }
2053    }
2054
2055    /// Set the expiration time.
2056    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2057        Self {
2058            expires_at: Some(expires_at),
2059            ..self
2060        }
2061    }
2062
2063    /// Set whether to automatically prefix stream names during creation and strip the prefix during
2064    /// listing.
2065    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2066        Self {
2067            auto_prefix_streams,
2068            ..self
2069        }
2070    }
2071}
2072
2073impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2074    fn from(value: IssueAccessTokenInput) -> Self {
2075        Self {
2076            id: value.id,
2077            expires_at: value.expires_at.map(Into::into),
2078            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2079            scope: value.scope.into(),
2080        }
2081    }
2082}
2083
2084#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2085/// Interval to accumulate over for timeseries metric sets.
2086pub enum TimeseriesInterval {
2087    /// Minute.
2088    Minute,
2089    /// Hour.
2090    Hour,
2091    /// Day.
2092    Day,
2093}
2094
2095impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2096    fn from(value: TimeseriesInterval) -> Self {
2097        match value {
2098            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2099            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2100            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2101        }
2102    }
2103}
2104
2105impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2106    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2107        match value {
2108            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2109            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2110            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2111        }
2112    }
2113}
2114
2115#[derive(Debug, Clone, Copy)]
2116#[non_exhaustive]
2117/// Time range as Unix epoch seconds.
2118pub struct TimeRange {
2119    /// Start timestamp (inclusive).
2120    pub start: u32,
2121    /// End timestamp (exclusive).
2122    pub end: u32,
2123}
2124
2125impl TimeRange {
2126    /// Create a new [`TimeRange`] with the given start and end timestamps.
2127    pub fn new(start: u32, end: u32) -> Self {
2128        Self { start, end }
2129    }
2130}
2131
2132#[derive(Debug, Clone, Copy)]
2133#[non_exhaustive]
2134/// Time range as Unix epoch seconds and accumulation interval.
2135pub struct TimeRangeAndInterval {
2136    /// Start timestamp (inclusive).
2137    pub start: u32,
2138    /// End timestamp (exclusive).
2139    pub end: u32,
2140    /// Interval to accumulate over for timeseries metric sets.
2141    ///
2142    /// Default is dependent on the requested metric set.
2143    pub interval: Option<TimeseriesInterval>,
2144}
2145
2146impl TimeRangeAndInterval {
2147    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2148    pub fn new(start: u32, end: u32) -> Self {
2149        Self {
2150            start,
2151            end,
2152            interval: None,
2153        }
2154    }
2155
2156    /// Set the interval to accumulate over for timeseries metric sets.
2157    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2158        Self {
2159            interval: Some(interval),
2160            ..self
2161        }
2162    }
2163}
2164
2165#[derive(Debug, Clone, Copy)]
2166/// Account metric set to return.
2167pub enum AccountMetricSet {
2168    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2169    /// specified time range.
2170    ActiveBasins(TimeRange),
2171    /// Returns [`AccumulationMetric`]s, one per account operation type.
2172    ///
2173    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2174    /// per interval over the requested time range.
2175    ///
2176    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2177    AccountOps(TimeRangeAndInterval),
2178}
2179
2180#[derive(Debug, Clone)]
2181#[non_exhaustive]
2182/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2183pub struct GetAccountMetricsInput {
2184    /// Metric set to return.
2185    pub set: AccountMetricSet,
2186}
2187
2188impl GetAccountMetricsInput {
2189    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2190    pub fn new(set: AccountMetricSet) -> Self {
2191        Self { set }
2192    }
2193}
2194
2195impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2196    fn from(value: GetAccountMetricsInput) -> Self {
2197        let (set, start, end, interval) = match value.set {
2198            AccountMetricSet::ActiveBasins(args) => (
2199                api::metrics::AccountMetricSet::ActiveBasins,
2200                args.start,
2201                args.end,
2202                None,
2203            ),
2204            AccountMetricSet::AccountOps(args) => (
2205                api::metrics::AccountMetricSet::AccountOps,
2206                args.start,
2207                args.end,
2208                args.interval,
2209            ),
2210        };
2211        Self {
2212            set,
2213            start: Some(start),
2214            end: Some(end),
2215            interval: interval.map(Into::into),
2216        }
2217    }
2218}
2219
2220#[derive(Debug, Clone, Copy)]
2221/// Basin metric set to return.
2222pub enum BasinMetricSet {
2223    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2224    /// in the basin, with one observed value for each hour over the requested time range.
2225    Storage(TimeRange),
2226    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2227    ///
2228    /// Each metric represents a timeseries of the number of append operations across all streams
2229    /// in the basin, with one accumulated value per interval over the requested time range.
2230    ///
2231    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2232    /// [`minute`](TimeseriesInterval::Minute).
2233    AppendOps(TimeRangeAndInterval),
2234    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2235    ///
2236    /// Each metric represents a timeseries of the number of read operations across all streams
2237    /// in the basin, with one accumulated value per interval over the requested time range.
2238    ///
2239    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2240    /// [`minute`](TimeseriesInterval::Minute).
2241    ReadOps(TimeRangeAndInterval),
2242    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2243    /// across all streams in the basin, with one accumulated value per interval
2244    /// over the requested time range.
2245    ///
2246    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2247    /// [`minute`](TimeseriesInterval::Minute).
2248    ReadThroughput(TimeRangeAndInterval),
2249    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2250    /// across all streams in the basin, with one accumulated value per interval
2251    /// over the requested time range.
2252    ///
2253    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2254    /// [`minute`](TimeseriesInterval::Minute).
2255    AppendThroughput(TimeRangeAndInterval),
2256    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2257    ///
2258    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2259    /// per interval over the requested time range.
2260    ///
2261    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2262    BasinOps(TimeRangeAndInterval),
2263}
2264
2265#[derive(Debug, Clone)]
2266#[non_exhaustive]
2267/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2268pub struct GetBasinMetricsInput {
2269    /// Basin name.
2270    pub name: BasinName,
2271    /// Metric set to return.
2272    pub set: BasinMetricSet,
2273}
2274
2275impl GetBasinMetricsInput {
2276    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2277    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2278        Self { name, set }
2279    }
2280}
2281
2282impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2283    fn from(value: GetBasinMetricsInput) -> Self {
2284        let (set, start, end, interval) = match value.set {
2285            BasinMetricSet::Storage(args) => (
2286                api::metrics::BasinMetricSet::Storage,
2287                args.start,
2288                args.end,
2289                None,
2290            ),
2291            BasinMetricSet::AppendOps(args) => (
2292                api::metrics::BasinMetricSet::AppendOps,
2293                args.start,
2294                args.end,
2295                args.interval,
2296            ),
2297            BasinMetricSet::ReadOps(args) => (
2298                api::metrics::BasinMetricSet::ReadOps,
2299                args.start,
2300                args.end,
2301                args.interval,
2302            ),
2303            BasinMetricSet::ReadThroughput(args) => (
2304                api::metrics::BasinMetricSet::ReadThroughput,
2305                args.start,
2306                args.end,
2307                args.interval,
2308            ),
2309            BasinMetricSet::AppendThroughput(args) => (
2310                api::metrics::BasinMetricSet::AppendThroughput,
2311                args.start,
2312                args.end,
2313                args.interval,
2314            ),
2315            BasinMetricSet::BasinOps(args) => (
2316                api::metrics::BasinMetricSet::BasinOps,
2317                args.start,
2318                args.end,
2319                args.interval,
2320            ),
2321        };
2322        (
2323            value.name,
2324            api::metrics::BasinMetricSetRequest {
2325                set,
2326                start: Some(start),
2327                end: Some(end),
2328                interval: interval.map(Into::into),
2329            },
2330        )
2331    }
2332}
2333
2334#[derive(Debug, Clone, Copy)]
2335/// Stream metric set to return.
2336pub enum StreamMetricSet {
2337    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2338    /// with one observed value for each minute over the requested time range.
2339    Storage(TimeRange),
2340}
2341
2342#[derive(Debug, Clone)]
2343#[non_exhaustive]
2344/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2345pub struct GetStreamMetricsInput {
2346    /// Basin name.
2347    pub basin_name: BasinName,
2348    /// Stream name.
2349    pub stream_name: StreamName,
2350    /// Metric set to return.
2351    pub set: StreamMetricSet,
2352}
2353
2354impl GetStreamMetricsInput {
2355    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2356    /// set.
2357    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2358        Self {
2359            basin_name,
2360            stream_name,
2361            set,
2362        }
2363    }
2364}
2365
2366impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2367    fn from(value: GetStreamMetricsInput) -> Self {
2368        let (set, start, end, interval) = match value.set {
2369            StreamMetricSet::Storage(args) => (
2370                api::metrics::StreamMetricSet::Storage,
2371                args.start,
2372                args.end,
2373                None,
2374            ),
2375        };
2376        (
2377            value.basin_name,
2378            value.stream_name,
2379            api::metrics::StreamMetricSetRequest {
2380                set,
2381                start: Some(start),
2382                end: Some(end),
2383                interval,
2384            },
2385        )
2386    }
2387}
2388
2389#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2390/// Unit in which metric values are measured.
2391pub enum MetricUnit {
2392    /// Size in bytes.
2393    Bytes,
2394    /// Number of operations.
2395    Operations,
2396}
2397
2398impl From<api::metrics::MetricUnit> for MetricUnit {
2399    fn from(value: api::metrics::MetricUnit) -> Self {
2400        match value {
2401            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2402            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2403        }
2404    }
2405}
2406
2407#[derive(Debug, Clone)]
2408#[non_exhaustive]
2409/// Single named value.
2410pub struct ScalarMetric {
2411    /// Metric name.
2412    pub name: String,
2413    /// Unit for the metric value.
2414    pub unit: MetricUnit,
2415    /// Metric value.
2416    pub value: f64,
2417}
2418
2419#[derive(Debug, Clone)]
2420#[non_exhaustive]
2421/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2422/// specified interval.
2423pub struct AccumulationMetric {
2424    /// Timeseries name.
2425    pub name: String,
2426    /// Unit for the accumulated values.
2427    pub unit: MetricUnit,
2428    /// The interval at which datapoints are accumulated.
2429    pub interval: TimeseriesInterval,
2430    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2431    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2432    /// one `interval`.
2433    pub values: Vec<(u32, f64)>,
2434}
2435
2436#[derive(Debug, Clone)]
2437#[non_exhaustive]
2438/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2439pub struct GaugeMetric {
2440    /// Timeseries name.
2441    pub name: String,
2442    /// Unit for the instantaneous values.
2443    pub unit: MetricUnit,
2444    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2445    /// instant of the `timestamp` (in Unix epoch seconds).
2446    pub values: Vec<(u32, f64)>,
2447}
2448
2449#[derive(Debug, Clone)]
2450#[non_exhaustive]
2451/// Set of string labels.
2452pub struct LabelMetric {
2453    /// Label name.
2454    pub name: String,
2455    /// Label values.
2456    pub values: Vec<String>,
2457}
2458
2459#[derive(Debug, Clone)]
2460/// Individual metric in a returned metric set.
2461pub enum Metric {
2462    /// Single named value.
2463    Scalar(ScalarMetric),
2464    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2465    /// specified interval.
2466    Accumulation(AccumulationMetric),
2467    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2468    Gauge(GaugeMetric),
2469    /// Set of string labels.
2470    Label(LabelMetric),
2471}
2472
2473impl From<api::metrics::Metric> for Metric {
2474    fn from(value: api::metrics::Metric) -> Self {
2475        match value {
2476            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2477                name: sm.name.into(),
2478                unit: sm.unit.into(),
2479                value: sm.value,
2480            }),
2481            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2482                name: am.name.into(),
2483                unit: am.unit.into(),
2484                interval: am.interval.into(),
2485                values: am.values,
2486            }),
2487            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2488                name: gm.name.into(),
2489                unit: gm.unit.into(),
2490                values: gm.values,
2491            }),
2492            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2493                name: lm.name.into(),
2494                values: lm.values,
2495            }),
2496        }
2497    }
2498}
2499
2500#[derive(Debug, Clone, Default)]
2501#[non_exhaustive]
2502/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2503pub struct ListStreamsInput {
2504    /// Filter streams whose names begin with this value.
2505    ///
2506    /// Defaults to `""`.
2507    pub prefix: StreamNamePrefix,
2508    /// Filter streams whose names are lexicographically greater than this value.
2509    ///
2510    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2511    ///
2512    /// Defaults to `""`.
2513    pub start_after: StreamNameStartAfter,
2514    /// Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2515    ///
2516    /// Defaults to `1000`.
2517    pub limit: Option<usize>,
2518}
2519
2520impl ListStreamsInput {
2521    /// Create a new [`ListStreamsInput`] with default values.
2522    pub fn new() -> Self {
2523        Self::default()
2524    }
2525
2526    /// Set the prefix used to filter streams whose names begin with this value.
2527    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2528        Self { prefix, ..self }
2529    }
2530
2531    /// Set the value used to filter streams whose names are lexicographically greater than this
2532    /// value.
2533    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2534        Self {
2535            start_after,
2536            ..self
2537        }
2538    }
2539
2540    /// Set the limit on number of streams to return in a page.
2541    pub fn with_limit(self, limit: usize) -> Self {
2542        Self {
2543            limit: Some(limit),
2544            ..self
2545        }
2546    }
2547}
2548
2549impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2550    fn from(value: ListStreamsInput) -> Self {
2551        Self {
2552            prefix: Some(value.prefix),
2553            start_after: Some(value.start_after),
2554            limit: value.limit,
2555        }
2556    }
2557}
2558
2559#[derive(Debug, Clone, Default)]
2560/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2561pub struct ListAllStreamsInput {
2562    /// Filter streams whose names begin with this value.
2563    ///
2564    /// Defaults to `""`.
2565    pub prefix: StreamNamePrefix,
2566    /// Filter streams whose names are lexicographically greater than this value.
2567    ///
2568    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2569    ///
2570    /// Defaults to `""`.
2571    pub start_after: StreamNameStartAfter,
2572    /// Whether to include streams that are being deleted.
2573    ///
2574    /// Defaults to `false`.
2575    pub include_deleted: bool,
2576}
2577
2578impl ListAllStreamsInput {
2579    /// Create a new [`ListAllStreamsInput`] with default values.
2580    pub fn new() -> Self {
2581        Self::default()
2582    }
2583
2584    /// Set the prefix used to filter streams whose names begin with this value.
2585    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2586        Self { prefix, ..self }
2587    }
2588
2589    /// Set the value used to filter streams whose names are lexicographically greater than this
2590    /// value.
2591    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2592        Self {
2593            start_after,
2594            ..self
2595        }
2596    }
2597
2598    /// Set whether to include streams that are being deleted.
2599    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2600        Self {
2601            include_deleted,
2602            ..self
2603        }
2604    }
2605}
2606
2607#[derive(Debug, Clone, PartialEq)]
2608#[non_exhaustive]
2609/// Stream information.
2610pub struct StreamInfo {
2611    /// Stream name.
2612    pub name: StreamName,
2613    /// Creation time.
2614    pub created_at: S2DateTime,
2615    /// Deletion time if the stream is being deleted.
2616    pub deleted_at: Option<S2DateTime>,
2617}
2618
2619impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2620    type Error = ValidationError;
2621
2622    fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2623        Ok(Self {
2624            name: value.name,
2625            created_at: value.created_at.try_into()?,
2626            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2627        })
2628    }
2629}
2630
2631#[derive(Debug, Clone)]
2632#[non_exhaustive]
2633/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2634pub struct CreateStreamInput {
2635    /// Stream name.
2636    pub name: StreamName,
2637    /// Configuration for the stream.
2638    ///
2639    /// See [`StreamConfig`] for defaults.
2640    pub config: Option<StreamConfig>,
2641    idempotency_token: String,
2642}
2643
2644impl CreateStreamInput {
2645    /// Create a new [`CreateStreamInput`] with the given stream name.
2646    pub fn new(name: StreamName) -> Self {
2647        Self {
2648            name,
2649            config: None,
2650            idempotency_token: idempotency_token(),
2651        }
2652    }
2653
2654    /// Set the configuration for the stream.
2655    pub fn with_config(self, config: StreamConfig) -> Self {
2656        Self {
2657            config: Some(config),
2658            ..self
2659        }
2660    }
2661}
2662
2663impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2664    fn from(value: CreateStreamInput) -> Self {
2665        (
2666            api::stream::CreateStreamRequest {
2667                stream: value.name,
2668                config: value.config.map(Into::into),
2669            },
2670            value.idempotency_token,
2671        )
2672    }
2673}
2674
2675#[derive(Debug, Clone)]
2676#[non_exhaustive]
2677/// Input for [`create_or_reconfigure_stream`](crate::S2Basin::create_or_reconfigure_stream)
2678/// operation.
2679#[doc(hidden)]
2680#[cfg(feature = "_hidden")]
2681pub struct CreateOrReconfigureStreamInput {
2682    /// Stream name.
2683    pub name: StreamName,
2684    /// Reconfiguration for the stream.
2685    ///
2686    /// If `None`, the stream is created with default configuration or left unchanged if it exists.
2687    pub config: Option<StreamReconfiguration>,
2688}
2689
2690#[cfg(feature = "_hidden")]
2691impl CreateOrReconfigureStreamInput {
2692    /// Create a new [`CreateOrReconfigureStreamInput`] with the given stream name.
2693    pub fn new(name: StreamName) -> Self {
2694        Self { name, config: None }
2695    }
2696
2697    /// Set the reconfiguration for the stream.
2698    pub fn with_config(self, config: StreamReconfiguration) -> Self {
2699        Self {
2700            config: Some(config),
2701            ..self
2702        }
2703    }
2704}
2705
2706#[cfg(feature = "_hidden")]
2707impl From<CreateOrReconfigureStreamInput>
2708    for (StreamName, Option<api::config::StreamReconfiguration>)
2709{
2710    fn from(value: CreateOrReconfigureStreamInput) -> Self {
2711        (value.name, value.config.map(Into::into))
2712    }
2713}
2714
2715#[derive(Debug, Clone)]
2716#[non_exhaustive]
2717/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2718pub struct DeleteStreamInput {
2719    /// Stream name.
2720    pub name: StreamName,
2721    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2722    pub ignore_not_found: bool,
2723}
2724
2725impl DeleteStreamInput {
2726    /// Create a new [`DeleteStreamInput`] with the given stream name.
2727    pub fn new(name: StreamName) -> Self {
2728        Self {
2729            name,
2730            ignore_not_found: false,
2731        }
2732    }
2733
2734    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2735    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2736        Self {
2737            ignore_not_found,
2738            ..self
2739        }
2740    }
2741}
2742
2743#[derive(Debug, Clone)]
2744#[non_exhaustive]
2745/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2746pub struct ReconfigureStreamInput {
2747    /// Stream name.
2748    pub name: StreamName,
2749    /// Reconfiguration for [`StreamConfig`].
2750    pub config: StreamReconfiguration,
2751}
2752
2753impl ReconfigureStreamInput {
2754    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2755    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2756        Self { name, config }
2757    }
2758}
2759
2760#[derive(Debug, Clone, PartialEq, Eq)]
2761/// Token for fencing appends to a stream.
2762///
2763/// **Note:** It must not exceed 36 bytes in length.
2764///
2765/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2766pub struct FencingToken(String);
2767
2768impl FencingToken {
2769    /// Generate a random alphanumeric fencing token of `n` bytes.
2770    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2771        rand::rng()
2772            .sample_iter(&rand::distr::Alphanumeric)
2773            .take(n)
2774            .map(char::from)
2775            .collect::<String>()
2776            .parse()
2777    }
2778}
2779
2780impl FromStr for FencingToken {
2781    type Err = ValidationError;
2782
2783    fn from_str(s: &str) -> Result<Self, Self::Err> {
2784        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2785            return Err(ValidationError(format!(
2786                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2787            )));
2788        }
2789        Ok(FencingToken(s.to_string()))
2790    }
2791}
2792
2793impl std::fmt::Display for FencingToken {
2794    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2795        write!(f, "{}", self.0)
2796    }
2797}
2798
2799impl Deref for FencingToken {
2800    type Target = str;
2801
2802    fn deref(&self) -> &Self::Target {
2803        &self.0
2804    }
2805}
2806
2807#[derive(Debug, Clone, Copy, PartialEq)]
2808#[non_exhaustive]
2809/// A position in a stream.
2810pub struct StreamPosition {
2811    /// Sequence number assigned by the service.
2812    pub seq_num: u64,
2813    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2814    /// User-specified timestamps are passed through as-is.
2815    pub timestamp: u64,
2816}
2817
2818impl std::fmt::Display for StreamPosition {
2819    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2820        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2821    }
2822}
2823
2824impl From<api::stream::proto::StreamPosition> for StreamPosition {
2825    fn from(value: api::stream::proto::StreamPosition) -> Self {
2826        Self {
2827            seq_num: value.seq_num,
2828            timestamp: value.timestamp,
2829        }
2830    }
2831}
2832
2833impl From<api::stream::StreamPosition> for StreamPosition {
2834    fn from(value: api::stream::StreamPosition) -> Self {
2835        Self {
2836            seq_num: value.seq_num,
2837            timestamp: value.timestamp,
2838        }
2839    }
2840}
2841
2842#[derive(Debug, Clone, PartialEq)]
2843#[non_exhaustive]
2844/// A name-value pair.
2845pub struct Header {
2846    /// Name.
2847    pub name: Bytes,
2848    /// Value.
2849    pub value: Bytes,
2850}
2851
2852impl Header {
2853    /// Create a new [`Header`] with the given name and value.
2854    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2855        Self {
2856            name: name.into(),
2857            value: value.into(),
2858        }
2859    }
2860}
2861
2862impl From<Header> for api::stream::proto::Header {
2863    fn from(value: Header) -> Self {
2864        Self {
2865            name: value.name,
2866            value: value.value,
2867        }
2868    }
2869}
2870
2871impl From<api::stream::proto::Header> for Header {
2872    fn from(value: api::stream::proto::Header) -> Self {
2873        Self {
2874            name: value.name,
2875            value: value.value,
2876        }
2877    }
2878}
2879
2880#[derive(Debug, Clone, PartialEq)]
2881/// A record to append.
2882pub struct AppendRecord {
2883    body: Bytes,
2884    headers: Vec<Header>,
2885    timestamp: Option<u64>,
2886}
2887
2888impl AppendRecord {
2889    fn validate(self) -> Result<Self, ValidationError> {
2890        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2891            Err(ValidationError(format!(
2892                "metered_bytes: {} exceeds {}",
2893                self.metered_bytes(),
2894                RECORD_BATCH_MAX.bytes
2895            )))
2896        } else {
2897            Ok(self)
2898        }
2899    }
2900
2901    /// Create a new [`AppendRecord`] with the given record body.
2902    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2903        let record = Self {
2904            body: body.into(),
2905            headers: Vec::default(),
2906            timestamp: None,
2907        };
2908        record.validate()
2909    }
2910
2911    /// Set the headers for this record.
2912    pub fn with_headers(
2913        self,
2914        headers: impl IntoIterator<Item = Header>,
2915    ) -> Result<Self, ValidationError> {
2916        let record = Self {
2917            headers: headers.into_iter().collect(),
2918            ..self
2919        };
2920        record.validate()
2921    }
2922
2923    /// Set the timestamp for this record.
2924    ///
2925    /// Precise semantics depend on [`StreamConfig::timestamping`].
2926    pub fn with_timestamp(self, timestamp: u64) -> Self {
2927        Self {
2928            timestamp: Some(timestamp),
2929            ..self
2930        }
2931    }
2932
2933    /// Get the body of this record.
2934    pub fn body(&self) -> &[u8] {
2935        &self.body
2936    }
2937
2938    /// Get the headers of this record.
2939    pub fn headers(&self) -> &[Header] {
2940        &self.headers
2941    }
2942
2943    /// Get the timestamp of this record.
2944    pub fn timestamp(&self) -> Option<u64> {
2945        self.timestamp
2946    }
2947}
2948
2949impl From<AppendRecord> for api::stream::proto::AppendRecord {
2950    fn from(value: AppendRecord) -> Self {
2951        Self {
2952            timestamp: value.timestamp,
2953            headers: value.headers.into_iter().map(Into::into).collect(),
2954            body: value.body,
2955        }
2956    }
2957}
2958
2959/// Metered byte size calculation.
2960///
2961/// Formula for a record:
2962/// ```text
2963/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2964/// ```
2965pub trait MeteredBytes {
2966    /// Returns the metered byte size.
2967    fn metered_bytes(&self) -> usize;
2968}
2969
2970macro_rules! metered_bytes_impl {
2971    ($ty:ty) => {
2972        impl MeteredBytes for $ty {
2973            fn metered_bytes(&self) -> usize {
2974                8 + (2 * self.headers.len())
2975                    + self
2976                        .headers
2977                        .iter()
2978                        .map(|h| h.name.len() + h.value.len())
2979                        .sum::<usize>()
2980                    + self.body.len()
2981            }
2982        }
2983    };
2984}
2985
2986metered_bytes_impl!(AppendRecord);
2987
2988#[derive(Debug, Clone)]
2989/// A batch of records to append atomically.
2990///
2991/// **Note:** It must contain at least `1` record and no more than `1000`.
2992/// The total size of the batch must not exceed `1MiB` in metered bytes.
2993///
2994/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
2995/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
2996/// that takes care of the abovementioned constraints.
2997pub struct AppendRecordBatch {
2998    records: Vec<AppendRecord>,
2999    metered_bytes: usize,
3000}
3001
3002impl AppendRecordBatch {
3003    pub(crate) fn with_capacity(capacity: usize) -> Self {
3004        Self {
3005            records: Vec::with_capacity(capacity),
3006            metered_bytes: 0,
3007        }
3008    }
3009
3010    pub(crate) fn push(&mut self, record: AppendRecord) {
3011        self.metered_bytes += record.metered_bytes();
3012        self.records.push(record);
3013    }
3014
3015    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
3016    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3017    where
3018        I: IntoIterator<Item = AppendRecord>,
3019    {
3020        let mut records = Vec::new();
3021        let mut metered_bytes = 0;
3022
3023        for record in iter {
3024            metered_bytes += record.metered_bytes();
3025            records.push(record);
3026
3027            if metered_bytes > RECORD_BATCH_MAX.bytes {
3028                return Err(ValidationError(format!(
3029                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
3030                    RECORD_BATCH_MAX.bytes
3031                )));
3032            }
3033
3034            if records.len() > RECORD_BATCH_MAX.count {
3035                return Err(ValidationError(format!(
3036                    "number of records in the batch exceeds {}",
3037                    RECORD_BATCH_MAX.count
3038                )));
3039            }
3040        }
3041
3042        if records.is_empty() {
3043            return Err(ValidationError("batch is empty".into()));
3044        }
3045
3046        Ok(Self {
3047            records,
3048            metered_bytes,
3049        })
3050    }
3051}
3052
3053impl Deref for AppendRecordBatch {
3054    type Target = [AppendRecord];
3055
3056    fn deref(&self) -> &Self::Target {
3057        &self.records
3058    }
3059}
3060
3061impl MeteredBytes for AppendRecordBatch {
3062    fn metered_bytes(&self) -> usize {
3063        self.metered_bytes
3064    }
3065}
3066
3067#[derive(Debug, Clone)]
3068/// Command to signal an operation.
3069pub enum Command {
3070    /// Fence operation.
3071    Fence {
3072        /// Fencing token.
3073        fencing_token: FencingToken,
3074    },
3075    /// Trim operation.
3076    Trim {
3077        /// Trim point.
3078        trim_point: u64,
3079    },
3080}
3081
3082#[derive(Debug, Clone)]
3083#[non_exhaustive]
3084/// Command record for signaling operations to the service.
3085///
3086/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
3087pub struct CommandRecord {
3088    /// Command to signal an operation.
3089    pub command: Command,
3090    /// Timestamp for this record.
3091    pub timestamp: Option<u64>,
3092}
3093
3094impl CommandRecord {
3095    const FENCE: &[u8] = b"fence";
3096    const TRIM: &[u8] = b"trim";
3097
3098    /// Create a fence command record with the given fencing token.
3099    ///
3100    /// Fencing is strongly consistent, and subsequent appends that specify a
3101    /// fencing token will fail if it does not match.
3102    pub fn fence(fencing_token: FencingToken) -> Self {
3103        Self {
3104            command: Command::Fence { fencing_token },
3105            timestamp: None,
3106        }
3107    }
3108
3109    /// Create a trim command record with the given trim point.
3110    ///
3111    /// Trim point is the desired earliest sequence number for the stream.
3112    ///
3113    /// Trimming is eventually consistent, and trimmed records may be visible
3114    /// for a brief period.
3115    pub fn trim(trim_point: u64) -> Self {
3116        Self {
3117            command: Command::Trim { trim_point },
3118            timestamp: None,
3119        }
3120    }
3121
3122    /// Set the timestamp for this record.
3123    pub fn with_timestamp(self, timestamp: u64) -> Self {
3124        Self {
3125            timestamp: Some(timestamp),
3126            ..self
3127        }
3128    }
3129}
3130
3131impl From<CommandRecord> for AppendRecord {
3132    fn from(value: CommandRecord) -> Self {
3133        let (header_value, body) = match value.command {
3134            Command::Fence { fencing_token } => (
3135                CommandRecord::FENCE,
3136                Bytes::copy_from_slice(fencing_token.as_bytes()),
3137            ),
3138            Command::Trim { trim_point } => (
3139                CommandRecord::TRIM,
3140                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3141            ),
3142        };
3143        Self {
3144            body,
3145            headers: vec![Header::new("", header_value)],
3146            timestamp: value.timestamp,
3147        }
3148    }
3149}
3150
3151#[derive(Debug, Clone)]
3152#[non_exhaustive]
3153/// Input for [`append`](crate::S2Stream::append) operation and
3154/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3155pub struct AppendInput {
3156    /// Batch of records to append atomically.
3157    pub records: AppendRecordBatch,
3158    /// Expected sequence number for the first record in the batch.
3159    ///
3160    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3161    pub match_seq_num: Option<u64>,
3162    /// Fencing token to match against the stream's current fencing token.
3163    ///
3164    /// If unspecified, no matching is performed. If specified and mismatched,
3165    /// the append fails. A stream defaults to `""` as its fencing token.
3166    pub fencing_token: Option<FencingToken>,
3167}
3168
3169impl AppendInput {
3170    /// Create a new [`AppendInput`] with the given batch of records.
3171    pub fn new(records: AppendRecordBatch) -> Self {
3172        Self {
3173            records,
3174            match_seq_num: None,
3175            fencing_token: None,
3176        }
3177    }
3178
3179    /// Set the expected sequence number for the first record in the batch.
3180    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3181        Self {
3182            match_seq_num: Some(match_seq_num),
3183            ..self
3184        }
3185    }
3186
3187    /// Set the fencing token to match against the stream's current fencing token.
3188    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3189        Self {
3190            fencing_token: Some(fencing_token),
3191            ..self
3192        }
3193    }
3194}
3195
3196impl From<AppendInput> for api::stream::proto::AppendInput {
3197    fn from(value: AppendInput) -> Self {
3198        Self {
3199            records: value.records.iter().cloned().map(Into::into).collect(),
3200            match_seq_num: value.match_seq_num,
3201            fencing_token: value.fencing_token.map(|t| t.to_string()),
3202        }
3203    }
3204}
3205
3206#[derive(Debug, Clone, PartialEq)]
3207#[non_exhaustive]
3208/// Acknowledgement for an [`AppendInput`].
3209pub struct AppendAck {
3210    /// Sequence number and timestamp of the first record that was appended.
3211    pub start: StreamPosition,
3212    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3213    /// that was appended.
3214    ///
3215    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3216    /// appended.
3217    pub end: StreamPosition,
3218    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3219    /// the last record on the stream.
3220    ///
3221    /// This can be greater than the `end` position in case of concurrent appends.
3222    pub tail: StreamPosition,
3223}
3224
3225impl From<api::stream::proto::AppendAck> for AppendAck {
3226    fn from(value: api::stream::proto::AppendAck) -> Self {
3227        Self {
3228            start: value.start.unwrap_or_default().into(),
3229            end: value.end.unwrap_or_default().into(),
3230            tail: value.tail.unwrap_or_default().into(),
3231        }
3232    }
3233}
3234
3235#[derive(Debug, Clone, Copy)]
3236/// Starting position for reading from a stream.
3237pub enum ReadFrom {
3238    /// Read from this sequence number.
3239    SeqNum(u64),
3240    /// Read from this timestamp.
3241    Timestamp(u64),
3242    /// Read from N records before the tail.
3243    TailOffset(u64),
3244}
3245
3246impl Default for ReadFrom {
3247    fn default() -> Self {
3248        Self::SeqNum(0)
3249    }
3250}
3251
3252#[derive(Debug, Default, Clone)]
3253#[non_exhaustive]
3254/// Where to start reading.
3255pub struct ReadStart {
3256    /// Starting position.
3257    ///
3258    /// Defaults to reading from sequence number `0`.
3259    pub from: ReadFrom,
3260    /// Whether to start from tail if the requested starting position is beyond it.
3261    ///
3262    /// Defaults to `false` (errors if position is beyond tail).
3263    pub clamp_to_tail: bool,
3264}
3265
3266impl ReadStart {
3267    /// Create a new [`ReadStart`] with default values.
3268    pub fn new() -> Self {
3269        Self::default()
3270    }
3271
3272    /// Set the starting position.
3273    pub fn with_from(self, from: ReadFrom) -> Self {
3274        Self { from, ..self }
3275    }
3276
3277    /// Set whether to start from tail if the requested starting position is beyond it.
3278    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3279        Self {
3280            clamp_to_tail,
3281            ..self
3282        }
3283    }
3284}
3285
3286impl From<ReadStart> for api::stream::ReadStart {
3287    fn from(value: ReadStart) -> Self {
3288        let (seq_num, timestamp, tail_offset) = match value.from {
3289            ReadFrom::SeqNum(n) => (Some(n), None, None),
3290            ReadFrom::Timestamp(t) => (None, Some(t), None),
3291            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3292        };
3293        Self {
3294            seq_num,
3295            timestamp,
3296            tail_offset,
3297            clamp: if value.clamp_to_tail {
3298                Some(true)
3299            } else {
3300                None
3301            },
3302        }
3303    }
3304}
3305
3306#[derive(Debug, Clone, Default)]
3307#[non_exhaustive]
3308/// Limits on how much to read.
3309pub struct ReadLimits {
3310    /// Limit on number of records.
3311    ///
3312    /// Defaults to `1000` for non-streaming read.
3313    pub count: Option<usize>,
3314    /// Limit on total metered bytes of records.
3315    ///
3316    /// Defaults to `1MiB` for non-streaming read.
3317    pub bytes: Option<usize>,
3318}
3319
3320impl ReadLimits {
3321    /// Create a new [`ReadLimits`] with default values.
3322    pub fn new() -> Self {
3323        Self::default()
3324    }
3325
3326    /// Set the limit on number of records.
3327    pub fn with_count(self, count: usize) -> Self {
3328        Self {
3329            count: Some(count),
3330            ..self
3331        }
3332    }
3333
3334    /// Set the limit on total metered bytes of records.
3335    pub fn with_bytes(self, bytes: usize) -> Self {
3336        Self {
3337            bytes: Some(bytes),
3338            ..self
3339        }
3340    }
3341}
3342
3343#[derive(Debug, Clone, Default)]
3344#[non_exhaustive]
3345/// When to stop reading.
3346pub struct ReadStop {
3347    /// Limits on how much to read.
3348    ///
3349    /// See [`ReadLimits`] for defaults.
3350    pub limits: ReadLimits,
3351    /// Timestamp at which to stop (exclusive).
3352    ///
3353    /// Defaults to `None`.
3354    pub until: Option<RangeTo<u64>>,
3355    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3356    /// seconds for [`read`](crate::S2Stream::read).
3357    ///
3358    /// Defaults to:
3359    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3360    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3361    ///   is specified.
3362    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3363    ///   `until` is specified.
3364    pub wait: Option<u32>,
3365}
3366
3367impl ReadStop {
3368    /// Create a new [`ReadStop`] with default values.
3369    pub fn new() -> Self {
3370        Self::default()
3371    }
3372
3373    /// Set the limits on how much to read.
3374    pub fn with_limits(self, limits: ReadLimits) -> Self {
3375        Self { limits, ..self }
3376    }
3377
3378    /// Set the timestamp at which to stop (exclusive).
3379    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3380        Self {
3381            until: Some(until),
3382            ..self
3383        }
3384    }
3385
3386    /// Set the duration in seconds to wait for new records before stopping.
3387    pub fn with_wait(self, wait: u32) -> Self {
3388        Self {
3389            wait: Some(wait),
3390            ..self
3391        }
3392    }
3393}
3394
3395impl From<ReadStop> for api::stream::ReadEnd {
3396    fn from(value: ReadStop) -> Self {
3397        Self {
3398            count: value.limits.count,
3399            bytes: value.limits.bytes,
3400            until: value.until.map(|r| r.end),
3401            wait: value.wait,
3402        }
3403    }
3404}
3405
3406#[derive(Debug, Clone, Default)]
3407#[non_exhaustive]
3408/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3409/// operations.
3410pub struct ReadInput {
3411    /// Where to start reading.
3412    ///
3413    /// See [`ReadStart`] for defaults.
3414    pub start: ReadStart,
3415    /// When to stop reading.
3416    ///
3417    /// See [`ReadStop`] for defaults.
3418    pub stop: ReadStop,
3419    /// Whether to filter out command records from the stream when reading.
3420    ///
3421    /// Defaults to `false`.
3422    pub ignore_command_records: bool,
3423}
3424
3425impl ReadInput {
3426    /// Create a new [`ReadInput`] with default values.
3427    pub fn new() -> Self {
3428        Self::default()
3429    }
3430
3431    /// Set where to start reading.
3432    pub fn with_start(self, start: ReadStart) -> Self {
3433        Self { start, ..self }
3434    }
3435
3436    /// Set when to stop reading.
3437    pub fn with_stop(self, stop: ReadStop) -> Self {
3438        Self { stop, ..self }
3439    }
3440
3441    /// Set whether to filter out command records from the stream when reading.
3442    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3443        Self {
3444            ignore_command_records,
3445            ..self
3446        }
3447    }
3448}
3449
3450#[derive(Debug, Clone)]
3451#[non_exhaustive]
3452/// Record that is durably sequenced on a stream.
3453pub struct SequencedRecord {
3454    /// Sequence number assigned to this record.
3455    pub seq_num: u64,
3456    /// Body of this record.
3457    pub body: Bytes,
3458    /// Headers for this record.
3459    pub headers: Vec<Header>,
3460    /// Timestamp for this record.
3461    pub timestamp: u64,
3462}
3463
3464impl SequencedRecord {
3465    /// Whether this is a command record.
3466    pub fn is_command_record(&self) -> bool {
3467        self.headers.len() == 1 && *self.headers[0].name == *b""
3468    }
3469}
3470
3471impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3472    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3473        Self {
3474            seq_num: value.seq_num,
3475            body: value.body,
3476            headers: value.headers.into_iter().map(Into::into).collect(),
3477            timestamp: value.timestamp,
3478        }
3479    }
3480}
3481
3482metered_bytes_impl!(SequencedRecord);
3483
3484#[derive(Debug, Clone)]
3485#[non_exhaustive]
3486/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3487/// [`read_session`](crate::S2Stream::read_session).
3488pub struct ReadBatch {
3489    /// Records that are durably sequenced on the stream.
3490    ///
3491    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3492    /// - the [`stop condition`](ReadInput::stop) was already met, or
3493    /// - all records in the batch were command records and
3494    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3495    pub records: Vec<SequencedRecord>,
3496    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3497    /// the last record.
3498    ///
3499    /// It will only be present when reading recent records.
3500    pub tail: Option<StreamPosition>,
3501}
3502
3503impl ReadBatch {
3504    pub(crate) fn from_api(
3505        batch: api::stream::proto::ReadBatch,
3506        ignore_command_records: bool,
3507    ) -> Self {
3508        Self {
3509            records: batch
3510                .records
3511                .into_iter()
3512                .map(Into::into)
3513                .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3514                .collect(),
3515            tail: batch.tail.map(Into::into),
3516        }
3517    }
3518}
3519
3520/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3521pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3522
3523#[derive(Debug, Clone, thiserror::Error)]
3524/// Why an append condition check failed.
3525pub enum AppendConditionFailed {
3526    #[error("fencing token mismatch, expected: {0}")]
3527    /// Fencing token did not match. Contains the expected fencing token.
3528    FencingTokenMismatch(FencingToken),
3529    #[error("sequence number mismatch, expected: {0}")]
3530    /// Sequence number did not match. Contains the expected sequence number.
3531    SeqNumMismatch(u64),
3532}
3533
3534impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3535    fn from(value: api::stream::AppendConditionFailed) -> Self {
3536        match value {
3537            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3538                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3539            }
3540            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3541                AppendConditionFailed::SeqNumMismatch(seq)
3542            }
3543        }
3544    }
3545}
3546
3547#[derive(Debug, Clone, thiserror::Error)]
3548/// Errors from S2 operations.
3549pub enum S2Error {
3550    #[error("{0}")]
3551    /// Client-side error.
3552    Client(String),
3553    #[error(transparent)]
3554    /// Validation error.
3555    Validation(#[from] ValidationError),
3556    #[error("{0}")]
3557    /// Append condition check failed. Contains the failure reason.
3558    AppendConditionFailed(AppendConditionFailed),
3559    #[error("read from an unwritten position. current tail: {0}")]
3560    /// Read from an unwritten position. Contains the current tail.
3561    ReadUnwritten(StreamPosition),
3562    #[error("{0}")]
3563    /// Other server-side error.
3564    Server(ErrorResponse),
3565}
3566
3567impl From<ApiError> for S2Error {
3568    fn from(err: ApiError) -> Self {
3569        match err {
3570            ApiError::ReadUnwritten(tail_response) => {
3571                Self::ReadUnwritten(tail_response.tail.into())
3572            }
3573            ApiError::AppendConditionFailed(condition_failed) => {
3574                Self::AppendConditionFailed(condition_failed.into())
3575            }
3576            ApiError::Server(_, response) => Self::Server(response.into()),
3577            other => Self::Client(other.to_string()),
3578        }
3579    }
3580}
3581
3582#[derive(Debug, Clone, thiserror::Error)]
3583#[error("{code}: {message}")]
3584#[non_exhaustive]
3585/// Error response from S2 server.
3586pub struct ErrorResponse {
3587    /// Error code.
3588    pub code: String,
3589    /// Error message.
3590    pub message: String,
3591}
3592
3593impl From<ApiErrorResponse> for ErrorResponse {
3594    fn from(response: ApiErrorResponse) -> Self {
3595        Self {
3596            code: response.code,
3597            message: response.message,
3598        }
3599    }
3600}
3601
3602fn idempotency_token() -> String {
3603    uuid::Uuid::new_v4().simple().to_string()
3604}