Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Encryption algorithm.
23pub use s2_common::encryption::EncryptionAlgorithm;
24/// Encryption key for stream operations.
25pub use s2_common::encryption::EncryptionKey;
26/// Validation error.
27pub use s2_common::types::ValidationError;
28/// Access token ID.
29///
30/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
31pub use s2_common::types::access::AccessTokenId;
32/// See [`ListAccessTokensInput::prefix`].
33pub use s2_common::types::access::AccessTokenIdPrefix;
34/// See [`ListAccessTokensInput::start_after`].
35pub use s2_common::types::access::AccessTokenIdStartAfter;
36/// Basin name.
37///
38/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
39/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
40pub use s2_common::types::basin::BasinName;
41/// See [`ListBasinsInput::prefix`].
42pub use s2_common::types::basin::BasinNamePrefix;
43/// See [`ListBasinsInput::start_after`].
44pub use s2_common::types::basin::BasinNameStartAfter;
45/// Result of provisioning a resource.
46#[doc(hidden)]
47#[cfg(feature = "_hidden")]
48pub use s2_common::types::resources::ProvisionResult;
49/// Stream name.
50///
51/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
52pub use s2_common::types::stream::StreamName;
53/// See [`ListStreamsInput::prefix`].
54pub use s2_common::types::stream::StreamNamePrefix;
55/// See [`ListStreamsInput::start_after`].
56pub use s2_common::types::stream::StreamNameStartAfter;
57
58pub(crate) const ONE_MIB: u32 = 1024 * 1024;
59
60use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
61use secrecy::SecretString;
62
63use crate::api::{ApiError, ApiErrorResponse};
64
65/// An RFC 3339 datetime.
66///
67/// It can be created in either of the following ways:
68/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
69/// - Convert from [`time::OffsetDateTime`] using [`TryFrom`]/[`TryInto`].
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub struct S2DateTime(time::OffsetDateTime);
72
73impl TryFrom<time::OffsetDateTime> for S2DateTime {
74    type Error = ValidationError;
75
76    fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
77        dt.format(&time::format_description::well_known::Rfc3339)
78            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
79        Ok(Self(dt))
80    }
81}
82
83impl From<S2DateTime> for time::OffsetDateTime {
84    fn from(dt: S2DateTime) -> Self {
85        dt.0
86    }
87}
88
89impl FromStr for S2DateTime {
90    type Err = ValidationError;
91
92    fn from_str(s: &str) -> Result<Self, Self::Err> {
93        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
94            .map(Self)
95            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
96    }
97}
98
99impl fmt::Display for S2DateTime {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        write!(
102            f,
103            "{}",
104            self.0
105                .format(&time::format_description::well_known::Rfc3339)
106                .expect("RFC3339 formatting should not fail for S2DateTime")
107        )
108    }
109}
110
111/// Authority for connecting to an S2 basin.
112#[derive(Debug, Clone, PartialEq)]
113pub(crate) enum BasinAuthority {
114    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
115    ParentZone(Authority),
116    /// Direct cell authority. Basin is expected to be hosted by this cell.
117    Direct(Authority),
118}
119
120/// Account endpoint.
121#[derive(Debug, Clone)]
122pub struct AccountEndpoint {
123    scheme: Scheme,
124    authority: Authority,
125}
126
127impl AccountEndpoint {
128    /// Create a new [`AccountEndpoint`] with the given endpoint.
129    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
130        endpoint.parse()
131    }
132}
133
134impl FromStr for AccountEndpoint {
135    type Err = ValidationError;
136
137    fn from_str(s: &str) -> Result<Self, Self::Err> {
138        let (scheme, authority) = match s.find("://") {
139            Some(idx) => {
140                let scheme: Scheme = s[..idx]
141                    .parse()
142                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
143                (scheme, &s[idx + 3..])
144            }
145            None => (Scheme::HTTPS, s),
146        };
147        Ok(Self {
148            scheme,
149            authority: authority
150                .parse()
151                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
152        })
153    }
154}
155
156/// Basin endpoint.
157#[derive(Debug, Clone)]
158pub struct BasinEndpoint {
159    scheme: Scheme,
160    authority: BasinAuthority,
161}
162
163impl BasinEndpoint {
164    /// Create a new [`BasinEndpoint`] with the given endpoint.
165    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
166        endpoint.parse()
167    }
168}
169
170impl FromStr for BasinEndpoint {
171    type Err = ValidationError;
172
173    fn from_str(s: &str) -> Result<Self, Self::Err> {
174        let (scheme, authority) = match s.find("://") {
175            Some(idx) => {
176                let scheme: Scheme = s[..idx]
177                    .parse()
178                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
179                (scheme, &s[idx + 3..])
180            }
181            None => (Scheme::HTTPS, s),
182        };
183        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
184            BasinAuthority::ParentZone(
185                authority
186                    .parse()
187                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
188            )
189        } else {
190            BasinAuthority::Direct(
191                authority
192                    .parse()
193                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
194            )
195        };
196        Ok(Self { scheme, authority })
197    }
198}
199
200#[derive(Debug, Clone)]
201#[non_exhaustive]
202/// Endpoints for the S2 environment.
203pub struct S2Endpoints {
204    pub(crate) scheme: Scheme,
205    pub(crate) account_authority: Authority,
206    pub(crate) basin_authority: BasinAuthority,
207}
208
209impl S2Endpoints {
210    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
211    pub fn new(
212        account_endpoint: AccountEndpoint,
213        basin_endpoint: BasinEndpoint,
214    ) -> Result<Self, ValidationError> {
215        if account_endpoint.scheme != basin_endpoint.scheme {
216            return Err("account and basin endpoints must have the same scheme".into());
217        }
218        Ok(Self {
219            scheme: account_endpoint.scheme,
220            account_authority: account_endpoint.authority,
221            basin_authority: basin_endpoint.authority,
222        })
223    }
224
225    /// Create a new [`S2Endpoints`] from environment variables.
226    ///
227    /// The following environment variables are expected to be set:
228    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
229    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
230    pub fn from_env() -> Result<Self, ValidationError> {
231        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
232            Ok(endpoint) => endpoint.parse()?,
233            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
234            Err(VarError::NotUnicode(_)) => {
235                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
236            }
237        };
238
239        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
240            Ok(endpoint) => endpoint.parse()?,
241            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
242            Err(VarError::NotUnicode(_)) => {
243                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
244            }
245        };
246
247        if account_endpoint.scheme != basin_endpoint.scheme {
248            return Err(
249                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
250            );
251        }
252
253        Ok(Self {
254            scheme: account_endpoint.scheme,
255            account_authority: account_endpoint.authority,
256            basin_authority: basin_endpoint.authority,
257        })
258    }
259
260    pub(crate) fn for_aws() -> Self {
261        Self {
262            scheme: Scheme::HTTPS,
263            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
264            basin_authority: BasinAuthority::ParentZone(
265                "b.s2.dev".try_into().expect("valid authority"),
266            ),
267        }
268    }
269}
270
271#[derive(Debug, Clone, Copy)]
272/// Compression algorithm for request and response bodies.
273pub enum Compression {
274    /// No compression.
275    None,
276    /// Gzip compression.
277    Gzip,
278    /// Zstd compression.
279    Zstd,
280}
281
282impl From<Compression> for CompressionAlgorithm {
283    fn from(value: Compression) -> Self {
284        match value {
285            Compression::None => CompressionAlgorithm::None,
286            Compression::Gzip => CompressionAlgorithm::Gzip,
287            Compression::Zstd => CompressionAlgorithm::Zstd,
288        }
289    }
290}
291
292#[derive(Debug, Clone, Copy, PartialEq)]
293#[non_exhaustive]
294/// Retry policy for [`append`](crate::S2Stream::append) and
295/// [`append_session`](crate::S2Stream::append_session) operations.
296pub enum AppendRetryPolicy {
297    /// Retry all appends. Use when duplicate records on the stream are acceptable.
298    All,
299    /// Retry when it can be determined that the request had no side effects.
300    ///
301    /// Uses a frame-level signal to detect whether any body frames were consumed
302    /// by the HTTP transport. If no frames were sent, the server never saw the
303    /// request, so retry is safe and will not cause duplicate records.
304    ///
305    /// Certain server errors (`rate_limited`, `hot_server`) are also safe to
306    /// retry regardless of frame signal state, since they guarantee no mutation
307    /// occurred.
308    NoSideEffects,
309}
310
311#[derive(Debug, Clone)]
312#[non_exhaustive]
313/// Configuration for retrying requests in case of transient failures.
314///
315/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
316/// ```text
317/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
318///     jitter = rand[0, base_delay]
319///     delay  = base_delay + jitter
320/// ````
321pub struct RetryConfig {
322    /// Total number of attempts including the initial try. A value of `1` means no retries.
323    ///
324    /// Defaults to `3`.
325    pub max_attempts: NonZeroU32,
326    /// Minimum base delay for retries.
327    ///
328    /// Defaults to `100ms`.
329    pub min_base_delay: Duration,
330    /// Maximum base delay for retries.
331    ///
332    /// Defaults to `1s`.
333    pub max_base_delay: Duration,
334    /// Retry policy for [`append`](crate::S2Stream::append) and
335    /// [`append_session`](crate::S2Stream::append_session) operations.
336    ///
337    /// Defaults to `All`.
338    pub append_retry_policy: AppendRetryPolicy,
339}
340
341impl Default for RetryConfig {
342    fn default() -> Self {
343        Self {
344            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
345            min_base_delay: Duration::from_millis(100),
346            max_base_delay: Duration::from_secs(1),
347            append_retry_policy: AppendRetryPolicy::All,
348        }
349    }
350}
351
352impl RetryConfig {
353    /// Create a new [`RetryConfig`] with default settings.
354    pub fn new() -> Self {
355        Self::default()
356    }
357
358    pub(crate) fn max_retries(&self) -> u32 {
359        self.max_attempts.get() - 1
360    }
361
362    /// Set the total number of attempts including the initial try.
363    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
364        Self {
365            max_attempts,
366            ..self
367        }
368    }
369
370    /// Set the minimum base delay for retries.
371    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
372        Self {
373            min_base_delay,
374            ..self
375        }
376    }
377
378    /// Set the maximum base delay for retries.
379    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
380        Self {
381            max_base_delay,
382            ..self
383        }
384    }
385
386    /// Set the retry policy for [`append`](crate::S2Stream::append) and
387    /// [`append_session`](crate::S2Stream::append_session) operations.
388    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
389        Self {
390            append_retry_policy,
391            ..self
392        }
393    }
394}
395
396#[derive(Debug, Clone)]
397#[non_exhaustive]
398/// Configuration for [`S2`](crate::S2).
399pub struct S2Config {
400    pub(crate) access_token: SecretString,
401    pub(crate) endpoints: S2Endpoints,
402    pub(crate) connection_timeout: Duration,
403    pub(crate) request_timeout: Duration,
404    pub(crate) retry: RetryConfig,
405    pub(crate) compression: Compression,
406    pub(crate) user_agent: HeaderValue,
407    pub(crate) insecure_skip_cert_verification: bool,
408}
409
410impl S2Config {
411    /// Create a new [`S2Config`] with the given access token and default settings.
412    pub fn new(access_token: impl Into<String>) -> Self {
413        Self {
414            access_token: access_token.into().into(),
415            endpoints: S2Endpoints::for_aws(),
416            connection_timeout: Duration::from_secs(3),
417            request_timeout: Duration::from_secs(5),
418            retry: RetryConfig::new(),
419            compression: Compression::None,
420            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
421                .parse()
422                .expect("valid user agent"),
423            insecure_skip_cert_verification: false,
424        }
425    }
426
427    /// Set the S2 endpoints to connect to.
428    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
429        Self { endpoints, ..self }
430    }
431
432    /// Set the timeout for establishing a connection to the server.
433    ///
434    /// Defaults to `3s`.
435    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
436        Self {
437            connection_timeout,
438            ..self
439        }
440    }
441
442    /// Set the timeout for requests.
443    ///
444    /// Defaults to `5s`.
445    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
446        Self {
447            request_timeout,
448            ..self
449        }
450    }
451
452    /// Set the retry configuration for requests.
453    ///
454    /// See [`RetryConfig`] for defaults.
455    pub fn with_retry(self, retry: RetryConfig) -> Self {
456        Self { retry, ..self }
457    }
458
459    /// Set the compression algorithm for requests and responses.
460    ///
461    /// Defaults to no compression.
462    pub fn with_compression(self, compression: Compression) -> Self {
463        Self {
464            compression,
465            ..self
466        }
467    }
468
469    /// Skip TLS certificate verification (insecure).
470    ///
471    /// This is useful for connecting to endpoints with self-signed certificates
472    /// or certificates that don't match the hostname (similar to `curl -k`).
473    ///
474    /// # Warning
475    ///
476    /// This disables certificate verification and should only be used for
477    /// testing or development purposes. **Never use this in production.**
478    ///
479    /// Defaults to `false`.
480    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
481        Self {
482            insecure_skip_cert_verification: skip,
483            ..self
484        }
485    }
486
487    #[doc(hidden)]
488    #[cfg(feature = "_hidden")]
489    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
490        let user_agent = user_agent
491            .into()
492            .parse()
493            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
494        Ok(Self { user_agent, ..self })
495    }
496}
497
498#[derive(Debug, Default, Clone, PartialEq, Eq)]
499#[non_exhaustive]
500/// A page of values.
501pub struct Page<T> {
502    /// Values in this page.
503    pub values: Vec<T>,
504    /// Whether there are more pages.
505    pub has_more: bool,
506}
507
508impl<T> Page<T> {
509    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
510        Self {
511            values: values.into(),
512            has_more,
513        }
514    }
515}
516
517#[derive(Debug, Clone, Copy, PartialEq, Eq)]
518/// Storage class for recent appends.
519pub enum StorageClass {
520    /// Standard storage class that offers append latencies under `500ms`.
521    Standard,
522    /// Express storage class that offers append latencies under `50ms`.
523    Express,
524}
525
526impl From<api::config::StorageClass> for StorageClass {
527    fn from(value: api::config::StorageClass) -> Self {
528        match value {
529            api::config::StorageClass::Standard => StorageClass::Standard,
530            api::config::StorageClass::Express => StorageClass::Express,
531        }
532    }
533}
534
535impl From<StorageClass> for api::config::StorageClass {
536    fn from(value: StorageClass) -> Self {
537        match value {
538            StorageClass::Standard => api::config::StorageClass::Standard,
539            StorageClass::Express => api::config::StorageClass::Express,
540        }
541    }
542}
543
544#[derive(Debug, Clone, Copy, PartialEq, Eq)]
545/// Retention policy for records in a stream.
546pub enum RetentionPolicy {
547    /// Age in seconds. Records older than this age are automatically trimmed.
548    Age(u64),
549    /// Records are retained indefinitely unless explicitly trimmed.
550    Infinite,
551}
552
553impl From<api::config::RetentionPolicy> for RetentionPolicy {
554    fn from(value: api::config::RetentionPolicy) -> Self {
555        match value {
556            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
557            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
558        }
559    }
560}
561
562impl From<RetentionPolicy> for api::config::RetentionPolicy {
563    fn from(value: RetentionPolicy) -> Self {
564        match value {
565            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
566            RetentionPolicy::Infinite => {
567                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
568            }
569        }
570    }
571}
572
573#[derive(Debug, Clone, Copy, PartialEq, Eq)]
574/// Timestamping mode for appends that influences how timestamps are handled.
575pub enum TimestampingMode {
576    /// Prefer client-specified timestamp if present otherwise use arrival time.
577    ClientPrefer,
578    /// Require a client-specified timestamp and reject the append if it is missing.
579    ClientRequire,
580    /// Use the arrival time and ignore any client-specified timestamp.
581    Arrival,
582}
583
584impl From<api::config::TimestampingMode> for TimestampingMode {
585    fn from(value: api::config::TimestampingMode) -> Self {
586        match value {
587            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
588            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
589            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
590        }
591    }
592}
593
594impl From<TimestampingMode> for api::config::TimestampingMode {
595    fn from(value: TimestampingMode) -> Self {
596        match value {
597            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
598            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
599            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
600        }
601    }
602}
603
604#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
605#[non_exhaustive]
606/// Configuration for timestamping behavior.
607pub struct TimestampingConfig {
608    /// Timestamping mode for appends that influences how timestamps are handled.
609    ///
610    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
611    pub mode: Option<TimestampingMode>,
612    /// Whether client-specified timestamps are allowed to exceed the arrival time.
613    ///
614    /// Defaults to `false` (client timestamps are capped at the arrival time).
615    pub uncapped: Option<bool>,
616}
617
618impl TimestampingConfig {
619    /// Create a new [`TimestampingConfig`] with default settings.
620    pub fn new() -> Self {
621        Self::default()
622    }
623
624    /// Set the timestamping mode for appends that influences how timestamps are handled.
625    pub fn with_mode(self, mode: TimestampingMode) -> Self {
626        Self {
627            mode: Some(mode),
628            ..self
629        }
630    }
631
632    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
633    pub fn with_uncapped(self, uncapped: bool) -> Self {
634        Self {
635            uncapped: Some(uncapped),
636            ..self
637        }
638    }
639}
640
641impl From<api::config::TimestampingConfig> for TimestampingConfig {
642    fn from(value: api::config::TimestampingConfig) -> Self {
643        Self {
644            mode: value.mode.map(Into::into),
645            uncapped: value.uncapped,
646        }
647    }
648}
649
650impl From<TimestampingConfig> for api::config::TimestampingConfig {
651    fn from(value: TimestampingConfig) -> Self {
652        Self {
653            mode: value.mode.map(Into::into),
654            uncapped: value.uncapped,
655        }
656    }
657}
658
659#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
660#[non_exhaustive]
661/// Configuration for automatically deleting a stream when it becomes empty.
662pub struct DeleteOnEmptyConfig {
663    /// Minimum age in seconds before an empty stream can be deleted.
664    ///
665    /// Defaults to `0` (disables automatic deletion).
666    pub min_age_secs: u64,
667}
668
669impl DeleteOnEmptyConfig {
670    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
671    pub fn new() -> Self {
672        Self::default()
673    }
674
675    /// Set the minimum age in seconds before an empty stream can be deleted.
676    pub fn with_min_age(self, min_age: Duration) -> Self {
677        Self {
678            min_age_secs: min_age.as_secs(),
679        }
680    }
681}
682
683impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
684    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
685        Self {
686            min_age_secs: value.min_age_secs,
687        }
688    }
689}
690
691impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
692    fn from(value: DeleteOnEmptyConfig) -> Self {
693        Self {
694            min_age_secs: value.min_age_secs,
695        }
696    }
697}
698
699#[derive(Debug, Clone, Default, PartialEq, Eq)]
700#[non_exhaustive]
701/// Configuration for a stream.
702pub struct StreamConfig {
703    /// Storage class for the stream.
704    ///
705    /// Defaults to [`Express`](StorageClass::Express).
706    pub storage_class: Option<StorageClass>,
707    /// Retention policy for records in the stream.
708    ///
709    /// Defaults to `7 days` of retention.
710    pub retention_policy: Option<RetentionPolicy>,
711    /// Configuration for timestamping behavior.
712    ///
713    /// See [`TimestampingConfig`] for defaults.
714    pub timestamping: Option<TimestampingConfig>,
715    /// Configuration for automatically deleting the stream when it becomes empty.
716    ///
717    /// See [`DeleteOnEmptyConfig`] for defaults.
718    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
719}
720
721impl StreamConfig {
722    /// Create a new [`StreamConfig`] with default settings.
723    pub fn new() -> Self {
724        Self::default()
725    }
726
727    /// Set the storage class for the stream.
728    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
729        Self {
730            storage_class: Some(storage_class),
731            ..self
732        }
733    }
734
735    /// Set the retention policy for records in the stream.
736    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
737        Self {
738            retention_policy: Some(retention_policy),
739            ..self
740        }
741    }
742
743    /// Set the configuration for timestamping behavior.
744    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
745        Self {
746            timestamping: Some(timestamping),
747            ..self
748        }
749    }
750
751    /// Set the configuration for automatically deleting the stream when it becomes empty.
752    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
753        Self {
754            delete_on_empty: Some(delete_on_empty),
755            ..self
756        }
757    }
758}
759
760impl From<api::config::StreamConfig> for StreamConfig {
761    fn from(value: api::config::StreamConfig) -> Self {
762        Self {
763            storage_class: value.storage_class.map(Into::into),
764            retention_policy: value.retention_policy.map(Into::into),
765            timestamping: value.timestamping.map(Into::into),
766            delete_on_empty: value.delete_on_empty.map(Into::into),
767        }
768    }
769}
770
771impl From<StreamConfig> for api::config::StreamConfig {
772    fn from(value: StreamConfig) -> Self {
773        Self {
774            storage_class: value.storage_class.map(Into::into),
775            retention_policy: value.retention_policy.map(Into::into),
776            timestamping: value.timestamping.map(Into::into),
777            delete_on_empty: value.delete_on_empty.map(Into::into),
778        }
779    }
780}
781
782#[derive(Debug, Clone, Default)]
783#[non_exhaustive]
784/// Configuration for a basin.
785pub struct BasinConfig {
786    /// Default configuration for all streams in the basin.
787    ///
788    /// See [`StreamConfig`] for defaults.
789    pub default_stream_config: Option<StreamConfig>,
790    /// Encryption algorithm to apply to newly created streams in the basin.
791    pub stream_cipher: Option<EncryptionAlgorithm>,
792    /// Whether to create stream on append if it doesn't exist using default stream configuration.
793    ///
794    /// Defaults to `false`.
795    pub create_stream_on_append: bool,
796    /// Whether to create stream on read if it doesn't exist using default stream configuration.
797    ///
798    /// Defaults to `false`.
799    pub create_stream_on_read: bool,
800}
801
802impl BasinConfig {
803    /// Create a new [`BasinConfig`] with default settings.
804    pub fn new() -> Self {
805        Self::default()
806    }
807
808    /// Set the default configuration for all streams in the basin.
809    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
810        Self {
811            default_stream_config: Some(config),
812            ..self
813        }
814    }
815
816    /// Set the encryption algorithm to apply to newly created streams in the basin.
817    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
818        Self {
819            stream_cipher: Some(stream_cipher),
820            ..self
821        }
822    }
823
824    /// Set whether to create stream on append if it doesn't exist using default stream
825    /// configuration.
826    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
827        Self {
828            create_stream_on_append,
829            ..self
830        }
831    }
832
833    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
834    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
835        Self {
836            create_stream_on_read,
837            ..self
838        }
839    }
840}
841
842impl From<api::config::BasinConfig> for BasinConfig {
843    fn from(value: api::config::BasinConfig) -> Self {
844        Self {
845            default_stream_config: value.default_stream_config.map(Into::into),
846            stream_cipher: value.stream_cipher.map(Into::into),
847            create_stream_on_append: value.create_stream_on_append,
848            create_stream_on_read: value.create_stream_on_read,
849        }
850    }
851}
852
853impl From<BasinConfig> for api::config::BasinConfig {
854    fn from(value: BasinConfig) -> Self {
855        Self {
856            default_stream_config: value.default_stream_config.map(Into::into),
857            stream_cipher: value.stream_cipher.map(Into::into),
858            create_stream_on_append: value.create_stream_on_append,
859            create_stream_on_read: value.create_stream_on_read,
860        }
861    }
862}
863
864#[derive(Debug, Clone, PartialEq, Eq)]
865/// Scope of a basin.
866#[non_exhaustive]
867pub enum BasinScope {
868    /// AWS `us-east-1` region.
869    AwsUsEast1,
870    /// AWS `us-west-2` region.
871    AwsUsWest2,
872    /// AWS `eu-north-1` region.
873    AwsEuNorth1,
874}
875
876impl From<api::basin::BasinScope> for BasinScope {
877    fn from(value: api::basin::BasinScope) -> Self {
878        match value {
879            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
880            api::basin::BasinScope::AwsUsWest2 => BasinScope::AwsUsWest2,
881            api::basin::BasinScope::AwsEuNorth1 => BasinScope::AwsEuNorth1,
882        }
883    }
884}
885
886impl From<BasinScope> for api::basin::BasinScope {
887    fn from(value: BasinScope) -> Self {
888        match value {
889            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
890            BasinScope::AwsUsWest2 => api::basin::BasinScope::AwsUsWest2,
891            BasinScope::AwsEuNorth1 => api::basin::BasinScope::AwsEuNorth1,
892        }
893    }
894}
895
896#[derive(Debug, Clone)]
897#[non_exhaustive]
898/// Input for [`create_basin`](crate::S2::create_basin) operation.
899pub struct CreateBasinInput {
900    /// Basin name.
901    pub name: BasinName,
902    /// Configuration for the basin.
903    ///
904    /// See [`BasinConfig`] for defaults.
905    pub config: Option<BasinConfig>,
906    /// Scope of the basin.
907    ///
908    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
909    pub scope: Option<BasinScope>,
910    idempotency_token: String,
911}
912
913impl CreateBasinInput {
914    /// Create a new [`CreateBasinInput`] with the given basin name.
915    pub fn new(name: BasinName) -> Self {
916        Self {
917            name,
918            config: None,
919            scope: None,
920            idempotency_token: idempotency_token(),
921        }
922    }
923
924    /// Set the configuration for the basin.
925    pub fn with_config(self, config: BasinConfig) -> Self {
926        Self {
927            config: Some(config),
928            ..self
929        }
930    }
931
932    /// Set the scope of the basin.
933    pub fn with_scope(self, scope: BasinScope) -> Self {
934        Self {
935            scope: Some(scope),
936            ..self
937        }
938    }
939}
940
941impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
942    fn from(value: CreateBasinInput) -> Self {
943        (
944            api::basin::CreateBasinRequest {
945                basin: value.name,
946                config: value.config.map(Into::into),
947                scope: value.scope.map(Into::into),
948            },
949            value.idempotency_token,
950        )
951    }
952}
953
954#[derive(Debug, Clone)]
955#[non_exhaustive]
956/// Input for [`ensure_basin`](crate::S2::ensure_basin) operation.
957#[doc(hidden)]
958#[cfg(feature = "_hidden")]
959pub struct EnsureBasinInput {
960    /// Basin name.
961    pub name: BasinName,
962    /// Desired configuration for the basin.
963    ///
964    /// If `None`, the basin is ensured with the default configuration.
965    config: Option<api::config::BasinConfig>,
966    /// Scope of the basin.
967    ///
968    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once set.
969    pub scope: Option<BasinScope>,
970}
971
972#[cfg(feature = "_hidden")]
973impl EnsureBasinInput {
974    /// Create a new [`EnsureBasinInput`] with the given basin name.
975    pub fn new(name: BasinName) -> Self {
976        Self {
977            name,
978            config: None,
979            scope: None,
980        }
981    }
982
983    /// Set the desired configuration for the basin.
984    pub fn with_config(self, config: impl Into<s2_api::v1::config::BasinConfig>) -> Self {
985        Self {
986            config: Some(config.into()),
987            ..self
988        }
989    }
990
991    /// Set the scope of the basin.
992    pub fn with_scope(self, scope: BasinScope) -> Self {
993        Self {
994            scope: Some(scope),
995            ..self
996        }
997    }
998}
999
1000#[cfg(feature = "_hidden")]
1001impl From<EnsureBasinInput> for (BasinName, Option<api::basin::EnsureBasinRequest>) {
1002    fn from(value: EnsureBasinInput) -> Self {
1003        let config = value.config;
1004        let request = if config.is_some() || value.scope.is_some() {
1005            Some(api::basin::EnsureBasinRequest {
1006                config,
1007                scope: value.scope.map(Into::into),
1008            })
1009        } else {
1010            None
1011        };
1012        (value.name, request)
1013    }
1014}
1015
1016#[derive(Debug, Clone, Default)]
1017#[non_exhaustive]
1018/// Input for [`list_basins`](crate::S2::list_basins) operation.
1019pub struct ListBasinsInput {
1020    /// Filter basins whose names begin with this value.
1021    ///
1022    /// Defaults to `""`.
1023    pub prefix: BasinNamePrefix,
1024    /// Filter basins whose names are lexicographically greater than this value.
1025    ///
1026    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
1027    ///
1028    /// Defaults to `""`.
1029    pub start_after: BasinNameStartAfter,
1030    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
1031    ///
1032    /// Defaults to `1000`.
1033    pub limit: Option<usize>,
1034}
1035
1036impl ListBasinsInput {
1037    /// Create a new [`ListBasinsInput`] with default values.
1038    pub fn new() -> Self {
1039        Self::default()
1040    }
1041
1042    /// Set the prefix used to filter basins whose names begin with this value.
1043    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1044        Self { prefix, ..self }
1045    }
1046
1047    /// Set the value used to filter basins whose names are lexicographically greater than this
1048    /// value.
1049    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1050        Self {
1051            start_after,
1052            ..self
1053        }
1054    }
1055
1056    /// Set the limit on number of basins to return in a page.
1057    pub fn with_limit(self, limit: usize) -> Self {
1058        Self {
1059            limit: Some(limit),
1060            ..self
1061        }
1062    }
1063}
1064
1065impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
1066    fn from(value: ListBasinsInput) -> Self {
1067        Self {
1068            prefix: Some(value.prefix),
1069            start_after: Some(value.start_after),
1070            limit: value.limit,
1071        }
1072    }
1073}
1074
1075#[derive(Debug, Clone, Default)]
1076/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
1077pub struct ListAllBasinsInput {
1078    /// Filter basins whose names begin with this value.
1079    ///
1080    /// Defaults to `""`.
1081    pub prefix: BasinNamePrefix,
1082    /// Filter basins whose names are lexicographically greater than this value.
1083    ///
1084    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
1085    ///
1086    /// Defaults to `""`.
1087    pub start_after: BasinNameStartAfter,
1088    /// Whether to include basins that are being deleted.
1089    ///
1090    /// Defaults to `false`.
1091    pub include_deleted: bool,
1092}
1093
1094impl ListAllBasinsInput {
1095    /// Create a new [`ListAllBasinsInput`] with default values.
1096    pub fn new() -> Self {
1097        Self::default()
1098    }
1099
1100    /// Set the prefix used to filter basins whose names begin with this value.
1101    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1102        Self { prefix, ..self }
1103    }
1104
1105    /// Set the value used to filter basins whose names are lexicographically greater than this
1106    /// value.
1107    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1108        Self {
1109            start_after,
1110            ..self
1111        }
1112    }
1113
1114    /// Set whether to include basins that are being deleted.
1115    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1116        Self {
1117            include_deleted,
1118            ..self
1119        }
1120    }
1121}
1122
1123#[derive(Debug, Clone, PartialEq, Eq)]
1124#[non_exhaustive]
1125/// Basin information.
1126pub struct BasinInfo {
1127    /// Basin name.
1128    pub name: BasinName,
1129    /// Scope of the basin.
1130    pub scope: Option<BasinScope>,
1131    /// Creation time.
1132    pub created_at: S2DateTime,
1133    /// Deletion time if the basin is being deleted.
1134    pub deleted_at: Option<S2DateTime>,
1135}
1136
1137impl TryFrom<api::basin::BasinInfo> for BasinInfo {
1138    type Error = ValidationError;
1139
1140    fn try_from(value: api::basin::BasinInfo) -> Result<Self, Self::Error> {
1141        Ok(Self {
1142            name: value.name,
1143            scope: value.scope.map(Into::into),
1144            created_at: value.created_at.try_into()?,
1145            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
1146        })
1147    }
1148}
1149
1150#[derive(Debug, Clone)]
1151#[non_exhaustive]
1152/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1153pub struct DeleteBasinInput {
1154    /// Basin name.
1155    pub name: BasinName,
1156    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1157    pub ignore_not_found: bool,
1158}
1159
1160impl DeleteBasinInput {
1161    /// Create a new [`DeleteBasinInput`] with the given basin name.
1162    pub fn new(name: BasinName) -> Self {
1163        Self {
1164            name,
1165            ignore_not_found: false,
1166        }
1167    }
1168
1169    /// Set whether to ignore `Not Found` error if the basin is not existing.
1170    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1171        Self {
1172            ignore_not_found,
1173            ..self
1174        }
1175    }
1176}
1177
1178#[derive(Debug, Clone, Default)]
1179#[non_exhaustive]
1180/// Reconfiguration for [`TimestampingConfig`].
1181pub struct TimestampingReconfiguration {
1182    /// Override for the existing [`mode`](TimestampingConfig::mode).
1183    pub mode: Maybe<Option<TimestampingMode>>,
1184    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1185    pub uncapped: Maybe<Option<bool>>,
1186}
1187
1188impl TimestampingReconfiguration {
1189    /// Create a new [`TimestampingReconfiguration`].
1190    pub fn new() -> Self {
1191        Self::default()
1192    }
1193
1194    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1195    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1196        Self {
1197            mode: Maybe::Specified(Some(mode)),
1198            ..self
1199        }
1200    }
1201
1202    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1203    pub fn with_uncapped(self, uncapped: bool) -> Self {
1204        Self {
1205            uncapped: Maybe::Specified(Some(uncapped)),
1206            ..self
1207        }
1208    }
1209}
1210
1211impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1212    fn from(value: TimestampingReconfiguration) -> Self {
1213        Self {
1214            mode: value.mode.map(|m| m.map(Into::into)),
1215            uncapped: value.uncapped,
1216        }
1217    }
1218}
1219
1220#[derive(Debug, Clone, Default)]
1221#[non_exhaustive]
1222/// Reconfiguration for [`DeleteOnEmptyConfig`].
1223pub struct DeleteOnEmptyReconfiguration {
1224    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1225    pub min_age_secs: Maybe<Option<u64>>,
1226}
1227
1228impl DeleteOnEmptyReconfiguration {
1229    /// Create a new [`DeleteOnEmptyReconfiguration`].
1230    pub fn new() -> Self {
1231        Self::default()
1232    }
1233
1234    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1235    pub fn with_min_age(self, min_age: Duration) -> Self {
1236        Self {
1237            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1238        }
1239    }
1240}
1241
1242impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1243    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1244        Self {
1245            min_age_secs: value.min_age_secs,
1246        }
1247    }
1248}
1249
1250#[derive(Debug, Clone, Default)]
1251#[non_exhaustive]
1252/// Reconfiguration for [`StreamConfig`].
1253pub struct StreamReconfiguration {
1254    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1255    pub storage_class: Maybe<Option<StorageClass>>,
1256    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1257    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1258    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1259    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1260    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1261    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1262}
1263
1264impl StreamReconfiguration {
1265    /// Create a new [`StreamReconfiguration`].
1266    pub fn new() -> Self {
1267        Self::default()
1268    }
1269
1270    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1271    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1272        Self {
1273            storage_class: Maybe::Specified(Some(storage_class)),
1274            ..self
1275        }
1276    }
1277
1278    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1279    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1280        Self {
1281            retention_policy: Maybe::Specified(Some(retention_policy)),
1282            ..self
1283        }
1284    }
1285
1286    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1287    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1288        Self {
1289            timestamping: Maybe::Specified(Some(timestamping)),
1290            ..self
1291        }
1292    }
1293
1294    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1295    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1296        Self {
1297            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1298            ..self
1299        }
1300    }
1301}
1302
1303impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1304    fn from(value: StreamReconfiguration) -> Self {
1305        Self {
1306            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1307            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1308            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1309            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1310        }
1311    }
1312}
1313
1314#[derive(Debug, Clone, Default)]
1315#[non_exhaustive]
1316/// Reconfiguration for [`BasinConfig`].
1317pub struct BasinReconfiguration {
1318    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1319    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1320    /// Override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1321    pub stream_cipher: Maybe<Option<EncryptionAlgorithm>>,
1322    /// Override for the existing
1323    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1324    pub create_stream_on_append: Maybe<bool>,
1325    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1326    pub create_stream_on_read: Maybe<bool>,
1327}
1328
1329impl BasinReconfiguration {
1330    /// Create a new [`BasinReconfiguration`].
1331    pub fn new() -> Self {
1332        Self::default()
1333    }
1334
1335    /// Set the override for the existing
1336    /// [`default_stream_config`](BasinConfig::default_stream_config).
1337    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1338        Self {
1339            default_stream_config: Maybe::Specified(Some(config)),
1340            ..self
1341        }
1342    }
1343
1344    /// Set the override for the existing [`stream_cipher`](BasinConfig::stream_cipher).
1345    pub fn with_stream_cipher(self, stream_cipher: EncryptionAlgorithm) -> Self {
1346        Self {
1347            stream_cipher: Maybe::Specified(Some(stream_cipher)),
1348            ..self
1349        }
1350    }
1351
1352    /// Set the override for the existing
1353    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1354    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1355        Self {
1356            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1357            ..self
1358        }
1359    }
1360
1361    /// Set the override for the existing
1362    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1363    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1364        Self {
1365            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1366            ..self
1367        }
1368    }
1369}
1370
1371impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1372    fn from(value: BasinReconfiguration) -> Self {
1373        Self {
1374            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1375            stream_cipher: value.stream_cipher.map(|m| m.map(Into::into)),
1376            create_stream_on_append: value.create_stream_on_append,
1377            create_stream_on_read: value.create_stream_on_read,
1378        }
1379    }
1380}
1381
1382#[derive(Debug, Clone)]
1383#[non_exhaustive]
1384/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1385pub struct ReconfigureBasinInput {
1386    /// Basin name.
1387    pub name: BasinName,
1388    /// Reconfiguration for [`BasinConfig`].
1389    pub config: BasinReconfiguration,
1390}
1391
1392impl ReconfigureBasinInput {
1393    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1394    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1395        Self { name, config }
1396    }
1397}
1398
1399#[derive(Debug, Clone, Default)]
1400#[non_exhaustive]
1401/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1402pub struct ListAccessTokensInput {
1403    /// Filter access tokens whose IDs begin with this value.
1404    ///
1405    /// Defaults to `""`.
1406    pub prefix: AccessTokenIdPrefix,
1407    /// Filter access tokens whose IDs are lexicographically greater than this value.
1408    ///
1409    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1410    ///
1411    /// Defaults to `""`.
1412    pub start_after: AccessTokenIdStartAfter,
1413    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1414    ///
1415    /// Defaults to `1000`.
1416    pub limit: Option<usize>,
1417}
1418
1419impl ListAccessTokensInput {
1420    /// Create a new [`ListAccessTokensInput`] with default values.
1421    pub fn new() -> Self {
1422        Self::default()
1423    }
1424
1425    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1426    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1427        Self { prefix, ..self }
1428    }
1429
1430    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1431    /// value.
1432    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1433        Self {
1434            start_after,
1435            ..self
1436        }
1437    }
1438
1439    /// Set the limit on number of access tokens to return in a page.
1440    pub fn with_limit(self, limit: usize) -> Self {
1441        Self {
1442            limit: Some(limit),
1443            ..self
1444        }
1445    }
1446}
1447
1448impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1449    fn from(value: ListAccessTokensInput) -> Self {
1450        Self {
1451            prefix: Some(value.prefix),
1452            start_after: Some(value.start_after),
1453            limit: value.limit,
1454        }
1455    }
1456}
1457
1458#[derive(Debug, Clone, Default)]
1459/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1460pub struct ListAllAccessTokensInput {
1461    /// Filter access tokens whose IDs begin with this value.
1462    ///
1463    /// Defaults to `""`.
1464    pub prefix: AccessTokenIdPrefix,
1465    /// Filter access tokens whose IDs are lexicographically greater than this value.
1466    ///
1467    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1468    ///
1469    /// Defaults to `""`.
1470    pub start_after: AccessTokenIdStartAfter,
1471}
1472
1473impl ListAllAccessTokensInput {
1474    /// Create a new [`ListAllAccessTokensInput`] with default values.
1475    pub fn new() -> Self {
1476        Self::default()
1477    }
1478
1479    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1480    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1481        Self { prefix, ..self }
1482    }
1483
1484    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1485    /// this value.
1486    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1487        Self {
1488            start_after,
1489            ..self
1490        }
1491    }
1492}
1493
1494#[derive(Debug, Clone)]
1495#[non_exhaustive]
1496/// Access token information.
1497pub struct AccessTokenInfo {
1498    /// Access token ID.
1499    pub id: AccessTokenId,
1500    /// Expiration time.
1501    pub expires_at: S2DateTime,
1502    /// Whether to automatically prefix stream names during creation and strip the prefix during
1503    /// listing.
1504    pub auto_prefix_streams: bool,
1505    /// Scope of the access token.
1506    pub scope: AccessTokenScope,
1507}
1508
1509impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1510    type Error = ValidationError;
1511
1512    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1513        let expires_at = value
1514            .expires_at
1515            .map(S2DateTime::try_from)
1516            .transpose()?
1517            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1518        Ok(Self {
1519            id: value.id,
1520            expires_at,
1521            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1522            scope: value.scope.into(),
1523        })
1524    }
1525}
1526
1527#[derive(Debug, Clone)]
1528/// Pattern for matching basins.
1529///
1530/// See [`AccessTokenScope::basins`].
1531pub enum BasinMatcher {
1532    /// Match no basins.
1533    None,
1534    /// Match exactly this basin.
1535    Exact(BasinName),
1536    /// Match all basins with this prefix.
1537    Prefix(BasinNamePrefix),
1538}
1539
1540#[derive(Debug, Clone)]
1541/// Pattern for matching streams.
1542///
1543/// See [`AccessTokenScope::streams`].
1544pub enum StreamMatcher {
1545    /// Match no streams.
1546    None,
1547    /// Match exactly this stream.
1548    Exact(StreamName),
1549    /// Match all streams with this prefix.
1550    Prefix(StreamNamePrefix),
1551}
1552
1553#[derive(Debug, Clone)]
1554/// Pattern for matching access tokens.
1555///
1556/// See [`AccessTokenScope::access_tokens`].
1557pub enum AccessTokenMatcher {
1558    /// Match no access tokens.
1559    None,
1560    /// Match exactly this access token.
1561    Exact(AccessTokenId),
1562    /// Match all access tokens with this prefix.
1563    Prefix(AccessTokenIdPrefix),
1564}
1565
1566#[derive(Debug, Clone, Default)]
1567#[non_exhaustive]
1568/// Permissions indicating allowed operations.
1569pub struct ReadWritePermissions {
1570    /// Read permission.
1571    ///
1572    /// Defaults to `false`.
1573    pub read: bool,
1574    /// Write permission.
1575    ///
1576    /// Defaults to `false`.
1577    pub write: bool,
1578}
1579
1580impl ReadWritePermissions {
1581    /// Create a new [`ReadWritePermissions`] with default values.
1582    pub fn new() -> Self {
1583        Self::default()
1584    }
1585
1586    /// Create read-only permissions.
1587    pub fn read_only() -> Self {
1588        Self {
1589            read: true,
1590            write: false,
1591        }
1592    }
1593
1594    /// Create write-only permissions.
1595    pub fn write_only() -> Self {
1596        Self {
1597            read: false,
1598            write: true,
1599        }
1600    }
1601
1602    /// Create read-write permissions.
1603    pub fn read_write() -> Self {
1604        Self {
1605            read: true,
1606            write: true,
1607        }
1608    }
1609}
1610
1611impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1612    fn from(value: ReadWritePermissions) -> Self {
1613        Self {
1614            read: Some(value.read),
1615            write: Some(value.write),
1616        }
1617    }
1618}
1619
1620impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1621    fn from(value: api::access::ReadWritePermissions) -> Self {
1622        Self {
1623            read: value.read.unwrap_or_default(),
1624            write: value.write.unwrap_or_default(),
1625        }
1626    }
1627}
1628
1629#[derive(Debug, Clone, Default)]
1630#[non_exhaustive]
1631/// Permissions at the operation group level.
1632///
1633/// See [`AccessTokenScope::op_group_perms`].
1634pub struct OperationGroupPermissions {
1635    /// Account-level access permissions.
1636    ///
1637    /// Defaults to `None`.
1638    pub account: Option<ReadWritePermissions>,
1639    /// Basin-level access permissions.
1640    ///
1641    /// Defaults to `None`.
1642    pub basin: Option<ReadWritePermissions>,
1643    /// Stream-level access permissions.
1644    ///
1645    /// Defaults to `None`.
1646    pub stream: Option<ReadWritePermissions>,
1647}
1648
1649impl OperationGroupPermissions {
1650    /// Create a new [`OperationGroupPermissions`] with default values.
1651    pub fn new() -> Self {
1652        Self::default()
1653    }
1654
1655    /// Create read-only permissions for all groups.
1656    pub fn read_only_all() -> Self {
1657        Self {
1658            account: Some(ReadWritePermissions::read_only()),
1659            basin: Some(ReadWritePermissions::read_only()),
1660            stream: Some(ReadWritePermissions::read_only()),
1661        }
1662    }
1663
1664    /// Create write-only permissions for all groups.
1665    pub fn write_only_all() -> Self {
1666        Self {
1667            account: Some(ReadWritePermissions::write_only()),
1668            basin: Some(ReadWritePermissions::write_only()),
1669            stream: Some(ReadWritePermissions::write_only()),
1670        }
1671    }
1672
1673    /// Create read-write permissions for all groups.
1674    pub fn read_write_all() -> Self {
1675        Self {
1676            account: Some(ReadWritePermissions::read_write()),
1677            basin: Some(ReadWritePermissions::read_write()),
1678            stream: Some(ReadWritePermissions::read_write()),
1679        }
1680    }
1681
1682    /// Set account-level access permissions.
1683    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1684        Self {
1685            account: Some(account),
1686            ..self
1687        }
1688    }
1689
1690    /// Set basin-level access permissions.
1691    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1692        Self {
1693            basin: Some(basin),
1694            ..self
1695        }
1696    }
1697
1698    /// Set stream-level access permissions.
1699    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1700        Self {
1701            stream: Some(stream),
1702            ..self
1703        }
1704    }
1705}
1706
1707impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1708    fn from(value: OperationGroupPermissions) -> Self {
1709        Self {
1710            account: value.account.map(Into::into),
1711            basin: value.basin.map(Into::into),
1712            stream: value.stream.map(Into::into),
1713        }
1714    }
1715}
1716
1717impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1718    fn from(value: api::access::PermittedOperationGroups) -> Self {
1719        Self {
1720            account: value.account.map(Into::into),
1721            basin: value.basin.map(Into::into),
1722            stream: value.stream.map(Into::into),
1723        }
1724    }
1725}
1726
1727#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1728/// Individual operation that can be permitted.
1729///
1730/// See [`AccessTokenScope::ops`].
1731pub enum Operation {
1732    /// List basins.
1733    ListBasins,
1734    /// Create a basin.
1735    CreateBasin,
1736    /// Get basin configuration.
1737    GetBasinConfig,
1738    /// Delete a basin.
1739    DeleteBasin,
1740    /// Reconfigure a basin.
1741    ReconfigureBasin,
1742    /// List access tokens.
1743    ListAccessTokens,
1744    /// Issue an access token.
1745    IssueAccessToken,
1746    /// Revoke an access token.
1747    RevokeAccessToken,
1748    /// Get account metrics.
1749    GetAccountMetrics,
1750    /// Get basin metrics.
1751    GetBasinMetrics,
1752    /// Get stream metrics.
1753    GetStreamMetrics,
1754    /// List streams.
1755    ListStreams,
1756    /// Create a stream.
1757    CreateStream,
1758    /// Get stream configuration.
1759    GetStreamConfig,
1760    /// Delete a stream.
1761    DeleteStream,
1762    /// Reconfigure a stream.
1763    ReconfigureStream,
1764    /// Check the tail of a stream.
1765    CheckTail,
1766    /// Append records to a stream.
1767    Append,
1768    /// Read records from a stream.
1769    Read,
1770    /// Trim records on a stream.
1771    Trim,
1772    /// Set the fencing token on a stream.
1773    Fence,
1774}
1775
1776impl From<Operation> for api::access::Operation {
1777    fn from(value: Operation) -> Self {
1778        match value {
1779            Operation::ListBasins => api::access::Operation::ListBasins,
1780            Operation::CreateBasin => api::access::Operation::CreateBasin,
1781            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1782            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1783            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1784            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1785            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1786            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1787            Operation::ListStreams => api::access::Operation::ListStreams,
1788            Operation::CreateStream => api::access::Operation::CreateStream,
1789            Operation::DeleteStream => api::access::Operation::DeleteStream,
1790            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1791            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1792            Operation::CheckTail => api::access::Operation::CheckTail,
1793            Operation::Append => api::access::Operation::Append,
1794            Operation::Read => api::access::Operation::Read,
1795            Operation::Trim => api::access::Operation::Trim,
1796            Operation::Fence => api::access::Operation::Fence,
1797            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1798            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1799            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1800        }
1801    }
1802}
1803
1804impl From<api::access::Operation> for Operation {
1805    fn from(value: api::access::Operation) -> Self {
1806        match value {
1807            api::access::Operation::ListBasins => Operation::ListBasins,
1808            api::access::Operation::CreateBasin => Operation::CreateBasin,
1809            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1810            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1811            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1812            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1813            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1814            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1815            api::access::Operation::ListStreams => Operation::ListStreams,
1816            api::access::Operation::CreateStream => Operation::CreateStream,
1817            api::access::Operation::DeleteStream => Operation::DeleteStream,
1818            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1819            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1820            api::access::Operation::CheckTail => Operation::CheckTail,
1821            api::access::Operation::Append => Operation::Append,
1822            api::access::Operation::Read => Operation::Read,
1823            api::access::Operation::Trim => Operation::Trim,
1824            api::access::Operation::Fence => Operation::Fence,
1825            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1826            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1827            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1828        }
1829    }
1830}
1831
1832#[derive(Debug, Clone)]
1833#[non_exhaustive]
1834/// Scope of an access token.
1835///
1836/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1837/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1838/// final set must not be empty.
1839///
1840/// See [`IssueAccessTokenInput::scope`].
1841pub struct AccessTokenScopeInput {
1842    basins: Option<BasinMatcher>,
1843    streams: Option<StreamMatcher>,
1844    access_tokens: Option<AccessTokenMatcher>,
1845    op_group_perms: Option<OperationGroupPermissions>,
1846    ops: HashSet<Operation>,
1847}
1848
1849impl AccessTokenScopeInput {
1850    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1851    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1852        Self {
1853            basins: None,
1854            streams: None,
1855            access_tokens: None,
1856            op_group_perms: None,
1857            ops: ops.into_iter().collect(),
1858        }
1859    }
1860
1861    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1862    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1863        Self {
1864            basins: None,
1865            streams: None,
1866            access_tokens: None,
1867            op_group_perms: Some(op_group_perms),
1868            ops: HashSet::default(),
1869        }
1870    }
1871
1872    /// Set the permitted operations.
1873    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1874        Self {
1875            ops: ops.into_iter().collect(),
1876            ..self
1877        }
1878    }
1879
1880    /// Set the access permissions at the operation group level.
1881    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1882        Self {
1883            op_group_perms: Some(op_group_perms),
1884            ..self
1885        }
1886    }
1887
1888    /// Set the permitted basins.
1889    ///
1890    /// Defaults to no basins.
1891    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1892        Self {
1893            basins: Some(basins),
1894            ..self
1895        }
1896    }
1897
1898    /// Set the permitted streams.
1899    ///
1900    /// Defaults to no streams.
1901    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1902        Self {
1903            streams: Some(streams),
1904            ..self
1905        }
1906    }
1907
1908    /// Set the permitted access tokens.
1909    ///
1910    /// Defaults to no access tokens.
1911    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1912        Self {
1913            access_tokens: Some(access_tokens),
1914            ..self
1915        }
1916    }
1917}
1918
1919#[derive(Debug, Clone)]
1920#[non_exhaustive]
1921/// Scope of an access token.
1922pub struct AccessTokenScope {
1923    /// Permitted basins.
1924    pub basins: Option<BasinMatcher>,
1925    /// Permitted streams.
1926    pub streams: Option<StreamMatcher>,
1927    /// Permitted access tokens.
1928    pub access_tokens: Option<AccessTokenMatcher>,
1929    /// Permissions at the operation group level.
1930    pub op_group_perms: Option<OperationGroupPermissions>,
1931    /// Permitted operations.
1932    pub ops: HashSet<Operation>,
1933}
1934
1935impl From<api::access::AccessTokenScope> for AccessTokenScope {
1936    fn from(value: api::access::AccessTokenScope) -> Self {
1937        Self {
1938            basins: value.basins.map(|rs| match rs {
1939                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1940                    BasinMatcher::Exact(e)
1941                }
1942                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1943                    BasinMatcher::None
1944                }
1945                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1946            }),
1947            streams: value.streams.map(|rs| match rs {
1948                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1949                    StreamMatcher::Exact(e)
1950                }
1951                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1952                    StreamMatcher::None
1953                }
1954                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1955            }),
1956            access_tokens: value.access_tokens.map(|rs| match rs {
1957                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1958                    AccessTokenMatcher::Exact(e)
1959                }
1960                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1961                    AccessTokenMatcher::None
1962                }
1963                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1964            }),
1965            op_group_perms: value.op_groups.map(Into::into),
1966            ops: value
1967                .ops
1968                .map(|ops| ops.into_iter().map(Into::into).collect())
1969                .unwrap_or_default(),
1970        }
1971    }
1972}
1973
1974impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1975    fn from(value: AccessTokenScopeInput) -> Self {
1976        Self {
1977            basins: value.basins.map(|rs| match rs {
1978                BasinMatcher::None => {
1979                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1980                }
1981                BasinMatcher::Exact(e) => {
1982                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1983                }
1984                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1985            }),
1986            streams: value.streams.map(|rs| match rs {
1987                StreamMatcher::None => {
1988                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1989                }
1990                StreamMatcher::Exact(e) => {
1991                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1992                }
1993                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1994            }),
1995            access_tokens: value.access_tokens.map(|rs| match rs {
1996                AccessTokenMatcher::None => {
1997                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1998                }
1999                AccessTokenMatcher::Exact(e) => {
2000                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
2001                }
2002                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
2003            }),
2004            op_groups: value.op_group_perms.map(Into::into),
2005            ops: if value.ops.is_empty() {
2006                None
2007            } else {
2008                Some(value.ops.into_iter().map(Into::into).collect())
2009            },
2010        }
2011    }
2012}
2013
2014#[derive(Debug, Clone)]
2015#[non_exhaustive]
2016/// Input for [`issue_access_token`](crate::S2::issue_access_token).
2017pub struct IssueAccessTokenInput {
2018    /// Access token ID.
2019    pub id: AccessTokenId,
2020    /// Expiration time.
2021    ///
2022    /// Defaults to the expiration time of requestor's access token passed via
2023    /// [`S2Config`](S2Config::new).
2024    pub expires_at: Option<S2DateTime>,
2025    /// Whether to automatically prefix stream names during creation and strip the prefix during
2026    /// listing.
2027    ///
2028    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
2029    /// prefix.
2030    ///
2031    /// Defaults to `false`.
2032    pub auto_prefix_streams: bool,
2033    /// Scope of the token.
2034    pub scope: AccessTokenScopeInput,
2035}
2036
2037impl IssueAccessTokenInput {
2038    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
2039    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
2040        Self {
2041            id,
2042            expires_at: None,
2043            auto_prefix_streams: false,
2044            scope,
2045        }
2046    }
2047
2048    /// Set the expiration time.
2049    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
2050        Self {
2051            expires_at: Some(expires_at),
2052            ..self
2053        }
2054    }
2055
2056    /// Set whether to automatically prefix stream names during creation and strip the prefix during
2057    /// listing.
2058    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2059        Self {
2060            auto_prefix_streams,
2061            ..self
2062        }
2063    }
2064}
2065
2066impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
2067    fn from(value: IssueAccessTokenInput) -> Self {
2068        Self {
2069            id: value.id,
2070            expires_at: value.expires_at.map(Into::into),
2071            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
2072            scope: value.scope.into(),
2073        }
2074    }
2075}
2076
2077#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2078/// Interval to accumulate over for timeseries metric sets.
2079pub enum TimeseriesInterval {
2080    /// Minute.
2081    Minute,
2082    /// Hour.
2083    Hour,
2084    /// Day.
2085    Day,
2086}
2087
2088impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2089    fn from(value: TimeseriesInterval) -> Self {
2090        match value {
2091            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2092            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2093            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2094        }
2095    }
2096}
2097
2098impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2099    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2100        match value {
2101            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2102            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2103            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2104        }
2105    }
2106}
2107
2108#[derive(Debug, Clone, Copy)]
2109#[non_exhaustive]
2110/// Time range as Unix epoch seconds.
2111pub struct TimeRange {
2112    /// Start timestamp (inclusive).
2113    pub start: u32,
2114    /// End timestamp (exclusive).
2115    pub end: u32,
2116}
2117
2118impl TimeRange {
2119    /// Create a new [`TimeRange`] with the given start and end timestamps.
2120    pub fn new(start: u32, end: u32) -> Self {
2121        Self { start, end }
2122    }
2123}
2124
2125#[derive(Debug, Clone, Copy)]
2126#[non_exhaustive]
2127/// Time range as Unix epoch seconds and accumulation interval.
2128pub struct TimeRangeAndInterval {
2129    /// Start timestamp (inclusive).
2130    pub start: u32,
2131    /// End timestamp (exclusive).
2132    pub end: u32,
2133    /// Interval to accumulate over for timeseries metric sets.
2134    ///
2135    /// Default is dependent on the requested metric set.
2136    pub interval: Option<TimeseriesInterval>,
2137}
2138
2139impl TimeRangeAndInterval {
2140    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2141    pub fn new(start: u32, end: u32) -> Self {
2142        Self {
2143            start,
2144            end,
2145            interval: None,
2146        }
2147    }
2148
2149    /// Set the interval to accumulate over for timeseries metric sets.
2150    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2151        Self {
2152            interval: Some(interval),
2153            ..self
2154        }
2155    }
2156}
2157
2158#[derive(Debug, Clone, Copy)]
2159/// Account metric set to return.
2160pub enum AccountMetricSet {
2161    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2162    /// specified time range.
2163    ActiveBasins(TimeRange),
2164    /// Returns [`AccumulationMetric`]s, one per account operation type.
2165    ///
2166    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2167    /// per interval over the requested time range.
2168    ///
2169    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2170    AccountOps(TimeRangeAndInterval),
2171}
2172
2173#[derive(Debug, Clone)]
2174#[non_exhaustive]
2175/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2176pub struct GetAccountMetricsInput {
2177    /// Metric set to return.
2178    pub set: AccountMetricSet,
2179}
2180
2181impl GetAccountMetricsInput {
2182    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2183    pub fn new(set: AccountMetricSet) -> Self {
2184        Self { set }
2185    }
2186}
2187
2188impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2189    fn from(value: GetAccountMetricsInput) -> Self {
2190        let (set, start, end, interval) = match value.set {
2191            AccountMetricSet::ActiveBasins(args) => (
2192                api::metrics::AccountMetricSet::ActiveBasins,
2193                args.start,
2194                args.end,
2195                None,
2196            ),
2197            AccountMetricSet::AccountOps(args) => (
2198                api::metrics::AccountMetricSet::AccountOps,
2199                args.start,
2200                args.end,
2201                args.interval,
2202            ),
2203        };
2204        Self {
2205            set,
2206            start: Some(start),
2207            end: Some(end),
2208            interval: interval.map(Into::into),
2209        }
2210    }
2211}
2212
2213#[derive(Debug, Clone, Copy)]
2214/// Basin metric set to return.
2215pub enum BasinMetricSet {
2216    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2217    /// in the basin, with one observed value for each hour over the requested time range.
2218    Storage(TimeRange),
2219    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2220    ///
2221    /// Each metric represents a timeseries of the number of append operations across all streams
2222    /// in the basin, with one accumulated value per interval over the requested time range.
2223    ///
2224    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2225    /// [`minute`](TimeseriesInterval::Minute).
2226    AppendOps(TimeRangeAndInterval),
2227    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2228    ///
2229    /// Each metric represents a timeseries of the number of read operations across all streams
2230    /// in the basin, with one accumulated value per interval over the requested time range.
2231    ///
2232    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2233    /// [`minute`](TimeseriesInterval::Minute).
2234    ReadOps(TimeRangeAndInterval),
2235    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2236    /// across all streams in the basin, with one accumulated value per interval
2237    /// over the requested time range.
2238    ///
2239    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2240    /// [`minute`](TimeseriesInterval::Minute).
2241    ReadThroughput(TimeRangeAndInterval),
2242    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2243    /// across all streams in the basin, with one accumulated value per interval
2244    /// over the requested time range.
2245    ///
2246    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2247    /// [`minute`](TimeseriesInterval::Minute).
2248    AppendThroughput(TimeRangeAndInterval),
2249    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2250    ///
2251    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2252    /// per interval over the requested time range.
2253    ///
2254    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2255    BasinOps(TimeRangeAndInterval),
2256}
2257
2258#[derive(Debug, Clone)]
2259#[non_exhaustive]
2260/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2261pub struct GetBasinMetricsInput {
2262    /// Basin name.
2263    pub name: BasinName,
2264    /// Metric set to return.
2265    pub set: BasinMetricSet,
2266}
2267
2268impl GetBasinMetricsInput {
2269    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2270    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2271        Self { name, set }
2272    }
2273}
2274
2275impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2276    fn from(value: GetBasinMetricsInput) -> Self {
2277        let (set, start, end, interval) = match value.set {
2278            BasinMetricSet::Storage(args) => (
2279                api::metrics::BasinMetricSet::Storage,
2280                args.start,
2281                args.end,
2282                None,
2283            ),
2284            BasinMetricSet::AppendOps(args) => (
2285                api::metrics::BasinMetricSet::AppendOps,
2286                args.start,
2287                args.end,
2288                args.interval,
2289            ),
2290            BasinMetricSet::ReadOps(args) => (
2291                api::metrics::BasinMetricSet::ReadOps,
2292                args.start,
2293                args.end,
2294                args.interval,
2295            ),
2296            BasinMetricSet::ReadThroughput(args) => (
2297                api::metrics::BasinMetricSet::ReadThroughput,
2298                args.start,
2299                args.end,
2300                args.interval,
2301            ),
2302            BasinMetricSet::AppendThroughput(args) => (
2303                api::metrics::BasinMetricSet::AppendThroughput,
2304                args.start,
2305                args.end,
2306                args.interval,
2307            ),
2308            BasinMetricSet::BasinOps(args) => (
2309                api::metrics::BasinMetricSet::BasinOps,
2310                args.start,
2311                args.end,
2312                args.interval,
2313            ),
2314        };
2315        (
2316            value.name,
2317            api::metrics::BasinMetricSetRequest {
2318                set,
2319                start: Some(start),
2320                end: Some(end),
2321                interval: interval.map(Into::into),
2322            },
2323        )
2324    }
2325}
2326
2327#[derive(Debug, Clone, Copy)]
2328/// Stream metric set to return.
2329pub enum StreamMetricSet {
2330    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2331    /// with one observed value for each minute over the requested time range.
2332    Storage(TimeRange),
2333}
2334
2335#[derive(Debug, Clone)]
2336#[non_exhaustive]
2337/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2338pub struct GetStreamMetricsInput {
2339    /// Basin name.
2340    pub basin_name: BasinName,
2341    /// Stream name.
2342    pub stream_name: StreamName,
2343    /// Metric set to return.
2344    pub set: StreamMetricSet,
2345}
2346
2347impl GetStreamMetricsInput {
2348    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2349    /// set.
2350    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2351        Self {
2352            basin_name,
2353            stream_name,
2354            set,
2355        }
2356    }
2357}
2358
2359impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2360    fn from(value: GetStreamMetricsInput) -> Self {
2361        let (set, start, end, interval) = match value.set {
2362            StreamMetricSet::Storage(args) => (
2363                api::metrics::StreamMetricSet::Storage,
2364                args.start,
2365                args.end,
2366                None,
2367            ),
2368        };
2369        (
2370            value.basin_name,
2371            value.stream_name,
2372            api::metrics::StreamMetricSetRequest {
2373                set,
2374                start: Some(start),
2375                end: Some(end),
2376                interval,
2377            },
2378        )
2379    }
2380}
2381
2382#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2383/// Unit in which metric values are measured.
2384pub enum MetricUnit {
2385    /// Size in bytes.
2386    Bytes,
2387    /// Number of operations.
2388    Operations,
2389}
2390
2391impl From<api::metrics::MetricUnit> for MetricUnit {
2392    fn from(value: api::metrics::MetricUnit) -> Self {
2393        match value {
2394            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2395            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2396        }
2397    }
2398}
2399
2400#[derive(Debug, Clone)]
2401#[non_exhaustive]
2402/// Single named value.
2403pub struct ScalarMetric {
2404    /// Metric name.
2405    pub name: String,
2406    /// Unit for the metric value.
2407    pub unit: MetricUnit,
2408    /// Metric value.
2409    pub value: f64,
2410}
2411
2412#[derive(Debug, Clone)]
2413#[non_exhaustive]
2414/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2415/// specified interval.
2416pub struct AccumulationMetric {
2417    /// Timeseries name.
2418    pub name: String,
2419    /// Unit for the accumulated values.
2420    pub unit: MetricUnit,
2421    /// The interval at which datapoints are accumulated.
2422    pub interval: TimeseriesInterval,
2423    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2424    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2425    /// one `interval`.
2426    pub values: Vec<(u32, f64)>,
2427}
2428
2429#[derive(Debug, Clone)]
2430#[non_exhaustive]
2431/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2432pub struct GaugeMetric {
2433    /// Timeseries name.
2434    pub name: String,
2435    /// Unit for the instantaneous values.
2436    pub unit: MetricUnit,
2437    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2438    /// instant of the `timestamp` (in Unix epoch seconds).
2439    pub values: Vec<(u32, f64)>,
2440}
2441
2442#[derive(Debug, Clone)]
2443#[non_exhaustive]
2444/// Set of string labels.
2445pub struct LabelMetric {
2446    /// Label name.
2447    pub name: String,
2448    /// Label values.
2449    pub values: Vec<String>,
2450}
2451
2452#[derive(Debug, Clone)]
2453/// Individual metric in a returned metric set.
2454pub enum Metric {
2455    /// Single named value.
2456    Scalar(ScalarMetric),
2457    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2458    /// specified interval.
2459    Accumulation(AccumulationMetric),
2460    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2461    Gauge(GaugeMetric),
2462    /// Set of string labels.
2463    Label(LabelMetric),
2464}
2465
2466impl From<api::metrics::Metric> for Metric {
2467    fn from(value: api::metrics::Metric) -> Self {
2468        match value {
2469            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2470                name: sm.name.into(),
2471                unit: sm.unit.into(),
2472                value: sm.value,
2473            }),
2474            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2475                name: am.name.into(),
2476                unit: am.unit.into(),
2477                interval: am.interval.into(),
2478                values: am.values,
2479            }),
2480            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2481                name: gm.name.into(),
2482                unit: gm.unit.into(),
2483                values: gm.values,
2484            }),
2485            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2486                name: lm.name.into(),
2487                values: lm.values,
2488            }),
2489        }
2490    }
2491}
2492
2493#[derive(Debug, Clone, Default)]
2494#[non_exhaustive]
2495/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2496pub struct ListStreamsInput {
2497    /// Filter streams whose names begin with this value.
2498    ///
2499    /// Defaults to `""`.
2500    pub prefix: StreamNamePrefix,
2501    /// Filter streams whose names are lexicographically greater than this value.
2502    ///
2503    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2504    ///
2505    /// Defaults to `""`.
2506    pub start_after: StreamNameStartAfter,
2507    /// Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2508    ///
2509    /// Defaults to `1000`.
2510    pub limit: Option<usize>,
2511}
2512
2513impl ListStreamsInput {
2514    /// Create a new [`ListStreamsInput`] with default values.
2515    pub fn new() -> Self {
2516        Self::default()
2517    }
2518
2519    /// Set the prefix used to filter streams whose names begin with this value.
2520    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2521        Self { prefix, ..self }
2522    }
2523
2524    /// Set the value used to filter streams whose names are lexicographically greater than this
2525    /// value.
2526    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2527        Self {
2528            start_after,
2529            ..self
2530        }
2531    }
2532
2533    /// Set the limit on number of streams to return in a page.
2534    pub fn with_limit(self, limit: usize) -> Self {
2535        Self {
2536            limit: Some(limit),
2537            ..self
2538        }
2539    }
2540}
2541
2542impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2543    fn from(value: ListStreamsInput) -> Self {
2544        Self {
2545            prefix: Some(value.prefix),
2546            start_after: Some(value.start_after),
2547            limit: value.limit,
2548        }
2549    }
2550}
2551
2552#[derive(Debug, Clone, Default)]
2553/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2554pub struct ListAllStreamsInput {
2555    /// Filter streams whose names begin with this value.
2556    ///
2557    /// Defaults to `""`.
2558    pub prefix: StreamNamePrefix,
2559    /// Filter streams whose names are lexicographically greater than this value.
2560    ///
2561    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2562    ///
2563    /// Defaults to `""`.
2564    pub start_after: StreamNameStartAfter,
2565    /// Whether to include streams that are being deleted.
2566    ///
2567    /// Defaults to `false`.
2568    pub include_deleted: bool,
2569}
2570
2571impl ListAllStreamsInput {
2572    /// Create a new [`ListAllStreamsInput`] with default values.
2573    pub fn new() -> Self {
2574        Self::default()
2575    }
2576
2577    /// Set the prefix used to filter streams whose names begin with this value.
2578    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2579        Self { prefix, ..self }
2580    }
2581
2582    /// Set the value used to filter streams whose names are lexicographically greater than this
2583    /// value.
2584    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2585        Self {
2586            start_after,
2587            ..self
2588        }
2589    }
2590
2591    /// Set whether to include streams that are being deleted.
2592    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2593        Self {
2594            include_deleted,
2595            ..self
2596        }
2597    }
2598}
2599
2600#[derive(Debug, Clone, PartialEq, Eq)]
2601#[non_exhaustive]
2602/// Stream information.
2603pub struct StreamInfo {
2604    /// Stream name.
2605    pub name: StreamName,
2606    /// Creation time.
2607    pub created_at: S2DateTime,
2608    /// Deletion time if the stream is being deleted.
2609    pub deleted_at: Option<S2DateTime>,
2610    /// Encryption algorithm for this stream, if encryption is enabled.
2611    pub cipher: Option<EncryptionAlgorithm>,
2612}
2613
2614impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2615    type Error = ValidationError;
2616
2617    fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2618        Ok(Self {
2619            name: value.name,
2620            created_at: value.created_at.try_into()?,
2621            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2622            cipher: value.cipher.map(Into::into),
2623        })
2624    }
2625}
2626
2627#[derive(Debug, Clone)]
2628#[non_exhaustive]
2629/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2630pub struct CreateStreamInput {
2631    /// Stream name.
2632    pub name: StreamName,
2633    /// Configuration for the stream.
2634    ///
2635    /// See [`StreamConfig`] for defaults.
2636    pub config: Option<StreamConfig>,
2637    idempotency_token: String,
2638}
2639
2640impl CreateStreamInput {
2641    /// Create a new [`CreateStreamInput`] with the given stream name.
2642    pub fn new(name: StreamName) -> Self {
2643        Self {
2644            name,
2645            config: None,
2646            idempotency_token: idempotency_token(),
2647        }
2648    }
2649
2650    /// Set the configuration for the stream.
2651    pub fn with_config(self, config: StreamConfig) -> Self {
2652        Self {
2653            config: Some(config),
2654            ..self
2655        }
2656    }
2657}
2658
2659impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2660    fn from(value: CreateStreamInput) -> Self {
2661        (
2662            api::stream::CreateStreamRequest {
2663                stream: value.name,
2664                config: value.config.map(Into::into),
2665            },
2666            value.idempotency_token,
2667        )
2668    }
2669}
2670
2671#[derive(Debug, Clone)]
2672#[non_exhaustive]
2673/// Input for [`ensure_stream`](crate::S2Basin::ensure_stream)
2674/// operation.
2675#[doc(hidden)]
2676#[cfg(feature = "_hidden")]
2677pub struct EnsureStreamInput {
2678    /// Stream name.
2679    pub name: StreamName,
2680    /// Desired stream configuration before basin defaults are applied.
2681    ///
2682    /// Missing fields are filled from the current basin default stream configuration and then
2683    /// global defaults before comparing or writing. If `None`, the stream is ensured using those
2684    /// defaults.
2685    config: Option<api::config::StreamConfig>,
2686}
2687
2688#[cfg(feature = "_hidden")]
2689impl EnsureStreamInput {
2690    /// Create a new [`EnsureStreamInput`] with the given stream name.
2691    pub fn new(name: StreamName) -> Self {
2692        Self { name, config: None }
2693    }
2694
2695    /// Set the desired configuration for the stream.
2696    pub fn with_config(self, config: impl Into<s2_api::v1::config::StreamConfig>) -> Self {
2697        Self {
2698            config: Some(config.into()),
2699            ..self
2700        }
2701    }
2702}
2703
2704#[cfg(feature = "_hidden")]
2705impl From<EnsureStreamInput> for (StreamName, Option<api::config::StreamConfig>) {
2706    fn from(value: EnsureStreamInput) -> Self {
2707        (value.name, value.config)
2708    }
2709}
2710
2711#[derive(Debug, Clone)]
2712#[non_exhaustive]
2713/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2714pub struct DeleteStreamInput {
2715    /// Stream name.
2716    pub name: StreamName,
2717    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2718    pub ignore_not_found: bool,
2719}
2720
2721impl DeleteStreamInput {
2722    /// Create a new [`DeleteStreamInput`] with the given stream name.
2723    pub fn new(name: StreamName) -> Self {
2724        Self {
2725            name,
2726            ignore_not_found: false,
2727        }
2728    }
2729
2730    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2731    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2732        Self {
2733            ignore_not_found,
2734            ..self
2735        }
2736    }
2737}
2738
2739#[derive(Debug, Clone)]
2740#[non_exhaustive]
2741/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2742pub struct ReconfigureStreamInput {
2743    /// Stream name.
2744    pub name: StreamName,
2745    /// Reconfiguration for [`StreamConfig`].
2746    pub config: StreamReconfiguration,
2747}
2748
2749impl ReconfigureStreamInput {
2750    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2751    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2752        Self { name, config }
2753    }
2754}
2755
2756#[derive(Debug, Clone, PartialEq, Eq)]
2757/// Token for fencing appends to a stream.
2758///
2759/// **Note:** It must not exceed 36 bytes in length.
2760///
2761/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2762pub struct FencingToken(String);
2763
2764impl FencingToken {
2765    /// Generate a random alphanumeric fencing token of `n` bytes.
2766    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2767        rand::rng()
2768            .sample_iter(&rand::distr::Alphanumeric)
2769            .take(n)
2770            .map(char::from)
2771            .collect::<String>()
2772            .parse()
2773    }
2774}
2775
2776impl FromStr for FencingToken {
2777    type Err = ValidationError;
2778
2779    fn from_str(s: &str) -> Result<Self, Self::Err> {
2780        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2781            return Err(ValidationError(format!(
2782                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2783            )));
2784        }
2785        Ok(FencingToken(s.to_string()))
2786    }
2787}
2788
2789impl std::fmt::Display for FencingToken {
2790    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2791        write!(f, "{}", self.0)
2792    }
2793}
2794
2795impl Deref for FencingToken {
2796    type Target = str;
2797
2798    fn deref(&self) -> &Self::Target {
2799        &self.0
2800    }
2801}
2802
2803#[derive(Debug, Clone, Copy, PartialEq)]
2804#[non_exhaustive]
2805/// A position in a stream.
2806pub struct StreamPosition {
2807    /// Sequence number assigned by the service.
2808    pub seq_num: u64,
2809    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2810    /// User-specified timestamps are passed through as-is.
2811    pub timestamp: u64,
2812}
2813
2814impl std::fmt::Display for StreamPosition {
2815    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2816        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2817    }
2818}
2819
2820impl From<api::stream::proto::StreamPosition> for StreamPosition {
2821    fn from(value: api::stream::proto::StreamPosition) -> Self {
2822        Self {
2823            seq_num: value.seq_num,
2824            timestamp: value.timestamp,
2825        }
2826    }
2827}
2828
2829impl From<api::stream::StreamPosition> for StreamPosition {
2830    fn from(value: api::stream::StreamPosition) -> Self {
2831        Self {
2832            seq_num: value.seq_num,
2833            timestamp: value.timestamp,
2834        }
2835    }
2836}
2837
2838#[derive(Debug, Clone, PartialEq)]
2839#[non_exhaustive]
2840/// A name-value pair.
2841pub struct Header {
2842    /// Name.
2843    pub name: Bytes,
2844    /// Value.
2845    pub value: Bytes,
2846}
2847
2848impl Header {
2849    /// Create a new [`Header`] with the given name and value.
2850    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2851        Self {
2852            name: name.into(),
2853            value: value.into(),
2854        }
2855    }
2856}
2857
2858impl From<Header> for api::stream::proto::Header {
2859    fn from(value: Header) -> Self {
2860        Self {
2861            name: value.name,
2862            value: value.value,
2863        }
2864    }
2865}
2866
2867impl From<api::stream::proto::Header> for Header {
2868    fn from(value: api::stream::proto::Header) -> Self {
2869        Self {
2870            name: value.name,
2871            value: value.value,
2872        }
2873    }
2874}
2875
2876#[derive(Debug, Clone, PartialEq)]
2877/// A record to append.
2878pub struct AppendRecord {
2879    body: Bytes,
2880    headers: Vec<Header>,
2881    timestamp: Option<u64>,
2882}
2883
2884impl AppendRecord {
2885    fn validate(self) -> Result<Self, ValidationError> {
2886        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2887            Err(ValidationError(format!(
2888                "metered_bytes: {} exceeds {}",
2889                self.metered_bytes(),
2890                RECORD_BATCH_MAX.bytes
2891            )))
2892        } else {
2893            Ok(self)
2894        }
2895    }
2896
2897    /// Create a new [`AppendRecord`] with the given record body.
2898    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2899        let record = Self {
2900            body: body.into(),
2901            headers: Vec::default(),
2902            timestamp: None,
2903        };
2904        record.validate()
2905    }
2906
2907    /// Set the headers for this record.
2908    pub fn with_headers(
2909        self,
2910        headers: impl IntoIterator<Item = Header>,
2911    ) -> Result<Self, ValidationError> {
2912        let record = Self {
2913            headers: headers.into_iter().collect(),
2914            ..self
2915        };
2916        record.validate()
2917    }
2918
2919    /// Set the timestamp for this record.
2920    ///
2921    /// Precise semantics depend on [`StreamConfig::timestamping`].
2922    pub fn with_timestamp(self, timestamp: u64) -> Self {
2923        Self {
2924            timestamp: Some(timestamp),
2925            ..self
2926        }
2927    }
2928
2929    /// Get the body of this record.
2930    pub fn body(&self) -> &[u8] {
2931        &self.body
2932    }
2933
2934    /// Get the headers of this record.
2935    pub fn headers(&self) -> &[Header] {
2936        &self.headers
2937    }
2938
2939    /// Get the timestamp of this record.
2940    pub fn timestamp(&self) -> Option<u64> {
2941        self.timestamp
2942    }
2943}
2944
2945impl From<AppendRecord> for api::stream::proto::AppendRecord {
2946    fn from(value: AppendRecord) -> Self {
2947        Self {
2948            timestamp: value.timestamp,
2949            headers: value.headers.into_iter().map(Into::into).collect(),
2950            body: value.body,
2951        }
2952    }
2953}
2954
2955/// Metered byte size calculation.
2956///
2957/// Formula for a record:
2958/// ```text
2959/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2960/// ```
2961pub trait MeteredBytes {
2962    /// Returns the metered byte size.
2963    fn metered_bytes(&self) -> usize;
2964}
2965
2966macro_rules! metered_bytes_impl {
2967    ($ty:ty) => {
2968        impl MeteredBytes for $ty {
2969            fn metered_bytes(&self) -> usize {
2970                8 + (2 * self.headers.len())
2971                    + self
2972                        .headers
2973                        .iter()
2974                        .map(|h| h.name.len() + h.value.len())
2975                        .sum::<usize>()
2976                    + self.body.len()
2977            }
2978        }
2979    };
2980}
2981
2982metered_bytes_impl!(AppendRecord);
2983
2984#[derive(Debug, Clone)]
2985/// A batch of records to append atomically.
2986///
2987/// **Note:** It must contain at least `1` record and no more than `1000`.
2988/// The total size of the batch must not exceed `1MiB` in metered bytes.
2989///
2990/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
2991/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
2992/// that takes care of the abovementioned constraints.
2993pub struct AppendRecordBatch {
2994    records: Vec<AppendRecord>,
2995    metered_bytes: usize,
2996}
2997
2998impl AppendRecordBatch {
2999    pub(crate) fn with_capacity(capacity: usize) -> Self {
3000        Self {
3001            records: Vec::with_capacity(capacity),
3002            metered_bytes: 0,
3003        }
3004    }
3005
3006    pub(crate) fn push(&mut self, record: AppendRecord) {
3007        self.metered_bytes += record.metered_bytes();
3008        self.records.push(record);
3009    }
3010
3011    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
3012    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
3013    where
3014        I: IntoIterator<Item = AppendRecord>,
3015    {
3016        let mut records = Vec::new();
3017        let mut metered_bytes = 0;
3018
3019        for record in iter {
3020            metered_bytes += record.metered_bytes();
3021            records.push(record);
3022
3023            if metered_bytes > RECORD_BATCH_MAX.bytes {
3024                return Err(ValidationError(format!(
3025                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
3026                    RECORD_BATCH_MAX.bytes
3027                )));
3028            }
3029
3030            if records.len() > RECORD_BATCH_MAX.count {
3031                return Err(ValidationError(format!(
3032                    "number of records in the batch exceeds {}",
3033                    RECORD_BATCH_MAX.count
3034                )));
3035            }
3036        }
3037
3038        if records.is_empty() {
3039            return Err(ValidationError("batch is empty".into()));
3040        }
3041
3042        Ok(Self {
3043            records,
3044            metered_bytes,
3045        })
3046    }
3047}
3048
3049impl Deref for AppendRecordBatch {
3050    type Target = [AppendRecord];
3051
3052    fn deref(&self) -> &Self::Target {
3053        &self.records
3054    }
3055}
3056
3057impl MeteredBytes for AppendRecordBatch {
3058    fn metered_bytes(&self) -> usize {
3059        self.metered_bytes
3060    }
3061}
3062
3063#[derive(Debug, Clone)]
3064/// Command to signal an operation.
3065pub enum Command {
3066    /// Fence operation.
3067    Fence {
3068        /// Fencing token.
3069        fencing_token: FencingToken,
3070    },
3071    /// Trim operation.
3072    Trim {
3073        /// Trim point.
3074        trim_point: u64,
3075    },
3076}
3077
3078#[derive(Debug, Clone)]
3079#[non_exhaustive]
3080/// Command record for signaling operations to the service.
3081///
3082/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
3083pub struct CommandRecord {
3084    /// Command to signal an operation.
3085    pub command: Command,
3086    /// Timestamp for this record.
3087    pub timestamp: Option<u64>,
3088}
3089
3090impl CommandRecord {
3091    const FENCE: &[u8] = b"fence";
3092    const TRIM: &[u8] = b"trim";
3093
3094    /// Create a fence command record with the given fencing token.
3095    ///
3096    /// Fencing is strongly consistent, and subsequent appends that specify a
3097    /// fencing token will fail if it does not match.
3098    pub fn fence(fencing_token: FencingToken) -> Self {
3099        Self {
3100            command: Command::Fence { fencing_token },
3101            timestamp: None,
3102        }
3103    }
3104
3105    /// Create a trim command record with the given trim point.
3106    ///
3107    /// Trim point is the desired earliest sequence number for the stream.
3108    ///
3109    /// Trimming is eventually consistent, and trimmed records may be visible
3110    /// for a brief period.
3111    pub fn trim(trim_point: u64) -> Self {
3112        Self {
3113            command: Command::Trim { trim_point },
3114            timestamp: None,
3115        }
3116    }
3117
3118    /// Set the timestamp for this record.
3119    pub fn with_timestamp(self, timestamp: u64) -> Self {
3120        Self {
3121            timestamp: Some(timestamp),
3122            ..self
3123        }
3124    }
3125}
3126
3127impl From<CommandRecord> for AppendRecord {
3128    fn from(value: CommandRecord) -> Self {
3129        let (header_value, body) = match value.command {
3130            Command::Fence { fencing_token } => (
3131                CommandRecord::FENCE,
3132                Bytes::copy_from_slice(fencing_token.as_bytes()),
3133            ),
3134            Command::Trim { trim_point } => (
3135                CommandRecord::TRIM,
3136                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3137            ),
3138        };
3139        Self {
3140            body,
3141            headers: vec![Header::new("", header_value)],
3142            timestamp: value.timestamp,
3143        }
3144    }
3145}
3146
3147#[derive(Debug, Clone)]
3148#[non_exhaustive]
3149/// Input for [`append`](crate::S2Stream::append) operation and
3150/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3151pub struct AppendInput {
3152    /// Batch of records to append atomically.
3153    pub records: AppendRecordBatch,
3154    /// Expected sequence number for the first record in the batch.
3155    ///
3156    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3157    pub match_seq_num: Option<u64>,
3158    /// Fencing token to match against the stream's current fencing token.
3159    ///
3160    /// If unspecified, no matching is performed. If specified and mismatched,
3161    /// the append fails. A stream defaults to `""` as its fencing token.
3162    pub fencing_token: Option<FencingToken>,
3163}
3164
3165impl AppendInput {
3166    /// Create a new [`AppendInput`] with the given batch of records.
3167    pub fn new(records: AppendRecordBatch) -> Self {
3168        Self {
3169            records,
3170            match_seq_num: None,
3171            fencing_token: None,
3172        }
3173    }
3174
3175    /// Set the expected sequence number for the first record in the batch.
3176    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3177        Self {
3178            match_seq_num: Some(match_seq_num),
3179            ..self
3180        }
3181    }
3182
3183    /// Set the fencing token to match against the stream's current fencing token.
3184    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3185        Self {
3186            fencing_token: Some(fencing_token),
3187            ..self
3188        }
3189    }
3190}
3191
3192impl From<AppendInput> for api::stream::proto::AppendInput {
3193    fn from(value: AppendInput) -> Self {
3194        Self {
3195            records: value.records.iter().cloned().map(Into::into).collect(),
3196            match_seq_num: value.match_seq_num,
3197            fencing_token: value.fencing_token.map(|t| t.to_string()),
3198        }
3199    }
3200}
3201
3202#[derive(Debug, Clone, PartialEq)]
3203#[non_exhaustive]
3204/// Acknowledgement for an [`AppendInput`].
3205pub struct AppendAck {
3206    /// Sequence number and timestamp of the first record that was appended.
3207    pub start: StreamPosition,
3208    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3209    /// that was appended.
3210    ///
3211    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3212    /// appended.
3213    pub end: StreamPosition,
3214    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3215    /// the last record on the stream.
3216    ///
3217    /// This can be greater than the `end` position in case of concurrent appends.
3218    pub tail: StreamPosition,
3219}
3220
3221impl From<api::stream::proto::AppendAck> for AppendAck {
3222    fn from(value: api::stream::proto::AppendAck) -> Self {
3223        Self {
3224            start: value.start.unwrap_or_default().into(),
3225            end: value.end.unwrap_or_default().into(),
3226            tail: value.tail.unwrap_or_default().into(),
3227        }
3228    }
3229}
3230
3231#[derive(Debug, Clone, Copy)]
3232/// Starting position for reading from a stream.
3233pub enum ReadFrom {
3234    /// Read from this sequence number.
3235    SeqNum(u64),
3236    /// Read from this timestamp.
3237    Timestamp(u64),
3238    /// Read from N records before the tail.
3239    TailOffset(u64),
3240}
3241
3242impl Default for ReadFrom {
3243    fn default() -> Self {
3244        Self::SeqNum(0)
3245    }
3246}
3247
3248#[derive(Debug, Default, Clone)]
3249#[non_exhaustive]
3250/// Where to start reading.
3251pub struct ReadStart {
3252    /// Starting position.
3253    ///
3254    /// Defaults to reading from sequence number `0`.
3255    pub from: ReadFrom,
3256    /// Whether to start from tail if the requested starting position is beyond it.
3257    ///
3258    /// Defaults to `false` (errors if position is beyond tail).
3259    pub clamp_to_tail: bool,
3260}
3261
3262impl ReadStart {
3263    /// Create a new [`ReadStart`] with default values.
3264    pub fn new() -> Self {
3265        Self::default()
3266    }
3267
3268    /// Set the starting position.
3269    pub fn with_from(self, from: ReadFrom) -> Self {
3270        Self { from, ..self }
3271    }
3272
3273    /// Set whether to start from tail if the requested starting position is beyond it.
3274    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3275        Self {
3276            clamp_to_tail,
3277            ..self
3278        }
3279    }
3280}
3281
3282impl From<ReadStart> for api::stream::ReadStart {
3283    fn from(value: ReadStart) -> Self {
3284        let (seq_num, timestamp, tail_offset) = match value.from {
3285            ReadFrom::SeqNum(n) => (Some(n), None, None),
3286            ReadFrom::Timestamp(t) => (None, Some(t), None),
3287            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3288        };
3289        Self {
3290            seq_num,
3291            timestamp,
3292            tail_offset,
3293            clamp: if value.clamp_to_tail {
3294                Some(true)
3295            } else {
3296                None
3297            },
3298        }
3299    }
3300}
3301
3302#[derive(Debug, Clone, Default)]
3303#[non_exhaustive]
3304/// Limits on how much to read.
3305pub struct ReadLimits {
3306    /// Limit on number of records.
3307    ///
3308    /// Defaults to `1000` for non-streaming read.
3309    pub count: Option<usize>,
3310    /// Limit on total metered bytes of records.
3311    ///
3312    /// Defaults to `1MiB` for non-streaming read.
3313    pub bytes: Option<usize>,
3314}
3315
3316impl ReadLimits {
3317    /// Create a new [`ReadLimits`] with default values.
3318    pub fn new() -> Self {
3319        Self::default()
3320    }
3321
3322    /// Set the limit on number of records.
3323    pub fn with_count(self, count: usize) -> Self {
3324        Self {
3325            count: Some(count),
3326            ..self
3327        }
3328    }
3329
3330    /// Set the limit on total metered bytes of records.
3331    pub fn with_bytes(self, bytes: usize) -> Self {
3332        Self {
3333            bytes: Some(bytes),
3334            ..self
3335        }
3336    }
3337}
3338
3339#[derive(Debug, Clone, Default)]
3340#[non_exhaustive]
3341/// When to stop reading.
3342pub struct ReadStop {
3343    /// Limits on how much to read.
3344    ///
3345    /// See [`ReadLimits`] for defaults.
3346    pub limits: ReadLimits,
3347    /// Timestamp at which to stop (exclusive).
3348    ///
3349    /// Defaults to `None`.
3350    pub until: Option<RangeTo<u64>>,
3351    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3352    /// seconds for [`read`](crate::S2Stream::read).
3353    ///
3354    /// Defaults to:
3355    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3356    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3357    ///   is specified.
3358    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3359    ///   `until` is specified.
3360    pub wait: Option<u32>,
3361}
3362
3363impl ReadStop {
3364    /// Create a new [`ReadStop`] with default values.
3365    pub fn new() -> Self {
3366        Self::default()
3367    }
3368
3369    /// Set the limits on how much to read.
3370    pub fn with_limits(self, limits: ReadLimits) -> Self {
3371        Self { limits, ..self }
3372    }
3373
3374    /// Set the timestamp at which to stop (exclusive).
3375    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3376        Self {
3377            until: Some(until),
3378            ..self
3379        }
3380    }
3381
3382    /// Set the duration in seconds to wait for new records before stopping.
3383    pub fn with_wait(self, wait: u32) -> Self {
3384        Self {
3385            wait: Some(wait),
3386            ..self
3387        }
3388    }
3389}
3390
3391impl From<ReadStop> for api::stream::ReadEnd {
3392    fn from(value: ReadStop) -> Self {
3393        Self {
3394            count: value.limits.count,
3395            bytes: value.limits.bytes,
3396            until: value.until.map(|r| r.end),
3397            wait: value.wait,
3398        }
3399    }
3400}
3401
3402#[derive(Debug, Clone, Default)]
3403#[non_exhaustive]
3404/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3405/// operations.
3406pub struct ReadInput {
3407    /// Where to start reading.
3408    ///
3409    /// See [`ReadStart`] for defaults.
3410    pub start: ReadStart,
3411    /// When to stop reading.
3412    ///
3413    /// See [`ReadStop`] for defaults.
3414    pub stop: ReadStop,
3415    /// Whether to filter out command records from the stream when reading.
3416    ///
3417    /// Defaults to `false`.
3418    pub ignore_command_records: bool,
3419}
3420
3421impl ReadInput {
3422    /// Create a new [`ReadInput`] with default values.
3423    pub fn new() -> Self {
3424        Self::default()
3425    }
3426
3427    /// Set where to start reading.
3428    pub fn with_start(self, start: ReadStart) -> Self {
3429        Self { start, ..self }
3430    }
3431
3432    /// Set when to stop reading.
3433    pub fn with_stop(self, stop: ReadStop) -> Self {
3434        Self { stop, ..self }
3435    }
3436
3437    /// Set whether to filter out command records from the stream when reading.
3438    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3439        Self {
3440            ignore_command_records,
3441            ..self
3442        }
3443    }
3444}
3445
3446#[derive(Debug, Clone)]
3447#[non_exhaustive]
3448/// Record that is durably sequenced on a stream.
3449pub struct SequencedRecord {
3450    /// Sequence number assigned to this record.
3451    pub seq_num: u64,
3452    /// Body of this record.
3453    pub body: Bytes,
3454    /// Headers for this record.
3455    pub headers: Vec<Header>,
3456    /// Timestamp for this record.
3457    pub timestamp: u64,
3458}
3459
3460impl SequencedRecord {
3461    /// Whether this is a command record.
3462    pub fn is_command_record(&self) -> bool {
3463        self.headers.len() == 1 && *self.headers[0].name == *b""
3464    }
3465}
3466
3467impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3468    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3469        Self {
3470            seq_num: value.seq_num,
3471            body: value.body,
3472            headers: value.headers.into_iter().map(Into::into).collect(),
3473            timestamp: value.timestamp,
3474        }
3475    }
3476}
3477
3478metered_bytes_impl!(SequencedRecord);
3479
3480#[derive(Debug, Clone)]
3481#[non_exhaustive]
3482/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3483/// [`read_session`](crate::S2Stream::read_session).
3484pub struct ReadBatch {
3485    /// Records that are durably sequenced on the stream.
3486    ///
3487    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3488    /// - the [`stop condition`](ReadInput::stop) was already met, or
3489    /// - all records in the batch were command records and
3490    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3491    pub records: Vec<SequencedRecord>,
3492    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3493    /// the last record.
3494    ///
3495    /// It will only be present when reading recent records.
3496    pub tail: Option<StreamPosition>,
3497}
3498
3499impl ReadBatch {
3500    pub(crate) fn from_api(batch: api::stream::proto::ReadBatch) -> Self {
3501        Self {
3502            records: batch.records.into_iter().map(Into::into).collect(),
3503            tail: batch.tail.map(Into::into),
3504        }
3505    }
3506}
3507
3508/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3509pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3510
3511#[derive(Debug, Clone, thiserror::Error)]
3512/// Why an append condition check failed.
3513pub enum AppendConditionFailed {
3514    #[error("fencing token mismatch, expected: {0}")]
3515    /// Fencing token did not match. Contains the expected fencing token.
3516    FencingTokenMismatch(FencingToken),
3517    #[error("sequence number mismatch, expected: {0}")]
3518    /// Sequence number did not match. Contains the expected sequence number.
3519    SeqNumMismatch(u64),
3520}
3521
3522impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3523    fn from(value: api::stream::AppendConditionFailed) -> Self {
3524        match value {
3525            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3526                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3527            }
3528            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3529                AppendConditionFailed::SeqNumMismatch(seq)
3530            }
3531        }
3532    }
3533}
3534
3535#[derive(Debug, Clone, thiserror::Error)]
3536/// Errors from S2 operations.
3537pub enum S2Error {
3538    #[error("{0}")]
3539    /// Client-side error.
3540    Client(String),
3541    #[error(transparent)]
3542    /// Validation error.
3543    Validation(#[from] ValidationError),
3544    #[error("{0}")]
3545    /// Append condition check failed. Contains the failure reason.
3546    AppendConditionFailed(AppendConditionFailed),
3547    #[error("read from an unwritten position. current tail: {0}")]
3548    /// Read from an unwritten position. Contains the current tail.
3549    ReadUnwritten(StreamPosition),
3550    #[error("{0}")]
3551    /// Other server-side error.
3552    Server(ErrorResponse),
3553}
3554
3555impl From<ApiError> for S2Error {
3556    fn from(err: ApiError) -> Self {
3557        match err {
3558            ApiError::ReadUnwritten(tail_response) => {
3559                Self::ReadUnwritten(tail_response.tail.into())
3560            }
3561            ApiError::AppendConditionFailed(condition_failed) => {
3562                Self::AppendConditionFailed(condition_failed.into())
3563            }
3564            ApiError::Server(_, response) => Self::Server(response.into()),
3565            other => Self::Client(other.to_string()),
3566        }
3567    }
3568}
3569
3570#[derive(Debug, Clone, thiserror::Error)]
3571#[error("{code}: {message}")]
3572#[non_exhaustive]
3573/// Error response from S2 server.
3574pub struct ErrorResponse {
3575    /// Error code.
3576    pub code: String,
3577    /// Error message.
3578    pub message: String,
3579}
3580
3581impl From<ApiErrorResponse> for ErrorResponse {
3582    fn from(response: ApiErrorResponse) -> Self {
3583        Self {
3584            code: response.code,
3585            message: response.message,
3586        }
3587    }
3588}
3589
3590fn idempotency_token() -> String {
3591    uuid::Uuid::new_v4().simple().to_string()
3592}