s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::{Rng, distributions::Uniform};
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14/// Error related to conversion from one type to another.
15#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20    fn from(value: T) -> Self {
21        Self(value.into())
22    }
23}
24
25/// Metered size of the object in bytes.
26///
27/// Bytes are calculated using the "metered bytes" formula:
28///
29/// ```python
30/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
31/// ```
32pub trait MeteredBytes {
33    /// Return the metered bytes of the object.
34    fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38    fn metered_bytes(&self) -> u64 {
39        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40    }
41}
42
43macro_rules! metered_impl {
44    ($ty:ty) => {
45        impl MeteredBytes for $ty {
46            fn metered_bytes(&self) -> u64 {
47                let bytes = 8
48                    + (2 * self.headers.len())
49                    + self
50                        .headers
51                        .iter()
52                        .map(|h| h.name.len() + h.value.len())
53                        .sum::<usize>()
54                    + self.body.len();
55                bytes as u64
56            }
57        }
58    };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64    #[default]
65    Unspecified,
66    AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70    fn from(value: BasinScope) -> Self {
71        match value {
72            BasinScope::Unspecified => Self::Unspecified,
73            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74        }
75    }
76}
77
78impl From<api::BasinScope> for BasinScope {
79    fn from(value: api::BasinScope) -> Self {
80        match value {
81            api::BasinScope::Unspecified => Self::Unspecified,
82            api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83        }
84    }
85}
86
87impl FromStr for BasinScope {
88    type Err = ConvertError;
89    fn from_str(value: &str) -> Result<Self, Self::Err> {
90        match value {
91            "unspecified" => Ok(Self::Unspecified),
92            "aws:us-east-1" => Ok(Self::AwsUsEast1),
93            _ => Err("invalid basin scope value".into()),
94        }
95    }
96}
97
98impl From<BasinScope> for i32 {
99    fn from(value: BasinScope) -> Self {
100        api::BasinScope::from(value).into()
101    }
102}
103
104impl TryFrom<i32> for BasinScope {
105    type Error = ConvertError;
106    fn try_from(value: i32) -> Result<Self, Self::Error> {
107        api::BasinScope::try_from(value)
108            .map(Into::into)
109            .map_err(|_| "invalid basin scope value".into())
110    }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116    pub basin: BasinName,
117    pub config: Option<BasinConfig>,
118    pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122    /// Create a new request with basin name.
123    pub fn new(basin: BasinName) -> Self {
124        Self {
125            basin,
126            config: None,
127            scope: BasinScope::Unspecified,
128        }
129    }
130
131    /// Overwrite basin configuration.
132    pub fn with_config(self, config: BasinConfig) -> Self {
133        Self {
134            config: Some(config),
135            ..self
136        }
137    }
138
139    /// Overwrite basin scope.
140    pub fn with_scope(self, scope: BasinScope) -> Self {
141        Self { scope, ..self }
142    }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146    fn from(value: CreateBasinRequest) -> Self {
147        let CreateBasinRequest {
148            basin,
149            config,
150            scope,
151        } = value;
152        Self {
153            basin: basin.0,
154            config: config.map(Into::into),
155            scope: scope.into(),
156        }
157    }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163    pub default_stream_config: Option<StreamConfig>,
164    pub create_stream_on_append: bool,
165}
166
167impl From<BasinConfig> for api::BasinConfig {
168    fn from(value: BasinConfig) -> Self {
169        let BasinConfig {
170            default_stream_config,
171            create_stream_on_append,
172        } = value;
173        Self {
174            default_stream_config: default_stream_config.map(Into::into),
175            create_stream_on_append,
176        }
177    }
178}
179
180impl TryFrom<api::BasinConfig> for BasinConfig {
181    type Error = ConvertError;
182    fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
183        let api::BasinConfig {
184            default_stream_config,
185            create_stream_on_append,
186        } = value;
187        Ok(Self {
188            default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
189            create_stream_on_append,
190        })
191    }
192}
193
194#[sync_docs]
195#[derive(Debug, Clone, Default)]
196pub struct StreamConfig {
197    pub storage_class: StorageClass,
198    pub retention_policy: Option<RetentionPolicy>,
199    pub require_client_timestamps: Option<bool>,
200}
201
202impl StreamConfig {
203    /// Create a new request.
204    pub fn new() -> Self {
205        Self::default()
206    }
207
208    /// Overwrite storage class.
209    pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
210        Self {
211            storage_class: storage_class.into(),
212            ..self
213        }
214    }
215
216    /// Overwrite retention policy.
217    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
218        Self {
219            retention_policy: Some(retention_policy),
220            ..self
221        }
222    }
223
224    /// Overwrite `require_client_timestamps`.
225    pub fn with_require_client_timestamps(self, require_client_timestamps: bool) -> Self {
226        Self {
227            require_client_timestamps: Some(require_client_timestamps),
228            ..self
229        }
230    }
231}
232
233impl From<StreamConfig> for api::StreamConfig {
234    fn from(value: StreamConfig) -> Self {
235        let StreamConfig {
236            storage_class,
237            retention_policy,
238            require_client_timestamps,
239        } = value;
240        Self {
241            storage_class: storage_class.into(),
242            retention_policy: retention_policy.map(Into::into),
243            require_client_timestamps,
244        }
245    }
246}
247
248impl TryFrom<api::StreamConfig> for StreamConfig {
249    type Error = ConvertError;
250    fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
251        let api::StreamConfig {
252            storage_class,
253            retention_policy,
254            require_client_timestamps,
255        } = value;
256        Ok(Self {
257            storage_class: storage_class.try_into()?,
258            retention_policy: retention_policy.map(Into::into),
259            require_client_timestamps,
260        })
261    }
262}
263
264#[sync_docs]
265#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
266pub enum StorageClass {
267    #[default]
268    Unspecified,
269    Standard,
270    Express,
271}
272
273impl From<StorageClass> for api::StorageClass {
274    fn from(value: StorageClass) -> Self {
275        match value {
276            StorageClass::Unspecified => Self::Unspecified,
277            StorageClass::Standard => Self::Standard,
278            StorageClass::Express => Self::Express,
279        }
280    }
281}
282
283impl From<api::StorageClass> for StorageClass {
284    fn from(value: api::StorageClass) -> Self {
285        match value {
286            api::StorageClass::Unspecified => Self::Unspecified,
287            api::StorageClass::Standard => Self::Standard,
288            api::StorageClass::Express => Self::Express,
289        }
290    }
291}
292
293impl FromStr for StorageClass {
294    type Err = ConvertError;
295    fn from_str(value: &str) -> Result<Self, Self::Err> {
296        match value {
297            "unspecified" => Ok(Self::Unspecified),
298            "standard" => Ok(Self::Standard),
299            "express" => Ok(Self::Express),
300            _ => Err("invalid storage class value".into()),
301        }
302    }
303}
304
305impl From<StorageClass> for i32 {
306    fn from(value: StorageClass) -> Self {
307        api::StorageClass::from(value).into()
308    }
309}
310
311impl TryFrom<i32> for StorageClass {
312    type Error = ConvertError;
313    fn try_from(value: i32) -> Result<Self, Self::Error> {
314        api::StorageClass::try_from(value)
315            .map(Into::into)
316            .map_err(|_| "invalid storage class value".into())
317    }
318}
319
320#[sync_docs(Age = "Age")]
321#[derive(Debug, Clone)]
322pub enum RetentionPolicy {
323    Age(Duration),
324}
325
326impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
327    fn from(value: RetentionPolicy) -> Self {
328        match value {
329            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
330        }
331    }
332}
333
334impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
335    fn from(value: api::stream_config::RetentionPolicy) -> Self {
336        match value {
337            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
338        }
339    }
340}
341
342#[sync_docs]
343#[derive(Debug, Clone, Copy, PartialEq, Eq)]
344pub enum BasinState {
345    Unspecified,
346    Active,
347    Creating,
348    Deleting,
349}
350
351impl From<BasinState> for api::BasinState {
352    fn from(value: BasinState) -> Self {
353        match value {
354            BasinState::Unspecified => Self::Unspecified,
355            BasinState::Active => Self::Active,
356            BasinState::Creating => Self::Creating,
357            BasinState::Deleting => Self::Deleting,
358        }
359    }
360}
361
362impl From<api::BasinState> for BasinState {
363    fn from(value: api::BasinState) -> Self {
364        match value {
365            api::BasinState::Unspecified => Self::Unspecified,
366            api::BasinState::Active => Self::Active,
367            api::BasinState::Creating => Self::Creating,
368            api::BasinState::Deleting => Self::Deleting,
369        }
370    }
371}
372
373impl From<BasinState> for i32 {
374    fn from(value: BasinState) -> Self {
375        api::BasinState::from(value).into()
376    }
377}
378
379impl TryFrom<i32> for BasinState {
380    type Error = ConvertError;
381    fn try_from(value: i32) -> Result<Self, Self::Error> {
382        api::BasinState::try_from(value)
383            .map(Into::into)
384            .map_err(|_| "invalid basin status value".into())
385    }
386}
387
388impl std::fmt::Display for BasinState {
389    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390        match self {
391            BasinState::Unspecified => write!(f, "unspecified"),
392            BasinState::Active => write!(f, "active"),
393            BasinState::Creating => write!(f, "creating"),
394            BasinState::Deleting => write!(f, "deleting"),
395        }
396    }
397}
398
399#[sync_docs]
400#[derive(Debug, Clone)]
401pub struct BasinInfo {
402    pub name: String,
403    pub scope: BasinScope,
404    pub state: BasinState,
405}
406
407impl From<BasinInfo> for api::BasinInfo {
408    fn from(value: BasinInfo) -> Self {
409        let BasinInfo { name, scope, state } = value;
410        Self {
411            name,
412            scope: scope.into(),
413            state: state.into(),
414        }
415    }
416}
417
418impl TryFrom<api::BasinInfo> for BasinInfo {
419    type Error = ConvertError;
420    fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
421        let api::BasinInfo { name, scope, state } = value;
422        Ok(Self {
423            name,
424            scope: scope.try_into()?,
425            state: state.try_into()?,
426        })
427    }
428}
429
430impl TryFrom<api::CreateBasinResponse> for BasinInfo {
431    type Error = ConvertError;
432    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
433        let api::CreateBasinResponse { info } = value;
434        let info = info.ok_or("missing basin info")?;
435        info.try_into()
436    }
437}
438
439#[sync_docs]
440#[derive(Debug, Clone, Default)]
441pub struct ListStreamsRequest {
442    pub prefix: String,
443    pub start_after: String,
444    pub limit: Option<usize>,
445}
446
447impl ListStreamsRequest {
448    /// Create a new request.
449    pub fn new() -> Self {
450        Self::default()
451    }
452
453    /// Overwrite prefix.
454    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
455        Self {
456            prefix: prefix.into(),
457            ..self
458        }
459    }
460
461    /// Overwrite start after.
462    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
463        Self {
464            start_after: start_after.into(),
465            ..self
466        }
467    }
468
469    /// Overwrite limit.
470    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
471        Self {
472            limit: limit.into(),
473            ..self
474        }
475    }
476}
477
478impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
479    type Error = ConvertError;
480    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
481        let ListStreamsRequest {
482            prefix,
483            start_after,
484            limit,
485        } = value;
486        Ok(Self {
487            prefix,
488            start_after,
489            limit: limit.map(|n| n as u64),
490        })
491    }
492}
493
494#[sync_docs]
495#[derive(Debug, Clone)]
496pub struct StreamInfo {
497    pub name: String,
498    pub created_at: u32,
499    pub deleted_at: Option<u32>,
500}
501
502impl From<api::StreamInfo> for StreamInfo {
503    fn from(value: api::StreamInfo) -> Self {
504        Self {
505            name: value.name,
506            created_at: value.created_at,
507            deleted_at: value.deleted_at,
508        }
509    }
510}
511
512impl TryFrom<api::CreateStreamResponse> for StreamInfo {
513    type Error = ConvertError;
514
515    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
516        let api::CreateStreamResponse { info } = value;
517        let info = info.ok_or("missing stream info")?;
518        Ok(info.into())
519    }
520}
521
522#[sync_docs]
523#[derive(Debug, Clone)]
524pub struct ListStreamsResponse {
525    pub streams: Vec<StreamInfo>,
526    pub has_more: bool,
527}
528
529impl From<api::ListStreamsResponse> for ListStreamsResponse {
530    fn from(value: api::ListStreamsResponse) -> Self {
531        let api::ListStreamsResponse { streams, has_more } = value;
532        let streams = streams.into_iter().map(Into::into).collect();
533        Self { streams, has_more }
534    }
535}
536
537impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
538    type Error = ConvertError;
539    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
540        let api::GetBasinConfigResponse { config } = value;
541        let config = config.ok_or("missing basin config")?;
542        config.try_into()
543    }
544}
545
546impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
547    type Error = ConvertError;
548    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
549        let api::GetStreamConfigResponse { config } = value;
550        let config = config.ok_or("missing stream config")?;
551        config.try_into()
552    }
553}
554
555#[sync_docs]
556#[derive(Debug, Clone)]
557pub struct CreateStreamRequest {
558    pub stream: String,
559    pub config: Option<StreamConfig>,
560}
561
562impl CreateStreamRequest {
563    /// Create a new request with stream name.
564    pub fn new(stream: impl Into<String>) -> Self {
565        Self {
566            stream: stream.into(),
567            config: None,
568        }
569    }
570
571    /// Overwrite stream config.
572    pub fn with_config(self, config: StreamConfig) -> Self {
573        Self {
574            config: Some(config),
575            ..self
576        }
577    }
578}
579
580impl From<CreateStreamRequest> for api::CreateStreamRequest {
581    fn from(value: CreateStreamRequest) -> Self {
582        let CreateStreamRequest { stream, config } = value;
583        Self {
584            stream,
585            config: config.map(Into::into),
586        }
587    }
588}
589
590#[sync_docs]
591#[derive(Debug, Clone, Default)]
592pub struct ListBasinsRequest {
593    pub prefix: String,
594    pub start_after: String,
595    pub limit: Option<usize>,
596}
597
598impl ListBasinsRequest {
599    /// Create a new request.
600    pub fn new() -> Self {
601        Self::default()
602    }
603
604    /// Overwrite prefix.
605    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
606        Self {
607            prefix: prefix.into(),
608            ..self
609        }
610    }
611
612    /// Overwrite start after.
613    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
614        Self {
615            start_after: start_after.into(),
616            ..self
617        }
618    }
619
620    /// Overwrite limit.
621    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
622        Self {
623            limit: limit.into(),
624            ..self
625        }
626    }
627}
628
629impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
630    type Error = ConvertError;
631    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
632        let ListBasinsRequest {
633            prefix,
634            start_after,
635            limit,
636        } = value;
637        Ok(Self {
638            prefix,
639            start_after,
640            limit: limit
641                .map(TryInto::try_into)
642                .transpose()
643                .map_err(|_| "request limit does not fit into u64 bounds")?,
644        })
645    }
646}
647
648#[sync_docs]
649#[derive(Debug, Clone)]
650pub struct ListBasinsResponse {
651    pub basins: Vec<BasinInfo>,
652    pub has_more: bool,
653}
654
655impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
656    type Error = ConvertError;
657    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
658        let api::ListBasinsResponse { basins, has_more } = value;
659        Ok(Self {
660            basins: basins
661                .into_iter()
662                .map(TryInto::try_into)
663                .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
664            has_more,
665        })
666    }
667}
668
669#[sync_docs]
670#[derive(Debug, Clone)]
671pub struct DeleteBasinRequest {
672    pub basin: BasinName,
673    /// Delete basin if it exists else do nothing.
674    pub if_exists: bool,
675}
676
677impl DeleteBasinRequest {
678    /// Create a new request.
679    pub fn new(basin: BasinName) -> Self {
680        Self {
681            basin,
682            if_exists: false,
683        }
684    }
685
686    /// Overwrite the if exists parameter.
687    pub fn with_if_exists(self, if_exists: bool) -> Self {
688        Self { if_exists, ..self }
689    }
690}
691
692impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
693    fn from(value: DeleteBasinRequest) -> Self {
694        let DeleteBasinRequest { basin, .. } = value;
695        Self { basin: basin.0 }
696    }
697}
698
699#[sync_docs]
700#[derive(Debug, Clone)]
701pub struct DeleteStreamRequest {
702    pub stream: String,
703    /// Delete stream if it exists else do nothing.
704    pub if_exists: bool,
705}
706
707impl DeleteStreamRequest {
708    /// Create a new request.
709    pub fn new(stream: impl Into<String>) -> Self {
710        Self {
711            stream: stream.into(),
712            if_exists: false,
713        }
714    }
715
716    /// Overwrite the if exists parameter.
717    pub fn with_if_exists(self, if_exists: bool) -> Self {
718        Self { if_exists, ..self }
719    }
720}
721
722impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
723    fn from(value: DeleteStreamRequest) -> Self {
724        let DeleteStreamRequest { stream, .. } = value;
725        Self { stream }
726    }
727}
728
729#[sync_docs]
730#[derive(Debug, Clone)]
731pub struct ReconfigureBasinRequest {
732    pub basin: BasinName,
733    pub config: Option<BasinConfig>,
734    pub mask: Option<Vec<String>>,
735}
736
737impl ReconfigureBasinRequest {
738    /// Create a new request with basin name.
739    pub fn new(basin: BasinName) -> Self {
740        Self {
741            basin,
742            config: None,
743            mask: None,
744        }
745    }
746
747    /// Overwrite basin config.
748    pub fn with_config(self, config: BasinConfig) -> Self {
749        Self {
750            config: Some(config),
751            ..self
752        }
753    }
754
755    /// Overwrite field mask.
756    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
757        Self {
758            mask: Some(mask.into()),
759            ..self
760        }
761    }
762}
763
764impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
765    fn from(value: ReconfigureBasinRequest) -> Self {
766        let ReconfigureBasinRequest {
767            basin,
768            config,
769            mask,
770        } = value;
771        Self {
772            basin: basin.0,
773            config: config.map(Into::into),
774            mask: mask.map(|paths| prost_types::FieldMask { paths }),
775        }
776    }
777}
778
779impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
780    type Error = ConvertError;
781    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
782        let api::ReconfigureBasinResponse { config } = value;
783        let config = config.ok_or("missing basin config")?;
784        config.try_into()
785    }
786}
787
788#[sync_docs]
789#[derive(Debug, Clone)]
790pub struct ReconfigureStreamRequest {
791    pub stream: String,
792    pub config: Option<StreamConfig>,
793    pub mask: Option<Vec<String>>,
794}
795
796impl ReconfigureStreamRequest {
797    /// Create a new request with stream name.
798    pub fn new(stream: impl Into<String>) -> Self {
799        Self {
800            stream: stream.into(),
801            config: None,
802            mask: None,
803        }
804    }
805
806    /// Overwrite stream config.
807    pub fn with_config(self, config: StreamConfig) -> Self {
808        Self {
809            config: Some(config),
810            ..self
811        }
812    }
813
814    /// Overwrite field mask.
815    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
816        Self {
817            mask: Some(mask.into()),
818            ..self
819        }
820    }
821}
822
823impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
824    fn from(value: ReconfigureStreamRequest) -> Self {
825        let ReconfigureStreamRequest {
826            stream,
827            config,
828            mask,
829        } = value;
830        Self {
831            stream,
832            config: config.map(Into::into),
833            mask: mask.map(|paths| prost_types::FieldMask { paths }),
834        }
835    }
836}
837
838impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
839    type Error = ConvertError;
840    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
841        let api::ReconfigureStreamResponse { config } = value;
842        let config = config.ok_or("missing stream config")?;
843        config.try_into()
844    }
845}
846
847impl From<api::CheckTailResponse> for u64 {
848    fn from(value: api::CheckTailResponse) -> Self {
849        let api::CheckTailResponse { next_seq_num } = value;
850        next_seq_num
851    }
852}
853
854#[sync_docs]
855#[derive(Debug, Clone, PartialEq, Eq)]
856pub struct Header {
857    pub name: Bytes,
858    pub value: Bytes,
859}
860
861impl Header {
862    /// Create a new header from name and value.
863    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
864        Self {
865            name: name.into(),
866            value: value.into(),
867        }
868    }
869
870    /// Create a new header from value.
871    pub fn from_value(value: impl Into<Bytes>) -> Self {
872        Self {
873            name: Bytes::new(),
874            value: value.into(),
875        }
876    }
877}
878
879impl From<Header> for api::Header {
880    fn from(value: Header) -> Self {
881        let Header { name, value } = value;
882        Self { name, value }
883    }
884}
885
886impl From<api::Header> for Header {
887    fn from(value: api::Header) -> Self {
888        let api::Header { name, value } = value;
889        Self { name, value }
890    }
891}
892
893/// A fencing token can be enforced on append requests.
894///
895/// Must not be more than 16 bytes.
896#[derive(Debug, Clone, Default, PartialEq, Eq)]
897pub struct FencingToken(Bytes);
898
899impl FencingToken {
900    const MAX_BYTES: usize = 16;
901
902    /// Try creating a new fencing token from bytes.
903    pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
904        let bytes = bytes.into();
905        if bytes.len() > Self::MAX_BYTES {
906            Err(format!(
907                "Size of a fencing token cannot exceed {} bytes",
908                Self::MAX_BYTES
909            )
910            .into())
911        } else {
912            Ok(Self(bytes))
913        }
914    }
915
916    /// Generate a random fencing token with `n` bytes.
917    pub fn generate(n: usize) -> Result<Self, ConvertError> {
918        Self::new(
919            rand::thread_rng()
920                .sample_iter(&Uniform::new_inclusive(0, u8::MAX))
921                .take(n)
922                .collect::<Bytes>(),
923        )
924    }
925}
926
927impl AsRef<Bytes> for FencingToken {
928    fn as_ref(&self) -> &Bytes {
929        &self.0
930    }
931}
932
933impl AsRef<[u8]> for FencingToken {
934    fn as_ref(&self) -> &[u8] {
935        &self.0
936    }
937}
938
939impl Deref for FencingToken {
940    type Target = [u8];
941
942    fn deref(&self) -> &Self::Target {
943        &self.0
944    }
945}
946
947impl From<FencingToken> for Bytes {
948    fn from(value: FencingToken) -> Self {
949        value.0
950    }
951}
952
953impl From<FencingToken> for Vec<u8> {
954    fn from(value: FencingToken) -> Self {
955        value.0.into()
956    }
957}
958
959impl TryFrom<Bytes> for FencingToken {
960    type Error = ConvertError;
961
962    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
963        Self::new(value)
964    }
965}
966
967impl TryFrom<Vec<u8>> for FencingToken {
968    type Error = ConvertError;
969
970    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
971        Self::new(value)
972    }
973}
974
975/// Command to send through a `CommandRecord`.
976#[derive(Debug, Clone)]
977pub enum Command {
978    /// Enforce a fencing token.
979    ///
980    /// Fencing is strongly consistent, and subsequent appends that specify a
981    /// fencing token will be rejected if it does not match.
982    Fence {
983        /// Fencing token to enforce.
984        ///
985        /// Set empty to clear the token.
986        fencing_token: FencingToken,
987    },
988    /// Request a trim till the sequence number.
989    ///
990    /// Trimming is eventually consistent, and trimmed records may be visible
991    /// for a brief period
992    Trim {
993        /// Trim point.
994        ///
995        /// This sequence number is only allowed to advance, and any regression
996        /// will be ignored.
997        seq_num: u64,
998    },
999}
1000
1001/// A command record is a special kind of [`AppendRecord`] that can be used to
1002/// send command messages.
1003///
1004/// Such a record is signalled by a sole header with empty name. The header
1005/// value represents the operation and record body acts as the payload.
1006#[derive(Debug, Clone)]
1007pub struct CommandRecord {
1008    /// Command kind.
1009    pub command: Command,
1010    /// Timestamp for the record.
1011    pub timestamp: Option<u64>,
1012}
1013
1014impl CommandRecord {
1015    const FENCE: &[u8] = b"fence";
1016    const TRIM: &[u8] = b"trim";
1017
1018    /// Create a new fence command record.
1019    pub fn fence(fencing_token: FencingToken) -> Self {
1020        Self {
1021            command: Command::Fence { fencing_token },
1022            timestamp: None,
1023        }
1024    }
1025
1026    /// Create a new trim command record.
1027    pub fn trim(seq_num: impl Into<u64>) -> Self {
1028        Self {
1029            command: Command::Trim {
1030                seq_num: seq_num.into(),
1031            },
1032            timestamp: None,
1033        }
1034    }
1035
1036    /// Overwrite timestamp.
1037    pub fn with_timestamp(self, timestamp: u64) -> Self {
1038        Self {
1039            timestamp: Some(timestamp),
1040            ..self
1041        }
1042    }
1043}
1044
1045#[sync_docs]
1046#[derive(Debug, Clone, PartialEq, Eq)]
1047pub struct AppendRecord {
1048    timestamp: Option<u64>,
1049    headers: Vec<Header>,
1050    body: Bytes,
1051    #[cfg(test)]
1052    max_bytes: u64,
1053}
1054
1055metered_impl!(AppendRecord);
1056
1057impl AppendRecord {
1058    const MAX_BYTES: u64 = MIB_BYTES;
1059
1060    fn validated(self) -> Result<Self, ConvertError> {
1061        #[cfg(test)]
1062        let max_bytes = self.max_bytes;
1063        #[cfg(not(test))]
1064        let max_bytes = Self::MAX_BYTES;
1065
1066        if self.metered_bytes() > max_bytes {
1067            Err("AppendRecord should have metered size less than 1 MiB".into())
1068        } else {
1069            Ok(self)
1070        }
1071    }
1072
1073    /// Try creating a new append record with body.
1074    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1075        Self {
1076            timestamp: None,
1077            headers: Vec::new(),
1078            body: body.into(),
1079            #[cfg(test)]
1080            max_bytes: Self::MAX_BYTES,
1081        }
1082        .validated()
1083    }
1084
1085    #[cfg(test)]
1086    pub(crate) fn with_max_bytes(
1087        max_bytes: u64,
1088        body: impl Into<Bytes>,
1089    ) -> Result<Self, ConvertError> {
1090        Self {
1091            timestamp: None,
1092            headers: Vec::new(),
1093            body: body.into(),
1094            max_bytes,
1095        }
1096        .validated()
1097    }
1098
1099    /// Overwrite headers.
1100    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1101        Self {
1102            headers: headers.into(),
1103            ..self
1104        }
1105        .validated()
1106    }
1107
1108    /// Overwrite timestamp.
1109    pub fn with_timestamp(self, timestamp: u64) -> Self {
1110        Self {
1111            timestamp: Some(timestamp),
1112            ..self
1113        }
1114    }
1115
1116    /// Body of the record.
1117    pub fn body(&self) -> &[u8] {
1118        &self.body
1119    }
1120
1121    /// Headers of the record.
1122    pub fn headers(&self) -> &[Header] {
1123        &self.headers
1124    }
1125
1126    /// Timestamp for the record.
1127    pub fn timestamp(&self) -> Option<u64> {
1128        self.timestamp
1129    }
1130
1131    /// Consume the record and return parts.
1132    pub fn into_parts(self) -> AppendRecordParts {
1133        AppendRecordParts {
1134            timestamp: self.timestamp,
1135            headers: self.headers,
1136            body: self.body,
1137        }
1138    }
1139
1140    /// Try creating the record from parts.
1141    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1142        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1143        if let Some(timestamp) = parts.timestamp {
1144            Ok(record.with_timestamp(timestamp))
1145        } else {
1146            Ok(record)
1147        }
1148    }
1149}
1150
1151impl From<AppendRecord> for api::AppendRecord {
1152    fn from(value: AppendRecord) -> Self {
1153        Self {
1154            timestamp: value.timestamp,
1155            headers: value.headers.into_iter().map(Into::into).collect(),
1156            body: value.body,
1157        }
1158    }
1159}
1160
1161impl From<CommandRecord> for AppendRecord {
1162    fn from(value: CommandRecord) -> Self {
1163        let (header_value, body) = match value.command {
1164            Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1165            Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1166        };
1167        AppendRecordParts {
1168            timestamp: value.timestamp,
1169            headers: vec![Header::from_value(header_value)],
1170            body: body.into(),
1171        }
1172        .try_into()
1173        .expect("command record is a valid append record")
1174    }
1175}
1176
1177#[sync_docs(AppendRecordParts = "AppendRecord")]
1178#[derive(Debug, Clone)]
1179pub struct AppendRecordParts {
1180    pub timestamp: Option<u64>,
1181    pub headers: Vec<Header>,
1182    pub body: Bytes,
1183}
1184
1185impl From<AppendRecord> for AppendRecordParts {
1186    fn from(value: AppendRecord) -> Self {
1187        value.into_parts()
1188    }
1189}
1190
1191impl TryFrom<AppendRecordParts> for AppendRecord {
1192    type Error = ConvertError;
1193
1194    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1195        Self::try_from_parts(value)
1196    }
1197}
1198
1199/// A collection of append records that can be sent together in a batch.
1200#[derive(Debug, Clone)]
1201pub struct AppendRecordBatch {
1202    records: Vec<AppendRecord>,
1203    metered_bytes: u64,
1204    max_capacity: usize,
1205    #[cfg(test)]
1206    max_bytes: u64,
1207}
1208
1209impl PartialEq for AppendRecordBatch {
1210    fn eq(&self, other: &Self) -> bool {
1211        if self.records.eq(&other.records) {
1212            assert_eq!(self.metered_bytes, other.metered_bytes);
1213            true
1214        } else {
1215            false
1216        }
1217    }
1218}
1219
1220impl Eq for AppendRecordBatch {}
1221
1222impl Default for AppendRecordBatch {
1223    fn default() -> Self {
1224        Self::new()
1225    }
1226}
1227
1228impl AppendRecordBatch {
1229    /// Maximum number of records that a batch can hold.
1230    ///
1231    /// A record batch cannot be created with a bigger capacity.
1232    pub const MAX_CAPACITY: usize = 1000;
1233
1234    /// Maximum metered bytes of the batch.
1235    pub const MAX_BYTES: u64 = MIB_BYTES;
1236
1237    /// Create an empty record batch.
1238    pub fn new() -> Self {
1239        Self::with_max_capacity(Self::MAX_CAPACITY)
1240    }
1241
1242    /// Create an empty record batch with custom max capacity.
1243    ///
1244    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1245    pub fn with_max_capacity(max_capacity: usize) -> Self {
1246        assert!(
1247            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1248            "Batch capacity must be between 1 and 1000"
1249        );
1250
1251        Self {
1252            records: Vec::with_capacity(max_capacity),
1253            metered_bytes: 0,
1254            max_capacity,
1255            #[cfg(test)]
1256            max_bytes: Self::MAX_BYTES,
1257        }
1258    }
1259
1260    #[cfg(test)]
1261    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1262        #[cfg(test)]
1263        assert!(
1264            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1265            "Batch size must be between 1 byte and 1 MiB"
1266        );
1267
1268        Self {
1269            max_bytes,
1270            ..Self::with_max_capacity(max_capacity)
1271        }
1272    }
1273
1274    /// Try creating a record batch from an iterator.
1275    ///
1276    /// If all the items of the iterator cannot be drained into the batch, the
1277    /// error returned contains a batch containing all records it could fit
1278    /// along-with the left over items from the iterator.
1279    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1280    where
1281        R: Into<AppendRecord>,
1282        T: IntoIterator<Item = R>,
1283    {
1284        let mut records = Self::new();
1285        let mut pending = Vec::new();
1286
1287        let mut iter = iter.into_iter();
1288
1289        for record in iter.by_ref() {
1290            if let Err(record) = records.push(record) {
1291                pending.push(record);
1292                break;
1293            }
1294        }
1295
1296        if pending.is_empty() {
1297            Ok(records)
1298        } else {
1299            pending.extend(iter.map(Into::into));
1300            Err((records, pending))
1301        }
1302    }
1303
1304    /// Returns true if the batch contains no records.
1305    pub fn is_empty(&self) -> bool {
1306        if self.records.is_empty() {
1307            assert_eq!(self.metered_bytes, 0);
1308            true
1309        } else {
1310            false
1311        }
1312    }
1313
1314    /// Returns the number of records contained in the batch.
1315    pub fn len(&self) -> usize {
1316        self.records.len()
1317    }
1318
1319    #[cfg(test)]
1320    fn max_bytes(&self) -> u64 {
1321        self.max_bytes
1322    }
1323
1324    #[cfg(not(test))]
1325    fn max_bytes(&self) -> u64 {
1326        Self::MAX_BYTES
1327    }
1328
1329    /// Returns true if the batch cannot fit any more records.
1330    pub fn is_full(&self) -> bool {
1331        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1332    }
1333
1334    /// Try to append a new record into the batch.
1335    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1336        assert!(self.records.len() <= self.max_capacity);
1337        assert!(self.metered_bytes <= self.max_bytes());
1338
1339        let record = record.into();
1340        let record_size = record.metered_bytes();
1341        if self.records.len() >= self.max_capacity
1342            || self.metered_bytes + record_size > self.max_bytes()
1343        {
1344            Err(record)
1345        } else {
1346            self.records.push(record);
1347            self.metered_bytes += record_size;
1348            Ok(())
1349        }
1350    }
1351}
1352
1353impl MeteredBytes for AppendRecordBatch {
1354    fn metered_bytes(&self) -> u64 {
1355        self.metered_bytes
1356    }
1357}
1358
1359impl IntoIterator for AppendRecordBatch {
1360    type Item = AppendRecord;
1361    type IntoIter = std::vec::IntoIter<Self::Item>;
1362
1363    fn into_iter(self) -> Self::IntoIter {
1364        self.records.into_iter()
1365    }
1366}
1367
1368impl<'a> IntoIterator for &'a AppendRecordBatch {
1369    type Item = &'a AppendRecord;
1370    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1371
1372    fn into_iter(self) -> Self::IntoIter {
1373        self.records.iter()
1374    }
1375}
1376
1377impl AsRef<[AppendRecord]> for AppendRecordBatch {
1378    fn as_ref(&self) -> &[AppendRecord] {
1379        &self.records
1380    }
1381}
1382
1383#[sync_docs]
1384#[derive(Debug, Default, Clone)]
1385pub struct AppendInput {
1386    pub records: AppendRecordBatch,
1387    pub match_seq_num: Option<u64>,
1388    pub fencing_token: Option<FencingToken>,
1389}
1390
1391impl MeteredBytes for AppendInput {
1392    fn metered_bytes(&self) -> u64 {
1393        self.records.metered_bytes()
1394    }
1395}
1396
1397impl AppendInput {
1398    /// Create a new append input from record batch.
1399    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1400        Self {
1401            records: records.into(),
1402            match_seq_num: None,
1403            fencing_token: None,
1404        }
1405    }
1406
1407    /// Overwrite match sequence number.
1408    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1409        Self {
1410            match_seq_num: Some(match_seq_num.into()),
1411            ..self
1412        }
1413    }
1414
1415    /// Overwrite fencing token.
1416    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1417        Self {
1418            fencing_token: Some(fencing_token),
1419            ..self
1420        }
1421    }
1422
1423    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1424        let Self {
1425            records,
1426            match_seq_num,
1427            fencing_token,
1428        } = self;
1429
1430        api::AppendInput {
1431            stream: stream.into(),
1432            records: records.into_iter().map(Into::into).collect(),
1433            match_seq_num,
1434            fencing_token: fencing_token.map(|f| f.0),
1435        }
1436    }
1437}
1438
1439#[sync_docs]
1440#[derive(Debug, Clone)]
1441pub struct AppendOutput {
1442    pub start_seq_num: u64,
1443    pub end_seq_num: u64,
1444    pub next_seq_num: u64,
1445}
1446
1447impl From<api::AppendOutput> for AppendOutput {
1448    fn from(value: api::AppendOutput) -> Self {
1449        let api::AppendOutput {
1450            start_seq_num,
1451            end_seq_num,
1452            next_seq_num,
1453        } = value;
1454        Self {
1455            start_seq_num,
1456            end_seq_num,
1457            next_seq_num,
1458        }
1459    }
1460}
1461
1462impl TryFrom<api::AppendResponse> for AppendOutput {
1463    type Error = ConvertError;
1464    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1465        let api::AppendResponse { output } = value;
1466        let output = output.ok_or("missing append output")?;
1467        Ok(output.into())
1468    }
1469}
1470
1471impl TryFrom<api::AppendSessionResponse> for AppendOutput {
1472    type Error = ConvertError;
1473    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1474        let api::AppendSessionResponse { output } = value;
1475        let output = output.ok_or("missing append output")?;
1476        Ok(output.into())
1477    }
1478}
1479
1480#[sync_docs]
1481#[derive(Debug, Clone, Default)]
1482pub struct ReadLimit {
1483    pub count: Option<u64>,
1484    pub bytes: Option<u64>,
1485}
1486
1487#[sync_docs]
1488#[derive(Debug, Clone, Default)]
1489pub struct ReadRequest {
1490    pub start_seq_num: u64,
1491    pub limit: ReadLimit,
1492}
1493
1494impl ReadRequest {
1495    /// Create a new request with start sequence number.
1496    pub fn new(start_seq_num: u64) -> Self {
1497        Self {
1498            start_seq_num,
1499            ..Default::default()
1500        }
1501    }
1502
1503    /// Overwrite limit.
1504    pub fn with_limit(self, limit: ReadLimit) -> Self {
1505        Self { limit, ..self }
1506    }
1507}
1508
1509impl ReadRequest {
1510    pub(crate) fn try_into_api_type(
1511        self,
1512        stream: impl Into<String>,
1513    ) -> Result<api::ReadRequest, ConvertError> {
1514        let Self {
1515            start_seq_num,
1516            limit,
1517        } = self;
1518
1519        let limit = if limit.count > Some(1000) {
1520            Err("read limit: count must not exceed 1000 for unary request")
1521        } else if limit.bytes > Some(MIB_BYTES) {
1522            Err("read limit: bytes must not exceed 1MiB for unary request")
1523        } else {
1524            Ok(api::ReadLimit {
1525                count: limit.count,
1526                bytes: limit.bytes,
1527            })
1528        }?;
1529
1530        Ok(api::ReadRequest {
1531            stream: stream.into(),
1532            start_seq_num,
1533            limit: Some(limit),
1534        })
1535    }
1536}
1537
1538#[sync_docs]
1539#[derive(Debug, Clone)]
1540pub struct SequencedRecord {
1541    pub seq_num: u64,
1542    pub timestamp: u64,
1543    pub headers: Vec<Header>,
1544    pub body: Bytes,
1545}
1546
1547metered_impl!(SequencedRecord);
1548
1549impl From<api::SequencedRecord> for SequencedRecord {
1550    fn from(value: api::SequencedRecord) -> Self {
1551        let api::SequencedRecord {
1552            seq_num,
1553            timestamp,
1554            headers,
1555            body,
1556        } = value;
1557        Self {
1558            seq_num,
1559            timestamp,
1560            headers: headers.into_iter().map(Into::into).collect(),
1561            body,
1562        }
1563    }
1564}
1565
1566impl SequencedRecord {
1567    /// Try representing the sequenced record as a command record.
1568    pub fn as_command_record(&self) -> Option<CommandRecord> {
1569        if self.headers.len() != 1 {
1570            return None;
1571        }
1572
1573        let header = self.headers.first().expect("pre-validated length");
1574
1575        if !header.name.is_empty() {
1576            return None;
1577        }
1578
1579        match header.value.as_ref() {
1580            CommandRecord::FENCE => {
1581                let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1582                Some(CommandRecord {
1583                    command: Command::Fence { fencing_token },
1584                    timestamp: Some(self.timestamp),
1585                })
1586            }
1587            CommandRecord::TRIM => {
1588                let body: &[u8] = &self.body;
1589                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1590                Some(CommandRecord {
1591                    command: Command::Trim { seq_num },
1592                    timestamp: Some(self.timestamp),
1593                })
1594            }
1595            _ => None,
1596        }
1597    }
1598}
1599
1600#[sync_docs]
1601#[derive(Debug, Clone)]
1602pub struct SequencedRecordBatch {
1603    pub records: Vec<SequencedRecord>,
1604}
1605
1606impl MeteredBytes for SequencedRecordBatch {
1607    fn metered_bytes(&self) -> u64 {
1608        self.records.metered_bytes()
1609    }
1610}
1611
1612impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1613    fn from(value: api::SequencedRecordBatch) -> Self {
1614        let api::SequencedRecordBatch { records } = value;
1615        Self {
1616            records: records.into_iter().map(Into::into).collect(),
1617        }
1618    }
1619}
1620
1621#[sync_docs(ReadOutput = "Output")]
1622#[derive(Debug, Clone)]
1623pub enum ReadOutput {
1624    Batch(SequencedRecordBatch),
1625    FirstSeqNum(u64),
1626    NextSeqNum(u64),
1627}
1628
1629impl From<api::read_output::Output> for ReadOutput {
1630    fn from(value: api::read_output::Output) -> Self {
1631        match value {
1632            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1633            api::read_output::Output::FirstSeqNum(first_seq_num) => {
1634                Self::FirstSeqNum(first_seq_num)
1635            }
1636            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1637        }
1638    }
1639}
1640
1641impl TryFrom<api::ReadOutput> for ReadOutput {
1642    type Error = ConvertError;
1643    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1644        let api::ReadOutput { output } = value;
1645        let output = output.ok_or("missing read output")?;
1646        Ok(output.into())
1647    }
1648}
1649
1650impl TryFrom<api::ReadResponse> for ReadOutput {
1651    type Error = ConvertError;
1652    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1653        let api::ReadResponse { output } = value;
1654        let output = output.ok_or("missing output in read response")?;
1655        output.try_into()
1656    }
1657}
1658
1659#[sync_docs]
1660#[derive(Debug, Clone, Default)]
1661pub struct ReadSessionRequest {
1662    pub start_seq_num: u64,
1663    pub limit: ReadLimit,
1664}
1665
1666impl ReadSessionRequest {
1667    /// Create a new request with start sequene number.
1668    pub fn new(start_seq_num: u64) -> Self {
1669        Self {
1670            start_seq_num,
1671            ..Default::default()
1672        }
1673    }
1674
1675    /// Overwrite limit.
1676    pub fn with_limit(self, limit: ReadLimit) -> Self {
1677        Self { limit, ..self }
1678    }
1679
1680    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1681        let Self {
1682            start_seq_num,
1683            limit,
1684        } = self;
1685        api::ReadSessionRequest {
1686            stream: stream.into(),
1687            start_seq_num,
1688            limit: Some(api::ReadLimit {
1689                count: limit.count,
1690                bytes: limit.bytes,
1691            }),
1692            heartbeats: false,
1693        }
1694    }
1695}
1696
1697impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1698    type Error = ConvertError;
1699    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1700        let api::ReadSessionResponse { output } = value;
1701        let output = output.ok_or("missing output in read session response")?;
1702        output.try_into()
1703    }
1704}
1705
1706/// Name of a basin.
1707///
1708/// Must be between 8 and 48 characters in length. Must comprise lowercase
1709/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1710#[derive(Debug, Clone)]
1711pub struct BasinName(String);
1712
1713impl AsRef<str> for BasinName {
1714    fn as_ref(&self) -> &str {
1715        &self.0
1716    }
1717}
1718
1719impl Deref for BasinName {
1720    type Target = str;
1721    fn deref(&self) -> &Self::Target {
1722        &self.0
1723    }
1724}
1725
1726impl TryFrom<String> for BasinName {
1727    type Error = ConvertError;
1728
1729    fn try_from(name: String) -> Result<Self, Self::Error> {
1730        if name.len() < 8 || name.len() > 48 {
1731            return Err("Basin name must be between 8 and 48 characters in length".into());
1732        }
1733
1734        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1735        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1736            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1737                .expect("Failed to compile basin name regex")
1738        });
1739
1740        if !regex.is_match(&name) {
1741            return Err(
1742                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1743                It cannot begin or end with a hyphen."
1744                    .into(),
1745            );
1746        }
1747
1748        Ok(Self(name))
1749    }
1750}
1751
1752impl FromStr for BasinName {
1753    type Err = ConvertError;
1754
1755    fn from_str(s: &str) -> Result<Self, Self::Err> {
1756        s.to_string().try_into()
1757    }
1758}
1759
1760impl std::fmt::Display for BasinName {
1761    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1762        f.write_str(&self.0)
1763    }
1764}