s2_sdk/
types.rs

1//! Types relevant to [`S2`](crate::S2), [`S2Basin`](crate::S2Basin), and
2//! [`S2Stream`](crate::S2Stream).
3use std::{
4    collections::HashSet,
5    env::VarError,
6    fmt,
7    num::NonZeroU32,
8    ops::{Deref, RangeTo},
9    pin::Pin,
10    str::FromStr,
11    time::Duration,
12};
13
14use bytes::Bytes;
15use http::{
16    header::HeaderValue,
17    uri::{Authority, Scheme},
18};
19use rand::Rng;
20use s2_api::{v1 as api, v1::stream::s2s::CompressionAlgorithm};
21pub use s2_common::caps::RECORD_BATCH_MAX;
22/// Validation error.
23pub use s2_common::types::ValidationError;
24/// Access token ID.
25///
26/// **Note:** It must be unique to the account and between 1 and 96 bytes in length.
27pub use s2_common::types::access::AccessTokenId;
28/// See [`ListAccessTokensInput::prefix`].
29pub use s2_common::types::access::AccessTokenIdPrefix;
30/// See [`ListAccessTokensInput::start_after`].
31pub use s2_common::types::access::AccessTokenIdStartAfter;
32/// Basin name.
33///
34/// **Note:** It must be globally unique and between 8 and 48 bytes in length. It can only
35/// comprise lowercase letters, numbers, and hyphens. It cannot begin or end with a hyphen.
36pub use s2_common::types::basin::BasinName;
37/// See [`ListBasinsInput::prefix`].
38pub use s2_common::types::basin::BasinNamePrefix;
39/// See [`ListBasinsInput::start_after`].
40pub use s2_common::types::basin::BasinNameStartAfter;
41/// Stream name.
42///
43/// **Note:** It must be unique to the basin and between 1 and 512 bytes in length.
44pub use s2_common::types::stream::StreamName;
45/// See [`ListStreamsInput::prefix`].
46pub use s2_common::types::stream::StreamNamePrefix;
47/// See [`ListStreamsInput::start_after`].
48pub use s2_common::types::stream::StreamNameStartAfter;
49
50pub(crate) const ONE_MIB: u32 = 1024 * 1024;
51
52use s2_common::{maybe::Maybe, record::MAX_FENCING_TOKEN_LENGTH};
53use secrecy::SecretString;
54
55use crate::api::{ApiError, ApiErrorResponse};
56
57/// An RFC 3339 datetime.
58///
59/// It can be created in either of the following ways:
60/// - Parse an RFC 3339 datetime string using [`FromStr`] or [`str::parse`].
61/// - Convert from [`time::OffsetDateTime`] using [`From`]/[`Into`].
62#[derive(Debug, Clone, Copy, PartialEq)]
63pub struct S2DateTime(time::OffsetDateTime);
64
65impl From<time::OffsetDateTime> for S2DateTime {
66    fn from(dt: time::OffsetDateTime) -> Self {
67        Self(dt)
68    }
69}
70
71impl From<S2DateTime> for time::OffsetDateTime {
72    fn from(dt: S2DateTime) -> Self {
73        dt.0
74    }
75}
76
77impl FromStr for S2DateTime {
78    type Err = ValidationError;
79
80    fn from_str(s: &str) -> Result<Self, Self::Err> {
81        time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339)
82            .map(Self)
83            .map_err(|e| ValidationError(format!("invalid datetime: {e}")))
84    }
85}
86
87impl fmt::Display for S2DateTime {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        write!(
90            f,
91            "{}",
92            self.0
93                .format(&time::format_description::well_known::Rfc3339)
94                .expect("RFC3339 formatting should not fail for S2DateTime")
95        )
96    }
97}
98
99/// Authority for connecting to an S2 basin.
100#[derive(Debug, Clone, PartialEq)]
101pub enum BasinAuthority {
102    /// Parent zone for basins. DNS is used to route to the correct cell for the basin.
103    ParentZone(Authority),
104    /// Direct cell authority. Basin is expected to be hosted by this cell.
105    Direct(Authority),
106}
107
108#[derive(Debug, Clone)]
109#[non_exhaustive]
110/// Endpoints for the S2 environment.
111pub struct S2Endpoints {
112    /// URI scheme.
113    ///
114    /// Defaults to `https`.
115    pub scheme: Scheme,
116    /// Account authority.
117    pub account_authority: Authority,
118    /// Basin authority.
119    pub basin_authority: BasinAuthority,
120}
121
122impl S2Endpoints {
123    /// Create a new [`S2Endpoints`] with the given account and basin authorities.
124    pub fn new(account_authority: Authority, basin_authority: BasinAuthority) -> Self {
125        Self {
126            scheme: Scheme::HTTPS,
127            account_authority,
128            basin_authority,
129        }
130    }
131
132    /// Create a new [`S2Endpoints`] from environment variables.
133    ///
134    /// The following environment variables are expected to be set:
135    /// - `S2_ACCOUNT_ENDPOINT` - Account-level endpoint.
136    /// - `S2_BASIN_ENDPOINT` - Basin-level endpoint.
137    pub fn from_env() -> Result<Self, ValidationError> {
138        fn parse_account_endpoint(s: &str) -> Result<(Scheme, Authority), ValidationError> {
139            let (scheme, authority) = match s.find("://") {
140                Some(idx) => {
141                    let scheme: Scheme = s[..idx]
142                        .parse()
143                        .map_err(|_| "invalid S2_ACCOUNT_ENDPOINT scheme".to_string())?;
144                    (scheme, &s[idx + 3..])
145                }
146                None => (Scheme::HTTPS, s),
147            };
148            Ok((
149                scheme,
150                authority
151                    .parse()
152                    .map_err(|e| format!("invalid S2_ACCOUNT_ENDPOINT authority: {e}"))?,
153            ))
154        }
155
156        fn parse_basin_endpoint(s: &str) -> Result<(Scheme, BasinAuthority), ValidationError> {
157            let (scheme, authority) = match s.find("://") {
158                Some(idx) => {
159                    let scheme: Scheme = s[..idx]
160                        .parse()
161                        .map_err(|_| "invalid S2_BASIN_ENDPOINT scheme".to_string())?;
162                    (scheme, &s[idx + 3..])
163                }
164                None => (Scheme::HTTPS, s),
165            };
166            let authority = if let Some(authority) = authority.strip_prefix("{basin}.") {
167                BasinAuthority::ParentZone(
168                    authority
169                        .parse()
170                        .map_err(|e| format!("invalid S2_BASIN_ENDPOINT authority: {e}"))?,
171                )
172            } else {
173                BasinAuthority::Direct(
174                    authority
175                        .parse()
176                        .map_err(|e| format!("invalid S2_BASIN_ENDPOINT authority: {e}"))?,
177                )
178            };
179            Ok((scheme, authority))
180        }
181
182        let (account_scheme, account_authority) = match std::env::var("S2_ACCOUNT_ENDPOINT") {
183            Ok(s) => parse_account_endpoint(&s)?,
184            Err(VarError::NotPresent) => return Err("S2_ACCOUNT_ENDPOINT env var not set".into()),
185            Err(VarError::NotUnicode(_)) => {
186                return Err("S2_ACCOUNT_ENDPOINT is not valid unicode".into());
187            }
188        };
189
190        let (basin_scheme, basin_authority) = match std::env::var("S2_BASIN_ENDPOINT") {
191            Ok(s) => parse_basin_endpoint(&s)?,
192            Err(VarError::NotPresent) => return Err("S2_BASIN_ENDPOINT env var not set".into()),
193            Err(VarError::NotUnicode(_)) => {
194                return Err("S2_BASIN_ENDPOINT is not valid unicode".into());
195            }
196        };
197
198        if account_scheme != basin_scheme {
199            return Err(
200                "S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT must have the same scheme".into(),
201            );
202        }
203
204        Ok(S2Endpoints::new(account_authority, basin_authority).with_scheme(account_scheme))
205    }
206
207    /// Set the URI scheme.
208    pub fn with_scheme(self, scheme: Scheme) -> Self {
209        Self { scheme, ..self }
210    }
211
212    pub(crate) fn for_aws() -> Self {
213        Self {
214            scheme: Scheme::HTTPS,
215            account_authority: "aws.s2.dev".try_into().expect("valid authority"),
216            basin_authority: BasinAuthority::ParentZone(
217                "b.aws.s2.dev".try_into().expect("valid authority"),
218            ),
219        }
220    }
221}
222
223#[derive(Debug, Clone, Copy)]
224/// Compression algorithm for request and response bodies.
225pub enum Compression {
226    /// No compression.
227    None,
228    /// Gzip compression.
229    Gzip,
230    /// Zstd compression.
231    Zstd,
232}
233
234impl From<Compression> for CompressionAlgorithm {
235    fn from(value: Compression) -> Self {
236        match value {
237            Compression::None => CompressionAlgorithm::None,
238            Compression::Gzip => CompressionAlgorithm::Gzip,
239            Compression::Zstd => CompressionAlgorithm::Zstd,
240        }
241    }
242}
243
244#[derive(Debug, Clone, Copy, PartialEq)]
245#[non_exhaustive]
246/// Retry policy for [`append`](crate::S2Stream::append) and
247/// [`append_session`](crate::S2Stream::append_session) operations.
248pub enum AppendRetryPolicy {
249    /// Retry all appends. Use when duplicate records on the stream are acceptable.
250    All,
251    /// Only retry appends that include [`match_seq_num`](AppendInput::match_seq_num).
252    NoSideEffects,
253}
254
255impl AppendRetryPolicy {
256    pub(crate) fn is_compliant(&self, input: &AppendInput) -> bool {
257        match self {
258            Self::All => true,
259            Self::NoSideEffects => input.match_seq_num.is_some(),
260        }
261    }
262}
263
264#[derive(Debug, Clone)]
265#[non_exhaustive]
266/// Configuration for retrying requests in case of transient failures.
267///
268/// Exponential backoff with jitter is the retry strategy. Below is the pseudocode for the strategy:
269/// ```text
270/// base_delay = min(min_base_delay · 2ⁿ, max_base_delay)    (n = retry attempt, starting from 0)
271///     jitter = rand[0, base_delay]
272///     delay  = base_delay + jitter
273/// ````
274pub struct RetryConfig {
275    /// Total number of attempts including the initial try. A value of `1` means no retries.
276    ///
277    /// Defaults to `3`.
278    pub max_attempts: NonZeroU32,
279    /// Minimum base delay for retries.
280    ///
281    /// Defaults to `100ms`.
282    pub min_base_delay: Duration,
283    /// Maximum base delay for retries.
284    ///
285    /// Defaults to `1s`.
286    pub max_base_delay: Duration,
287    /// Retry policy for [`append`](crate::S2Stream::append) and
288    /// [`append_session`](crate::S2Stream::append_session) operations.
289    ///
290    /// Defaults to `All`.
291    pub append_retry_policy: AppendRetryPolicy,
292}
293
294impl Default for RetryConfig {
295    fn default() -> Self {
296        Self {
297            max_attempts: NonZeroU32::new(3).expect("valid non-zero u32"),
298            min_base_delay: Duration::from_millis(100),
299            max_base_delay: Duration::from_secs(1),
300            append_retry_policy: AppendRetryPolicy::All,
301        }
302    }
303}
304
305impl RetryConfig {
306    /// Create a new [`RetryConfig`] with default settings.
307    pub fn new() -> Self {
308        Self::default()
309    }
310
311    pub(crate) fn max_retries(&self) -> u32 {
312        self.max_attempts.get() - 1
313    }
314
315    /// Set the total number of attempts including the initial try.
316    pub fn with_max_attempts(self, max_attempts: NonZeroU32) -> Self {
317        Self {
318            max_attempts,
319            ..self
320        }
321    }
322
323    /// Set the minimum base delay for retries.
324    pub fn with_min_base_delay(self, min_base_delay: Duration) -> Self {
325        Self {
326            min_base_delay,
327            ..self
328        }
329    }
330
331    /// Set the maximum base delay for retries.
332    pub fn with_max_base_delay(self, max_base_delay: Duration) -> Self {
333        Self {
334            max_base_delay,
335            ..self
336        }
337    }
338
339    /// Set the retry policy for [`append`](crate::S2Stream::append) and
340    /// [`append_session`](crate::S2Stream::append_session) operations.
341    pub fn with_append_retry_policy(self, append_retry_policy: AppendRetryPolicy) -> Self {
342        Self {
343            append_retry_policy,
344            ..self
345        }
346    }
347}
348
349#[derive(Debug, Clone)]
350#[non_exhaustive]
351/// Configuration for [`S2`](crate::S2).
352pub struct S2Config {
353    pub(crate) access_token: SecretString,
354    pub(crate) endpoints: S2Endpoints,
355    pub(crate) connection_timeout: Duration,
356    pub(crate) request_timeout: Duration,
357    pub(crate) retry: RetryConfig,
358    pub(crate) compression: Compression,
359    pub(crate) user_agent: HeaderValue,
360}
361
362impl S2Config {
363    /// Create a new [`S2Config`] with the given access token and default settings.
364    pub fn new(access_token: impl Into<String>) -> Self {
365        Self {
366            access_token: access_token.into().into(),
367            endpoints: S2Endpoints::for_aws(),
368            connection_timeout: Duration::from_secs(3),
369            request_timeout: Duration::from_secs(5),
370            retry: RetryConfig::new(),
371            compression: Compression::None,
372            user_agent: concat!("s2-sdk-rust/", env!("CARGO_PKG_VERSION"))
373                .parse()
374                .expect("valid user agent"),
375        }
376    }
377
378    /// Set the S2 endpoints to connect to.
379    pub fn with_endpoints(self, endpoints: S2Endpoints) -> Self {
380        Self { endpoints, ..self }
381    }
382
383    /// Set the timeout for establishing a connection to the server.
384    ///
385    /// Defaults to `3s`.
386    pub fn with_connection_timeout(self, connection_timeout: Duration) -> Self {
387        Self {
388            connection_timeout,
389            ..self
390        }
391    }
392
393    /// Set the timeout for requests.
394    ///
395    /// Defaults to `5s`.
396    pub fn with_request_timeout(self, request_timeout: Duration) -> Self {
397        Self {
398            request_timeout,
399            ..self
400        }
401    }
402
403    /// Set the retry configuration for requests.
404    ///
405    /// See [`RetryConfig`] for defaults.
406    pub fn with_retry(self, retry: RetryConfig) -> Self {
407        Self { retry, ..self }
408    }
409
410    /// Set the compression algorithm for requests and responses.
411    ///
412    /// Defaults to no compression.
413    pub fn with_compression(self, compression: Compression) -> Self {
414        Self {
415            compression,
416            ..self
417        }
418    }
419
420    #[doc(hidden)]
421    #[cfg(feature = "_hidden")]
422    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Result<Self, ValidationError> {
423        let user_agent = user_agent
424            .into()
425            .parse()
426            .map_err(|e| ValidationError(format!("invalid user agent: {e}")))?;
427        Ok(Self { user_agent, ..self })
428    }
429}
430
431#[derive(Debug, Default, Clone, PartialEq, Eq)]
432#[non_exhaustive]
433/// A page of values.
434pub struct Page<T> {
435    /// Values in this page.
436    pub values: Vec<T>,
437    /// Whether there are more pages.
438    pub has_more: bool,
439}
440
441impl<T> Page<T> {
442    pub(crate) fn new(values: impl Into<Vec<T>>, has_more: bool) -> Self {
443        Self {
444            values: values.into(),
445            has_more,
446        }
447    }
448}
449
450#[derive(Debug, Clone, Copy, PartialEq, Eq)]
451/// Storage class for recent appends.
452pub enum StorageClass {
453    /// Standard storage class that offers append latencies under `500ms`.
454    Standard,
455    /// Express storage class that offers append latencies under `50ms`.
456    Express,
457}
458
459impl From<api::config::StorageClass> for StorageClass {
460    fn from(value: api::config::StorageClass) -> Self {
461        match value {
462            api::config::StorageClass::Standard => StorageClass::Standard,
463            api::config::StorageClass::Express => StorageClass::Express,
464        }
465    }
466}
467
468impl From<StorageClass> for api::config::StorageClass {
469    fn from(value: StorageClass) -> Self {
470        match value {
471            StorageClass::Standard => api::config::StorageClass::Standard,
472            StorageClass::Express => api::config::StorageClass::Express,
473        }
474    }
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
478/// Retention policy for records in a stream.
479pub enum RetentionPolicy {
480    /// Age in seconds. Records older than this age are automatically trimmed.
481    Age(u64),
482    /// Records are retained indefinitely unless explicitly trimmed.
483    Infinite,
484}
485
486impl From<api::config::RetentionPolicy> for RetentionPolicy {
487    fn from(value: api::config::RetentionPolicy) -> Self {
488        match value {
489            api::config::RetentionPolicy::Age(secs) => RetentionPolicy::Age(secs),
490            api::config::RetentionPolicy::Infinite(_) => RetentionPolicy::Infinite,
491        }
492    }
493}
494
495impl From<RetentionPolicy> for api::config::RetentionPolicy {
496    fn from(value: RetentionPolicy) -> Self {
497        match value {
498            RetentionPolicy::Age(secs) => api::config::RetentionPolicy::Age(secs),
499            RetentionPolicy::Infinite => {
500                api::config::RetentionPolicy::Infinite(api::config::InfiniteRetention {})
501            }
502        }
503    }
504}
505
506#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507/// Timestamping mode for appends that influences how timestamps are handled.
508pub enum TimestampingMode {
509    /// Prefer client-specified timestamp if present otherwise use arrival time.
510    ClientPrefer,
511    /// Require a client-specified timestamp and reject the append if it is missing.
512    ClientRequire,
513    /// Use the arrival time and ignore any client-specified timestamp.
514    Arrival,
515}
516
517impl From<api::config::TimestampingMode> for TimestampingMode {
518    fn from(value: api::config::TimestampingMode) -> Self {
519        match value {
520            api::config::TimestampingMode::ClientPrefer => TimestampingMode::ClientPrefer,
521            api::config::TimestampingMode::ClientRequire => TimestampingMode::ClientRequire,
522            api::config::TimestampingMode::Arrival => TimestampingMode::Arrival,
523        }
524    }
525}
526
527impl From<TimestampingMode> for api::config::TimestampingMode {
528    fn from(value: TimestampingMode) -> Self {
529        match value {
530            TimestampingMode::ClientPrefer => api::config::TimestampingMode::ClientPrefer,
531            TimestampingMode::ClientRequire => api::config::TimestampingMode::ClientRequire,
532            TimestampingMode::Arrival => api::config::TimestampingMode::Arrival,
533        }
534    }
535}
536
537#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
538#[non_exhaustive]
539/// Configuration for timestamping behavior.
540pub struct TimestampingConfig {
541    /// Timestamping mode for appends that influences how timestamps are handled.
542    ///
543    /// Defaults to [`ClientPrefer`](TimestampingMode::ClientPrefer).
544    pub mode: Option<TimestampingMode>,
545    /// Whether client-specified timestamps are allowed to exceed the arrival time.
546    ///
547    /// Defaults to `false` (client timestamps are capped at the arrival time).
548    pub uncapped: bool,
549}
550
551impl TimestampingConfig {
552    /// Create a new [`TimestampingConfig`] with default settings.
553    pub fn new() -> Self {
554        Self::default()
555    }
556
557    /// Set the timestamping mode for appends that influences how timestamps are handled.
558    pub fn with_mode(self, mode: TimestampingMode) -> Self {
559        Self {
560            mode: Some(mode),
561            ..self
562        }
563    }
564
565    /// Set whether client-specified timestamps are allowed to exceed the arrival time.
566    pub fn with_uncapped(self, uncapped: bool) -> Self {
567        Self { uncapped, ..self }
568    }
569}
570
571impl From<api::config::TimestampingConfig> for TimestampingConfig {
572    fn from(value: api::config::TimestampingConfig) -> Self {
573        Self {
574            mode: value.mode.map(Into::into),
575            uncapped: value.uncapped.unwrap_or_default(),
576        }
577    }
578}
579
580impl From<TimestampingConfig> for api::config::TimestampingConfig {
581    fn from(value: TimestampingConfig) -> Self {
582        Self {
583            mode: value.mode.map(Into::into),
584            uncapped: Some(value.uncapped),
585        }
586    }
587}
588
589#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
590#[non_exhaustive]
591/// Configuration for automatically deleting a stream when it becomes empty.
592pub struct DeleteOnEmptyConfig {
593    /// Minimum age in seconds before an empty stream can be deleted.
594    ///
595    /// Defaults to `0` (disables automatic deletion).
596    pub min_age_secs: u64,
597}
598
599impl DeleteOnEmptyConfig {
600    /// Create a new [`DeleteOnEmptyConfig`] with default settings.
601    pub fn new() -> Self {
602        Self::default()
603    }
604
605    /// Set the minimum age in seconds before an empty stream can be deleted.
606    pub fn with_min_age(self, min_age: Duration) -> Self {
607        Self {
608            min_age_secs: min_age.as_secs(),
609        }
610    }
611}
612
613impl From<api::config::DeleteOnEmptyConfig> for DeleteOnEmptyConfig {
614    fn from(value: api::config::DeleteOnEmptyConfig) -> Self {
615        Self {
616            min_age_secs: value.min_age_secs,
617        }
618    }
619}
620
621impl From<DeleteOnEmptyConfig> for api::config::DeleteOnEmptyConfig {
622    fn from(value: DeleteOnEmptyConfig) -> Self {
623        Self {
624            min_age_secs: value.min_age_secs,
625        }
626    }
627}
628
629#[derive(Debug, Clone, Default, PartialEq, Eq)]
630#[non_exhaustive]
631/// Configuration for a stream.
632pub struct StreamConfig {
633    /// Storage class for the stream.
634    ///
635    /// Defaults to [`Express`](StorageClass::Express).
636    pub storage_class: Option<StorageClass>,
637    /// Retention policy for records in the stream.
638    ///
639    /// Defaults to `7 days` of retention.
640    pub retention_policy: Option<RetentionPolicy>,
641    /// Configuration for timestamping behavior.
642    ///
643    /// See [`TimestampingConfig`] for defaults.
644    pub timestamping: Option<TimestampingConfig>,
645    /// Configuration for automatically deleting the stream when it becomes empty.
646    ///
647    /// See [`DeleteOnEmptyConfig`] for defaults.
648    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
649}
650
651impl StreamConfig {
652    /// Create a new [`StreamConfig`] with default settings.
653    pub fn new() -> Self {
654        Self::default()
655    }
656
657    /// Set the storage class for the stream.
658    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
659        Self {
660            storage_class: Some(storage_class),
661            ..self
662        }
663    }
664
665    /// Set the retention policy for records in the stream.
666    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
667        Self {
668            retention_policy: Some(retention_policy),
669            ..self
670        }
671    }
672
673    /// Set the configuration for timestamping behavior.
674    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
675        Self {
676            timestamping: Some(timestamping),
677            ..self
678        }
679    }
680
681    /// Set the configuration for automatically deleting the stream when it becomes empty.
682    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
683        Self {
684            delete_on_empty: Some(delete_on_empty),
685            ..self
686        }
687    }
688}
689
690impl From<api::config::StreamConfig> for StreamConfig {
691    fn from(value: api::config::StreamConfig) -> Self {
692        Self {
693            storage_class: value.storage_class.map(Into::into),
694            retention_policy: value.retention_policy.map(Into::into),
695            timestamping: value.timestamping.map(Into::into),
696            delete_on_empty: value.delete_on_empty.map(Into::into),
697        }
698    }
699}
700
701impl From<StreamConfig> for api::config::StreamConfig {
702    fn from(value: StreamConfig) -> Self {
703        Self {
704            storage_class: value.storage_class.map(Into::into),
705            retention_policy: value.retention_policy.map(Into::into),
706            timestamping: value.timestamping.map(Into::into),
707            delete_on_empty: value.delete_on_empty.map(Into::into),
708        }
709    }
710}
711
712#[derive(Debug, Clone, Default)]
713#[non_exhaustive]
714/// Configuration for a basin.
715pub struct BasinConfig {
716    /// Default configuration for all streams in the basin.
717    ///
718    /// See [`StreamConfig`] for defaults.
719    pub default_stream_config: Option<StreamConfig>,
720    /// Whether to create stream on append if it doesn't exist using default stream configuration.
721    ///
722    /// Defaults to `false`.
723    pub create_stream_on_append: bool,
724    /// Whether to create stream on read if it doesn't exist using default stream configuration.
725    ///
726    /// Defaults to `false`.
727    pub create_stream_on_read: bool,
728}
729
730impl BasinConfig {
731    /// Create a new [`BasinConfig`] with default settings.
732    pub fn new() -> Self {
733        Self::default()
734    }
735
736    /// Set the default configuration for all streams in the basin.
737    pub fn with_default_stream_config(self, config: StreamConfig) -> Self {
738        Self {
739            default_stream_config: Some(config),
740            ..self
741        }
742    }
743
744    /// Set whether to create stream on append if it doesn't exist using default stream
745    /// configuration.
746    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
747        Self {
748            create_stream_on_append,
749            ..self
750        }
751    }
752
753    /// Set whether to create stream on read if it doesn't exist using default stream configuration.
754    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
755        Self {
756            create_stream_on_read,
757            ..self
758        }
759    }
760}
761
762impl From<api::config::BasinConfig> for BasinConfig {
763    fn from(value: api::config::BasinConfig) -> Self {
764        Self {
765            default_stream_config: value.default_stream_config.map(Into::into),
766            create_stream_on_append: value.create_stream_on_append,
767            create_stream_on_read: value.create_stream_on_read,
768        }
769    }
770}
771
772impl From<BasinConfig> for api::config::BasinConfig {
773    fn from(value: BasinConfig) -> Self {
774        Self {
775            default_stream_config: value.default_stream_config.map(Into::into),
776            create_stream_on_append: value.create_stream_on_append,
777            create_stream_on_read: value.create_stream_on_read,
778        }
779    }
780}
781
782#[derive(Debug, Clone, PartialEq, Eq)]
783/// Scope of a basin.
784pub enum BasinScope {
785    /// AWS `us-east-1` region.
786    AwsUsEast1,
787}
788
789impl From<api::basin::BasinScope> for BasinScope {
790    fn from(value: api::basin::BasinScope) -> Self {
791        match value {
792            api::basin::BasinScope::AwsUsEast1 => BasinScope::AwsUsEast1,
793        }
794    }
795}
796
797impl From<BasinScope> for api::basin::BasinScope {
798    fn from(value: BasinScope) -> Self {
799        match value {
800            BasinScope::AwsUsEast1 => api::basin::BasinScope::AwsUsEast1,
801        }
802    }
803}
804
805#[derive(Debug, Clone)]
806#[non_exhaustive]
807/// Input for [`create_basin`](crate::S2::create_basin) operation.
808pub struct CreateBasinInput {
809    /// Basin name.
810    pub name: BasinName,
811    /// Configuration for the basin.
812    ///
813    /// See [`BasinConfig`] for defaults.
814    pub config: Option<BasinConfig>,
815    /// Scope of the basin.
816    ///
817    /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1).
818    pub scope: Option<BasinScope>,
819    /// Idempotency token for the operation.
820    ///
821    /// When a request is retried, the same token is reused,
822    /// causing the server to return the original response instead of
823    /// performing the operation again.
824    ///
825    /// Defaults to a random UUID.
826    pub idempotency_token: String,
827}
828
829impl CreateBasinInput {
830    /// Create a new [`CreateBasinInput`] with the given basin name.
831    pub fn new(name: BasinName) -> Self {
832        Self {
833            name,
834            config: None,
835            scope: None,
836            idempotency_token: idempotency_token(),
837        }
838    }
839
840    /// Set the configuration for the basin.
841    pub fn with_config(self, config: BasinConfig) -> Self {
842        Self {
843            config: Some(config),
844            ..self
845        }
846    }
847
848    /// Set the scope of the basin.
849    pub fn with_scope(self, scope: BasinScope) -> Self {
850        Self {
851            scope: Some(scope),
852            ..self
853        }
854    }
855
856    /// Set the idempotency token for the operation.
857    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
858        Self {
859            idempotency_token: idempotency_token.into(),
860            ..self
861        }
862    }
863}
864
865impl From<CreateBasinInput> for (api::basin::CreateBasinRequest, String) {
866    fn from(value: CreateBasinInput) -> Self {
867        (
868            api::basin::CreateBasinRequest {
869                basin: value.name,
870                config: value.config.map(Into::into),
871                scope: value.scope.map(Into::into),
872            },
873            value.idempotency_token,
874        )
875    }
876}
877
878#[derive(Debug, Clone, Default)]
879#[non_exhaustive]
880/// Input for [`list_basins`](crate::S2::list_basins) operation.
881pub struct ListBasinsInput {
882    /// Filter basins whose names begin with this value.
883    ///
884    /// Defaults to `""`.
885    pub prefix: BasinNamePrefix,
886    /// Filter basins whose names are lexicographically greater than this value.
887    ///
888    /// **Note:** It must be greater than or equal to [`prefix`](ListBasinsInput::prefix).
889    ///
890    /// Defaults to `""`.
891    pub start_after: BasinNameStartAfter,
892    /// Number of basins to return in a page. Will be clamped to a maximum of `1000`.
893    ///
894    /// Defaults to `1000`.
895    pub limit: Option<usize>,
896}
897
898impl ListBasinsInput {
899    /// Create a new [`ListBasinsInput`] with default values.
900    pub fn new() -> Self {
901        Self::default()
902    }
903
904    /// Set the prefix used to filter basins whose names begin with this value.
905    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
906        Self { prefix, ..self }
907    }
908
909    /// Set the value used to filter basins whose names are lexicographically greater than this
910    /// value.
911    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
912        Self {
913            start_after,
914            ..self
915        }
916    }
917
918    /// Set the limit on number of basins to return in a page.
919    pub fn with_limit(self, limit: usize) -> Self {
920        Self {
921            limit: Some(limit),
922            ..self
923        }
924    }
925}
926
927impl From<ListBasinsInput> for api::basin::ListBasinsRequest {
928    fn from(value: ListBasinsInput) -> Self {
929        Self {
930            prefix: Some(value.prefix),
931            start_after: Some(value.start_after),
932            limit: value.limit,
933        }
934    }
935}
936
937#[derive(Debug, Clone, Default)]
938/// Input for [`S2::list_all_basins`](crate::S2::list_all_basins).
939pub struct ListAllBasinsInput {
940    /// Filter basins whose names begin with this value.
941    ///
942    /// Defaults to `""`.
943    pub prefix: BasinNamePrefix,
944    /// Filter basins whose names are lexicographically greater than this value.
945    ///
946    /// **Note:** It must be greater than or equal to [`prefix`](ListAllBasinsInput::prefix).
947    ///
948    /// Defaults to `""`.
949    pub start_after: BasinNameStartAfter,
950    /// Whether to ignore basins that have been requested for deletion but not yet deleted.
951    ///
952    /// Defaults to `false`.
953    pub ignore_pending_deletions: bool,
954}
955
956impl ListAllBasinsInput {
957    /// Create a new [`ListAllBasinsInput`] with default values.
958    pub fn new() -> Self {
959        Self::default()
960    }
961
962    /// Set the prefix used to filter basins whose names begin with this value.
963    pub fn with_prefix(self, prefix: BasinNamePrefix) -> Self {
964        Self { prefix, ..self }
965    }
966
967    /// Set the value used to filter basins whose names are lexicographically greater than this
968    /// value.
969    pub fn with_start_after(self, start_after: BasinNameStartAfter) -> Self {
970        Self {
971            start_after,
972            ..self
973        }
974    }
975
976    /// Set whether to ignore basins that have been requested for deletion but not yet deleted.
977    pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
978        Self {
979            ignore_pending_deletions,
980            ..self
981        }
982    }
983}
984
985#[derive(Debug, Clone, PartialEq, Eq)]
986/// Current state of a basin.
987pub enum BasinState {
988    /// Active
989    Active,
990    /// Creating
991    Creating,
992    /// Deleting
993    Deleting,
994}
995
996impl From<api::basin::BasinState> for BasinState {
997    fn from(value: api::basin::BasinState) -> Self {
998        match value {
999            api::basin::BasinState::Active => BasinState::Active,
1000            api::basin::BasinState::Creating => BasinState::Creating,
1001            api::basin::BasinState::Deleting => BasinState::Deleting,
1002        }
1003    }
1004}
1005
1006#[derive(Debug, Clone, PartialEq, Eq)]
1007#[non_exhaustive]
1008/// Basin information.
1009pub struct BasinInfo {
1010    /// Basin name.
1011    pub name: BasinName,
1012    /// Scope of the basin.
1013    pub scope: Option<BasinScope>,
1014    /// Current state of the basin.
1015    pub state: BasinState,
1016}
1017
1018impl From<api::basin::BasinInfo> for BasinInfo {
1019    fn from(value: api::basin::BasinInfo) -> Self {
1020        Self {
1021            name: value.name,
1022            scope: value.scope.map(Into::into),
1023            state: value.state.into(),
1024        }
1025    }
1026}
1027
1028#[derive(Debug, Clone)]
1029#[non_exhaustive]
1030/// Input for [`delete_basin`](crate::S2::delete_basin) operation.
1031pub struct DeleteBasinInput {
1032    /// Basin name.
1033    pub name: BasinName,
1034    /// Whether to ignore `Not Found` error if the basin doesn't exist.
1035    pub ignore_not_found: bool,
1036}
1037
1038impl DeleteBasinInput {
1039    /// Create a new [`DeleteBasinInput`] with the given basin name.
1040    pub fn new(name: BasinName) -> Self {
1041        Self {
1042            name,
1043            ignore_not_found: false,
1044        }
1045    }
1046
1047    /// Set whether to ignore `Not Found` error if the basin is not existing.
1048    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
1049        Self {
1050            ignore_not_found,
1051            ..self
1052        }
1053    }
1054}
1055
1056#[derive(Debug, Clone, Default)]
1057#[non_exhaustive]
1058/// Reconfiguration for [`TimestampingConfig`].
1059pub struct TimestampingReconfiguration {
1060    /// Override for the existing [`mode`](TimestampingConfig::mode).
1061    pub mode: Maybe<Option<TimestampingMode>>,
1062    /// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
1063    pub uncapped: Maybe<Option<bool>>,
1064}
1065
1066impl TimestampingReconfiguration {
1067    /// Create a new [`TimestampingReconfiguration`].
1068    pub fn new() -> Self {
1069        Self::default()
1070    }
1071
1072    /// Set the override for the existing [`mode`](TimestampingConfig::mode).
1073    pub fn with_mode(self, mode: TimestampingMode) -> Self {
1074        Self {
1075            mode: Maybe::Specified(Some(mode)),
1076            ..self
1077        }
1078    }
1079
1080    /// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
1081    pub fn with_uncapped(self, uncapped: bool) -> Self {
1082        Self {
1083            uncapped: Maybe::Specified(Some(uncapped)),
1084            ..self
1085        }
1086    }
1087}
1088
1089impl From<TimestampingReconfiguration> for api::config::TimestampingReconfiguration {
1090    fn from(value: TimestampingReconfiguration) -> Self {
1091        Self {
1092            mode: value.mode.map(|m| m.map(Into::into)),
1093            uncapped: value.uncapped,
1094        }
1095    }
1096}
1097
1098#[derive(Debug, Clone, Default)]
1099#[non_exhaustive]
1100/// Reconfiguration for [`DeleteOnEmptyConfig`].
1101pub struct DeleteOnEmptyReconfiguration {
1102    /// Override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1103    pub min_age_secs: Maybe<Option<u64>>,
1104}
1105
1106impl DeleteOnEmptyReconfiguration {
1107    /// Create a new [`DeleteOnEmptyReconfiguration`].
1108    pub fn new() -> Self {
1109        Self::default()
1110    }
1111
1112    /// Set the override for the existing [`min_age_secs`](DeleteOnEmptyConfig::min_age_secs).
1113    pub fn with_min_age(self, min_age: Duration) -> Self {
1114        Self {
1115            min_age_secs: Maybe::Specified(Some(min_age.as_secs())),
1116        }
1117    }
1118}
1119
1120impl From<DeleteOnEmptyReconfiguration> for api::config::DeleteOnEmptyReconfiguration {
1121    fn from(value: DeleteOnEmptyReconfiguration) -> Self {
1122        Self {
1123            min_age_secs: value.min_age_secs,
1124        }
1125    }
1126}
1127
1128#[derive(Debug, Clone, Default)]
1129#[non_exhaustive]
1130/// Reconfiguration for [`StreamConfig`].
1131pub struct StreamReconfiguration {
1132    /// Override for the existing [`storage_class`](StreamConfig::storage_class).
1133    pub storage_class: Maybe<Option<StorageClass>>,
1134    /// Override for the existing [`retention_policy`](StreamConfig::retention_policy).
1135    pub retention_policy: Maybe<Option<RetentionPolicy>>,
1136    /// Override for the existing [`timestamping`](StreamConfig::timestamping).
1137    pub timestamping: Maybe<Option<TimestampingReconfiguration>>,
1138    /// Override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1139    pub delete_on_empty: Maybe<Option<DeleteOnEmptyReconfiguration>>,
1140}
1141
1142impl StreamReconfiguration {
1143    /// Create a new [`StreamReconfiguration`].
1144    pub fn new() -> Self {
1145        Self::default()
1146    }
1147
1148    /// Set the override for the existing [`storage_class`](StreamConfig::storage_class).
1149    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
1150        Self {
1151            storage_class: Maybe::Specified(Some(storage_class)),
1152            ..self
1153        }
1154    }
1155
1156    /// Set the override for the existing [`retention_policy`](StreamConfig::retention_policy).
1157    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
1158        Self {
1159            retention_policy: Maybe::Specified(Some(retention_policy)),
1160            ..self
1161        }
1162    }
1163
1164    /// Set the override for the existing [`timestamping`](StreamConfig::timestamping).
1165    pub fn with_timestamping(self, timestamping: TimestampingReconfiguration) -> Self {
1166        Self {
1167            timestamping: Maybe::Specified(Some(timestamping)),
1168            ..self
1169        }
1170    }
1171
1172    /// Set the override for the existing [`delete_on_empty`](StreamConfig::delete_on_empty).
1173    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyReconfiguration) -> Self {
1174        Self {
1175            delete_on_empty: Maybe::Specified(Some(delete_on_empty)),
1176            ..self
1177        }
1178    }
1179}
1180
1181impl From<StreamReconfiguration> for api::config::StreamReconfiguration {
1182    fn from(value: StreamReconfiguration) -> Self {
1183        Self {
1184            storage_class: value.storage_class.map(|m| m.map(Into::into)),
1185            retention_policy: value.retention_policy.map(|m| m.map(Into::into)),
1186            timestamping: value.timestamping.map(|m| m.map(Into::into)),
1187            delete_on_empty: value.delete_on_empty.map(|m| m.map(Into::into)),
1188        }
1189    }
1190}
1191
1192#[derive(Debug, Clone, Default)]
1193#[non_exhaustive]
1194/// Reconfiguration for [`BasinConfig`].
1195pub struct BasinReconfiguration {
1196    /// Override for the existing [`default_stream_config`](BasinConfig::default_stream_config).
1197    pub default_stream_config: Maybe<Option<StreamReconfiguration>>,
1198    /// Override for the existing
1199    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1200    pub create_stream_on_append: Maybe<bool>,
1201    /// Override for the existing [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1202    pub create_stream_on_read: Maybe<bool>,
1203}
1204
1205impl BasinReconfiguration {
1206    /// Create a new [`BasinReconfiguration`].
1207    pub fn new() -> Self {
1208        Self::default()
1209    }
1210
1211    /// Set the override for the existing
1212    /// [`default_stream_config`](BasinConfig::default_stream_config).
1213    pub fn with_default_stream_config(self, config: StreamReconfiguration) -> Self {
1214        Self {
1215            default_stream_config: Maybe::Specified(Some(config)),
1216            ..self
1217        }
1218    }
1219
1220    /// Set the override for the existing
1221    /// [`create_stream_on_append`](BasinConfig::create_stream_on_append).
1222    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
1223        Self {
1224            create_stream_on_append: Maybe::Specified(create_stream_on_append),
1225            ..self
1226        }
1227    }
1228
1229    /// Set the override for the existing
1230    /// [`create_stream_on_read`](BasinConfig::create_stream_on_read).
1231    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
1232        Self {
1233            create_stream_on_read: Maybe::Specified(create_stream_on_read),
1234            ..self
1235        }
1236    }
1237}
1238
1239impl From<BasinReconfiguration> for api::config::BasinReconfiguration {
1240    fn from(value: BasinReconfiguration) -> Self {
1241        Self {
1242            default_stream_config: value.default_stream_config.map(|m| m.map(Into::into)),
1243            create_stream_on_append: value.create_stream_on_append,
1244            create_stream_on_read: value.create_stream_on_read,
1245        }
1246    }
1247}
1248
1249#[derive(Debug, Clone)]
1250#[non_exhaustive]
1251/// Input for [`reconfigure_basin`](crate::S2::reconfigure_basin) operation.
1252pub struct ReconfigureBasinInput {
1253    /// Basin name.
1254    pub name: BasinName,
1255    /// Reconfiguration for [`BasinConfig`].
1256    pub config: BasinReconfiguration,
1257}
1258
1259impl ReconfigureBasinInput {
1260    /// Create a new [`ReconfigureBasinInput`] with the given basin name and reconfiguration.
1261    pub fn new(name: BasinName, config: BasinReconfiguration) -> Self {
1262        Self { name, config }
1263    }
1264}
1265
1266#[derive(Debug, Clone, Default)]
1267#[non_exhaustive]
1268/// Input for [`list_access_tokens`](crate::S2::list_access_tokens) operation.
1269pub struct ListAccessTokensInput {
1270    /// Filter access tokens whose IDs begin with this value.
1271    ///
1272    /// Defaults to `""`.
1273    pub prefix: AccessTokenIdPrefix,
1274    /// Filter access tokens whose IDs are lexicographically greater than this value.
1275    ///
1276    /// **Note:** It must be greater than or equal to [`prefix`](ListAccessTokensInput::prefix).
1277    ///
1278    /// Defaults to `""`.
1279    pub start_after: AccessTokenIdStartAfter,
1280    /// Number of access tokens to return in a page. Will be clamped to a maximum of `1000`.
1281    ///
1282    /// Defaults to `1000`.
1283    pub limit: Option<usize>,
1284}
1285
1286impl ListAccessTokensInput {
1287    /// Create a new [`ListAccessTokensInput`] with default values.
1288    pub fn new() -> Self {
1289        Self::default()
1290    }
1291
1292    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1293    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1294        Self { prefix, ..self }
1295    }
1296
1297    /// Set the value used to filter access tokens whose IDs are lexicographically greater than this
1298    /// value.
1299    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1300        Self {
1301            start_after,
1302            ..self
1303        }
1304    }
1305
1306    /// Set the limit on number of access tokens to return in a page.
1307    pub fn with_limit(self, limit: usize) -> Self {
1308        Self {
1309            limit: Some(limit),
1310            ..self
1311        }
1312    }
1313}
1314
1315impl From<ListAccessTokensInput> for api::access::ListAccessTokensRequest {
1316    fn from(value: ListAccessTokensInput) -> Self {
1317        Self {
1318            prefix: Some(value.prefix),
1319            start_after: Some(value.start_after),
1320            limit: value.limit,
1321        }
1322    }
1323}
1324
1325#[derive(Debug, Clone, Default)]
1326/// Input for [`S2::list_all_access_tokens`](crate::S2::list_all_access_tokens).
1327pub struct ListAllAccessTokensInput {
1328    /// Filter access tokens whose IDs begin with this value.
1329    ///
1330    /// Defaults to `""`.
1331    pub prefix: AccessTokenIdPrefix,
1332    /// Filter access tokens whose IDs are lexicographically greater than this value.
1333    ///
1334    /// **Note:** It must be greater than or equal to [`prefix`](ListAllAccessTokensInput::prefix).
1335    ///
1336    /// Defaults to `""`.
1337    pub start_after: AccessTokenIdStartAfter,
1338}
1339
1340impl ListAllAccessTokensInput {
1341    /// Create a new [`ListAllAccessTokensInput`] with default values.
1342    pub fn new() -> Self {
1343        Self::default()
1344    }
1345
1346    /// Set the prefix used to filter access tokens whose IDs begin with this value.
1347    pub fn with_prefix(self, prefix: AccessTokenIdPrefix) -> Self {
1348        Self { prefix, ..self }
1349    }
1350
1351    /// Set the value used to filter access tokens whose IDs are lexicographically greater than
1352    /// this value.
1353    pub fn with_start_after(self, start_after: AccessTokenIdStartAfter) -> Self {
1354        Self {
1355            start_after,
1356            ..self
1357        }
1358    }
1359}
1360
1361#[derive(Debug, Clone)]
1362#[non_exhaustive]
1363/// Access token information.
1364pub struct AccessTokenInfo {
1365    /// Access token ID.
1366    pub id: AccessTokenId,
1367    /// Expiration time.
1368    pub expires_at: S2DateTime,
1369    /// Whether to automatically prefix stream names during creation and strip the prefix during
1370    /// listing.
1371    pub auto_prefix_streams: bool,
1372    /// Scope of the access token.
1373    pub scope: AccessTokenScope,
1374}
1375
1376impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
1377    type Error = ValidationError;
1378
1379    fn try_from(value: api::access::AccessTokenInfo) -> Result<Self, Self::Error> {
1380        let expires_at = value
1381            .expires_at
1382            .map(Into::into)
1383            .ok_or_else(|| ValidationError::from("missing expires_at"))?;
1384        Ok(Self {
1385            id: value.id,
1386            expires_at,
1387            auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
1388            scope: value.scope.into(),
1389        })
1390    }
1391}
1392
1393#[derive(Debug, Clone)]
1394/// Pattern for matching basins.
1395///
1396/// See [`AccessTokenScope::basins`].
1397pub enum BasinMatcher {
1398    /// Match no basins.
1399    None,
1400    /// Match exactly this basin.
1401    Exact(BasinName),
1402    /// Match all basins with this prefix.
1403    Prefix(BasinNamePrefix),
1404}
1405
1406#[derive(Debug, Clone)]
1407/// Pattern for matching streams.
1408///
1409/// See [`AccessTokenScope::streams`].
1410pub enum StreamMatcher {
1411    /// Match no streams.
1412    None,
1413    /// Match exactly this stream.
1414    Exact(StreamName),
1415    /// Match all streams with this prefix.
1416    Prefix(StreamNamePrefix),
1417}
1418
1419#[derive(Debug, Clone)]
1420/// Pattern for matching access tokens.
1421///
1422/// See [`AccessTokenScope::access_tokens`].
1423pub enum AccessTokenMatcher {
1424    /// Match no access tokens.
1425    None,
1426    /// Match exactly this access token.
1427    Exact(AccessTokenId),
1428    /// Match all access tokens with this prefix.
1429    Prefix(AccessTokenIdPrefix),
1430}
1431
1432#[derive(Debug, Clone, Default)]
1433#[non_exhaustive]
1434/// Permissions indicating allowed operations.
1435pub struct ReadWritePermissions {
1436    /// Read permission.
1437    ///
1438    /// Defaults to `false`.
1439    pub read: bool,
1440    /// Write permission.
1441    ///
1442    /// Defaults to `false`.
1443    pub write: bool,
1444}
1445
1446impl ReadWritePermissions {
1447    /// Create a new [`ReadWritePermissions`] with default values.
1448    pub fn new() -> Self {
1449        Self::default()
1450    }
1451
1452    /// Create read-only permissions.
1453    pub fn read_only() -> Self {
1454        Self {
1455            read: true,
1456            write: false,
1457        }
1458    }
1459
1460    /// Create write-only permissions.
1461    pub fn write_only() -> Self {
1462        Self {
1463            read: false,
1464            write: true,
1465        }
1466    }
1467
1468    /// Create read-write permissions.
1469    pub fn read_write() -> Self {
1470        Self {
1471            read: true,
1472            write: true,
1473        }
1474    }
1475}
1476
1477impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
1478    fn from(value: ReadWritePermissions) -> Self {
1479        Self {
1480            read: Some(value.read),
1481            write: Some(value.write),
1482        }
1483    }
1484}
1485
1486impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
1487    fn from(value: api::access::ReadWritePermissions) -> Self {
1488        Self {
1489            read: value.read.unwrap_or_default(),
1490            write: value.write.unwrap_or_default(),
1491        }
1492    }
1493}
1494
1495#[derive(Debug, Clone, Default)]
1496#[non_exhaustive]
1497/// Permissions at the operation group level.
1498///
1499/// See [`AccessTokenScope::op_group_perms`].
1500pub struct OperationGroupPermissions {
1501    /// Account-level access permissions.
1502    ///
1503    /// Defaults to `None`.
1504    pub account: Option<ReadWritePermissions>,
1505    /// Basin-level access permissions.
1506    ///
1507    /// Defaults to `None`.
1508    pub basin: Option<ReadWritePermissions>,
1509    /// Stream-level access permissions.
1510    ///
1511    /// Defaults to `None`.
1512    pub stream: Option<ReadWritePermissions>,
1513}
1514
1515impl OperationGroupPermissions {
1516    /// Create a new [`OperationGroupPermissions`] with default values.
1517    pub fn new() -> Self {
1518        Self::default()
1519    }
1520
1521    /// Create read-only permissions for all groups.
1522    pub fn read_only_all() -> Self {
1523        Self {
1524            account: Some(ReadWritePermissions::read_only()),
1525            basin: Some(ReadWritePermissions::read_only()),
1526            stream: Some(ReadWritePermissions::read_only()),
1527        }
1528    }
1529
1530    /// Create write-only permissions for all groups.
1531    pub fn write_only_all() -> Self {
1532        Self {
1533            account: Some(ReadWritePermissions::write_only()),
1534            basin: Some(ReadWritePermissions::write_only()),
1535            stream: Some(ReadWritePermissions::write_only()),
1536        }
1537    }
1538
1539    /// Create read-write permissions for all groups.
1540    pub fn read_write_all() -> Self {
1541        Self {
1542            account: Some(ReadWritePermissions::read_write()),
1543            basin: Some(ReadWritePermissions::read_write()),
1544            stream: Some(ReadWritePermissions::read_write()),
1545        }
1546    }
1547
1548    /// Set account-level access permissions.
1549    pub fn with_account(self, account: ReadWritePermissions) -> Self {
1550        Self {
1551            account: Some(account),
1552            ..self
1553        }
1554    }
1555
1556    /// Set basin-level access permissions.
1557    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
1558        Self {
1559            basin: Some(basin),
1560            ..self
1561        }
1562    }
1563
1564    /// Set stream-level access permissions.
1565    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
1566        Self {
1567            stream: Some(stream),
1568            ..self
1569        }
1570    }
1571}
1572
1573impl From<OperationGroupPermissions> for api::access::PermittedOperationGroups {
1574    fn from(value: OperationGroupPermissions) -> Self {
1575        Self {
1576            account: value.account.map(Into::into),
1577            basin: value.basin.map(Into::into),
1578            stream: value.stream.map(Into::into),
1579        }
1580    }
1581}
1582
1583impl From<api::access::PermittedOperationGroups> for OperationGroupPermissions {
1584    fn from(value: api::access::PermittedOperationGroups) -> Self {
1585        Self {
1586            account: value.account.map(Into::into),
1587            basin: value.basin.map(Into::into),
1588            stream: value.stream.map(Into::into),
1589        }
1590    }
1591}
1592
1593#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1594/// Individual operation that can be permitted.
1595///
1596/// See [`AccessTokenScope::ops`].
1597pub enum Operation {
1598    /// List basins.
1599    ListBasins,
1600    /// Create a basin.
1601    CreateBasin,
1602    /// Get basin configuration.
1603    GetBasinConfig,
1604    /// Delete a basin.
1605    DeleteBasin,
1606    /// Reconfigure a basin.
1607    ReconfigureBasin,
1608    /// List access tokens.
1609    ListAccessTokens,
1610    /// Issue an access token.
1611    IssueAccessToken,
1612    /// Revoke an access token.
1613    RevokeAccessToken,
1614    /// Get account metrics.
1615    GetAccountMetrics,
1616    /// Get basin metrics.
1617    GetBasinMetrics,
1618    /// Get stream metrics.
1619    GetStreamMetrics,
1620    /// List streams.
1621    ListStreams,
1622    /// Create a stream.
1623    CreateStream,
1624    /// Get stream configuration.
1625    GetStreamConfig,
1626    /// Delete a stream.
1627    DeleteStream,
1628    /// Reconfigure a stream.
1629    ReconfigureStream,
1630    /// Check the tail of a stream.
1631    CheckTail,
1632    /// Append records to a stream.
1633    Append,
1634    /// Read records from a stream.
1635    Read,
1636    /// Trim records on a stream.
1637    Trim,
1638    /// Set the fencing token on a stream.
1639    Fence,
1640}
1641
1642impl From<Operation> for api::access::Operation {
1643    fn from(value: Operation) -> Self {
1644        match value {
1645            Operation::ListBasins => api::access::Operation::ListBasins,
1646            Operation::CreateBasin => api::access::Operation::CreateBasin,
1647            Operation::DeleteBasin => api::access::Operation::DeleteBasin,
1648            Operation::ReconfigureBasin => api::access::Operation::ReconfigureBasin,
1649            Operation::GetBasinConfig => api::access::Operation::GetBasinConfig,
1650            Operation::IssueAccessToken => api::access::Operation::IssueAccessToken,
1651            Operation::RevokeAccessToken => api::access::Operation::RevokeAccessToken,
1652            Operation::ListAccessTokens => api::access::Operation::ListAccessTokens,
1653            Operation::ListStreams => api::access::Operation::ListStreams,
1654            Operation::CreateStream => api::access::Operation::CreateStream,
1655            Operation::DeleteStream => api::access::Operation::DeleteStream,
1656            Operation::GetStreamConfig => api::access::Operation::GetStreamConfig,
1657            Operation::ReconfigureStream => api::access::Operation::ReconfigureStream,
1658            Operation::CheckTail => api::access::Operation::CheckTail,
1659            Operation::Append => api::access::Operation::Append,
1660            Operation::Read => api::access::Operation::Read,
1661            Operation::Trim => api::access::Operation::Trim,
1662            Operation::Fence => api::access::Operation::Fence,
1663            Operation::GetAccountMetrics => api::access::Operation::AccountMetrics,
1664            Operation::GetBasinMetrics => api::access::Operation::BasinMetrics,
1665            Operation::GetStreamMetrics => api::access::Operation::StreamMetrics,
1666        }
1667    }
1668}
1669
1670impl From<api::access::Operation> for Operation {
1671    fn from(value: api::access::Operation) -> Self {
1672        match value {
1673            api::access::Operation::ListBasins => Operation::ListBasins,
1674            api::access::Operation::CreateBasin => Operation::CreateBasin,
1675            api::access::Operation::DeleteBasin => Operation::DeleteBasin,
1676            api::access::Operation::ReconfigureBasin => Operation::ReconfigureBasin,
1677            api::access::Operation::GetBasinConfig => Operation::GetBasinConfig,
1678            api::access::Operation::IssueAccessToken => Operation::IssueAccessToken,
1679            api::access::Operation::RevokeAccessToken => Operation::RevokeAccessToken,
1680            api::access::Operation::ListAccessTokens => Operation::ListAccessTokens,
1681            api::access::Operation::ListStreams => Operation::ListStreams,
1682            api::access::Operation::CreateStream => Operation::CreateStream,
1683            api::access::Operation::DeleteStream => Operation::DeleteStream,
1684            api::access::Operation::GetStreamConfig => Operation::GetStreamConfig,
1685            api::access::Operation::ReconfigureStream => Operation::ReconfigureStream,
1686            api::access::Operation::CheckTail => Operation::CheckTail,
1687            api::access::Operation::Append => Operation::Append,
1688            api::access::Operation::Read => Operation::Read,
1689            api::access::Operation::Trim => Operation::Trim,
1690            api::access::Operation::Fence => Operation::Fence,
1691            api::access::Operation::AccountMetrics => Operation::GetAccountMetrics,
1692            api::access::Operation::BasinMetrics => Operation::GetBasinMetrics,
1693            api::access::Operation::StreamMetrics => Operation::GetStreamMetrics,
1694        }
1695    }
1696}
1697
1698#[derive(Debug, Clone)]
1699#[non_exhaustive]
1700/// Scope of an access token.
1701///
1702/// **Note:** The final set of permitted operations is the union of [`ops`](AccessTokenScope::ops)
1703/// and the operations permitted by [`op_group_perms`](AccessTokenScope::op_group_perms). Also, the
1704/// final set must not be empty.
1705///
1706/// See [`IssueAccessTokenInput::scope`].
1707pub struct AccessTokenScopeInput {
1708    basins: Option<BasinMatcher>,
1709    streams: Option<StreamMatcher>,
1710    access_tokens: Option<AccessTokenMatcher>,
1711    op_group_perms: Option<OperationGroupPermissions>,
1712    ops: HashSet<Operation>,
1713}
1714
1715impl AccessTokenScopeInput {
1716    /// Create a new [`AccessTokenScopeInput`] with the given permitted operations.
1717    pub fn from_ops(ops: impl IntoIterator<Item = Operation>) -> Self {
1718        Self {
1719            basins: None,
1720            streams: None,
1721            access_tokens: None,
1722            op_group_perms: None,
1723            ops: ops.into_iter().collect(),
1724        }
1725    }
1726
1727    /// Create a new [`AccessTokenScopeInput`] with the given operation group permissions.
1728    pub fn from_op_group_perms(op_group_perms: OperationGroupPermissions) -> Self {
1729        Self {
1730            basins: None,
1731            streams: None,
1732            access_tokens: None,
1733            op_group_perms: Some(op_group_perms),
1734            ops: HashSet::default(),
1735        }
1736    }
1737
1738    /// Set the permitted operations.
1739    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
1740        Self {
1741            ops: ops.into_iter().collect(),
1742            ..self
1743        }
1744    }
1745
1746    /// Set the access permissions at the operation group level.
1747    pub fn with_op_group_perms(self, op_group_perms: OperationGroupPermissions) -> Self {
1748        Self {
1749            op_group_perms: Some(op_group_perms),
1750            ..self
1751        }
1752    }
1753
1754    /// Set the permitted basins.
1755    ///
1756    /// Defaults to no basins.
1757    pub fn with_basins(self, basins: BasinMatcher) -> Self {
1758        Self {
1759            basins: Some(basins),
1760            ..self
1761        }
1762    }
1763
1764    /// Set the permitted streams.
1765    ///
1766    /// Defaults to no streams.
1767    pub fn with_streams(self, streams: StreamMatcher) -> Self {
1768        Self {
1769            streams: Some(streams),
1770            ..self
1771        }
1772    }
1773
1774    /// Set the permitted access tokens.
1775    ///
1776    /// Defaults to no access tokens.
1777    pub fn with_access_tokens(self, access_tokens: AccessTokenMatcher) -> Self {
1778        Self {
1779            access_tokens: Some(access_tokens),
1780            ..self
1781        }
1782    }
1783}
1784
1785#[derive(Debug, Clone)]
1786#[non_exhaustive]
1787/// Scope of an access token.
1788pub struct AccessTokenScope {
1789    /// Permitted basins.
1790    pub basins: Option<BasinMatcher>,
1791    /// Permitted streams.
1792    pub streams: Option<StreamMatcher>,
1793    /// Permitted access tokens.
1794    pub access_tokens: Option<AccessTokenMatcher>,
1795    /// Permissions at the operation group level.
1796    pub op_group_perms: Option<OperationGroupPermissions>,
1797    /// Permitted operations.
1798    pub ops: HashSet<Operation>,
1799}
1800
1801impl From<api::access::AccessTokenScope> for AccessTokenScope {
1802    fn from(value: api::access::AccessTokenScope) -> Self {
1803        Self {
1804            basins: value.basins.map(|rs| match rs {
1805                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1806                    BasinMatcher::Exact(e)
1807                }
1808                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1809                    BasinMatcher::None
1810                }
1811                api::access::ResourceSet::Prefix(p) => BasinMatcher::Prefix(p),
1812            }),
1813            streams: value.streams.map(|rs| match rs {
1814                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1815                    StreamMatcher::Exact(e)
1816                }
1817                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1818                    StreamMatcher::None
1819                }
1820                api::access::ResourceSet::Prefix(p) => StreamMatcher::Prefix(p),
1821            }),
1822            access_tokens: value.access_tokens.map(|rs| match rs {
1823                api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e)) => {
1824                    AccessTokenMatcher::Exact(e)
1825                }
1826                api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty) => {
1827                    AccessTokenMatcher::None
1828                }
1829                api::access::ResourceSet::Prefix(p) => AccessTokenMatcher::Prefix(p),
1830            }),
1831            op_group_perms: value.op_groups.map(Into::into),
1832            ops: value
1833                .ops
1834                .map(|ops| ops.into_iter().map(Into::into).collect())
1835                .unwrap_or_default(),
1836        }
1837    }
1838}
1839
1840impl From<AccessTokenScopeInput> for api::access::AccessTokenScope {
1841    fn from(value: AccessTokenScopeInput) -> Self {
1842        Self {
1843            basins: value.basins.map(|rs| match rs {
1844                BasinMatcher::None => {
1845                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1846                }
1847                BasinMatcher::Exact(e) => {
1848                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1849                }
1850                BasinMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1851            }),
1852            streams: value.streams.map(|rs| match rs {
1853                StreamMatcher::None => {
1854                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1855                }
1856                StreamMatcher::Exact(e) => {
1857                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1858                }
1859                StreamMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1860            }),
1861            access_tokens: value.access_tokens.map(|rs| match rs {
1862                AccessTokenMatcher::None => {
1863                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::Empty)
1864                }
1865                AccessTokenMatcher::Exact(e) => {
1866                    api::access::ResourceSet::Exact(api::access::MaybeEmpty::NonEmpty(e))
1867                }
1868                AccessTokenMatcher::Prefix(p) => api::access::ResourceSet::Prefix(p),
1869            }),
1870            op_groups: value.op_group_perms.map(Into::into),
1871            ops: if value.ops.is_empty() {
1872                None
1873            } else {
1874                Some(value.ops.into_iter().map(Into::into).collect())
1875            },
1876        }
1877    }
1878}
1879
1880#[derive(Debug, Clone)]
1881#[non_exhaustive]
1882/// Input for [`issue_access_token`](crate::S2::issue_access_token).
1883pub struct IssueAccessTokenInput {
1884    /// Access token ID.
1885    pub id: AccessTokenId,
1886    /// Expiration time.
1887    ///
1888    /// Defaults to the expiration time of requestor's access token passed via
1889    /// [`S2Config`](S2Config::new).
1890    pub expires_at: Option<S2DateTime>,
1891    /// Whether to automatically prefix stream names during creation and strip the prefix during
1892    /// listing.
1893    ///
1894    /// **Note:** [`scope.streams`](AccessTokenScopeInput::with_streams) must be set with the
1895    /// prefix.
1896    ///
1897    /// Defaults to `false`.
1898    pub auto_prefix_streams: bool,
1899    /// Scope of the token.
1900    pub scope: AccessTokenScopeInput,
1901}
1902
1903impl IssueAccessTokenInput {
1904    /// Create a new [`IssueAccessTokenInput`] with the given id and scope.
1905    pub fn new(id: AccessTokenId, scope: AccessTokenScopeInput) -> Self {
1906        Self {
1907            id,
1908            expires_at: None,
1909            auto_prefix_streams: false,
1910            scope,
1911        }
1912    }
1913
1914    /// Set the expiration time.
1915    pub fn with_expires_at(self, expires_at: S2DateTime) -> Self {
1916        Self {
1917            expires_at: Some(expires_at),
1918            ..self
1919        }
1920    }
1921
1922    /// Set whether to automatically prefix stream names during creation and strip the prefix during
1923    /// listing.
1924    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1925        Self {
1926            auto_prefix_streams,
1927            ..self
1928        }
1929    }
1930}
1931
1932impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
1933    fn from(value: IssueAccessTokenInput) -> Self {
1934        Self {
1935            id: value.id,
1936            expires_at: value.expires_at.map(Into::into),
1937            auto_prefix_streams: value.auto_prefix_streams.then_some(true),
1938            scope: value.scope.into(),
1939        }
1940    }
1941}
1942
1943#[derive(Debug, Clone, Copy)]
1944/// Interval to accumulate over for timeseries metric sets.
1945pub enum TimeseriesInterval {
1946    /// Minute.
1947    Minute,
1948    /// Hour.
1949    Hour,
1950    /// Day.
1951    Day,
1952}
1953
1954impl From<TimeseriesInterval> for api::metrics::TimeseriesInterval {
1955    fn from(value: TimeseriesInterval) -> Self {
1956        match value {
1957            TimeseriesInterval::Minute => api::metrics::TimeseriesInterval::Minute,
1958            TimeseriesInterval::Hour => api::metrics::TimeseriesInterval::Hour,
1959            TimeseriesInterval::Day => api::metrics::TimeseriesInterval::Day,
1960        }
1961    }
1962}
1963
1964impl From<api::metrics::TimeseriesInterval> for TimeseriesInterval {
1965    fn from(value: api::metrics::TimeseriesInterval) -> Self {
1966        match value {
1967            api::metrics::TimeseriesInterval::Minute => TimeseriesInterval::Minute,
1968            api::metrics::TimeseriesInterval::Hour => TimeseriesInterval::Hour,
1969            api::metrics::TimeseriesInterval::Day => TimeseriesInterval::Day,
1970        }
1971    }
1972}
1973
1974#[derive(Debug, Clone, Copy)]
1975#[non_exhaustive]
1976/// Time range as Unix epoch seconds.
1977pub struct TimeRange {
1978    /// Start timestamp (inclusive).
1979    pub start: u32,
1980    /// End timestamp (exclusive).
1981    pub end: u32,
1982}
1983
1984impl TimeRange {
1985    /// Create a new [`TimeRange`] with the given start and end timestamps.
1986    pub fn new(start: u32, end: u32) -> Self {
1987        Self { start, end }
1988    }
1989}
1990
1991#[derive(Debug, Clone, Copy)]
1992#[non_exhaustive]
1993/// Time range as Unix epoch seconds and accumulation interval.
1994pub struct TimeRangeAndInterval {
1995    /// Start timestamp (inclusive).
1996    pub start: u32,
1997    /// End timestamp (exclusive).
1998    pub end: u32,
1999    /// Interval to accumulate over for timeseries metric sets.
2000    ///
2001    /// Default is dependent on the requested metric set.
2002    pub interval: Option<TimeseriesInterval>,
2003}
2004
2005impl TimeRangeAndInterval {
2006    /// Create a new [`TimeRangeAndInterval`] with the given start and end timestamps.
2007    pub fn new(start: u32, end: u32) -> Self {
2008        Self {
2009            start,
2010            end,
2011            interval: None,
2012        }
2013    }
2014
2015    /// Set the interval to accumulate over for timeseries metric sets.
2016    pub fn with_interval(self, interval: TimeseriesInterval) -> Self {
2017        Self {
2018            interval: Some(interval),
2019            ..self
2020        }
2021    }
2022}
2023
2024#[derive(Debug, Clone, Copy)]
2025/// Account metric set to return.
2026pub enum AccountMetricSet {
2027    /// Returns a [`LabelMetric`] representing all basins which had at least one stream within the
2028    /// specified time range.
2029    ActiveBasins(TimeRange),
2030    /// Returns [`AccumulationMetric`]s, one per account operation type.
2031    ///
2032    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2033    /// per interval over the requested time range.
2034    ///
2035    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2036    AccountOps(TimeRangeAndInterval),
2037}
2038
2039#[derive(Debug, Clone)]
2040#[non_exhaustive]
2041/// Input for [`get_account_metrics`](crate::S2::get_account_metrics) operation.
2042pub struct GetAccountMetricsInput {
2043    /// Metric set to return.
2044    pub set: AccountMetricSet,
2045}
2046
2047impl GetAccountMetricsInput {
2048    /// Create a new [`GetAccountMetricsInput`] with the given account metric set.
2049    pub fn new(set: AccountMetricSet) -> Self {
2050        Self { set }
2051    }
2052}
2053
2054impl From<GetAccountMetricsInput> for api::metrics::AccountMetricSetRequest {
2055    fn from(value: GetAccountMetricsInput) -> Self {
2056        let (set, start, end, interval) = match value.set {
2057            AccountMetricSet::ActiveBasins(args) => (
2058                api::metrics::AccountMetricSet::ActiveBasins,
2059                args.start,
2060                args.end,
2061                None,
2062            ),
2063            AccountMetricSet::AccountOps(args) => (
2064                api::metrics::AccountMetricSet::AccountOps,
2065                args.start,
2066                args.end,
2067                args.interval,
2068            ),
2069        };
2070        Self {
2071            set,
2072            start: Some(start),
2073            end: Some(end),
2074            interval: interval.map(Into::into),
2075        }
2076    }
2077}
2078
2079#[derive(Debug, Clone, Copy)]
2080/// Basin metric set to return.
2081pub enum BasinMetricSet {
2082    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes across all streams
2083    /// in the basin, with one observed value for each hour over the requested time range.
2084    Storage(TimeRange),
2085    /// Returns [`AccumulationMetric`]s, one per storage class (standard, express).
2086    ///
2087    /// Each metric represents a timeseries of the number of append operations across all streams
2088    /// in the basin, with one accumulated value per interval over the requested time range.
2089    ///
2090    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2091    /// [`minute`](TimeseriesInterval::Minute).
2092    AppendOps(TimeRangeAndInterval),
2093    /// Returns [`AccumulationMetric`]s, one per read type (unary, streaming).
2094    ///
2095    /// Each metric represents a timeseries of the number of read operations across all streams
2096    /// in the basin, with one accumulated value per interval over the requested time range.
2097    ///
2098    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2099    /// [`minute`](TimeseriesInterval::Minute).
2100    ReadOps(TimeRangeAndInterval),
2101    /// Returns an [`AccumulationMetric`] representing a timeseries of total read bytes
2102    /// across all streams in the basin, with one accumulated value per interval
2103    /// over the requested time range.
2104    ///
2105    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2106    /// [`minute`](TimeseriesInterval::Minute).
2107    ReadThroughput(TimeRangeAndInterval),
2108    /// Returns an [`AccumulationMetric`] representing a timeseries of total appended bytes
2109    /// across all streams in the basin, with one accumulated value per interval
2110    /// over the requested time range.
2111    ///
2112    /// [`interval`](TimeRangeAndInterval::interval) defaults to
2113    /// [`minute`](TimeseriesInterval::Minute).
2114    AppendThroughput(TimeRangeAndInterval),
2115    /// Returns [`AccumulationMetric`]s, one per basin operation type.
2116    ///
2117    /// Each metric represents a timeseries of the number of operations, with one accumulated value
2118    /// per interval over the requested time range.
2119    ///
2120    /// [`interval`](TimeRangeAndInterval::interval) defaults to [`hour`](TimeseriesInterval::Hour).
2121    BasinOps(TimeRangeAndInterval),
2122}
2123
2124#[derive(Debug, Clone)]
2125#[non_exhaustive]
2126/// Input for [`get_basin_metrics`](crate::S2::get_basin_metrics) operation.
2127pub struct GetBasinMetricsInput {
2128    /// Basin name.
2129    pub name: BasinName,
2130    /// Metric set to return.
2131    pub set: BasinMetricSet,
2132}
2133
2134impl GetBasinMetricsInput {
2135    /// Create a new [`GetBasinMetricsInput`] with the given basin name and metric set.
2136    pub fn new(name: BasinName, set: BasinMetricSet) -> Self {
2137        Self { name, set }
2138    }
2139}
2140
2141impl From<GetBasinMetricsInput> for (BasinName, api::metrics::BasinMetricSetRequest) {
2142    fn from(value: GetBasinMetricsInput) -> Self {
2143        let (set, start, end, interval) = match value.set {
2144            BasinMetricSet::Storage(args) => (
2145                api::metrics::BasinMetricSet::Storage,
2146                args.start,
2147                args.end,
2148                None,
2149            ),
2150            BasinMetricSet::AppendOps(args) => (
2151                api::metrics::BasinMetricSet::AppendOps,
2152                args.start,
2153                args.end,
2154                args.interval,
2155            ),
2156            BasinMetricSet::ReadOps(args) => (
2157                api::metrics::BasinMetricSet::ReadOps,
2158                args.start,
2159                args.end,
2160                args.interval,
2161            ),
2162            BasinMetricSet::ReadThroughput(args) => (
2163                api::metrics::BasinMetricSet::ReadThroughput,
2164                args.start,
2165                args.end,
2166                args.interval,
2167            ),
2168            BasinMetricSet::AppendThroughput(args) => (
2169                api::metrics::BasinMetricSet::AppendThroughput,
2170                args.start,
2171                args.end,
2172                args.interval,
2173            ),
2174            BasinMetricSet::BasinOps(args) => (
2175                api::metrics::BasinMetricSet::BasinOps,
2176                args.start,
2177                args.end,
2178                args.interval,
2179            ),
2180        };
2181        (
2182            value.name,
2183            api::metrics::BasinMetricSetRequest {
2184                set,
2185                start: Some(start),
2186                end: Some(end),
2187                interval: interval.map(Into::into),
2188            },
2189        )
2190    }
2191}
2192
2193#[derive(Debug, Clone, Copy)]
2194/// Stream metric set to return.
2195pub enum StreamMetricSet {
2196    /// Returns a [`GaugeMetric`] representing a timeseries of total stored bytes for the stream,
2197    /// with one observed value for each minute over the requested time range.
2198    Storage(TimeRange),
2199}
2200
2201#[derive(Debug, Clone)]
2202#[non_exhaustive]
2203/// Input for [`get_stream_metrics`](crate::S2::get_stream_metrics) operation.
2204pub struct GetStreamMetricsInput {
2205    /// Basin name.
2206    pub basin_name: BasinName,
2207    /// Stream name.
2208    pub stream_name: StreamName,
2209    /// Metric set to return.
2210    pub set: StreamMetricSet,
2211}
2212
2213impl GetStreamMetricsInput {
2214    /// Create a new [`GetStreamMetricsInput`] with the given basin name, stream name and metric
2215    /// set.
2216    pub fn new(basin_name: BasinName, stream_name: StreamName, set: StreamMetricSet) -> Self {
2217        Self {
2218            basin_name,
2219            stream_name,
2220            set,
2221        }
2222    }
2223}
2224
2225impl From<GetStreamMetricsInput> for (BasinName, StreamName, api::metrics::StreamMetricSetRequest) {
2226    fn from(value: GetStreamMetricsInput) -> Self {
2227        let (set, start, end, interval) = match value.set {
2228            StreamMetricSet::Storage(args) => (
2229                api::metrics::StreamMetricSet::Storage,
2230                args.start,
2231                args.end,
2232                None,
2233            ),
2234        };
2235        (
2236            value.basin_name,
2237            value.stream_name,
2238            api::metrics::StreamMetricSetRequest {
2239                set,
2240                start: Some(start),
2241                end: Some(end),
2242                interval,
2243            },
2244        )
2245    }
2246}
2247
2248#[derive(Debug, Clone, Copy)]
2249/// Unit in which metric values are measured.
2250pub enum MetricUnit {
2251    /// Size in bytes.
2252    Bytes,
2253    /// Number of operations.
2254    Operations,
2255}
2256
2257impl From<api::metrics::MetricUnit> for MetricUnit {
2258    fn from(value: api::metrics::MetricUnit) -> Self {
2259        match value {
2260            api::metrics::MetricUnit::Bytes => MetricUnit::Bytes,
2261            api::metrics::MetricUnit::Operations => MetricUnit::Operations,
2262        }
2263    }
2264}
2265
2266#[derive(Debug, Clone)]
2267#[non_exhaustive]
2268/// Single named value.
2269pub struct ScalarMetric {
2270    /// Metric name.
2271    pub name: String,
2272    /// Unit for the metric value.
2273    pub unit: MetricUnit,
2274    /// Metric value.
2275    pub value: f64,
2276}
2277
2278#[derive(Debug, Clone)]
2279#[non_exhaustive]
2280/// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2281/// specified interval.
2282pub struct AccumulationMetric {
2283    /// Timeseries name.
2284    pub name: String,
2285    /// Unit for the accumulated values.
2286    pub unit: MetricUnit,
2287    /// The interval at which datapoints are accumulated.
2288    pub interval: TimeseriesInterval,
2289    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the accumulated
2290    /// `value` for the time period starting at the `timestamp` (in Unix epoch seconds), spanning
2291    /// one `interval`.
2292    pub values: Vec<(u32, f64)>,
2293}
2294
2295#[derive(Debug, Clone)]
2296#[non_exhaustive]
2297/// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2298pub struct GaugeMetric {
2299    /// Timeseries name.
2300    pub name: String,
2301    /// Unit for the instantaneous values.
2302    pub unit: MetricUnit,
2303    /// Series of `(timestamp, value)` datapoints. Each datapoint represents the `value` at the
2304    /// instant of the `timestamp` (in Unix epoch seconds).
2305    pub values: Vec<(u32, f64)>,
2306}
2307
2308#[derive(Debug, Clone)]
2309#[non_exhaustive]
2310/// Set of string labels.
2311pub struct LabelMetric {
2312    /// Label name.
2313    pub name: String,
2314    /// Label values.
2315    pub values: Vec<String>,
2316}
2317
2318#[derive(Debug, Clone)]
2319/// Individual metric in a returned metric set.
2320pub enum Metric {
2321    /// Single named value.
2322    Scalar(ScalarMetric),
2323    /// Named series of `(timestamp, value)` datapoints, each representing an accumulation over a
2324    /// specified interval.
2325    Accumulation(AccumulationMetric),
2326    /// Named series of `(timestamp, value)` datapoints, each representing an instantaneous value.
2327    Gauge(GaugeMetric),
2328    /// Set of string labels.
2329    Label(LabelMetric),
2330}
2331
2332impl From<api::metrics::Metric> for Metric {
2333    fn from(value: api::metrics::Metric) -> Self {
2334        match value {
2335            api::metrics::Metric::Scalar(sm) => Metric::Scalar(ScalarMetric {
2336                name: sm.name.into(),
2337                unit: sm.unit.into(),
2338                value: sm.value,
2339            }),
2340            api::metrics::Metric::Accumulation(am) => Metric::Accumulation(AccumulationMetric {
2341                name: am.name.into(),
2342                unit: am.unit.into(),
2343                interval: am.interval.into(),
2344                values: am.values,
2345            }),
2346            api::metrics::Metric::Gauge(gm) => Metric::Gauge(GaugeMetric {
2347                name: gm.name.into(),
2348                unit: gm.unit.into(),
2349                values: gm.values,
2350            }),
2351            api::metrics::Metric::Label(lm) => Metric::Label(LabelMetric {
2352                name: lm.name.into(),
2353                values: lm.values,
2354            }),
2355        }
2356    }
2357}
2358
2359#[derive(Debug, Clone, Default)]
2360#[non_exhaustive]
2361/// Input for [`list_streams`](crate::S2Basin::list_streams) operation.
2362pub struct ListStreamsInput {
2363    /// Filter streams whose names begin with this value.
2364    ///
2365    /// Defaults to `""`.
2366    pub prefix: StreamNamePrefix,
2367    /// Filter streams whose names are lexicographically greater than this value.
2368    ///
2369    /// **Note:** It must be greater than or equal to [`prefix`](ListStreamsInput::prefix).
2370    ///
2371    /// Defaults to `""`.
2372    pub start_after: StreamNameStartAfter,
2373    ///  Number of streams to return in a page. Will be clamped to a maximum of `1000`.
2374    ///
2375    /// Defaults to `1000`.
2376    pub limit: Option<usize>,
2377}
2378
2379impl ListStreamsInput {
2380    /// Create a new [`ListStreamsInput`] with default values.
2381    pub fn new() -> Self {
2382        Self::default()
2383    }
2384
2385    /// Set the prefix used to filter streams whose names begin with this value.
2386    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2387        Self { prefix, ..self }
2388    }
2389
2390    /// Set the value used to filter streams whose names are lexicographically greater than this
2391    /// value.
2392    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2393        Self {
2394            start_after,
2395            ..self
2396        }
2397    }
2398
2399    /// Set the limit on number of streams to return in a page.
2400    pub fn with_limit(self, limit: usize) -> Self {
2401        Self {
2402            limit: Some(limit),
2403            ..self
2404        }
2405    }
2406}
2407
2408impl From<ListStreamsInput> for api::stream::ListStreamsRequest {
2409    fn from(value: ListStreamsInput) -> Self {
2410        Self {
2411            prefix: Some(value.prefix),
2412            start_after: Some(value.start_after),
2413            limit: value.limit,
2414        }
2415    }
2416}
2417
2418#[derive(Debug, Clone, Default)]
2419/// Input for [`S2Basin::list_all_streams`](crate::S2Basin::list_all_streams).
2420pub struct ListAllStreamsInput {
2421    /// Filter streams whose names begin with this value.
2422    ///
2423    /// Defaults to `""`.
2424    pub prefix: StreamNamePrefix,
2425    /// Filter streams whose names are lexicographically greater than this value.
2426    ///
2427    /// **Note:** It must be greater than or equal to [`prefix`](ListAllStreamsInput::prefix).
2428    ///
2429    /// Defaults to `""`.
2430    pub start_after: StreamNameStartAfter,
2431    /// Whether to ignore streams that have been requested for deletion but not yet deleted.
2432    ///
2433    /// Defaults to `false`.
2434    pub ignore_pending_deletions: bool,
2435}
2436
2437impl ListAllStreamsInput {
2438    /// Create a new [`ListAllStreamsInput`] with default values.
2439    pub fn new() -> Self {
2440        Self::default()
2441    }
2442
2443    /// Set the prefix used to filter streams whose names begin with this value.
2444    pub fn with_prefix(self, prefix: StreamNamePrefix) -> Self {
2445        Self { prefix, ..self }
2446    }
2447
2448    /// Set the value used to filter streams whose names are lexicographically greater than this
2449    /// value.
2450    pub fn with_start_after(self, start_after: StreamNameStartAfter) -> Self {
2451        Self {
2452            start_after,
2453            ..self
2454        }
2455    }
2456
2457    /// Set whether to ignore streams that have been requested for deletion but not yet deleted.
2458    pub fn with_ignore_pending_deletions(self, ignore_pending_deletions: bool) -> Self {
2459        Self {
2460            ignore_pending_deletions,
2461            ..self
2462        }
2463    }
2464}
2465
2466#[derive(Debug, Clone, PartialEq)]
2467#[non_exhaustive]
2468/// Stream information.
2469pub struct StreamInfo {
2470    /// Stream name.
2471    pub name: StreamName,
2472    /// Creation time.
2473    pub created_at: S2DateTime,
2474    /// Deletion time if the stream is being deleted.
2475    pub deleted_at: Option<S2DateTime>,
2476}
2477
2478impl From<api::stream::StreamInfo> for StreamInfo {
2479    fn from(value: api::stream::StreamInfo) -> Self {
2480        Self {
2481            name: value.name,
2482            created_at: value.created_at.into(),
2483            deleted_at: value.deleted_at.map(Into::into),
2484        }
2485    }
2486}
2487
2488#[derive(Debug, Clone)]
2489#[non_exhaustive]
2490/// Input for [`create_stream`](crate::S2Basin::create_stream) operation.
2491pub struct CreateStreamInput {
2492    /// Stream name.
2493    pub name: StreamName,
2494    /// Configuration for the stream.
2495    ///
2496    /// See [`StreamConfig`] for defaults.
2497    pub config: Option<StreamConfig>,
2498    /// Idempotency token for the operation.
2499    ///
2500    /// When a request is retried, the same token is reused,
2501    /// causing the server to return the original response instead of
2502    /// performing the operation again.
2503    ///
2504    /// Defaults to a random UUID.
2505    pub idempotency_token: String,
2506}
2507
2508impl CreateStreamInput {
2509    /// Create a new [`CreateStreamInput`] with the given stream name.
2510    pub fn new(name: StreamName) -> Self {
2511        Self {
2512            name,
2513            config: None,
2514            idempotency_token: idempotency_token(),
2515        }
2516    }
2517
2518    /// Set the configuration for the stream.
2519    pub fn with_config(self, config: StreamConfig) -> Self {
2520        Self {
2521            config: Some(config),
2522            ..self
2523        }
2524    }
2525
2526    /// Set the idempotency token for the operation.
2527    pub fn with_idempotency_token(self, idempotency_token: impl Into<String>) -> Self {
2528        Self {
2529            idempotency_token: idempotency_token.into(),
2530            ..self
2531        }
2532    }
2533}
2534
2535impl From<CreateStreamInput> for (api::stream::CreateStreamRequest, String) {
2536    fn from(value: CreateStreamInput) -> Self {
2537        (
2538            api::stream::CreateStreamRequest {
2539                stream: value.name,
2540                config: value.config.map(Into::into),
2541            },
2542            value.idempotency_token,
2543        )
2544    }
2545}
2546
2547#[derive(Debug, Clone)]
2548#[non_exhaustive]
2549/// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.
2550pub struct DeleteStreamInput {
2551    /// Stream name.
2552    pub name: StreamName,
2553    /// Whether to ignore `Not Found` error if the stream doesn't exist.
2554    pub ignore_not_found: bool,
2555}
2556
2557impl DeleteStreamInput {
2558    /// Create a new [`DeleteStreamInput`] with the given stream name.
2559    pub fn new(name: StreamName) -> Self {
2560        Self {
2561            name,
2562            ignore_not_found: false,
2563        }
2564    }
2565
2566    /// Set whether to ignore `Not Found` error if the stream doesn't exist.
2567    pub fn with_ignore_not_found(self, ignore_not_found: bool) -> Self {
2568        Self {
2569            ignore_not_found,
2570            ..self
2571        }
2572    }
2573}
2574
2575#[derive(Debug, Clone)]
2576#[non_exhaustive]
2577/// Input for [`reconfigure_stream`](crate::S2Basin::reconfigure_stream) operation.
2578pub struct ReconfigureStreamInput {
2579    /// Stream name.
2580    pub name: StreamName,
2581    /// Reconfiguration for [`StreamConfig`].
2582    pub config: StreamReconfiguration,
2583}
2584
2585impl ReconfigureStreamInput {
2586    /// Create a new [`ReconfigureStreamInput`] with the given stream name and reconfiguration.
2587    pub fn new(name: StreamName, config: StreamReconfiguration) -> Self {
2588        Self { name, config }
2589    }
2590}
2591
2592#[derive(Debug, Clone, PartialEq, Eq)]
2593/// Token for fencing appends to a stream.
2594///
2595/// **Note:** It must not exceed 36 bytes in length.
2596///
2597/// See [`CommandRecord::fence`] and [`AppendInput::fencing_token`].
2598pub struct FencingToken(String);
2599
2600impl FencingToken {
2601    /// Generate a random alphanumeric fencing token of `n` bytes.
2602    pub fn generate(n: usize) -> Result<Self, ValidationError> {
2603        rand::rng()
2604            .sample_iter(&rand::distr::Alphanumeric)
2605            .take(n)
2606            .map(char::from)
2607            .collect::<String>()
2608            .parse()
2609    }
2610}
2611
2612impl FromStr for FencingToken {
2613    type Err = ValidationError;
2614
2615    fn from_str(s: &str) -> Result<Self, Self::Err> {
2616        if s.len() > MAX_FENCING_TOKEN_LENGTH {
2617            return Err(ValidationError(format!(
2618                "fencing token exceeds {MAX_FENCING_TOKEN_LENGTH} bytes in length",
2619            )));
2620        }
2621        Ok(FencingToken(s.to_string()))
2622    }
2623}
2624
2625impl std::fmt::Display for FencingToken {
2626    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2627        write!(f, "{}", self.0)
2628    }
2629}
2630
2631impl Deref for FencingToken {
2632    type Target = str;
2633
2634    fn deref(&self) -> &Self::Target {
2635        &self.0
2636    }
2637}
2638
2639#[derive(Debug, Clone, Copy, PartialEq)]
2640#[non_exhaustive]
2641/// A position in a stream.
2642pub struct StreamPosition {
2643    /// Sequence number assigned by the service.
2644    pub seq_num: u64,
2645    /// Timestamp. When assigned by the service, represents milliseconds since Unix epoch.
2646    /// User-specified timestamps are passed through as-is.
2647    pub timestamp: u64,
2648}
2649
2650impl std::fmt::Display for StreamPosition {
2651    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2652        write!(f, "seq_num={}, timestamp={}", self.seq_num, self.timestamp)
2653    }
2654}
2655
2656impl From<api::stream::proto::StreamPosition> for StreamPosition {
2657    fn from(value: api::stream::proto::StreamPosition) -> Self {
2658        Self {
2659            seq_num: value.seq_num,
2660            timestamp: value.timestamp,
2661        }
2662    }
2663}
2664
2665impl From<api::stream::StreamPosition> for StreamPosition {
2666    fn from(value: api::stream::StreamPosition) -> Self {
2667        Self {
2668            seq_num: value.seq_num,
2669            timestamp: value.timestamp,
2670        }
2671    }
2672}
2673
2674#[derive(Debug, Clone, PartialEq)]
2675#[non_exhaustive]
2676/// A name-value pair.
2677pub struct Header {
2678    /// Name.
2679    pub name: Bytes,
2680    /// Value.
2681    pub value: Bytes,
2682}
2683
2684impl Header {
2685    /// Create a new [`Header`] with the given name and value.
2686    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
2687        Self {
2688            name: name.into(),
2689            value: value.into(),
2690        }
2691    }
2692}
2693
2694impl From<Header> for api::stream::proto::Header {
2695    fn from(value: Header) -> Self {
2696        Self {
2697            name: value.name,
2698            value: value.value,
2699        }
2700    }
2701}
2702
2703impl From<api::stream::proto::Header> for Header {
2704    fn from(value: api::stream::proto::Header) -> Self {
2705        Self {
2706            name: value.name,
2707            value: value.value,
2708        }
2709    }
2710}
2711
2712#[derive(Debug, Clone, PartialEq)]
2713/// A record to append.
2714pub struct AppendRecord {
2715    body: Bytes,
2716    headers: Vec<Header>,
2717    timestamp: Option<u64>,
2718}
2719
2720impl AppendRecord {
2721    fn validate(self) -> Result<Self, ValidationError> {
2722        if self.metered_bytes() > RECORD_BATCH_MAX.bytes {
2723            Err(ValidationError(format!(
2724                "metered_bytes: {} exceeds {}",
2725                self.metered_bytes(),
2726                RECORD_BATCH_MAX.bytes
2727            )))
2728        } else {
2729            Ok(self)
2730        }
2731    }
2732
2733    /// Create a new [`AppendRecord`] with the given record body.
2734    pub fn new(body: impl Into<Bytes>) -> Result<Self, ValidationError> {
2735        let record = Self {
2736            body: body.into(),
2737            headers: Vec::default(),
2738            timestamp: None,
2739        };
2740        record.validate()
2741    }
2742
2743    /// Set the headers for this record.
2744    pub fn with_headers(
2745        self,
2746        headers: impl IntoIterator<Item = Header>,
2747    ) -> Result<Self, ValidationError> {
2748        let record = Self {
2749            headers: headers.into_iter().collect(),
2750            ..self
2751        };
2752        record.validate()
2753    }
2754
2755    /// Set the timestamp for this record.
2756    ///
2757    /// Precise semantics depend on [`StreamConfig::timestamping`].
2758    pub fn with_timestamp(self, timestamp: u64) -> Self {
2759        Self {
2760            timestamp: Some(timestamp),
2761            ..self
2762        }
2763    }
2764}
2765
2766impl From<AppendRecord> for api::stream::proto::AppendRecord {
2767    fn from(value: AppendRecord) -> Self {
2768        Self {
2769            timestamp: value.timestamp,
2770            headers: value.headers.into_iter().map(Into::into).collect(),
2771            body: value.body,
2772        }
2773    }
2774}
2775
2776/// Metered byte size calculation.
2777///
2778/// Formula for a record:
2779/// ```text
2780/// 8 + 2 * len(headers) + sum(len(h.name) + len(h.value) for h in headers) + len(body)
2781/// ```
2782pub trait MeteredBytes {
2783    /// Returns the metered byte size.
2784    fn metered_bytes(&self) -> usize;
2785}
2786
2787macro_rules! metered_bytes_impl {
2788    ($ty:ty) => {
2789        impl MeteredBytes for $ty {
2790            fn metered_bytes(&self) -> usize {
2791                8 + (2 * self.headers.len())
2792                    + self
2793                        .headers
2794                        .iter()
2795                        .map(|h| h.name.len() + h.value.len())
2796                        .sum::<usize>()
2797                    + self.body.len()
2798            }
2799        }
2800    };
2801}
2802
2803metered_bytes_impl!(AppendRecord);
2804
2805#[derive(Debug, Clone)]
2806/// A batch of records to append atomically.
2807///
2808/// **Note:** It must contain at least `1` record and no more than `1000`.
2809/// The total size of the batch must not exceed `1MiB` in metered bytes.
2810///
2811/// See [`AppendRecordBatches`](crate::batching::AppendRecordBatches) and
2812/// [`AppendInputs`](crate::batching::AppendInputs) for convenient and automatic batching of records
2813/// that takes care of the abovementioned constraints.
2814pub struct AppendRecordBatch {
2815    records: Vec<AppendRecord>,
2816    metered_bytes: usize,
2817}
2818
2819impl AppendRecordBatch {
2820    pub(crate) fn with_capacity(capacity: usize) -> Self {
2821        Self {
2822            records: Vec::with_capacity(capacity),
2823            metered_bytes: 0,
2824        }
2825    }
2826
2827    pub(crate) fn push(&mut self, record: AppendRecord) {
2828        self.metered_bytes += record.metered_bytes();
2829        self.records.push(record);
2830    }
2831
2832    /// Try to create an [`AppendRecordBatch`] from an iterator of [`AppendRecord`]s.
2833    pub fn try_from_iter<I>(iter: I) -> Result<Self, ValidationError>
2834    where
2835        I: IntoIterator<Item = AppendRecord>,
2836    {
2837        let mut records = Vec::new();
2838        let mut metered_bytes = 0;
2839
2840        for record in iter {
2841            metered_bytes += record.metered_bytes();
2842            records.push(record);
2843
2844            if metered_bytes > RECORD_BATCH_MAX.bytes {
2845                return Err(ValidationError(format!(
2846                    "batch size in metered bytes ({metered_bytes}) exceeds {}",
2847                    RECORD_BATCH_MAX.bytes
2848                )));
2849            }
2850
2851            if records.len() > RECORD_BATCH_MAX.count {
2852                return Err(ValidationError(format!(
2853                    "number of records in the batch exceeds {}",
2854                    RECORD_BATCH_MAX.count
2855                )));
2856            }
2857        }
2858
2859        if records.is_empty() {
2860            return Err(ValidationError("batch is empty".into()));
2861        }
2862
2863        Ok(Self {
2864            records,
2865            metered_bytes,
2866        })
2867    }
2868}
2869
2870impl Deref for AppendRecordBatch {
2871    type Target = [AppendRecord];
2872
2873    fn deref(&self) -> &Self::Target {
2874        &self.records
2875    }
2876}
2877
2878impl MeteredBytes for AppendRecordBatch {
2879    fn metered_bytes(&self) -> usize {
2880        self.metered_bytes
2881    }
2882}
2883
2884#[derive(Debug, Clone)]
2885/// Command to signal an operation.
2886pub enum Command {
2887    /// Fence operation.
2888    Fence {
2889        /// Fencing token.
2890        fencing_token: FencingToken,
2891    },
2892    /// Trim operation.
2893    Trim {
2894        /// Trim point.
2895        trim_point: u64,
2896    },
2897}
2898
2899#[derive(Debug, Clone)]
2900#[non_exhaustive]
2901/// Command record for signaling operations to the service.
2902///
2903/// See [here](https://s2.dev/docs/rest/records/overview#command-records) for more information.
2904pub struct CommandRecord {
2905    /// Command to signal an operation.
2906    pub command: Command,
2907    /// Timestamp for this record.
2908    pub timestamp: Option<u64>,
2909}
2910
2911impl CommandRecord {
2912    const FENCE: &[u8] = b"fence";
2913    const TRIM: &[u8] = b"trim";
2914
2915    /// Create a fence command record with the given fencing token.
2916    ///
2917    /// Fencing is strongly consistent, and subsequent appends that specify a
2918    /// fencing token will fail if it does not match.
2919    pub fn fence(fencing_token: FencingToken) -> Self {
2920        Self {
2921            command: Command::Fence { fencing_token },
2922            timestamp: None,
2923        }
2924    }
2925
2926    /// Create a trim command record with the given trim point.
2927    ///
2928    /// Trim point is the desired earliest sequence number for the stream.
2929    ///
2930    /// Trimming is eventually consistent, and trimmed records may be visible
2931    /// for a brief period.
2932    pub fn trim(trim_point: u64) -> Self {
2933        Self {
2934            command: Command::Trim { trim_point },
2935            timestamp: None,
2936        }
2937    }
2938
2939    /// Set the timestamp for this record.
2940    pub fn with_timestamp(self, timestamp: u64) -> Self {
2941        Self {
2942            timestamp: Some(timestamp),
2943            ..self
2944        }
2945    }
2946}
2947
2948impl From<CommandRecord> for AppendRecord {
2949    fn from(value: CommandRecord) -> Self {
2950        let (header_value, body) = match value.command {
2951            Command::Fence { fencing_token } => (
2952                CommandRecord::FENCE,
2953                Bytes::copy_from_slice(fencing_token.as_bytes()),
2954            ),
2955            Command::Trim { trim_point } => (
2956                CommandRecord::TRIM,
2957                Bytes::copy_from_slice(&trim_point.to_be_bytes()),
2958            ),
2959        };
2960        Self {
2961            body,
2962            headers: vec![Header::new("", header_value)],
2963            timestamp: value.timestamp,
2964        }
2965    }
2966}
2967
2968#[derive(Debug, Clone)]
2969#[non_exhaustive]
2970/// Input for [`append`](crate::S2Stream::append) operation and
2971/// [`AppendSession::submit`](crate::append_session::AppendSession::submit).
2972pub struct AppendInput {
2973    /// Batch of records to append atomically.
2974    pub records: AppendRecordBatch,
2975    /// Expected sequence number for the first record in the batch.
2976    ///
2977    /// If unspecified, no matching is performed. If specified and mismatched, the append fails.
2978    pub match_seq_num: Option<u64>,
2979    /// Fencing token to match against the stream's current fencing token.
2980    ///
2981    /// If unspecified, no matching is performed. If specified and mismatched,
2982    /// the append fails. A stream defaults to `""` as its fencing token.
2983    pub fencing_token: Option<FencingToken>,
2984}
2985
2986impl AppendInput {
2987    /// Create a new [`AppendInput`] with the given batch of records.
2988    pub fn new(records: AppendRecordBatch) -> Self {
2989        Self {
2990            records,
2991            match_seq_num: None,
2992            fencing_token: None,
2993        }
2994    }
2995
2996    /// Set the expected sequence number for the first record in the batch.
2997    pub fn with_match_seq_num(self, match_seq_num: u64) -> Self {
2998        Self {
2999            match_seq_num: Some(match_seq_num),
3000            ..self
3001        }
3002    }
3003
3004    /// Set the fencing token to match against the stream's current fencing token.
3005    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
3006        Self {
3007            fencing_token: Some(fencing_token),
3008            ..self
3009        }
3010    }
3011}
3012
3013impl From<AppendInput> for api::stream::proto::AppendInput {
3014    fn from(value: AppendInput) -> Self {
3015        Self {
3016            records: value.records.iter().cloned().map(Into::into).collect(),
3017            match_seq_num: value.match_seq_num,
3018            fencing_token: value.fencing_token.map(|t| t.to_string()),
3019        }
3020    }
3021}
3022
3023#[derive(Debug, Clone, PartialEq)]
3024#[non_exhaustive]
3025/// Acknowledgement for an [`AppendInput`].
3026pub struct AppendAck {
3027    /// Sequence number and timestamp of the first record that was appended.
3028    pub start: StreamPosition,
3029    /// Sequence number of the last record that was appended + 1, and timestamp of the last record
3030    /// that was appended.
3031    ///
3032    /// The difference between `end.seq_num` and `start.seq_num` will be the number of records
3033    /// appended.
3034    pub end: StreamPosition,
3035    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3036    /// the last record on the stream.
3037    ///
3038    /// This can be greater than the `end` position in case of concurrent appends.
3039    pub tail: StreamPosition,
3040}
3041
3042impl From<api::stream::proto::AppendAck> for AppendAck {
3043    fn from(value: api::stream::proto::AppendAck) -> Self {
3044        Self {
3045            start: value.start.unwrap_or_default().into(),
3046            end: value.end.unwrap_or_default().into(),
3047            tail: value.tail.unwrap_or_default().into(),
3048        }
3049    }
3050}
3051
3052#[derive(Debug, Clone, Copy)]
3053/// Starting position for reading from a stream.
3054pub enum ReadFrom {
3055    /// Read from this sequence number.
3056    SeqNum(u64),
3057    /// Read from this timestamp.
3058    Timestamp(u64),
3059    /// Read from N records before the tail.
3060    TailOffset(u64),
3061}
3062
3063impl Default for ReadFrom {
3064    fn default() -> Self {
3065        Self::SeqNum(0)
3066    }
3067}
3068
3069#[derive(Debug, Default, Clone)]
3070#[non_exhaustive]
3071/// Where to start reading.
3072pub struct ReadStart {
3073    /// Starting position.
3074    ///
3075    /// Defaults to reading from sequence number `0`.
3076    pub from: ReadFrom,
3077    /// Whether to start from tail if the requested starting position is beyond it.
3078    ///
3079    /// Defaults to `false` (errors if position is beyond tail).
3080    pub clamp_to_tail: bool,
3081}
3082
3083impl ReadStart {
3084    /// Create a new [`ReadStart`] with default values.
3085    pub fn new() -> Self {
3086        Self::default()
3087    }
3088
3089    /// Set the starting position.
3090    pub fn with_from(self, from: ReadFrom) -> Self {
3091        Self { from, ..self }
3092    }
3093
3094    /// Set whether to start from tail if the requested starting position is beyond it.
3095    pub fn with_clamp_to_tail(self, clamp_to_tail: bool) -> Self {
3096        Self {
3097            clamp_to_tail,
3098            ..self
3099        }
3100    }
3101}
3102
3103impl From<ReadStart> for api::stream::ReadStart {
3104    fn from(value: ReadStart) -> Self {
3105        let (seq_num, timestamp, tail_offset) = match value.from {
3106            ReadFrom::SeqNum(n) => (Some(n), None, None),
3107            ReadFrom::Timestamp(t) => (None, Some(t), None),
3108            ReadFrom::TailOffset(o) => (None, None, Some(o)),
3109        };
3110        Self {
3111            seq_num,
3112            timestamp,
3113            tail_offset,
3114            clamp: if value.clamp_to_tail {
3115                Some(true)
3116            } else {
3117                None
3118            },
3119        }
3120    }
3121}
3122
3123#[derive(Debug, Clone, Default)]
3124#[non_exhaustive]
3125/// Limits on how much to read.
3126pub struct ReadLimits {
3127    /// Limit on number of records.
3128    ///
3129    /// Defaults to `1000` for non-streaming read.
3130    pub count: Option<usize>,
3131    /// Limit on total metered bytes of records.
3132    ///
3133    /// Defaults to `1MiB` for non-streaming read.
3134    pub bytes: Option<usize>,
3135}
3136
3137impl ReadLimits {
3138    /// Create a new [`ReadLimits`] with default values.
3139    pub fn new() -> Self {
3140        Self::default()
3141    }
3142
3143    /// Set the limit on number of records.
3144    pub fn with_count(self, count: usize) -> Self {
3145        Self {
3146            count: Some(count),
3147            ..self
3148        }
3149    }
3150
3151    /// Set the limit on total metered bytes of records.
3152    pub fn with_bytes(self, bytes: usize) -> Self {
3153        Self {
3154            bytes: Some(bytes),
3155            ..self
3156        }
3157    }
3158}
3159
3160#[derive(Debug, Clone, Default)]
3161#[non_exhaustive]
3162/// When to stop reading.
3163pub struct ReadStop {
3164    /// Limits on how much to read.
3165    pub limits: ReadLimits,
3166    /// Timestamp at which to stop (exclusive).
3167    pub until: Option<RangeTo<u64>>,
3168    /// Duration in seconds to wait for new records before stopping. Will be clamped to `60`
3169    /// seconds for [`read`](crate::S2Stream::read).
3170    ///
3171    /// Defaults to:
3172    /// - `0` (no wait) for [`read`](crate::S2Stream::read).
3173    /// - `0` (no wait) for [`read_session`](crate::S2Stream::read_session) if `limits` or `until`
3174    ///   is specified.
3175    /// - Infinite wait for [`read_session`](crate::S2Stream::read_session) if neither `limits` nor
3176    ///   `until` is specified.
3177    pub wait: Option<u32>,
3178}
3179
3180impl ReadStop {
3181    /// Create a new [`ReadStop`] with default values.
3182    pub fn new() -> Self {
3183        Self::default()
3184    }
3185
3186    /// Set the limits on how much to read.
3187    pub fn with_limits(self, limits: ReadLimits) -> Self {
3188        Self { limits, ..self }
3189    }
3190
3191    /// Set the timestamp at which to stop (exclusive).
3192    pub fn with_until(self, until: RangeTo<u64>) -> Self {
3193        Self {
3194            until: Some(until),
3195            ..self
3196        }
3197    }
3198
3199    /// Set the duration in seconds to wait for new records before stopping.
3200    pub fn with_wait(self, wait: u32) -> Self {
3201        Self {
3202            wait: Some(wait),
3203            ..self
3204        }
3205    }
3206}
3207
3208impl From<ReadStop> for api::stream::ReadEnd {
3209    fn from(value: ReadStop) -> Self {
3210        Self {
3211            count: value.limits.count,
3212            bytes: value.limits.bytes,
3213            until: value.until.map(|r| r.end),
3214            wait: value.wait,
3215        }
3216    }
3217}
3218
3219#[derive(Debug, Clone, Default)]
3220#[non_exhaustive]
3221/// Input for [`read`](crate::S2Stream::read) and [`read_session`](crate::S2Stream::read_session)
3222/// operations.
3223pub struct ReadInput {
3224    /// Where to start reading.
3225    pub start: ReadStart,
3226    /// When to stop reading.
3227    pub stop: ReadStop,
3228    /// Whether to filter out command records from the stream when reading.
3229    ///
3230    /// Defaults to `false`.
3231    pub ignore_command_records: bool,
3232}
3233
3234impl ReadInput {
3235    /// Create a new [`ReadInput`] with default values.
3236    pub fn new() -> Self {
3237        Self::default()
3238    }
3239
3240    /// Set where to start reading.
3241    pub fn with_start(self, start: ReadStart) -> Self {
3242        Self { start, ..self }
3243    }
3244
3245    /// Set when to stop reading.
3246    pub fn with_stop(self, stop: ReadStop) -> Self {
3247        Self { stop, ..self }
3248    }
3249
3250    /// Set whether to filter out command records from the stream when reading.
3251    pub fn with_ignore_command_records(self, ignore_command_records: bool) -> Self {
3252        Self {
3253            ignore_command_records,
3254            ..self
3255        }
3256    }
3257}
3258
3259#[derive(Debug, Clone)]
3260#[non_exhaustive]
3261/// Record that is durably sequenced on a stream.
3262pub struct SequencedRecord {
3263    /// Sequence number assigned to this record.
3264    pub seq_num: u64,
3265    /// Body of this record.
3266    pub body: Bytes,
3267    /// Headers for this record.
3268    pub headers: Vec<Header>,
3269    /// Timestamp for this record.
3270    pub timestamp: u64,
3271}
3272
3273impl SequencedRecord {
3274    /// Whether this is a command record.
3275    pub fn is_command_record(&self) -> bool {
3276        self.headers.len() == 1 && *self.headers[0].name == *b""
3277    }
3278}
3279
3280impl From<api::stream::proto::SequencedRecord> for SequencedRecord {
3281    fn from(value: api::stream::proto::SequencedRecord) -> Self {
3282        Self {
3283            seq_num: value.seq_num,
3284            body: value.body,
3285            headers: value.headers.into_iter().map(Into::into).collect(),
3286            timestamp: value.timestamp,
3287        }
3288    }
3289}
3290
3291metered_bytes_impl!(SequencedRecord);
3292
3293#[derive(Debug, Clone)]
3294#[non_exhaustive]
3295/// Batch of records returned by [`read`](crate::S2Stream::read) or streamed by
3296/// [`read_session`](crate::S2Stream::read_session).
3297pub struct ReadBatch {
3298    /// Records that are durably sequenced on the stream.
3299    ///
3300    /// It can be empty only for a [`read`](crate::S2Stream::read) operation when:
3301    /// - the [`stop condition`](ReadInput::stop) was already met, or
3302    /// - all records in the batch were command records and
3303    ///   [`ignore_command_records`](ReadInput::ignore_command_records) was set to `true`.
3304    pub records: Vec<SequencedRecord>,
3305    /// Sequence number that will be assigned to the next record on the stream, and timestamp of
3306    /// the last record.
3307    ///
3308    /// It will only be present when reading recent records.
3309    pub tail: Option<StreamPosition>,
3310}
3311
3312impl ReadBatch {
3313    pub(crate) fn from_api(
3314        batch: api::stream::proto::ReadBatch,
3315        ignore_command_records: bool,
3316    ) -> Self {
3317        Self {
3318            records: batch
3319                .records
3320                .into_iter()
3321                .map(Into::into)
3322                .filter(|sr: &SequencedRecord| !ignore_command_records || !sr.is_command_record())
3323                .collect(),
3324            tail: batch.tail.map(Into::into),
3325        }
3326    }
3327}
3328
3329/// A [`Stream`](futures::Stream) of values of type `Result<T, S2Error>`.
3330pub type Streaming<T> = Pin<Box<dyn Send + futures::Stream<Item = Result<T, S2Error>>>>;
3331
3332#[derive(Debug, Clone, thiserror::Error)]
3333/// Why an append condition check failed.
3334pub enum AppendConditionFailed {
3335    #[error("fencing token mismatch, expected: {0}")]
3336    /// Fencing token did not match. Contains the expected fencing token.
3337    FencingTokenMismatch(FencingToken),
3338    #[error("sequence number mismatch, expected: {0}")]
3339    /// Sequence number did not match. Contains the expected sequence number.
3340    SeqNumMismatch(u64),
3341}
3342
3343impl From<api::stream::AppendConditionFailed> for AppendConditionFailed {
3344    fn from(value: api::stream::AppendConditionFailed) -> Self {
3345        match value {
3346            api::stream::AppendConditionFailed::FencingTokenMismatch(token) => {
3347                AppendConditionFailed::FencingTokenMismatch(FencingToken(token.to_string()))
3348            }
3349            api::stream::AppendConditionFailed::SeqNumMismatch(seq) => {
3350                AppendConditionFailed::SeqNumMismatch(seq)
3351            }
3352        }
3353    }
3354}
3355
3356#[derive(Debug, Clone, thiserror::Error)]
3357/// Errors from S2 operations.
3358pub enum S2Error {
3359    #[error("{0}")]
3360    /// Client-side error.
3361    Client(String),
3362    #[error(transparent)]
3363    /// Validation error.
3364    Validation(#[from] ValidationError),
3365    #[error("{0}")]
3366    /// Append condition check failed. Contains the failure reason.
3367    AppendConditionFailed(AppendConditionFailed),
3368    #[error("read from an unwritten position. current tail: {0}")]
3369    /// Read from an unwritten position. Contains the current tail.
3370    ReadUnwritten(StreamPosition),
3371    #[error("{0}")]
3372    /// Other server-side error.
3373    Server(ErrorResponse),
3374}
3375
3376impl From<ApiError> for S2Error {
3377    fn from(err: ApiError) -> Self {
3378        match err {
3379            ApiError::ReadUnwritten(tail_response) => {
3380                Self::ReadUnwritten(tail_response.tail.into())
3381            }
3382            ApiError::AppendConditionFailed(condition_failed) => {
3383                Self::AppendConditionFailed(condition_failed.into())
3384            }
3385            ApiError::Server(_, response) => Self::Server(response.into()),
3386            other => Self::Client(other.to_string()),
3387        }
3388    }
3389}
3390
3391#[derive(Debug, Clone, thiserror::Error)]
3392#[error("{code}: {message}")]
3393#[non_exhaustive]
3394/// Error response from S2 server.
3395pub struct ErrorResponse {
3396    /// Error code.
3397    pub code: String,
3398    /// Error message.
3399    pub message: String,
3400}
3401
3402impl From<ApiErrorResponse> for ErrorResponse {
3403    fn from(response: ApiErrorResponse) -> Self {
3404        Self {
3405            code: response.code,
3406            message: response.message,
3407        }
3408    }
3409}
3410
3411fn idempotency_token() -> String {
3412    uuid::Uuid::new_v4().simple().to_string()
3413}