Skip to main content

s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::RngExt;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Stream name.
42///
43/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
44pub use s2_common::types::stream::StreamName;
45/// See [`ListStreamsInput::prefix`].
46pub use s2_common::types::stream::StreamNamePrefix;
47/// See [`ListStreamsInput::start_after`].
48pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57/// An RFC 3339 datetime.
58///
59/// It can be created in either of the following ways:
60/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
61/// - Convert from [`time::OffsetDateTime`] using [`TryFrom`]/[`TryInto`].
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl TryFrom<time::OffsetDateTime> for S2DateTime {
66    type Error = ValidationError;
67
68    fn try_from(dt: time::OffsetDateTime) -> Result<Self, Self::Error> {
69        dt.format(&time::format_description::well_known::Rfc3339)
70            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))?;
71        Ok(Self(dt))
72    }
73}
74
75impl From<S2DateTime> for time::OffsetDateTime {
76    fn from(dt: S2DateTime) -> Self {
77        dt.0
78    }
79}
80
81impl FromStr for S2DateTime {
82    type Err = ValidationError;
83
84    fn from_str(s: &str) -> Result<Self, Self::Err> {
85        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
86            .map(Self)
87            .map_err(|e| ValidationError(format!("not a valid RFC 3339 datetime: {e}")))
88    }
89}
90
91impl fmt::Display for S2DateTime {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        write!(
94            f,
95            "{}",
96            self.0
97                .format(&time::format_description::well_known::Rfc3339)
98                .expect("RFC3339 formatting should not fail for S2DateTime")
99        )
100    }
101}
102
103/// Authority for connecting to an S2 basin.
104#[derive(Debug, Clone, PartialEq)]
105pub(crate) enum BasinAuthority {
106    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
107    ParentZone(Authority),
108    /// Direct cell authority. Basin is expected to be hosted by this cell.
109    Direct(Authority),
110}
111
112/// Account endpoint.
113#[derive(Debug, Clone)]
114pub struct AccountEndpoint {
115    scheme: Scheme,
116    authority: Authority,
117}
118
119impl AccountEndpoint {
120    /// Create a new [`AccountEndpoint`] with the given endpoint.
121    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
122        endpoint.parse()
123    }
124}
125
126impl FromStr for AccountEndpoint {
127    type Err = ValidationError;
128
129    fn from_str(s: &str) -> Result<Self, Self::Err> {
130        let (scheme, authority) = match s.find("://") {
131            Some(idx) => {
132                let scheme: Scheme = s[..idx]
133                    .parse()
134                    .map_err(|_| "invalid account endpoint scheme".to_string())?;
135                (scheme, &s[idx + 3..])
136            }
137            None => (Scheme::HTTPS, s),
138        };
139        Ok(Self {
140            scheme,
141            authority: authority
142                .parse()
143                .map_err(|e| format!("invalid account endpoint authority: {e}"))?,
144        })
145    }
146}
147
148/// Basin endpoint.
149#[derive(Debug, Clone)]
150pub struct BasinEndpoint {
151    scheme: Scheme,
152    authority: BasinAuthority,
153}
154
155impl BasinEndpoint {
156    /// Create a new [`BasinEndpoint`] with the given endpoint.
157    pub fn new(endpoint: &str) -> Result<Self, ValidationError> {
158        endpoint.parse()
159    }
160}
161
162impl FromStr for BasinEndpoint {
163    type Err = ValidationError;
164
165    fn from_str(s: &str) -> Result<Self, Self::Err> {
166        let (scheme, authority) = match s.find("://") {
167            Some(idx) => {
168                let scheme: Scheme = s[..idx]
169                    .parse()
170                    .map_err(|_| "invalid basin endpoint scheme".to_string())?;
171                (scheme, &s[idx + 3..])
172            }
173            None => (Scheme::HTTPS, s),
174        };
175        let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
176            BasinAuthority::ParentZone(
177                authority
178                    .parse()
179                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
180            )
181        } else {
182            BasinAuthority::Direct(
183                authority
184                    .parse()
185                    .map_err(|e| format!("invalid basin endpoint authority: {e}"))?,
186            )
187        };
188        Ok(Self { scheme, authority })
189    }
190}
191
192#[derive(Debug, Clone)]
193#[non_exhaustive]
194/// Endpoints for the S2 environment.
195pub struct S2Endpoints {
196    pub(crate) scheme: Scheme,
197    pub(crate) account_authority: Authority,
198    pub(crate) basin_authority: BasinAuthority,
199}
200
201impl S2Endpoints {
202    /// Create a new [`S2Endpoints`] with the given account and basin endpoints.
203    pub fn new(
204        account_endpoint: AccountEndpoint,
205        basin_endpoint: BasinEndpoint,
206    ) -> Result<Self, ValidationError> {
207        if account_endpoint.scheme != basin_endpoint.scheme {
208            return Err("account and basin endpoints must have the same scheme".into());
209        }
210        Ok(Self {
211            scheme: account_endpoint.scheme,
212            account_authority: account_endpoint.authority,
213            basin_authority: basin_endpoint.authority,
214        })
215    }
216
217    /// Create a new [`S2Endpoints`] from environment variables.
218    ///
219    /// The following environment variables are expected to be set:
220    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
221    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
222    pub fn from_env() -> Result<Self, ValidationError> {
223        let account_endpoint: AccountEndpoint = match std::env::var("S2_ACCOUNT_ENDPOINT") {
224            Ok(endpoint) => endpoint.parse()?,
225            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
226            Err(VarError::NotUnicode(_)) => {
227                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
228            }
229        };
230
231        let basin_endpoint: BasinEndpoint = match std::env::var("S2_BASIN_ENDPOINT") {
232            Ok(endpoint) => endpoint.parse()?,
233            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
234            Err(VarError::NotUnicode(_)) => {
235                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
236            }
237        };
238
239        if account_endpoint.scheme != basin_endpoint.scheme {
240            return Err(
241                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
242            );
243        }
244
245        Ok(Self {
246            scheme: account_endpoint.scheme,
247            account_authority: account_endpoint.authority,
248            basin_authority: basin_endpoint.authority,
249        })
250    }
251
252    pub(crate) fn for_aws() -> Self {
253        Self {
254            scheme: Scheme::HTTPS,
255            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
256            basin_authority: BasinAuthority::ParentZone(
257                "b.aws.s2.dev".try_into().expect("valid authority"),
258            ),
259        }
260    }
261}
262
263#[derive(Debug, Clone, Copy)]
264/// Compression algorithm for request and response bodies.
265pub enum Compression {
266    /// No compression.
267    None,
268    /// Gzip compression.
269    Gzip,
270    /// Zstd compression.
271    Zstd,
272}
273
274impl From<Compression> for CompressionAlgorithm {
275    fn from(value: Compression) -> Self {
276        match value {
277            Compression::None => CompressionAlgorithm::None,
278            Compression::Gzip => CompressionAlgorithm::Gzip,
279            Compression::Zstd => CompressionAlgorithm::Zstd,
280        }
281    }
282}
283
284#[derive(Debug, Clone, Copy, PartialEq)]
285#[non_exhaustive]
286/// Retry policy for [`append`](crate::S2Stream::append) and
287/// [`append_session`](crate::S2Stream::append_session) operations.
288pub enum AppendRetryPolicy {
289    /// Retry all appends. Use when duplicate records on the stream are acceptable.
290    All,
291    /// Only retry appends that include [`match_seq_num`](AppendInput::match_seq_num).
292    NoSideEffects,
293}
294
295impl AppendRetryPolicy {
296    pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
297        match self {
298            Self::All => true,
299            Self::NoSideEffects => input.match_seq_num.is_some(),
300        }
301    }
302}
303
304#[derive(Debug, Clone)]
305#[non_exhaustive]
306/// Configuration for retrying requests in case of transient failures.
307///
308/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
309/// ```text
310/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
311///     jitter = rand[0, base_delay]
312///     delay  = base_delay + jitter
313/// ````
314pub struct RetryConfig {
315    /// Total number of attempts including the initial try. A value of `1` means no retries.
316    ///
317    /// Defaults to `3`.
318    pub max_attempts: NonZeroU32,
319    /// Minimum base delay for retries.
320    ///
321    /// Defaults to `100ms`.
322    pub min_base_delay: Duration,
323    /// Maximum base delay for retries.
324    ///
325    /// Defaults to `1s`.
326    pub max_base_delay: Duration,
327    /// Retry policy for [`append`](crate::S2Stream::append) and
328    /// [`append_session`](crate::S2Stream::append_session) operations.
329    ///
330    /// Defaults to `All`.
331    pub append_retry_policy: AppendRetryPolicy,
332}
333
334impl Default for RetryConfig {
335    fn default() -> Self {
336        Self {
337            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
338            min_base_delay: Duration::from_millis(100),
339            max_base_delay: Duration::from_secs(1),
340            append_retry_policy: AppendRetryPolicy::All,
341        }
342    }
343}
344
345impl RetryConfig {
346    /// Create a new [`RetryConfig`] with default settings.
347    pub fn new() -> Self {
348        Self::default()
349    }
350
351    pub(crate) fn max_retries(&self) -> u32 {
352        self.max_attempts.get() - 1
353    }
354
355    /// Set the total number of attempts including the initial try.
356    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
357        Self {
358            max_attempts,
359            ..self
360        }
361    }
362
363    /// Set the minimum base delay for retries.
364    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
365        Self {
366            min_base_delay,
367            ..self
368        }
369    }
370
371    /// Set the maximum base delay for retries.
372    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
373        Self {
374            max_base_delay,
375            ..self
376        }
377    }
378
379    /// Set the retry policy for [`append`](crate::S2Stream::append) and
380    /// [`append_session`](crate::S2Stream::append_session) operations.
381    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
382        Self {
383            append_retry_policy,
384            ..self
385        }
386    }
387}
388
389#[derive(Debug, Clone)]
390#[non_exhaustive]
391/// Configuration for [`S2`](crate::S2).
392pub struct S2Config {
393    pub(crate) access_token: SecretString,
394    pub(crate) endpoints: S2Endpoints,
395    pub(crate) connection_timeout: Duration,
396    pub(crate) request_timeout: Duration,
397    pub(crate) retry: RetryConfig,
398    pub(crate) compression: Compression,
399    pub(crate) user_agent: HeaderValue,
400    pub(crate) insecure_skip_cert_verification: bool,
401}
402
403impl S2Config {
404    /// Create a new [`S2Config`] with the given access token and default settings.
405    pub fn new(access_token: impl Into<String>) -> Self {
406        Self {
407            access_token: access_token.into().into(),
408            endpoints: S2Endpoints::for_aws(),
409            connection_timeout: Duration::from_secs(3),
410            request_timeout: Duration::from_secs(5),
411            retry: RetryConfig::new(),
412            compression: Compression::None,
413            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
414                .parse()
415                .expect("valid user agent"),
416            insecure_skip_cert_verification: false,
417        }
418    }
419
420    /// Set the S2 endpoints to connect to.
421    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
422        Self { endpoints, ..self }
423    }
424
425    /// Set the timeout for establishing a connection to the server.
426    ///
427    /// Defaults to `3s`.
428    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
429        Self {
430            connection_timeout,
431            ..self
432        }
433    }
434
435    /// Set the timeout for requests.
436    ///
437    /// Defaults to `5s`.
438    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
439        Self {
440            request_timeout,
441            ..self
442        }
443    }
444
445    /// Set the retry configuration for requests.
446    ///
447    /// See [`RetryConfig`] for defaults.
448    pub fn with_retry(self, retry: RetryConfig) -> Self {
449        Self { retry, ..self }
450    }
451
452    /// Set the compression algorithm for requests and responses.
453    ///
454    /// Defaults to no compression.
455    pub fn with_compression(self, compression: Compression) -> Self {
456        Self {
457            compression,
458            ..self
459        }
460    }
461
462    /// Skip TLS certificate verification (insecure).
463    ///
464    /// This is useful for connecting to endpoints with self-signed certificates
465    /// or certificates that don't match the hostname (similar to `curl -k`).
466    ///
467    /// # Warning
468    ///
469    /// This disables certificate verification and should only be used for
470    /// testing or development purposes. **Never use this in production.**
471    ///
472    /// Defaults to `false`.
473    pub fn with_insecure_skip_cert_verification(self, skip: bool) -> Self {
474        Self {
475            insecure_skip_cert_verification: skip,
476            ..self
477        }
478    }
479
480    #[doc(hidden)]
481    #[cfg(feature = "_hidden")]
482    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
483        let user_agent = user_agent
484            .into()
485            .parse()
486            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
487        Ok(Self { user_agent, ..self })
488    }
489}
490
491#[derive(Debug, Default, Clone, PartialEq, Eq)]
492#[non_exhaustive]
493/// A page of values.
494pub struct Page<T> {
495    /// Values in this page.
496    pub values: Vec<T>,
497    /// Whether there are more pages.
498    pub has_more: bool,
499}
500
501impl<T> Page<T> {
502    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
503        Self {
504            values: values.into(),
505            has_more,
506        }
507    }
508}
509
510#[derive(Debug, Clone, Copy, PartialEq, Eq)]
511/// Storage class for recent appends.
512pub enum StorageClass {
513    /// Standard storage class that offers append latencies under `500ms`.
514    Standard,
515    /// Express storage class that offers append latencies under `50ms`.
516    Express,
517}
518
519impl From<api::config::StorageClass> for StorageClass {
520    fn from(value: api::config::StorageClass) -> Self {
521        match value {
522            api::config::StorageClass::Standard => StorageClass::Standard,
523            api::config::StorageClass::Express => StorageClass::Express,
524        }
525    }
526}
527
528impl From<StorageClass> for api::config::StorageClass {
529    fn from(value: StorageClass) -> Self {
530        match value {
531            StorageClass::Standard => api::config::StorageClass::Standard,
532            StorageClass::Express => api::config::StorageClass::Express,
533        }
534    }
535}
536
537#[derive(Debug, Clone, Copy, PartialEq, Eq)]
538/// Retention policy for records in a stream.
539pub enum RetentionPolicy {
540    /// Age in seconds. Records older than this age are automatically trimmed.
541    Age(u64),
542    /// Records are retained indefinitely unless explicitly trimmed.
543    Infinite,
544}
545
546impl From<api::config::RetentionPolicy> for RetentionPolicy {
547    fn from(value: api::config::RetentionPolicy) -> Self {
548        match value {
549            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
550            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
551        }
552    }
553}
554
555impl From<RetentionPolicy> for api::config::RetentionPolicy {
556    fn from(value: RetentionPolicy) -> Self {
557        match value {
558            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
559            RetentionPolicy::Infinite => {
560                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
561            }
562        }
563    }
564}
565
566#[derive(Debug, Clone, Copy, PartialEq, Eq)]
567/// Timestamping mode for appends that influences how timestamps are handled.
568pub enum TimestampingMode {
569    /// Prefer client-specified timestamp if present otherwise use arrival time.
570    ClientPrefer,
571    /// Require a client-specified timestamp and reject the append if it is missing.
572    ClientRequire,
573    /// Use the arrival time and ignore any client-specified timestamp.
574    Arrival,
575}
576
577impl From<api::config::TimestampingMode> for TimestampingMode {
578    fn from(value: api::config::TimestampingMode) -> Self {
579        match value {
580            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
581            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
582            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
583        }
584    }
585}
586
587impl From<TimestampingMode> for api::config::TimestampingMode {
588    fn from(value: TimestampingMode) -> Self {
589        match value {
590            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
591            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
592            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
593        }
594    }
595}
596
597#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
598#[non_exhaustive]
599/// Configuration for timestamping behavior.
600pub struct TimestampingConfig {
601    /// Timestamping mode for appends that influences how timestamps are handled.
602    ///
603    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
604    pub mode: Option<TimestampingMode>,
605    /// Whether client-specified timestamps are allowed to exceed the arrival time.
606    ///
607    /// Defaults to `false` (client timestamps are capped at the arrival time).
608    pub uncapped: bool,
609}
610
611impl TimestampingConfig {
612    /// Create a new [`TimestampingConfig`] with default settings.
613    pub fn new() -> Self {
614        Self::default()
615    }
616
617    /// Set the timestamping mode for appends that influences how timestamps are handled.
618    pub fn with_mode(self, mode: TimestampingMode) -> Self {
619        Self {
620            mode: Some(mode),
621            ..self
622        }
623    }
624
625    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
626    pub fn with_uncapped(self, uncapped: bool) -> Self {
627        Self { uncapped, ..self }
628    }
629}
630
631impl From<api::config::TimestampingConfig> for TimestampingConfig {
632    fn from(value: api::config::TimestampingConfig) -> Self {
633        Self {
634            mode: value.mode.map(Into::into),
635            uncapped: value.uncapped.unwrap_or_default(),
636        }
637    }
638}
639
640impl From<TimestampingConfig> for api::config::TimestampingConfig {
641    fn from(value: TimestampingConfig) -> Self {
642        Self {
643            mode: value.mode.map(Into::into),
644            uncapped: Some(value.uncapped),
645        }
646    }
647}
648
649#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
650#[non_exhaustive]
651/// Configuration for automatically deleting a stream when it becomes empty.
652pub struct DeleteOnEmptyConfig {
653    /// Minimum age in seconds before an empty stream can be deleted.
654    ///
655    /// Defaults to `0` (disables automatic deletion).
656    pub min_age_secs: u64,
657}
658
659impl DeleteOnEmptyConfig {
660    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
661    pub fn new() -> Self {
662        Self::default()
663    }
664
665    /// Set the minimum age in seconds before an empty stream can be deleted.
666    pub fn with_min_age(self, min_age: Duration) -> Self {
667        Self {
668            min_age_secs: min_age.as_secs(),
669        }
670    }
671}
672
673impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
674    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
675        Self {
676            min_age_secs: value.min_age_secs,
677        }
678    }
679}
680
681impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
682    fn from(value: DeleteOnEmptyConfig) -> Self {
683        Self {
684            min_age_secs: value.min_age_secs,
685        }
686    }
687}
688
689#[derive(Debug, Clone, Default, PartialEq, Eq)]
690#[non_exhaustive]
691/// Configuration for a stream.
692pub struct StreamConfig {
693    /// Storage class for the stream.
694    ///
695    /// Defaults to [`Express`](StorageClass::Express).
696    pub storage_class: Option<StorageClass>,
697    /// Retention policy for records in the stream.
698    ///
699    /// Defaults to `7 days` of retention.
700    pub retention_policy: Option<RetentionPolicy>,
701    /// Configuration for timestamping behavior.
702    ///
703    /// See [`TimestampingConfig`] for defaults.
704    pub timestamping: Option<TimestampingConfig>,
705    /// Configuration for automatically deleting the stream when it becomes empty.
706    ///
707    /// See [`DeleteOnEmptyConfig`] for defaults.
708    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
709}
710
711impl StreamConfig {
712    /// Create a new [`StreamConfig`] with default settings.
713    pub fn new() -> Self {
714        Self::default()
715    }
716
717    /// Set the storage class for the stream.
718    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
719        Self {
720            storage_class: Some(storage_class),
721            ..self
722        }
723    }
724
725    /// Set the retention policy for records in the stream.
726    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
727        Self {
728            retention_policy: Some(retention_policy),
729            ..self
730        }
731    }
732
733    /// Set the configuration for timestamping behavior.
734    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
735        Self {
736            timestamping: Some(timestamping),
737            ..self
738        }
739    }
740
741    /// Set the configuration for automatically deleting the stream when it becomes empty.
742    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
743        Self {
744            delete_on_empty: Some(delete_on_empty),
745            ..self
746        }
747    }
748}
749
750impl From<api::config::StreamConfig> for StreamConfig {
751    fn from(value: api::config::StreamConfig) -> Self {
752        Self {
753            storage_class: value.storage_class.map(Into::into),
754            retention_policy: value.retention_policy.map(Into::into),
755            timestamping: value.timestamping.map(Into::into),
756            delete_on_empty: value.delete_on_empty.map(Into::into),
757        }
758    }
759}
760
761impl From<StreamConfig> for api::config::StreamConfig {
762    fn from(value: StreamConfig) -> Self {
763        Self {
764            storage_class: value.storage_class.map(Into::into),
765            retention_policy: value.retention_policy.map(Into::into),
766            timestamping: value.timestamping.map(Into::into),
767            delete_on_empty: value.delete_on_empty.map(Into::into),
768        }
769    }
770}
771
772#[derive(Debug, Clone, Default)]
773#[non_exhaustive]
774/// Configuration for a basin.
775pub struct BasinConfig {
776    /// Default configuration for all streams in the basin.
777    ///
778    /// See [`StreamConfig`] for defaults.
779    pub default_stream_config: Option<StreamConfig>,
780    /// Whether to create stream on append if it doesn't exist using default stream configuration.
781    ///
782    /// Defaults to `false`.
783    pub create_stream_on_append: bool,
784    /// Whether to create stream on read if it doesn't exist using default stream configuration.
785    ///
786    /// Defaults to `false`.
787    pub create_stream_on_read: bool,
788}
789
790impl BasinConfig {
791    /// Create a new [`BasinConfig`] with default settings.
792    pub fn new() -> Self {
793        Self::default()
794    }
795
796    /// Set the default configuration for all streams in the basin.
797    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
798        Self {
799            default_stream_config: Some(config),
800            ..self
801        }
802    }
803
804    /// Set whether to create stream on append if it doesn't exist using default stream
805    /// configuration.
806    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
807        Self {
808            create_stream_on_append,
809            ..self
810        }
811    }
812
813    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
814    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
815        Self {
816            create_stream_on_read,
817            ..self
818        }
819    }
820}
821
822impl From<api::config::BasinConfig> for BasinConfig {
823    fn from(value: api::config::BasinConfig) -> Self {
824        Self {
825            default_stream_config: value.default_stream_config.map(Into::into),
826            create_stream_on_append: value.create_stream_on_append,
827            create_stream_on_read: value.create_stream_on_read,
828        }
829    }
830}
831
832impl From<BasinConfig> for api::config::BasinConfig {
833    fn from(value: BasinConfig) -> Self {
834        Self {
835            default_stream_config: value.default_stream_config.map(Into::into),
836            create_stream_on_append: value.create_stream_on_append,
837            create_stream_on_read: value.create_stream_on_read,
838        }
839    }
840}
841
842#[derive(Debug, Clone, PartialEq, Eq)]
843/// Scope of a basin.
844pub enum BasinScope {
845    /// AWS `us-east-1` region.
846    AwsUsEast1,
847}
848
849impl From<api::basin::BasinScope> for BasinScope {
850    fn from(value: api::basin::BasinScope) -> Self {
851        match value {
852            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
853        }
854    }
855}
856
857impl From<BasinScope> for api::basin::BasinScope {
858    fn from(value: BasinScope) -> Self {
859        match value {
860            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
861        }
862    }
863}
864
865#[derive(Debug, Clone)]
866#[non_exhaustive]
867/// Input for [`create_basin`](crate::S2::create_basin) operation.
868pub struct CreateBasinInput {
869    /// Basin name.
870    pub name: BasinName,
871    /// Configuration for the basin.
872    ///
873    /// See [`BasinConfig`] for defaults.
874    pub config: Option<BasinConfig>,
875    /// Scope of the basin.
876    ///
877    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
878    pub scope: Option<BasinScope>,
879    idempotency_token: String,
880}
881
882impl CreateBasinInput {
883    /// Create a new [`CreateBasinInput`] with the given basin name.
884    pub fn new(name: BasinName) -> Self {
885        Self {
886            name,
887            config: None,
888            scope: None,
889            idempotency_token: idempotency_token(),
890        }
891    }
892
893    /// Set the configuration for the basin.
894    pub fn with_config(self, config: BasinConfig) -> Self {
895        Self {
896            config: Some(config),
897            ..self
898        }
899    }
900
901    /// Set the scope of the basin.
902    pub fn with_scope(self, scope: BasinScope) -> Self {
903        Self {
904            scope: Some(scope),
905            ..self
906        }
907    }
908}
909
910impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
911    fn from(value: CreateBasinInput) -> Self {
912        (
913            api::basin::CreateBasinRequest {
914                basin: value.name,
915                config: value.config.map(Into::into),
916                scope: value.scope.map(Into::into),
917            },
918            value.idempotency_token,
919        )
920    }
921}
922
923#[derive(Debug, Clone, Default)]
924#[non_exhaustive]
925/// Input for [`list_basins`](crate::S2::list_basins) operation.
926pub struct ListBasinsInput {
927    /// Filter basins whose names begin with this value.
928    ///
929    /// Defaults to `""`.
930    pub prefix: BasinNamePrefix,
931    /// Filter basins whose names are lexicographically greater than this value.
932    ///
933    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
934    ///
935    /// Defaults to `""`.
936    pub start_after: BasinNameStartAfter,
937    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
938    ///
939    /// Defaults to `1000`.
940    pub limit: Option<usize>,
941}
942
943impl ListBasinsInput {
944    /// Create a new [`ListBasinsInput`] with default values.
945    pub fn new() -> Self {
946        Self::default()
947    }
948
949    /// Set the prefix used to filter basins whose names begin with this value.
950    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
951        Self { prefix, ..self }
952    }
953
954    /// Set the value used to filter basins whose names are lexicographically greater than this
955    /// value.
956    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
957        Self {
958            start_after,
959            ..self
960        }
961    }
962
963    /// Set the limit on number of basins to return in a page.
964    pub fn with_limit(self, limit: usize) -> Self {
965        Self {
966            limit: Some(limit),
967            ..self
968        }
969    }
970}
971
972impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
973    fn from(value: ListBasinsInput) -> Self {
974        Self {
975            prefix: Some(value.prefix),
976            start_after: Some(value.start_after),
977            limit: value.limit,
978        }
979    }
980}
981
982#[derive(Debug, Clone, Default)]
983/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
984pub struct ListAllBasinsInput {
985    /// Filter basins whose names begin with this value.
986    ///
987    /// Defaults to `""`.
988    pub prefix: BasinNamePrefix,
989    /// Filter basins whose names are lexicographically greater than this value.
990    ///
991    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
992    ///
993    /// Defaults to `""`.
994    pub start_after: BasinNameStartAfter,
995    /// Whether to include basins that are being deleted.
996    ///
997    /// Defaults to `false`.
998    pub include_deleted: bool,
999}
1000
1001impl ListAllBasinsInput {
1002    /// Create a new [`ListAllBasinsInput`] with default values.
1003    pub fn new() -> Self {
1004        Self::default()
1005    }
1006
1007    /// Set the prefix used to filter basins whose names begin with this value.
1008    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
1009        Self { prefix, ..self }
1010    }
1011
1012    /// Set the value used to filter basins whose names are lexicographically greater than this
1013    /// value.
1014    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
1015        Self {
1016            start_after,
1017            ..self
1018        }
1019    }
1020
1021    /// Set whether to include basins that are being deleted.
1022    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
1023        Self {
1024            include_deleted,
1025            ..self
1026        }
1027    }
1028}
1029
1030#[derive(Debug, Clone, PartialEq, Eq)]
1031/// Current state of a basin.
1032pub enum BasinState {
1033    /// Active
1034    Active,
1035    /// Creating
1036    Creating,
1037    /// Deleting
1038    Deleting,
1039}
1040
1041impl From<api::basin::BasinState> for BasinState {
1042    fn from(value: api::basin::BasinState) -> Self {
1043        match value {
1044            api::basin::BasinState::Active => BasinState::Active,
1045            api::basin::BasinState::Creating => BasinState::Creating,
1046            api::basin::BasinState::Deleting => BasinState::Deleting,
1047        }
1048    }
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq)]
1052#[non_exhaustive]
1053/// Basin information.
1054pub struct BasinInfo {
1055    /// Basin name.
1056    pub name: BasinName,
1057    /// Scope of the basin.
1058    pub scope: Option<BasinScope>,
1059    /// Current state of the basin.
1060    pub state: BasinState,
1061}
1062
1063impl From<api::basin::BasinInfo> for BasinInfo {
1064    fn from(value: api::basin::BasinInfo) -> Self {
1065        Self {
1066            name: value.name,
1067            scope: value.scope.map(Into::into),
1068            state: value.state.into(),
1069        }
1070    }
1071}
1072
1073#[derive(Debug, Clone)]
1074#[non_exhaustive]
1075/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1076pub struct DeleteBasinInput {
1077    /// Basin name.
1078    pub name: BasinName,
1079    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1080    pub ignore_not_found: bool,
1081}
1082
1083impl DeleteBasinInput {
1084    /// Create a new [`DeleteBasinInput`] with the given basin name.
1085    pub fn new(name: BasinName) -> Self {
1086        Self {
1087            name,
1088            ignore_not_found: false,
1089        }
1090    }
1091
1092    /// Set whether to ignore `Not Found` error if the basin is not existing.
1093    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1094        Self {
1095            ignore_not_found,
1096            ..self
1097        }
1098    }
1099}
1100
1101#[derive(Debug, Clone, Default)]
1102#[non_exhaustive]
1103/// Reconfiguration for [`TimestampingConfig`].
1104pub struct TimestampingReconfiguration {
1105    /// Override for the existing [`mode`](TimestampingConfig::mode).
1106    pub mode: Maybe<Option<TimestampingMode>>,
1107    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1108    pub uncapped: Maybe<Option<bool>>,
1109}
1110
1111impl TimestampingReconfiguration {
1112    /// Create a new [`TimestampingReconfiguration`].
1113    pub fn new() -> Self {
1114        Self::default()
1115    }
1116
1117    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1118    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1119        Self {
1120            mode: Maybe::Specified(Some(mode)),
1121            ..self
1122        }
1123    }
1124
1125    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1126    pub fn with_uncapped(self, uncapped: bool) -> Self {
1127        Self {
1128            uncapped: Maybe::Specified(Some(uncapped)),
1129            ..self
1130        }
1131    }
1132}
1133
1134impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1135    fn from(value: TimestampingReconfiguration) -> Self {
1136        Self {
1137            mode: value.mode.map(|m| m.map(Into::into)),
1138            uncapped: value.uncapped,
1139        }
1140    }
1141}
1142
1143#[derive(Debug, Clone, Default)]
1144#[non_exhaustive]
1145/// Reconfiguration for [`DeleteOnEmptyConfig`].
1146pub struct DeleteOnEmptyReconfiguration {
1147    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1148    pub min_age_secs: Maybe<Option<u64>>,
1149}
1150
1151impl DeleteOnEmptyReconfiguration {
1152    /// Create a new [`DeleteOnEmptyReconfiguration`].
1153    pub fn new() -> Self {
1154        Self::default()
1155    }
1156
1157    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1158    pub fn with_min_age(self, min_age: Duration) -> Self {
1159        Self {
1160            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1161        }
1162    }
1163}
1164
1165impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1166    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1167        Self {
1168            min_age_secs: value.min_age_secs,
1169        }
1170    }
1171}
1172
1173#[derive(Debug, Clone, Default)]
1174#[non_exhaustive]
1175/// Reconfiguration for [`StreamConfig`].
1176pub struct StreamReconfiguration {
1177    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1178    pub storage_class: Maybe<Option<StorageClass>>,
1179    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1180    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1181    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1182    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1183    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1184    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1185}
1186
1187impl StreamReconfiguration {
1188    /// Create a new [`StreamReconfiguration`].
1189    pub fn new() -> Self {
1190        Self::default()
1191    }
1192
1193    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1194    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1195        Self {
1196            storage_class: Maybe::Specified(Some(storage_class)),
1197            ..self
1198        }
1199    }
1200
1201    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1202    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1203        Self {
1204            retention_policy: Maybe::Specified(Some(retention_policy)),
1205            ..self
1206        }
1207    }
1208
1209    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1210    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1211        Self {
1212            timestamping: Maybe::Specified(Some(timestamping)),
1213            ..self
1214        }
1215    }
1216
1217    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1218    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1219        Self {
1220            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1221            ..self
1222        }
1223    }
1224}
1225
1226impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1227    fn from(value: StreamReconfiguration) -> Self {
1228        Self {
1229            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1230            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1231            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1232            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1233        }
1234    }
1235}
1236
1237#[derive(Debug, Clone, Default)]
1238#[non_exhaustive]
1239/// Reconfiguration for [`BasinConfig`].
1240pub struct BasinReconfiguration {
1241    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1242    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1243    /// Override for the existing
1244    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1245    pub create_stream_on_append: Maybe<bool>,
1246    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1247    pub create_stream_on_read: Maybe<bool>,
1248}
1249
1250impl BasinReconfiguration {
1251    /// Create a new [`BasinReconfiguration`].
1252    pub fn new() -> Self {
1253        Self::default()
1254    }
1255
1256    /// Set the override for the existing
1257    /// [`default_stream_config`](BasinConfig::default_stream_config).
1258    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1259        Self {
1260            default_stream_config: Maybe::Specified(Some(config)),
1261            ..self
1262        }
1263    }
1264
1265    /// Set the override for the existing
1266    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1267    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1268        Self {
1269            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1270            ..self
1271        }
1272    }
1273
1274    /// Set the override for the existing
1275    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1276    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1277        Self {
1278            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1279            ..self
1280        }
1281    }
1282}
1283
1284impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1285    fn from(value: BasinReconfiguration) -> Self {
1286        Self {
1287            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1288            create_stream_on_append: value.create_stream_on_append,
1289            create_stream_on_read: value.create_stream_on_read,
1290        }
1291    }
1292}
1293
1294#[derive(Debug, Clone)]
1295#[non_exhaustive]
1296/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1297pub struct ReconfigureBasinInput {
1298    /// Basin name.
1299    pub name: BasinName,
1300    /// Reconfiguration for [`BasinConfig`].
1301    pub config: BasinReconfiguration,
1302}
1303
1304impl ReconfigureBasinInput {
1305    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1306    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1307        Self { name, config }
1308    }
1309}
1310
1311#[derive(Debug, Clone, Default)]
1312#[non_exhaustive]
1313/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1314pub struct ListAccessTokensInput {
1315    /// Filter access tokens whose IDs begin with this value.
1316    ///
1317    /// Defaults to `""`.
1318    pub prefix: AccessTokenIdPrefix,
1319    /// Filter access tokens whose IDs are lexicographically greater than this value.
1320    ///
1321    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1322    ///
1323    /// Defaults to `""`.
1324    pub start_after: AccessTokenIdStartAfter,
1325    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1326    ///
1327    /// Defaults to `1000`.
1328    pub limit: Option<usize>,
1329}
1330
1331impl ListAccessTokensInput {
1332    /// Create a new [`ListAccessTokensInput`] with default values.
1333    pub fn new() -> Self {
1334        Self::default()
1335    }
1336
1337    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1338    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1339        Self { prefix, ..self }
1340    }
1341
1342    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1343    /// value.
1344    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1345        Self {
1346            start_after,
1347            ..self
1348        }
1349    }
1350
1351    /// Set the limit on number of access tokens to return in a page.
1352    pub fn with_limit(self, limit: usize) -> Self {
1353        Self {
1354            limit: Some(limit),
1355            ..self
1356        }
1357    }
1358}
1359
1360impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1361    fn from(value: ListAccessTokensInput) -> Self {
1362        Self {
1363            prefix: Some(value.prefix),
1364            start_after: Some(value.start_after),
1365            limit: value.limit,
1366        }
1367    }
1368}
1369
1370#[derive(Debug, Clone, Default)]
1371/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1372pub struct ListAllAccessTokensInput {
1373    /// Filter access tokens whose IDs begin with this value.
1374    ///
1375    /// Defaults to `""`.
1376    pub prefix: AccessTokenIdPrefix,
1377    /// Filter access tokens whose IDs are lexicographically greater than this value.
1378    ///
1379    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1380    ///
1381    /// Defaults to `""`.
1382    pub start_after: AccessTokenIdStartAfter,
1383}
1384
1385impl ListAllAccessTokensInput {
1386    /// Create a new [`ListAllAccessTokensInput`] with default values.
1387    pub fn new() -> Self {
1388        Self::default()
1389    }
1390
1391    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1392    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1393        Self { prefix, ..self }
1394    }
1395
1396    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1397    /// this value.
1398    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1399        Self {
1400            start_after,
1401            ..self
1402        }
1403    }
1404}
1405
1406#[derive(Debug, Clone)]
1407#[non_exhaustive]
1408/// Access token information.
1409pub struct AccessTokenInfo {
1410    /// Access token ID.
1411    pub id: AccessTokenId,
1412    /// Expiration time.
1413    pub expires_at: S2DateTime,
1414    /// Whether to automatically prefix stream names during creation and strip the prefix during
1415    /// listing.
1416    pub auto_prefix_streams: bool,
1417    /// Scope of the access token.
1418    pub scope: AccessTokenScope,
1419}
1420
1421impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1422    type Error = ValidationError;
1423
1424    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1425        let expires_at = value
1426            .expires_at
1427            .map(S2DateTime::try_from)
1428            .transpose()?
1429            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1430        Ok(Self {
1431            id: value.id,
1432            expires_at,
1433            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1434            scope: value.scope.into(),
1435        })
1436    }
1437}
1438
1439#[derive(Debug, Clone)]
1440/// Pattern for matching basins.
1441///
1442/// See [`AccessTokenScope::basins`].
1443pub enum BasinMatcher {
1444    /// Match no basins.
1445    None,
1446    /// Match exactly this basin.
1447    Exact(BasinName),
1448    /// Match all basins with this prefix.
1449    Prefix(BasinNamePrefix),
1450}
1451
1452#[derive(Debug, Clone)]
1453/// Pattern for matching streams.
1454///
1455/// See [`AccessTokenScope::streams`].
1456pub enum StreamMatcher {
1457    /// Match no streams.
1458    None,
1459    /// Match exactly this stream.
1460    Exact(StreamName),
1461    /// Match all streams with this prefix.
1462    Prefix(StreamNamePrefix),
1463}
1464
1465#[derive(Debug, Clone)]
1466/// Pattern for matching access tokens.
1467///
1468/// See [`AccessTokenScope::access_tokens`].
1469pub enum AccessTokenMatcher {
1470    /// Match no access tokens.
1471    None,
1472    /// Match exactly this access token.
1473    Exact(AccessTokenId),
1474    /// Match all access tokens with this prefix.
1475    Prefix(AccessTokenIdPrefix),
1476}
1477
1478#[derive(Debug, Clone, Default)]
1479#[non_exhaustive]
1480/// Permissions indicating allowed operations.
1481pub struct ReadWritePermissions {
1482    /// Read permission.
1483    ///
1484    /// Defaults to `false`.
1485    pub read: bool,
1486    /// Write permission.
1487    ///
1488    /// Defaults to `false`.
1489    pub write: bool,
1490}
1491
1492impl ReadWritePermissions {
1493    /// Create a new [`ReadWritePermissions`] with default values.
1494    pub fn new() -> Self {
1495        Self::default()
1496    }
1497
1498    /// Create read-only permissions.
1499    pub fn read_only() -> Self {
1500        Self {
1501            read: true,
1502            write: false,
1503        }
1504    }
1505
1506    /// Create write-only permissions.
1507    pub fn write_only() -> Self {
1508        Self {
1509            read: false,
1510            write: true,
1511        }
1512    }
1513
1514    /// Create read-write permissions.
1515    pub fn read_write() -> Self {
1516        Self {
1517            read: true,
1518            write: true,
1519        }
1520    }
1521}
1522
1523impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1524    fn from(value: ReadWritePermissions) -> Self {
1525        Self {
1526            read: Some(value.read),
1527            write: Some(value.write),
1528        }
1529    }
1530}
1531
1532impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1533    fn from(value: api::access::ReadWritePermissions) -> Self {
1534        Self {
1535            read: value.read.unwrap_or_default(),
1536            write: value.write.unwrap_or_default(),
1537        }
1538    }
1539}
1540
1541#[derive(Debug, Clone, Default)]
1542#[non_exhaustive]
1543/// Permissions at the operation group level.
1544///
1545/// See [`AccessTokenScope::op_group_perms`].
1546pub struct OperationGroupPermissions {
1547    /// Account-level access permissions.
1548    ///
1549    /// Defaults to `None`.
1550    pub account: Option<ReadWritePermissions>,
1551    /// Basin-level access permissions.
1552    ///
1553    /// Defaults to `None`.
1554    pub basin: Option<ReadWritePermissions>,
1555    /// Stream-level access permissions.
1556    ///
1557    /// Defaults to `None`.
1558    pub stream: Option<ReadWritePermissions>,
1559}
1560
1561impl OperationGroupPermissions {
1562    /// Create a new [`OperationGroupPermissions`] with default values.
1563    pub fn new() -> Self {
1564        Self::default()
1565    }
1566
1567    /// Create read-only permissions for all groups.
1568    pub fn read_only_all() -> Self {
1569        Self {
1570            account: Some(ReadWritePermissions::read_only()),
1571            basin: Some(ReadWritePermissions::read_only()),
1572            stream: Some(ReadWritePermissions::read_only()),
1573        }
1574    }
1575
1576    /// Create write-only permissions for all groups.
1577    pub fn write_only_all() -> Self {
1578        Self {
1579            account: Some(ReadWritePermissions::write_only()),
1580            basin: Some(ReadWritePermissions::write_only()),
1581            stream: Some(ReadWritePermissions::write_only()),
1582        }
1583    }
1584
1585    /// Create read-write permissions for all groups.
1586    pub fn read_write_all() -> Self {
1587        Self {
1588            account: Some(ReadWritePermissions::read_write()),
1589            basin: Some(ReadWritePermissions::read_write()),
1590            stream: Some(ReadWritePermissions::read_write()),
1591        }
1592    }
1593
1594    /// Set account-level access permissions.
1595    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1596        Self {
1597            account: Some(account),
1598            ..self
1599        }
1600    }
1601
1602    /// Set basin-level access permissions.
1603    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1604        Self {
1605            basin: Some(basin),
1606            ..self
1607        }
1608    }
1609
1610    /// Set stream-level access permissions.
1611    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1612        Self {
1613            stream: Some(stream),
1614            ..self
1615        }
1616    }
1617}
1618
1619impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1620    fn from(value: OperationGroupPermissions) -> Self {
1621        Self {
1622            account: value.account.map(Into::into),
1623            basin: value.basin.map(Into::into),
1624            stream: value.stream.map(Into::into),
1625        }
1626    }
1627}
1628
1629impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1630    fn from(value: api::access::PermittedOperationGroups) -> Self {
1631        Self {
1632            account: value.account.map(Into::into),
1633            basin: value.basin.map(Into::into),
1634            stream: value.stream.map(Into::into),
1635        }
1636    }
1637}
1638
1639#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1640/// Individual operation that can be permitted.
1641///
1642/// See [`AccessTokenScope::ops`].
1643pub enum Operation {
1644    /// List basins.
1645    ListBasins,
1646    /// Create a basin.
1647    CreateBasin,
1648    /// Get basin configuration.
1649    GetBasinConfig,
1650    /// Delete a basin.
1651    DeleteBasin,
1652    /// Reconfigure a basin.
1653    ReconfigureBasin,
1654    /// List access tokens.
1655    ListAccessTokens,
1656    /// Issue an access token.
1657    IssueAccessToken,
1658    /// Revoke an access token.
1659    RevokeAccessToken,
1660    /// Get account metrics.
1661    GetAccountMetrics,
1662    /// Get basin metrics.
1663    GetBasinMetrics,
1664    /// Get stream metrics.
1665    GetStreamMetrics,
1666    /// List streams.
1667    ListStreams,
1668    /// Create a stream.
1669    CreateStream,
1670    /// Get stream configuration.
1671    GetStreamConfig,
1672    /// Delete a stream.
1673    DeleteStream,
1674    /// Reconfigure a stream.
1675    ReconfigureStream,
1676    /// Check the tail of a stream.
1677    CheckTail,
1678    /// Append records to a stream.
1679    Append,
1680    /// Read records from a stream.
1681    Read,
1682    /// Trim records on a stream.
1683    Trim,
1684    /// Set the fencing token on a stream.
1685    Fence,
1686}
1687
1688impl From<Operation> for api::access::Operation {
1689    fn from(value: Operation) -> Self {
1690        match value {
1691            Operation::ListBasins => api::access::Operation::ListBasins,
1692            Operation::CreateBasin => api::access::Operation::CreateBasin,
1693            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1694            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1695            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1696            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1697            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1698            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1699            Operation::ListStreams => api::access::Operation::ListStreams,
1700            Operation::CreateStream => api::access::Operation::CreateStream,
1701            Operation::DeleteStream => api::access::Operation::DeleteStream,
1702            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1703            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1704            Operation::CheckTail => api::access::Operation::CheckTail,
1705            Operation::Append => api::access::Operation::Append,
1706            Operation::Read => api::access::Operation::Read,
1707            Operation::Trim => api::access::Operation::Trim,
1708            Operation::Fence => api::access::Operation::Fence,
1709            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1710            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1711            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1712        }
1713    }
1714}
1715
1716impl From<api::access::Operation> for Operation {
1717    fn from(value: api::access::Operation) -> Self {
1718        match value {
1719            api::access::Operation::ListBasins => Operation::ListBasins,
1720            api::access::Operation::CreateBasin => Operation::CreateBasin,
1721            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1722            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1723            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1724            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1725            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1726            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1727            api::access::Operation::ListStreams => Operation::ListStreams,
1728            api::access::Operation::CreateStream => Operation::CreateStream,
1729            api::access::Operation::DeleteStream => Operation::DeleteStream,
1730            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1731            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1732            api::access::Operation::CheckTail => Operation::CheckTail,
1733            api::access::Operation::Append => Operation::Append,
1734            api::access::Operation::Read => Operation::Read,
1735            api::access::Operation::Trim => Operation::Trim,
1736            api::access::Operation::Fence => Operation::Fence,
1737            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1738            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1739            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1740        }
1741    }
1742}
1743
1744#[derive(Debug, Clone)]
1745#[non_exhaustive]
1746/// Scope of an access token.
1747///
1748/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1749/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1750/// final set must not be empty.
1751///
1752/// See [`IssueAccessTokenInput::scope`].
1753pub struct AccessTokenScopeInput {
1754    basins: Option<BasinMatcher>,
1755    streams: Option<StreamMatcher>,
1756    access_tokens: Option<AccessTokenMatcher>,
1757    op_group_perms: Option<OperationGroupPermissions>,
1758    ops: HashSet<Operation>,
1759}
1760
1761impl AccessTokenScopeInput {
1762    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1763    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1764        Self {
1765            basins: None,
1766            streams: None,
1767            access_tokens: None,
1768            op_group_perms: None,
1769            ops: ops.into_iter().collect(),
1770        }
1771    }
1772
1773    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1774    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1775        Self {
1776            basins: None,
1777            streams: None,
1778            access_tokens: None,
1779            op_group_perms: Some(op_group_perms),
1780            ops: HashSet::default(),
1781        }
1782    }
1783
1784    /// Set the permitted operations.
1785    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1786        Self {
1787            ops: ops.into_iter().collect(),
1788            ..self
1789        }
1790    }
1791
1792    /// Set the access permissions at the operation group level.
1793    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1794        Self {
1795            op_group_perms: Some(op_group_perms),
1796            ..self
1797        }
1798    }
1799
1800    /// Set the permitted basins.
1801    ///
1802    /// Defaults to no basins.
1803    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1804        Self {
1805            basins: Some(basins),
1806            ..self
1807        }
1808    }
1809
1810    /// Set the permitted streams.
1811    ///
1812    /// Defaults to no streams.
1813    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1814        Self {
1815            streams: Some(streams),
1816            ..self
1817        }
1818    }
1819
1820    /// Set the permitted access tokens.
1821    ///
1822    /// Defaults to no access tokens.
1823    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1824        Self {
1825            access_tokens: Some(access_tokens),
1826            ..self
1827        }
1828    }
1829}
1830
1831#[derive(Debug, Clone)]
1832#[non_exhaustive]
1833/// Scope of an access token.
1834pub struct AccessTokenScope {
1835    /// Permitted basins.
1836    pub basins: Option<BasinMatcher>,
1837    /// Permitted streams.
1838    pub streams: Option<StreamMatcher>,
1839    /// Permitted access tokens.
1840    pub access_tokens: Option<AccessTokenMatcher>,
1841    /// Permissions at the operation group level.
1842    pub op_group_perms: Option<OperationGroupPermissions>,
1843    /// Permitted operations.
1844    pub ops: HashSet<Operation>,
1845}
1846
1847impl From<api::access::AccessTokenScope> for AccessTokenScope {
1848    fn from(value: api::access::AccessTokenScope) -> Self {
1849        Self {
1850            basins: value.basins.map(|rs| match rs {
1851                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1852                    BasinMatcher::Exact(e)
1853                }
1854                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1855                    BasinMatcher::None
1856                }
1857                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1858            }),
1859            streams: value.streams.map(|rs| match rs {
1860                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1861                    StreamMatcher::Exact(e)
1862                }
1863                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1864                    StreamMatcher::None
1865                }
1866                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1867            }),
1868            access_tokens: value.access_tokens.map(|rs| match rs {
1869                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1870                    AccessTokenMatcher::Exact(e)
1871                }
1872                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1873                    AccessTokenMatcher::None
1874                }
1875                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1876            }),
1877            op_group_perms: value.op_groups.map(Into::into),
1878            ops: value
1879                .ops
1880                .map(|ops| ops.into_iter().map(Into::into).collect())
1881                .unwrap_or_default(),
1882        }
1883    }
1884}
1885
1886impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1887    fn from(value: AccessTokenScopeInput) -> Self {
1888        Self {
1889            basins: value.basins.map(|rs| match rs {
1890                BasinMatcher::None => {
1891                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1892                }
1893                BasinMatcher::Exact(e) => {
1894                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1895                }
1896                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1897            }),
1898            streams: value.streams.map(|rs| match rs {
1899                StreamMatcher::None => {
1900                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1901                }
1902                StreamMatcher::Exact(e) => {
1903                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1904                }
1905                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1906            }),
1907            access_tokens: value.access_tokens.map(|rs| match rs {
1908                AccessTokenMatcher::None => {
1909                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1910                }
1911                AccessTokenMatcher::Exact(e) => {
1912                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1913                }
1914                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1915            }),
1916            op_groups: value.op_group_perms.map(Into::into),
1917            ops: if value.ops.is_empty() {
1918                None
1919            } else {
1920                Some(value.ops.into_iter().map(Into::into).collect())
1921            },
1922        }
1923    }
1924}
1925
1926#[derive(Debug, Clone)]
1927#[non_exhaustive]
1928/// Input for [`issue_access_token`](crate::S2::issue_access_token).
1929pub struct IssueAccessTokenInput {
1930    /// Access token ID.
1931    pub id: AccessTokenId,
1932    /// Expiration time.
1933    ///
1934    /// Defaults to the expiration time of requestor's access token passed via
1935    /// [`S2Config`](S2Config::new).
1936    pub expires_at: Option<S2DateTime>,
1937    /// Whether to automatically prefix stream names during creation and strip the prefix during
1938    /// listing.
1939    ///
1940    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
1941    /// prefix.
1942    ///
1943    /// Defaults to `false`.
1944    pub auto_prefix_streams: bool,
1945    /// Scope of the token.
1946    pub scope: AccessTokenScopeInput,
1947}
1948
1949impl IssueAccessTokenInput {
1950    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
1951    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1952        Self {
1953            id,
1954            expires_at: None,
1955            auto_prefix_streams: false,
1956            scope,
1957        }
1958    }
1959
1960    /// Set the expiration time.
1961    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1962        Self {
1963            expires_at: Some(expires_at),
1964            ..self
1965        }
1966    }
1967
1968    /// Set whether to automatically prefix stream names during creation and strip the prefix during
1969    /// listing.
1970    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1971        Self {
1972            auto_prefix_streams,
1973            ..self
1974        }
1975    }
1976}
1977
1978impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1979    fn from(value: IssueAccessTokenInput) -> Self {
1980        Self {
1981            id: value.id,
1982            expires_at: value.expires_at.map(Into::into),
1983            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1984            scope: value.scope.into(),
1985        }
1986    }
1987}
1988
1989#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1990/// Interval to accumulate over for timeseries metric sets.
1991pub enum TimeseriesInterval {
1992    /// Minute.
1993    Minute,
1994    /// Hour.
1995    Hour,
1996    /// Day.
1997    Day,
1998}
1999
2000impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
2001    fn from(value: TimeseriesInterval) -> Self {
2002        match value {
2003            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
2004            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
2005            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
2006        }
2007    }
2008}
2009
2010impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
2011    fn from(value: api::metrics::TimeseriesInterval) -> Self {
2012        match value {
2013            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
2014            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
2015            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
2016        }
2017    }
2018}
2019
2020#[derive(Debug, Clone, Copy)]
2021#[non_exhaustive]
2022/// Time range as Unix epoch seconds.
2023pub struct TimeRange {
2024    /// Start timestamp (inclusive).
2025    pub start: u32,
2026    /// End timestamp (exclusive).
2027    pub end: u32,
2028}
2029
2030impl TimeRange {
2031    /// Create a new [`TimeRange`] with the given start and end timestamps.
2032    pub fn new(start: u32, end: u32) -> Self {
2033        Self { start, end }
2034    }
2035}
2036
2037#[derive(Debug, Clone, Copy)]
2038#[non_exhaustive]
2039/// Time range as Unix epoch seconds and accumulation interval.
2040pub struct TimeRangeAndInterval {
2041    /// Start timestamp (inclusive).
2042    pub start: u32,
2043    /// End timestamp (exclusive).
2044    pub end: u32,
2045    /// Interval to accumulate over for timeseries metric sets.
2046    ///
2047    /// Default is dependent on the requested metric set.
2048    pub interval: Option<TimeseriesInterval>,
2049}
2050
2051impl TimeRangeAndInterval {
2052    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2053    pub fn new(start: u32, end: u32) -> Self {
2054        Self {
2055            start,
2056            end,
2057            interval: None,
2058        }
2059    }
2060
2061    /// Set the interval to accumulate over for timeseries metric sets.
2062    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2063        Self {
2064            interval: Some(interval),
2065            ..self
2066        }
2067    }
2068}
2069
2070#[derive(Debug, Clone, Copy)]
2071/// Account metric set to return.
2072pub enum AccountMetricSet {
2073    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2074    /// specified time range.
2075    ActiveBasins(TimeRange),
2076    /// Returns [`AccumulationMetric`]s, one per account operation type.
2077    ///
2078    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2079    /// per interval over the requested time range.
2080    ///
2081    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2082    AccountOps(TimeRangeAndInterval),
2083}
2084
2085#[derive(Debug, Clone)]
2086#[non_exhaustive]
2087/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2088pub struct GetAccountMetricsInput {
2089    /// Metric set to return.
2090    pub set: AccountMetricSet,
2091}
2092
2093impl GetAccountMetricsInput {
2094    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2095    pub fn new(set: AccountMetricSet) -> Self {
2096        Self { set }
2097    }
2098}
2099
2100impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2101    fn from(value: GetAccountMetricsInput) -> Self {
2102        let (set, start, end, interval) = match value.set {
2103            AccountMetricSet::ActiveBasins(args) => (
2104                api::metrics::AccountMetricSet::ActiveBasins,
2105                args.start,
2106                args.end,
2107                None,
2108            ),
2109            AccountMetricSet::AccountOps(args) => (
2110                api::metrics::AccountMetricSet::AccountOps,
2111                args.start,
2112                args.end,
2113                args.interval,
2114            ),
2115        };
2116        Self {
2117            set,
2118            start: Some(start),
2119            end: Some(end),
2120            interval: interval.map(Into::into),
2121        }
2122    }
2123}
2124
2125#[derive(Debug, Clone, Copy)]
2126/// Basin metric set to return.
2127pub enum BasinMetricSet {
2128    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2129    /// in the basin, with one observed value for each hour over the requested time range.
2130    Storage(TimeRange),
2131    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2132    ///
2133    /// Each metric represents a timeseries of the number of append operations across all streams
2134    /// in the basin, with one accumulated value per interval over the requested time range.
2135    ///
2136    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2137    /// [`minute`](TimeseriesInterval::Minute).
2138    AppendOps(TimeRangeAndInterval),
2139    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2140    ///
2141    /// Each metric represents a timeseries of the number of read operations across all streams
2142    /// in the basin, with one accumulated value per interval over the requested time range.
2143    ///
2144    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2145    /// [`minute`](TimeseriesInterval::Minute).
2146    ReadOps(TimeRangeAndInterval),
2147    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2148    /// across all streams in the basin, with one accumulated value per interval
2149    /// over the requested time range.
2150    ///
2151    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2152    /// [`minute`](TimeseriesInterval::Minute).
2153    ReadThroughput(TimeRangeAndInterval),
2154    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2155    /// across all streams in the basin, with one accumulated value per interval
2156    /// over the requested time range.
2157    ///
2158    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2159    /// [`minute`](TimeseriesInterval::Minute).
2160    AppendThroughput(TimeRangeAndInterval),
2161    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2162    ///
2163    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2164    /// per interval over the requested time range.
2165    ///
2166    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2167    BasinOps(TimeRangeAndInterval),
2168}
2169
2170#[derive(Debug, Clone)]
2171#[non_exhaustive]
2172/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2173pub struct GetBasinMetricsInput {
2174    /// Basin name.
2175    pub name: BasinName,
2176    /// Metric set to return.
2177    pub set: BasinMetricSet,
2178}
2179
2180impl GetBasinMetricsInput {
2181    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2182    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2183        Self { name, set }
2184    }
2185}
2186
2187impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2188    fn from(value: GetBasinMetricsInput) -> Self {
2189        let (set, start, end, interval) = match value.set {
2190            BasinMetricSet::Storage(args) => (
2191                api::metrics::BasinMetricSet::Storage,
2192                args.start,
2193                args.end,
2194                None,
2195            ),
2196            BasinMetricSet::AppendOps(args) => (
2197                api::metrics::BasinMetricSet::AppendOps,
2198                args.start,
2199                args.end,
2200                args.interval,
2201            ),
2202            BasinMetricSet::ReadOps(args) => (
2203                api::metrics::BasinMetricSet::ReadOps,
2204                args.start,
2205                args.end,
2206                args.interval,
2207            ),
2208            BasinMetricSet::ReadThroughput(args) => (
2209                api::metrics::BasinMetricSet::ReadThroughput,
2210                args.start,
2211                args.end,
2212                args.interval,
2213            ),
2214            BasinMetricSet::AppendThroughput(args) => (
2215                api::metrics::BasinMetricSet::AppendThroughput,
2216                args.start,
2217                args.end,
2218                args.interval,
2219            ),
2220            BasinMetricSet::BasinOps(args) => (
2221                api::metrics::BasinMetricSet::BasinOps,
2222                args.start,
2223                args.end,
2224                args.interval,
2225            ),
2226        };
2227        (
2228            value.name,
2229            api::metrics::BasinMetricSetRequest {
2230                set,
2231                start: Some(start),
2232                end: Some(end),
2233                interval: interval.map(Into::into),
2234            },
2235        )
2236    }
2237}
2238
2239#[derive(Debug, Clone, Copy)]
2240/// Stream metric set to return.
2241pub enum StreamMetricSet {
2242    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2243    /// with one observed value for each minute over the requested time range.
2244    Storage(TimeRange),
2245}
2246
2247#[derive(Debug, Clone)]
2248#[non_exhaustive]
2249/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2250pub struct GetStreamMetricsInput {
2251    /// Basin name.
2252    pub basin_name: BasinName,
2253    /// Stream name.
2254    pub stream_name: StreamName,
2255    /// Metric set to return.
2256    pub set: StreamMetricSet,
2257}
2258
2259impl GetStreamMetricsInput {
2260    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2261    /// set.
2262    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2263        Self {
2264            basin_name,
2265            stream_name,
2266            set,
2267        }
2268    }
2269}
2270
2271impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2272    fn from(value: GetStreamMetricsInput) -> Self {
2273        let (set, start, end, interval) = match value.set {
2274            StreamMetricSet::Storage(args) => (
2275                api::metrics::StreamMetricSet::Storage,
2276                args.start,
2277                args.end,
2278                None,
2279            ),
2280        };
2281        (
2282            value.basin_name,
2283            value.stream_name,
2284            api::metrics::StreamMetricSetRequest {
2285                set,
2286                start: Some(start),
2287                end: Some(end),
2288                interval,
2289            },
2290        )
2291    }
2292}
2293
2294#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2295/// Unit in which metric values are measured.
2296pub enum MetricUnit {
2297    /// Size in bytes.
2298    Bytes,
2299    /// Number of operations.
2300    Operations,
2301}
2302
2303impl From<api::metrics::MetricUnit> for MetricUnit {
2304    fn from(value: api::metrics::MetricUnit) -> Self {
2305        match value {
2306            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2307            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2308        }
2309    }
2310}
2311
2312#[derive(Debug, Clone)]
2313#[non_exhaustive]
2314/// Single named value.
2315pub struct ScalarMetric {
2316    /// Metric name.
2317    pub name: String,
2318    /// Unit for the metric value.
2319    pub unit: MetricUnit,
2320    /// Metric value.
2321    pub value: f64,
2322}
2323
2324#[derive(Debug, Clone)]
2325#[non_exhaustive]
2326/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2327/// specified interval.
2328pub struct AccumulationMetric {
2329    /// Timeseries name.
2330    pub name: String,
2331    /// Unit for the accumulated values.
2332    pub unit: MetricUnit,
2333    /// The interval at which datapoints are accumulated.
2334    pub interval: TimeseriesInterval,
2335    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2336    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2337    /// one `interval`.
2338    pub values: Vec<(u32, f64)>,
2339}
2340
2341#[derive(Debug, Clone)]
2342#[non_exhaustive]
2343/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2344pub struct GaugeMetric {
2345    /// Timeseries name.
2346    pub name: String,
2347    /// Unit for the instantaneous values.
2348    pub unit: MetricUnit,
2349    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2350    /// instant of the `timestamp` (in Unix epoch seconds).
2351    pub values: Vec<(u32, f64)>,
2352}
2353
2354#[derive(Debug, Clone)]
2355#[non_exhaustive]
2356/// Set of string labels.
2357pub struct LabelMetric {
2358    /// Label name.
2359    pub name: String,
2360    /// Label values.
2361    pub values: Vec<String>,
2362}
2363
2364#[derive(Debug, Clone)]
2365/// Individual metric in a returned metric set.
2366pub enum Metric {
2367    /// Single named value.
2368    Scalar(ScalarMetric),
2369    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2370    /// specified interval.
2371    Accumulation(AccumulationMetric),
2372    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2373    Gauge(GaugeMetric),
2374    /// Set of string labels.
2375    Label(LabelMetric),
2376}
2377
2378impl From<api::metrics::Metric> for Metric {
2379    fn from(value: api::metrics::Metric) -> Self {
2380        match value {
2381            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2382                name: sm.name.into(),
2383                unit: sm.unit.into(),
2384                value: sm.value,
2385            }),
2386            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2387                name: am.name.into(),
2388                unit: am.unit.into(),
2389                interval: am.interval.into(),
2390                values: am.values,
2391            }),
2392            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2393                name: gm.name.into(),
2394                unit: gm.unit.into(),
2395                values: gm.values,
2396            }),
2397            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2398                name: lm.name.into(),
2399                values: lm.values,
2400            }),
2401        }
2402    }
2403}
2404
2405#[derive(Debug, Clone, Default)]
2406#[non_exhaustive]
2407/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2408pub struct ListStreamsInput {
2409    /// Filter streams whose names begin with this value.
2410    ///
2411    /// Defaults to `""`.
2412    pub prefix: StreamNamePrefix,
2413    /// Filter streams whose names are lexicographically greater than this value.
2414    ///
2415    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2416    ///
2417    /// Defaults to `""`.
2418    pub start_after: StreamNameStartAfter,
2419    /// Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2420    ///
2421    /// Defaults to `1000`.
2422    pub limit: Option<usize>,
2423}
2424
2425impl ListStreamsInput {
2426    /// Create a new [`ListStreamsInput`] with default values.
2427    pub fn new() -> Self {
2428        Self::default()
2429    }
2430
2431    /// Set the prefix used to filter streams whose names begin with this value.
2432    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2433        Self { prefix, ..self }
2434    }
2435
2436    /// Set the value used to filter streams whose names are lexicographically greater than this
2437    /// value.
2438    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2439        Self {
2440            start_after,
2441            ..self
2442        }
2443    }
2444
2445    /// Set the limit on number of streams to return in a page.
2446    pub fn with_limit(self, limit: usize) -> Self {
2447        Self {
2448            limit: Some(limit),
2449            ..self
2450        }
2451    }
2452}
2453
2454impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2455    fn from(value: ListStreamsInput) -> Self {
2456        Self {
2457            prefix: Some(value.prefix),
2458            start_after: Some(value.start_after),
2459            limit: value.limit,
2460        }
2461    }
2462}
2463
2464#[derive(Debug, Clone, Default)]
2465/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2466pub struct ListAllStreamsInput {
2467    /// Filter streams whose names begin with this value.
2468    ///
2469    /// Defaults to `""`.
2470    pub prefix: StreamNamePrefix,
2471    /// Filter streams whose names are lexicographically greater than this value.
2472    ///
2473    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2474    ///
2475    /// Defaults to `""`.
2476    pub start_after: StreamNameStartAfter,
2477    /// Whether to include streams that are being deleted.
2478    ///
2479    /// Defaults to `false`.
2480    pub include_deleted: bool,
2481}
2482
2483impl ListAllStreamsInput {
2484    /// Create a new [`ListAllStreamsInput`] with default values.
2485    pub fn new() -> Self {
2486        Self::default()
2487    }
2488
2489    /// Set the prefix used to filter streams whose names begin with this value.
2490    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2491        Self { prefix, ..self }
2492    }
2493
2494    /// Set the value used to filter streams whose names are lexicographically greater than this
2495    /// value.
2496    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2497        Self {
2498            start_after,
2499            ..self
2500        }
2501    }
2502
2503    /// Set whether to include streams that are being deleted.
2504    pub fn with_include_deleted(self, include_deleted: bool) -> Self {
2505        Self {
2506            include_deleted,
2507            ..self
2508        }
2509    }
2510}
2511
2512#[derive(Debug, Clone, PartialEq)]
2513#[non_exhaustive]
2514/// Stream information.
2515pub struct StreamInfo {
2516    /// Stream name.
2517    pub name: StreamName,
2518    /// Creation time.
2519    pub created_at: S2DateTime,
2520    /// Deletion time if the stream is being deleted.
2521    pub deleted_at: Option<S2DateTime>,
2522}
2523
2524impl TryFrom<api::stream::StreamInfo> for StreamInfo {
2525    type Error = ValidationError;
2526
2527    fn try_from(value: api::stream::StreamInfo) -> Result<Self, Self::Error> {
2528        Ok(Self {
2529            name: value.name,
2530            created_at: value.created_at.try_into()?,
2531            deleted_at: value.deleted_at.map(S2DateTime::try_from).transpose()?,
2532        })
2533    }
2534}
2535
2536#[derive(Debug, Clone)]
2537#[non_exhaustive]
2538/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2539pub struct CreateStreamInput {
2540    /// Stream name.
2541    pub name: StreamName,
2542    /// Configuration for the stream.
2543    ///
2544    /// See [`StreamConfig`] for defaults.
2545    pub config: Option<StreamConfig>,
2546    idempotency_token: String,
2547}
2548
2549impl CreateStreamInput {
2550    /// Create a new [`CreateStreamInput`] with the given stream name.
2551    pub fn new(name: StreamName) -> Self {
2552        Self {
2553            name,
2554            config: None,
2555            idempotency_token: idempotency_token(),
2556        }
2557    }
2558
2559    /// Set the configuration for the stream.
2560    pub fn with_config(self, config: StreamConfig) -> Self {
2561        Self {
2562            config: Some(config),
2563            ..self
2564        }
2565    }
2566}
2567
2568impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2569    fn from(value: CreateStreamInput) -> Self {
2570        (
2571            api::stream::CreateStreamRequest {
2572                stream: value.name,
2573                config: value.config.map(Into::into),
2574            },
2575            value.idempotency_token,
2576        )
2577    }
2578}
2579
2580#[derive(Debug, Clone)]
2581#[non_exhaustive]
2582/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2583pub struct DeleteStreamInput {
2584    /// Stream name.
2585    pub name: StreamName,
2586    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2587    pub ignore_not_found: bool,
2588}
2589
2590impl DeleteStreamInput {
2591    /// Create a new [`DeleteStreamInput`] with the given stream name.
2592    pub fn new(name: StreamName) -> Self {
2593        Self {
2594            name,
2595            ignore_not_found: false,
2596        }
2597    }
2598
2599    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2600    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2601        Self {
2602            ignore_not_found,
2603            ..self
2604        }
2605    }
2606}
2607
2608#[derive(Debug, Clone)]
2609#[non_exhaustive]
2610/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2611pub struct ReconfigureStreamInput {
2612    /// Stream name.
2613    pub name: StreamName,
2614    /// Reconfiguration for [`StreamConfig`].
2615    pub config: StreamReconfiguration,
2616}
2617
2618impl ReconfigureStreamInput {
2619    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2620    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2621        Self { name, config }
2622    }
2623}
2624
2625#[derive(Debug, Clone, PartialEq, Eq)]
2626/// Token for fencing appends to a stream.
2627///
2628/// **Note:** It must not exceed 36 bytes in length.
2629///
2630/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2631pub struct FencingToken(String);
2632
2633impl FencingToken {
2634    /// Generate a random alphanumeric fencing token of `n` bytes.
2635    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2636        rand::rng()
2637            .sample_iter(&rand::distr::Alphanumeric)
2638            .take(n)
2639            .map(char::from)
2640            .collect::<String>()
2641            .parse()
2642    }
2643}
2644
2645impl FromStr for FencingToken {
2646    type Err = ValidationError;
2647
2648    fn from_str(s: &str) -> Result<Self, Self::Err> {
2649        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2650            return Err(ValidationError(format!(
2651                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2652            )));
2653        }
2654        Ok(FencingToken(s.to_string()))
2655    }
2656}
2657
2658impl std::fmt::Display for FencingToken {
2659    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2660        write!(f, "{}", self.0)
2661    }
2662}
2663
2664impl Deref for FencingToken {
2665    type Target = str;
2666
2667    fn deref(&self) -> &Self::Target {
2668        &self.0
2669    }
2670}
2671
2672#[derive(Debug, Clone, Copy, PartialEq)]
2673#[non_exhaustive]
2674/// A position in a stream.
2675pub struct StreamPosition {
2676    /// Sequence number assigned by the service.
2677    pub seq_num: u64,
2678    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2679    /// User-specified timestamps are passed through as-is.
2680    pub timestamp: u64,
2681}
2682
2683impl std::fmt::Display for StreamPosition {
2684    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2685        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2686    }
2687}
2688
2689impl From<api::stream::proto::StreamPosition> for StreamPosition {
2690    fn from(value: api::stream::proto::StreamPosition) -> Self {
2691        Self {
2692            seq_num: value.seq_num,
2693            timestamp: value.timestamp,
2694        }
2695    }
2696}
2697
2698impl From<api::stream::StreamPosition> for StreamPosition {
2699    fn from(value: api::stream::StreamPosition) -> Self {
2700        Self {
2701            seq_num: value.seq_num,
2702            timestamp: value.timestamp,
2703        }
2704    }
2705}
2706
2707#[derive(Debug, Clone, PartialEq)]
2708#[non_exhaustive]
2709/// A name-value pair.
2710pub struct Header {
2711    /// Name.
2712    pub name: Bytes,
2713    /// Value.
2714    pub value: Bytes,
2715}
2716
2717impl Header {
2718    /// Create a new [`Header`] with the given name and value.
2719    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2720        Self {
2721            name: name.into(),
2722            value: value.into(),
2723        }
2724    }
2725}
2726
2727impl From<Header> for api::stream::proto::Header {
2728    fn from(value: Header) -> Self {
2729        Self {
2730            name: value.name,
2731            value: value.value,
2732        }
2733    }
2734}
2735
2736impl From<api::stream::proto::Header> for Header {
2737    fn from(value: api::stream::proto::Header) -> Self {
2738        Self {
2739            name: value.name,
2740            value: value.value,
2741        }
2742    }
2743}
2744
2745#[derive(Debug, Clone, PartialEq)]
2746/// A record to append.
2747pub struct AppendRecord {
2748    body: Bytes,
2749    headers: Vec<Header>,
2750    timestamp: Option<u64>,
2751}
2752
2753impl AppendRecord {
2754    fn validate(self) -> Result<Self, ValidationError> {
2755        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2756            Err(ValidationError(format!(
2757                "metered_bytes: {} exceeds {}",
2758                self.metered_bytes(),
2759                RECORD_BATCH_MAX.bytes
2760            )))
2761        } else {
2762            Ok(self)
2763        }
2764    }
2765
2766    /// Create a new [`AppendRecord`] with the given record body.
2767    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2768        let record = Self {
2769            body: body.into(),
2770            headers: Vec::default(),
2771            timestamp: None,
2772        };
2773        record.validate()
2774    }
2775
2776    /// Set the headers for this record.
2777    pub fn with_headers(
2778        self,
2779        headers: impl IntoIterator<Item = Header>,
2780    ) -> Result<Self, ValidationError> {
2781        let record = Self {
2782            headers: headers.into_iter().collect(),
2783            ..self
2784        };
2785        record.validate()
2786    }
2787
2788    /// Set the timestamp for this record.
2789    ///
2790    /// Precise semantics depend on [`StreamConfig::timestamping`].
2791    pub fn with_timestamp(self, timestamp: u64) -> Self {
2792        Self {
2793            timestamp: Some(timestamp),
2794            ..self
2795        }
2796    }
2797
2798    /// Get the body of this record.
2799    pub fn body(&self) -> &[u8] {
2800        &self.body
2801    }
2802
2803    /// Get the headers of this record.
2804    pub fn headers(&self) -> &[Header] {
2805        &self.headers
2806    }
2807
2808    /// Get the timestamp of this record.
2809    pub fn timestamp(&self) -> Option<u64> {
2810        self.timestamp
2811    }
2812}
2813
2814impl From<AppendRecord> for api::stream::proto::AppendRecord {
2815    fn from(value: AppendRecord) -> Self {
2816        Self {
2817            timestamp: value.timestamp,
2818            headers: value.headers.into_iter().map(Into::into).collect(),
2819            body: value.body,
2820        }
2821    }
2822}
2823
2824/// Metered byte size calculation.
2825///
2826/// Formula for a record:
2827/// ```text
2828/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2829/// ```
2830pub trait MeteredBytes {
2831    /// Returns the metered byte size.
2832    fn metered_bytes(&self) -> usize;
2833}
2834
2835macro_rules! metered_bytes_impl {
2836    ($ty:ty) => {
2837        impl MeteredBytes for $ty {
2838            fn metered_bytes(&self) -> usize {
2839                8 + (2 * self.headers.len())
2840                    + self
2841                        .headers
2842                        .iter()
2843                        .map(|h| h.name.len() + h.value.len())
2844                        .sum::<usize>()
2845                    + self.body.len()
2846            }
2847        }
2848    };
2849}
2850
2851metered_bytes_impl!(AppendRecord);
2852
2853#[derive(Debug, Clone)]
2854/// A batch of records to append atomically.
2855///
2856/// **Note:** It must contain at least `1` record and no more than `1000`.
2857/// The total size of the batch must not exceed `1MiB` in metered bytes.
2858///
2859/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
2860/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
2861/// that takes care of the abovementioned constraints.
2862pub struct AppendRecordBatch {
2863    records: Vec<AppendRecord>,
2864    metered_bytes: usize,
2865}
2866
2867impl AppendRecordBatch {
2868    pub(crate) fn with_capacity(capacity: usize) -> Self {
2869        Self {
2870            records: Vec::with_capacity(capacity),
2871            metered_bytes: 0,
2872        }
2873    }
2874
2875    pub(crate) fn push(&mut self, record: AppendRecord) {
2876        self.metered_bytes += record.metered_bytes();
2877        self.records.push(record);
2878    }
2879
2880    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
2881    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2882    where
2883        I: IntoIterator<Item = AppendRecord>,
2884    {
2885        let mut records = Vec::new();
2886        let mut metered_bytes = 0;
2887
2888        for record in iter {
2889            metered_bytes += record.metered_bytes();
2890            records.push(record);
2891
2892            if metered_bytes > RECORD_BATCH_MAX.bytes {
2893                return Err(ValidationError(format!(
2894                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
2895                    RECORD_BATCH_MAX.bytes
2896                )));
2897            }
2898
2899            if records.len() > RECORD_BATCH_MAX.count {
2900                return Err(ValidationError(format!(
2901                    "number of records in the batch exceeds {}",
2902                    RECORD_BATCH_MAX.count
2903                )));
2904            }
2905        }
2906
2907        if records.is_empty() {
2908            return Err(ValidationError("batch is empty".into()));
2909        }
2910
2911        Ok(Self {
2912            records,
2913            metered_bytes,
2914        })
2915    }
2916}
2917
2918impl Deref for AppendRecordBatch {
2919    type Target = [AppendRecord];
2920
2921    fn deref(&self) -> &Self::Target {
2922        &self.records
2923    }
2924}
2925
2926impl MeteredBytes for AppendRecordBatch {
2927    fn metered_bytes(&self) -> usize {
2928        self.metered_bytes
2929    }
2930}
2931
2932#[derive(Debug, Clone)]
2933/// Command to signal an operation.
2934pub enum Command {
2935    /// Fence operation.
2936    Fence {
2937        /// Fencing token.
2938        fencing_token: FencingToken,
2939    },
2940    /// Trim operation.
2941    Trim {
2942        /// Trim point.
2943        trim_point: u64,
2944    },
2945}
2946
2947#[derive(Debug, Clone)]
2948#[non_exhaustive]
2949/// Command record for signaling operations to the service.
2950///
2951/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
2952pub struct CommandRecord {
2953    /// Command to signal an operation.
2954    pub command: Command,
2955    /// Timestamp for this record.
2956    pub timestamp: Option<u64>,
2957}
2958
2959impl CommandRecord {
2960    const FENCE: &[u8] = b"fence";
2961    const TRIM: &[u8] = b"trim";
2962
2963    /// Create a fence command record with the given fencing token.
2964    ///
2965    /// Fencing is strongly consistent, and subsequent appends that specify a
2966    /// fencing token will fail if it does not match.
2967    pub fn fence(fencing_token: FencingToken) -> Self {
2968        Self {
2969            command: Command::Fence { fencing_token },
2970            timestamp: None,
2971        }
2972    }
2973
2974    /// Create a trim command record with the given trim point.
2975    ///
2976    /// Trim point is the desired earliest sequence number for the stream.
2977    ///
2978    /// Trimming is eventually consistent, and trimmed records may be visible
2979    /// for a brief period.
2980    pub fn trim(trim_point: u64) -> Self {
2981        Self {
2982            command: Command::Trim { trim_point },
2983            timestamp: None,
2984        }
2985    }
2986
2987    /// Set the timestamp for this record.
2988    pub fn with_timestamp(self, timestamp: u64) -> Self {
2989        Self {
2990            timestamp: Some(timestamp),
2991            ..self
2992        }
2993    }
2994}
2995
2996impl From<CommandRecord> for AppendRecord {
2997    fn from(value: CommandRecord) -> Self {
2998        let (header_value, body) = match value.command {
2999            Command::Fence { fencing_token } => (
3000                CommandRecord::FENCE,
3001                Bytes::copy_from_slice(fencing_token.as_bytes()),
3002            ),
3003            Command::Trim { trim_point } => (
3004                CommandRecord::TRIM,
3005                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
3006            ),
3007        };
3008        Self {
3009            body,
3010            headers: vec![Header::new("", header_value)],
3011            timestamp: value.timestamp,
3012        }
3013    }
3014}
3015
3016#[derive(Debug, Clone)]
3017#[non_exhaustive]
3018/// Input for [`append`](crate::S2Stream::append) operation and
3019/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
3020pub struct AppendInput {
3021    /// Batch of records to append atomically.
3022    pub records: AppendRecordBatch,
3023    /// Expected sequence number for the first record in the batch.
3024    ///
3025    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
3026    pub match_seq_num: Option<u64>,
3027    /// Fencing token to match against the stream's current fencing token.
3028    ///
3029    /// If unspecified, no matching is performed. If specified and mismatched,
3030    /// the append fails. A stream defaults to `""` as its fencing token.
3031    pub fencing_token: Option<FencingToken>,
3032}
3033
3034impl AppendInput {
3035    /// Create a new [`AppendInput`] with the given batch of records.
3036    pub fn new(records: AppendRecordBatch) -> Self {
3037        Self {
3038            records,
3039            match_seq_num: None,
3040            fencing_token: None,
3041        }
3042    }
3043
3044    /// Set the expected sequence number for the first record in the batch.
3045    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
3046        Self {
3047            match_seq_num: Some(match_seq_num),
3048            ..self
3049        }
3050    }
3051
3052    /// Set the fencing token to match against the stream's current fencing token.
3053    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3054        Self {
3055            fencing_token: Some(fencing_token),
3056            ..self
3057        }
3058    }
3059}
3060
3061impl From<AppendInput> for api::stream::proto::AppendInput {
3062    fn from(value: AppendInput) -> Self {
3063        Self {
3064            records: value.records.iter().cloned().map(Into::into).collect(),
3065            match_seq_num: value.match_seq_num,
3066            fencing_token: value.fencing_token.map(|t| t.to_string()),
3067        }
3068    }
3069}
3070
3071#[derive(Debug, Clone, PartialEq)]
3072#[non_exhaustive]
3073/// Acknowledgement for an [`AppendInput`].
3074pub struct AppendAck {
3075    /// Sequence number and timestamp of the first record that was appended.
3076    pub start: StreamPosition,
3077    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3078    /// that was appended.
3079    ///
3080    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3081    /// appended.
3082    pub end: StreamPosition,
3083    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3084    /// the last record on the stream.
3085    ///
3086    /// This can be greater than the `end` position in case of concurrent appends.
3087    pub tail: StreamPosition,
3088}
3089
3090impl From<api::stream::proto::AppendAck> for AppendAck {
3091    fn from(value: api::stream::proto::AppendAck) -> Self {
3092        Self {
3093            start: value.start.unwrap_or_default().into(),
3094            end: value.end.unwrap_or_default().into(),
3095            tail: value.tail.unwrap_or_default().into(),
3096        }
3097    }
3098}
3099
3100#[derive(Debug, Clone, Copy)]
3101/// Starting position for reading from a stream.
3102pub enum ReadFrom {
3103    /// Read from this sequence number.
3104    SeqNum(u64),
3105    /// Read from this timestamp.
3106    Timestamp(u64),
3107    /// Read from N records before the tail.
3108    TailOffset(u64),
3109}
3110
3111impl Default for ReadFrom {
3112    fn default() -> Self {
3113        Self::SeqNum(0)
3114    }
3115}
3116
3117#[derive(Debug, Default, Clone)]
3118#[non_exhaustive]
3119/// Where to start reading.
3120pub struct ReadStart {
3121    /// Starting position.
3122    ///
3123    /// Defaults to reading from sequence number `0`.
3124    pub from: ReadFrom,
3125    /// Whether to start from tail if the requested starting position is beyond it.
3126    ///
3127    /// Defaults to `false` (errors if position is beyond tail).
3128    pub clamp_to_tail: bool,
3129}
3130
3131impl ReadStart {
3132    /// Create a new [`ReadStart`] with default values.
3133    pub fn new() -> Self {
3134        Self::default()
3135    }
3136
3137    /// Set the starting position.
3138    pub fn with_from(self, from: ReadFrom) -> Self {
3139        Self { from, ..self }
3140    }
3141
3142    /// Set whether to start from tail if the requested starting position is beyond it.
3143    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3144        Self {
3145            clamp_to_tail,
3146            ..self
3147        }
3148    }
3149}
3150
3151impl From<ReadStart> for api::stream::ReadStart {
3152    fn from(value: ReadStart) -> Self {
3153        let (seq_num, timestamp, tail_offset) = match value.from {
3154            ReadFrom::SeqNum(n) => (Some(n), None, None),
3155            ReadFrom::Timestamp(t) => (None, Some(t), None),
3156            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3157        };
3158        Self {
3159            seq_num,
3160            timestamp,
3161            tail_offset,
3162            clamp: if value.clamp_to_tail {
3163                Some(true)
3164            } else {
3165                None
3166            },
3167        }
3168    }
3169}
3170
3171#[derive(Debug, Clone, Default)]
3172#[non_exhaustive]
3173/// Limits on how much to read.
3174pub struct ReadLimits {
3175    /// Limit on number of records.
3176    ///
3177    /// Defaults to `1000` for non-streaming read.
3178    pub count: Option<usize>,
3179    /// Limit on total metered bytes of records.
3180    ///
3181    /// Defaults to `1MiB` for non-streaming read.
3182    pub bytes: Option<usize>,
3183}
3184
3185impl ReadLimits {
3186    /// Create a new [`ReadLimits`] with default values.
3187    pub fn new() -> Self {
3188        Self::default()
3189    }
3190
3191    /// Set the limit on number of records.
3192    pub fn with_count(self, count: usize) -> Self {
3193        Self {
3194            count: Some(count),
3195            ..self
3196        }
3197    }
3198
3199    /// Set the limit on total metered bytes of records.
3200    pub fn with_bytes(self, bytes: usize) -> Self {
3201        Self {
3202            bytes: Some(bytes),
3203            ..self
3204        }
3205    }
3206}
3207
3208#[derive(Debug, Clone, Default)]
3209#[non_exhaustive]
3210/// When to stop reading.
3211pub struct ReadStop {
3212    /// Limits on how much to read.
3213    ///
3214    /// See [`ReadLimits`] for defaults.
3215    pub limits: ReadLimits,
3216    /// Timestamp at which to stop (exclusive).
3217    ///
3218    /// Defaults to `None`.
3219    pub until: Option<RangeTo<u64>>,
3220    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3221    /// seconds for [`read`](crate::S2Stream::read).
3222    ///
3223    /// Defaults to:
3224    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3225    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3226    ///   is specified.
3227    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3228    ///   `until` is specified.
3229    pub wait: Option<u32>,
3230}
3231
3232impl ReadStop {
3233    /// Create a new [`ReadStop`] with default values.
3234    pub fn new() -> Self {
3235        Self::default()
3236    }
3237
3238    /// Set the limits on how much to read.
3239    pub fn with_limits(self, limits: ReadLimits) -> Self {
3240        Self { limits, ..self }
3241    }
3242
3243    /// Set the timestamp at which to stop (exclusive).
3244    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3245        Self {
3246            until: Some(until),
3247            ..self
3248        }
3249    }
3250
3251    /// Set the duration in seconds to wait for new records before stopping.
3252    pub fn with_wait(self, wait: u32) -> Self {
3253        Self {
3254            wait: Some(wait),
3255            ..self
3256        }
3257    }
3258}
3259
3260impl From<ReadStop> for api::stream::ReadEnd {
3261    fn from(value: ReadStop) -> Self {
3262        Self {
3263            count: value.limits.count,
3264            bytes: value.limits.bytes,
3265            until: value.until.map(|r| r.end),
3266            wait: value.wait,
3267        }
3268    }
3269}
3270
3271#[derive(Debug, Clone, Default)]
3272#[non_exhaustive]
3273/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3274/// operations.
3275pub struct ReadInput {
3276    /// Where to start reading.
3277    ///
3278    /// See [`ReadStart`] for defaults.
3279    pub start: ReadStart,
3280    /// When to stop reading.
3281    ///
3282    /// See [`ReadStop`] for defaults.
3283    pub stop: ReadStop,
3284    /// Whether to filter out command records from the stream when reading.
3285    ///
3286    /// Defaults to `false`.
3287    pub ignore_command_records: bool,
3288}
3289
3290impl ReadInput {
3291    /// Create a new [`ReadInput`] with default values.
3292    pub fn new() -> Self {
3293        Self::default()
3294    }
3295
3296    /// Set where to start reading.
3297    pub fn with_start(self, start: ReadStart) -> Self {
3298        Self { start, ..self }
3299    }
3300
3301    /// Set when to stop reading.
3302    pub fn with_stop(self, stop: ReadStop) -> Self {
3303        Self { stop, ..self }
3304    }
3305
3306    /// Set whether to filter out command records from the stream when reading.
3307    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3308        Self {
3309            ignore_command_records,
3310            ..self
3311        }
3312    }
3313}
3314
3315#[derive(Debug, Clone)]
3316#[non_exhaustive]
3317/// Record that is durably sequenced on a stream.
3318pub struct SequencedRecord {
3319    /// Sequence number assigned to this record.
3320    pub seq_num: u64,
3321    /// Body of this record.
3322    pub body: Bytes,
3323    /// Headers for this record.
3324    pub headers: Vec<Header>,
3325    /// Timestamp for this record.
3326    pub timestamp: u64,
3327}
3328
3329impl SequencedRecord {
3330    /// Whether this is a command record.
3331    pub fn is_command_record(&self) -> bool {
3332        self.headers.len() == 1 && *self.headers[0].name == *b""
3333    }
3334}
3335
3336impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3337    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3338        Self {
3339            seq_num: value.seq_num,
3340            body: value.body,
3341            headers: value.headers.into_iter().map(Into::into).collect(),
3342            timestamp: value.timestamp,
3343        }
3344    }
3345}
3346
3347metered_bytes_impl!(SequencedRecord);
3348
3349#[derive(Debug, Clone)]
3350#[non_exhaustive]
3351/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3352/// [`read_session`](crate::S2Stream::read_session).
3353pub struct ReadBatch {
3354    /// Records that are durably sequenced on the stream.
3355    ///
3356    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3357    /// - the [`stop condition`](ReadInput::stop) was already met, or
3358    /// - all records in the batch were command records and
3359    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3360    pub records: Vec<SequencedRecord>,
3361    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3362    /// the last record.
3363    ///
3364    /// It will only be present when reading recent records.
3365    pub tail: Option<StreamPosition>,
3366}
3367
3368impl ReadBatch {
3369    pub(crate) fn from_api(
3370        batch: api::stream::proto::ReadBatch,
3371        ignore_command_records: bool,
3372    ) -> Self {
3373        Self {
3374            records: batch
3375                .records
3376                .into_iter()
3377                .map(Into::into)
3378                .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3379                .collect(),
3380            tail: batch.tail.map(Into::into),
3381        }
3382    }
3383}
3384
3385/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3386pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3387
3388#[derive(Debug, Clone, thiserror::Error)]
3389/// Why an append condition check failed.
3390pub enum AppendConditionFailed {
3391    #[error("fencing token mismatch, expected: {0}")]
3392    /// Fencing token did not match. Contains the expected fencing token.
3393    FencingTokenMismatch(FencingToken),
3394    #[error("sequence number mismatch, expected: {0}")]
3395    /// Sequence number did not match. Contains the expected sequence number.
3396    SeqNumMismatch(u64),
3397}
3398
3399impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3400    fn from(value: api::stream::AppendConditionFailed) -> Self {
3401        match value {
3402            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3403                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3404            }
3405            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3406                AppendConditionFailed::SeqNumMismatch(seq)
3407            }
3408        }
3409    }
3410}
3411
3412#[derive(Debug, Clone, thiserror::Error)]
3413/// Errors from S2 operations.
3414pub enum S2Error {
3415    #[error("{0}")]
3416    /// Client-side error.
3417    Client(String),
3418    #[error(transparent)]
3419    /// Validation error.
3420    Validation(#[from] ValidationError),
3421    #[error("{0}")]
3422    /// Append condition check failed. Contains the failure reason.
3423    AppendConditionFailed(AppendConditionFailed),
3424    #[error("read from an unwritten position. current tail: {0}")]
3425    /// Read from an unwritten position. Contains the current tail.
3426    ReadUnwritten(StreamPosition),
3427    #[error("{0}")]
3428    /// Other server-side error.
3429    Server(ErrorResponse),
3430}
3431
3432impl From<ApiError> for S2Error {
3433    fn from(err: ApiError) -> Self {
3434        match err {
3435            ApiError::ReadUnwritten(tail_response) => {
3436                Self::ReadUnwritten(tail_response.tail.into())
3437            }
3438            ApiError::AppendConditionFailed(condition_failed) => {
3439                Self::AppendConditionFailed(condition_failed.into())
3440            }
3441            ApiError::Server(_, response) => Self::Server(response.into()),
3442            other => Self::Client(other.to_string()),
3443        }
3444    }
3445}
3446
3447#[derive(Debug, Clone, thiserror::Error)]
3448#[error("{code}: {message}")]
3449#[non_exhaustive]
3450/// Error response from S2 server.
3451pub struct ErrorResponse {
3452    /// Error code.
3453    pub code: String,
3454    /// Error message.
3455    pub message: String,
3456}
3457
3458impl From<ApiErrorResponse> for ErrorResponse {
3459    fn from(response: ApiErrorResponse) -> Self {
3460        Self {
3461            code: response.code,
3462            message: response.message,
3463        }
3464    }
3465}
3466
3467fn idempotency_token() -> String {
3468    uuid::Uuid::new_v4().simple().to_string()
3469}