s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::{Rng, distributions::Uniform};
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14/// Error related to conversion from one type to another.
15#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20    fn from(value: T) -> Self {
21        Self(value.into())
22    }
23}
24
25/// Metered size of the object in bytes.
26///
27/// Bytes are calculated using the "metered bytes" formula:
28///
29/// ```python
30/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
31/// ```
32pub trait MeteredBytes {
33    /// Return the metered bytes of the object.
34    fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38    fn metered_bytes(&self) -> u64 {
39        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40    }
41}
42
43macro_rules! metered_impl {
44    ($ty:ty) => {
45        impl MeteredBytes for $ty {
46            fn metered_bytes(&self) -> u64 {
47                let bytes = 8
48                    + (2 * self.headers.len())
49                    + self
50                        .headers
51                        .iter()
52                        .map(|h| h.name.len() + h.value.len())
53                        .sum::<usize>()
54                    + self.body.len();
55                bytes as u64
56            }
57        }
58    };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64    #[default]
65    Unspecified,
66    AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70    fn from(value: BasinScope) -> Self {
71        match value {
72            BasinScope::Unspecified => Self::Unspecified,
73            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74        }
75    }
76}
77
78impl From<api::BasinScope> for BasinScope {
79    fn from(value: api::BasinScope) -> Self {
80        match value {
81            api::BasinScope::Unspecified => Self::Unspecified,
82            api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83        }
84    }
85}
86
87impl FromStr for BasinScope {
88    type Err = ConvertError;
89    fn from_str(value: &str) -> Result<Self, Self::Err> {
90        match value {
91            "unspecified" => Ok(Self::Unspecified),
92            "aws:us-east-1" => Ok(Self::AwsUsEast1),
93            _ => Err("invalid basin scope value".into()),
94        }
95    }
96}
97
98impl From<BasinScope> for i32 {
99    fn from(value: BasinScope) -> Self {
100        api::BasinScope::from(value).into()
101    }
102}
103
104impl TryFrom<i32> for BasinScope {
105    type Error = ConvertError;
106    fn try_from(value: i32) -> Result<Self, Self::Error> {
107        api::BasinScope::try_from(value)
108            .map(Into::into)
109            .map_err(|_| "invalid basin scope value".into())
110    }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116    pub basin: BasinName,
117    pub config: Option<BasinConfig>,
118    pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122    /// Create a new request with basin name.
123    pub fn new(basin: BasinName) -> Self {
124        Self {
125            basin,
126            config: None,
127            scope: BasinScope::Unspecified,
128        }
129    }
130
131    /// Overwrite basin configuration.
132    pub fn with_config(self, config: BasinConfig) -> Self {
133        Self {
134            config: Some(config),
135            ..self
136        }
137    }
138
139    /// Overwrite basin scope.
140    pub fn with_scope(self, scope: BasinScope) -> Self {
141        Self { scope, ..self }
142    }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146    fn from(value: CreateBasinRequest) -> Self {
147        let CreateBasinRequest {
148            basin,
149            config,
150            scope,
151        } = value;
152        Self {
153            basin: basin.0,
154            config: config.map(Into::into),
155            scope: scope.into(),
156        }
157    }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163    pub default_stream_config: Option<StreamConfig>,
164    pub create_stream_on_append: bool,
165}
166
167impl From<BasinConfig> for api::BasinConfig {
168    fn from(value: BasinConfig) -> Self {
169        let BasinConfig {
170            default_stream_config,
171            create_stream_on_append,
172        } = value;
173        Self {
174            default_stream_config: default_stream_config.map(Into::into),
175            create_stream_on_append,
176        }
177    }
178}
179
180impl TryFrom<api::BasinConfig> for BasinConfig {
181    type Error = ConvertError;
182    fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
183        let api::BasinConfig {
184            default_stream_config,
185            create_stream_on_append,
186        } = value;
187        Ok(Self {
188            default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
189            create_stream_on_append,
190        })
191    }
192}
193
194#[sync_docs]
195#[derive(Debug, Clone, Default)]
196pub struct StreamConfig {
197    pub storage_class: StorageClass,
198    pub retention_policy: Option<RetentionPolicy>,
199}
200
201impl StreamConfig {
202    /// Create a new request.
203    pub fn new() -> Self {
204        Self::default()
205    }
206
207    /// Overwrite storage class.
208    pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
209        Self {
210            storage_class: storage_class.into(),
211            ..self
212        }
213    }
214
215    /// Overwrite retention policy.
216    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
217        Self {
218            retention_policy: Some(retention_policy),
219            ..self
220        }
221    }
222}
223
224impl From<StreamConfig> for api::StreamConfig {
225    fn from(value: StreamConfig) -> Self {
226        let StreamConfig {
227            storage_class,
228            retention_policy,
229        } = value;
230        Self {
231            storage_class: storage_class.into(),
232            retention_policy: retention_policy.map(Into::into),
233        }
234    }
235}
236
237impl TryFrom<api::StreamConfig> for StreamConfig {
238    type Error = ConvertError;
239    fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
240        let api::StreamConfig {
241            storage_class,
242            retention_policy,
243        } = value;
244        Ok(Self {
245            storage_class: storage_class.try_into()?,
246            retention_policy: retention_policy.map(Into::into),
247        })
248    }
249}
250
251#[sync_docs]
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
253pub enum StorageClass {
254    #[default]
255    Unspecified,
256    Standard,
257    Express,
258}
259
260impl From<StorageClass> for api::StorageClass {
261    fn from(value: StorageClass) -> Self {
262        match value {
263            StorageClass::Unspecified => Self::Unspecified,
264            StorageClass::Standard => Self::Standard,
265            StorageClass::Express => Self::Express,
266        }
267    }
268}
269
270impl From<api::StorageClass> for StorageClass {
271    fn from(value: api::StorageClass) -> Self {
272        match value {
273            api::StorageClass::Unspecified => Self::Unspecified,
274            api::StorageClass::Standard => Self::Standard,
275            api::StorageClass::Express => Self::Express,
276        }
277    }
278}
279
280impl FromStr for StorageClass {
281    type Err = ConvertError;
282    fn from_str(value: &str) -> Result<Self, Self::Err> {
283        match value {
284            "unspecified" => Ok(Self::Unspecified),
285            "standard" => Ok(Self::Standard),
286            "express" => Ok(Self::Express),
287            _ => Err("invalid storage class value".into()),
288        }
289    }
290}
291
292impl From<StorageClass> for i32 {
293    fn from(value: StorageClass) -> Self {
294        api::StorageClass::from(value).into()
295    }
296}
297
298impl TryFrom<i32> for StorageClass {
299    type Error = ConvertError;
300    fn try_from(value: i32) -> Result<Self, Self::Error> {
301        api::StorageClass::try_from(value)
302            .map(Into::into)
303            .map_err(|_| "invalid storage class value".into())
304    }
305}
306
307#[sync_docs(Age = "Age")]
308#[derive(Debug, Clone)]
309pub enum RetentionPolicy {
310    Age(Duration),
311}
312
313impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
314    fn from(value: RetentionPolicy) -> Self {
315        match value {
316            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
317        }
318    }
319}
320
321impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
322    fn from(value: api::stream_config::RetentionPolicy) -> Self {
323        match value {
324            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
325        }
326    }
327}
328
329#[sync_docs]
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331pub enum BasinState {
332    Unspecified,
333    Active,
334    Creating,
335    Deleting,
336}
337
338impl From<BasinState> for api::BasinState {
339    fn from(value: BasinState) -> Self {
340        match value {
341            BasinState::Unspecified => Self::Unspecified,
342            BasinState::Active => Self::Active,
343            BasinState::Creating => Self::Creating,
344            BasinState::Deleting => Self::Deleting,
345        }
346    }
347}
348
349impl From<api::BasinState> for BasinState {
350    fn from(value: api::BasinState) -> Self {
351        match value {
352            api::BasinState::Unspecified => Self::Unspecified,
353            api::BasinState::Active => Self::Active,
354            api::BasinState::Creating => Self::Creating,
355            api::BasinState::Deleting => Self::Deleting,
356        }
357    }
358}
359
360impl From<BasinState> for i32 {
361    fn from(value: BasinState) -> Self {
362        api::BasinState::from(value).into()
363    }
364}
365
366impl TryFrom<i32> for BasinState {
367    type Error = ConvertError;
368    fn try_from(value: i32) -> Result<Self, Self::Error> {
369        api::BasinState::try_from(value)
370            .map(Into::into)
371            .map_err(|_| "invalid basin status value".into())
372    }
373}
374
375impl std::fmt::Display for BasinState {
376    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377        match self {
378            BasinState::Unspecified => write!(f, "unspecified"),
379            BasinState::Active => write!(f, "active"),
380            BasinState::Creating => write!(f, "creating"),
381            BasinState::Deleting => write!(f, "deleting"),
382        }
383    }
384}
385
386#[sync_docs]
387#[derive(Debug, Clone)]
388pub struct BasinInfo {
389    pub name: String,
390    pub scope: BasinScope,
391    pub state: BasinState,
392}
393
394impl From<BasinInfo> for api::BasinInfo {
395    fn from(value: BasinInfo) -> Self {
396        let BasinInfo { name, scope, state } = value;
397        Self {
398            name,
399            scope: scope.into(),
400            state: state.into(),
401        }
402    }
403}
404
405impl TryFrom<api::BasinInfo> for BasinInfo {
406    type Error = ConvertError;
407    fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
408        let api::BasinInfo { name, scope, state } = value;
409        Ok(Self {
410            name,
411            scope: scope.try_into()?,
412            state: state.try_into()?,
413        })
414    }
415}
416
417impl TryFrom<api::CreateBasinResponse> for BasinInfo {
418    type Error = ConvertError;
419    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
420        let api::CreateBasinResponse { info } = value;
421        let info = info.ok_or("missing basin info")?;
422        info.try_into()
423    }
424}
425
426#[sync_docs]
427#[derive(Debug, Clone, Default)]
428pub struct ListStreamsRequest {
429    pub prefix: String,
430    pub start_after: String,
431    pub limit: Option<usize>,
432}
433
434impl ListStreamsRequest {
435    /// Create a new request.
436    pub fn new() -> Self {
437        Self::default()
438    }
439
440    /// Overwrite prefix.
441    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
442        Self {
443            prefix: prefix.into(),
444            ..self
445        }
446    }
447
448    /// Overwrite start after.
449    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
450        Self {
451            start_after: start_after.into(),
452            ..self
453        }
454    }
455
456    /// Overwrite limit.
457    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
458        Self {
459            limit: limit.into(),
460            ..self
461        }
462    }
463}
464
465impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
466    type Error = ConvertError;
467    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
468        let ListStreamsRequest {
469            prefix,
470            start_after,
471            limit,
472        } = value;
473        Ok(Self {
474            prefix,
475            start_after,
476            limit: limit.map(|n| n as u64),
477        })
478    }
479}
480
481#[sync_docs]
482#[derive(Debug, Clone)]
483pub struct StreamInfo {
484    pub name: String,
485    pub created_at: u32,
486    pub deleted_at: Option<u32>,
487}
488
489impl From<api::StreamInfo> for StreamInfo {
490    fn from(value: api::StreamInfo) -> Self {
491        Self {
492            name: value.name,
493            created_at: value.created_at,
494            deleted_at: value.deleted_at,
495        }
496    }
497}
498
499impl TryFrom<api::CreateStreamResponse> for StreamInfo {
500    type Error = ConvertError;
501
502    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
503        let api::CreateStreamResponse { info } = value;
504        let info = info.ok_or("missing stream info")?;
505        Ok(info.into())
506    }
507}
508
509#[sync_docs]
510#[derive(Debug, Clone)]
511pub struct ListStreamsResponse {
512    pub streams: Vec<StreamInfo>,
513    pub has_more: bool,
514}
515
516impl From<api::ListStreamsResponse> for ListStreamsResponse {
517    fn from(value: api::ListStreamsResponse) -> Self {
518        let api::ListStreamsResponse { streams, has_more } = value;
519        let streams = streams.into_iter().map(Into::into).collect();
520        Self { streams, has_more }
521    }
522}
523
524impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
525    type Error = ConvertError;
526    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
527        let api::GetBasinConfigResponse { config } = value;
528        let config = config.ok_or("missing basin config")?;
529        config.try_into()
530    }
531}
532
533impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
534    type Error = ConvertError;
535    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
536        let api::GetStreamConfigResponse { config } = value;
537        let config = config.ok_or("missing stream config")?;
538        config.try_into()
539    }
540}
541
542#[sync_docs]
543#[derive(Debug, Clone)]
544pub struct CreateStreamRequest {
545    pub stream: String,
546    pub config: Option<StreamConfig>,
547}
548
549impl CreateStreamRequest {
550    /// Create a new request with stream name.
551    pub fn new(stream: impl Into<String>) -> Self {
552        Self {
553            stream: stream.into(),
554            config: None,
555        }
556    }
557
558    /// Overwrite stream config.
559    pub fn with_config(self, config: StreamConfig) -> Self {
560        Self {
561            config: Some(config),
562            ..self
563        }
564    }
565}
566
567impl From<CreateStreamRequest> for api::CreateStreamRequest {
568    fn from(value: CreateStreamRequest) -> Self {
569        let CreateStreamRequest { stream, config } = value;
570        Self {
571            stream,
572            config: config.map(Into::into),
573        }
574    }
575}
576
577#[sync_docs]
578#[derive(Debug, Clone, Default)]
579pub struct ListBasinsRequest {
580    pub prefix: String,
581    pub start_after: String,
582    pub limit: Option<usize>,
583}
584
585impl ListBasinsRequest {
586    /// Create a new request.
587    pub fn new() -> Self {
588        Self::default()
589    }
590
591    /// Overwrite prefix.
592    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
593        Self {
594            prefix: prefix.into(),
595            ..self
596        }
597    }
598
599    /// Overwrite start after.
600    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
601        Self {
602            start_after: start_after.into(),
603            ..self
604        }
605    }
606
607    /// Overwrite limit.
608    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
609        Self {
610            limit: limit.into(),
611            ..self
612        }
613    }
614}
615
616impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
617    type Error = ConvertError;
618    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
619        let ListBasinsRequest {
620            prefix,
621            start_after,
622            limit,
623        } = value;
624        Ok(Self {
625            prefix,
626            start_after,
627            limit: limit
628                .map(TryInto::try_into)
629                .transpose()
630                .map_err(|_| "request limit does not fit into u64 bounds")?,
631        })
632    }
633}
634
635#[sync_docs]
636#[derive(Debug, Clone)]
637pub struct ListBasinsResponse {
638    pub basins: Vec<BasinInfo>,
639    pub has_more: bool,
640}
641
642impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
643    type Error = ConvertError;
644    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
645        let api::ListBasinsResponse { basins, has_more } = value;
646        Ok(Self {
647            basins: basins
648                .into_iter()
649                .map(TryInto::try_into)
650                .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
651            has_more,
652        })
653    }
654}
655
656#[sync_docs]
657#[derive(Debug, Clone)]
658pub struct DeleteBasinRequest {
659    pub basin: BasinName,
660    /// Delete basin if it exists else do nothing.
661    pub if_exists: bool,
662}
663
664impl DeleteBasinRequest {
665    /// Create a new request.
666    pub fn new(basin: BasinName) -> Self {
667        Self {
668            basin,
669            if_exists: false,
670        }
671    }
672
673    /// Overwrite the if exists parameter.
674    pub fn with_if_exists(self, if_exists: bool) -> Self {
675        Self { if_exists, ..self }
676    }
677}
678
679impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
680    fn from(value: DeleteBasinRequest) -> Self {
681        let DeleteBasinRequest { basin, .. } = value;
682        Self { basin: basin.0 }
683    }
684}
685
686#[sync_docs]
687#[derive(Debug, Clone)]
688pub struct DeleteStreamRequest {
689    pub stream: String,
690    /// Delete stream if it exists else do nothing.
691    pub if_exists: bool,
692}
693
694impl DeleteStreamRequest {
695    /// Create a new request.
696    pub fn new(stream: impl Into<String>) -> Self {
697        Self {
698            stream: stream.into(),
699            if_exists: false,
700        }
701    }
702
703    /// Overwrite the if exists parameter.
704    pub fn with_if_exists(self, if_exists: bool) -> Self {
705        Self { if_exists, ..self }
706    }
707}
708
709impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
710    fn from(value: DeleteStreamRequest) -> Self {
711        let DeleteStreamRequest { stream, .. } = value;
712        Self { stream }
713    }
714}
715
716#[sync_docs]
717#[derive(Debug, Clone)]
718pub struct ReconfigureBasinRequest {
719    pub basin: BasinName,
720    pub config: Option<BasinConfig>,
721    pub mask: Option<Vec<String>>,
722}
723
724impl ReconfigureBasinRequest {
725    /// Create a new request with basin name.
726    pub fn new(basin: BasinName) -> Self {
727        Self {
728            basin,
729            config: None,
730            mask: None,
731        }
732    }
733
734    /// Overwrite basin config.
735    pub fn with_config(self, config: BasinConfig) -> Self {
736        Self {
737            config: Some(config),
738            ..self
739        }
740    }
741
742    /// Overwrite field mask.
743    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
744        Self {
745            mask: Some(mask.into()),
746            ..self
747        }
748    }
749}
750
751impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
752    fn from(value: ReconfigureBasinRequest) -> Self {
753        let ReconfigureBasinRequest {
754            basin,
755            config,
756            mask,
757        } = value;
758        Self {
759            basin: basin.0,
760            config: config.map(Into::into),
761            mask: mask.map(|paths| prost_types::FieldMask { paths }),
762        }
763    }
764}
765
766impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
767    type Error = ConvertError;
768    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
769        let api::ReconfigureBasinResponse { config } = value;
770        let config = config.ok_or("missing basin config")?;
771        config.try_into()
772    }
773}
774
775#[sync_docs]
776#[derive(Debug, Clone)]
777pub struct ReconfigureStreamRequest {
778    pub stream: String,
779    pub config: Option<StreamConfig>,
780    pub mask: Option<Vec<String>>,
781}
782
783impl ReconfigureStreamRequest {
784    /// Create a new request with stream name.
785    pub fn new(stream: impl Into<String>) -> Self {
786        Self {
787            stream: stream.into(),
788            config: None,
789            mask: None,
790        }
791    }
792
793    /// Overwrite stream config.
794    pub fn with_config(self, config: StreamConfig) -> Self {
795        Self {
796            config: Some(config),
797            ..self
798        }
799    }
800
801    /// Overwrite field mask.
802    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
803        Self {
804            mask: Some(mask.into()),
805            ..self
806        }
807    }
808}
809
810impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
811    fn from(value: ReconfigureStreamRequest) -> Self {
812        let ReconfigureStreamRequest {
813            stream,
814            config,
815            mask,
816        } = value;
817        Self {
818            stream,
819            config: config.map(Into::into),
820            mask: mask.map(|paths| prost_types::FieldMask { paths }),
821        }
822    }
823}
824
825impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
826    type Error = ConvertError;
827    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
828        let api::ReconfigureStreamResponse { config } = value;
829        let config = config.ok_or("missing stream config")?;
830        config.try_into()
831    }
832}
833
834impl From<api::CheckTailResponse> for u64 {
835    fn from(value: api::CheckTailResponse) -> Self {
836        let api::CheckTailResponse { next_seq_num } = value;
837        next_seq_num
838    }
839}
840
841#[sync_docs]
842#[derive(Debug, Clone, PartialEq, Eq)]
843pub struct Header {
844    pub name: Bytes,
845    pub value: Bytes,
846}
847
848impl Header {
849    /// Create a new header from name and value.
850    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
851        Self {
852            name: name.into(),
853            value: value.into(),
854        }
855    }
856
857    /// Create a new header from value.
858    pub fn from_value(value: impl Into<Bytes>) -> Self {
859        Self {
860            name: Bytes::new(),
861            value: value.into(),
862        }
863    }
864}
865
866impl From<Header> for api::Header {
867    fn from(value: Header) -> Self {
868        let Header { name, value } = value;
869        Self { name, value }
870    }
871}
872
873impl From<api::Header> for Header {
874    fn from(value: api::Header) -> Self {
875        let api::Header { name, value } = value;
876        Self { name, value }
877    }
878}
879
880/// A fencing token can be enforced on append requests.
881///
882/// Must not be more than 16 bytes.
883#[derive(Debug, Clone, Default, PartialEq, Eq)]
884pub struct FencingToken(Bytes);
885
886impl FencingToken {
887    const MAX_BYTES: usize = 16;
888
889    /// Try creating a new fencing token from bytes.
890    pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
891        let bytes = bytes.into();
892        if bytes.len() > Self::MAX_BYTES {
893            Err(format!(
894                "Size of a fencing token cannot exceed {} bytes",
895                Self::MAX_BYTES
896            )
897            .into())
898        } else {
899            Ok(Self(bytes))
900        }
901    }
902
903    /// Generate a random fencing token with `n` bytes.
904    pub fn generate(n: usize) -> Result<Self, ConvertError> {
905        Self::new(
906            rand::thread_rng()
907                .sample_iter(&Uniform::new_inclusive(0, u8::MAX))
908                .take(n)
909                .collect::<Bytes>(),
910        )
911    }
912}
913
914impl AsRef<Bytes> for FencingToken {
915    fn as_ref(&self) -> &Bytes {
916        &self.0
917    }
918}
919
920impl AsRef<[u8]> for FencingToken {
921    fn as_ref(&self) -> &[u8] {
922        &self.0
923    }
924}
925
926impl Deref for FencingToken {
927    type Target = [u8];
928
929    fn deref(&self) -> &Self::Target {
930        &self.0
931    }
932}
933
934impl From<FencingToken> for Bytes {
935    fn from(value: FencingToken) -> Self {
936        value.0
937    }
938}
939
940impl From<FencingToken> for Vec<u8> {
941    fn from(value: FencingToken) -> Self {
942        value.0.into()
943    }
944}
945
946impl TryFrom<Bytes> for FencingToken {
947    type Error = ConvertError;
948
949    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
950        Self::new(value)
951    }
952}
953
954impl TryFrom<Vec<u8>> for FencingToken {
955    type Error = ConvertError;
956
957    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
958        Self::new(value)
959    }
960}
961
962/// A command record is a special kind of [`AppendRecord`] that can be used to
963/// send command messages.
964///
965/// Such a record is signalled by a sole header with empty name. The header
966/// value represents the operation and record body acts as the payload.
967#[derive(Debug, Clone)]
968pub enum CommandRecord {
969    /// Enforce a fencing token.
970    ///
971    /// Fencing is strongly consistent, and subsequent appends that specify a
972    /// fencing token will be rejected if it does not match.
973    Fence {
974        /// Fencing token to enforce.
975        ///
976        /// Set empty to clear the token.
977        fencing_token: FencingToken,
978    },
979    /// Request a trim till the sequence number.
980    ///
981    /// Trimming is eventually consistent, and trimmed records may be visible
982    /// for a brief period
983    Trim {
984        /// Trim point.
985        ///
986        /// This sequence number is only allowed to advance, and any regression
987        /// will be ignored.
988        seq_num: u64,
989    },
990}
991
992impl CommandRecord {
993    const FENCE: &[u8] = b"fence";
994    const TRIM: &[u8] = b"trim";
995
996    /// Create a new fence command record.
997    pub fn fence(fencing_token: FencingToken) -> Self {
998        Self::Fence { fencing_token }
999    }
1000
1001    /// Create a new trim command record.
1002    pub fn trim(seq_num: impl Into<u64>) -> Self {
1003        Self::Trim {
1004            seq_num: seq_num.into(),
1005        }
1006    }
1007}
1008
1009#[sync_docs]
1010#[derive(Debug, Clone, PartialEq, Eq)]
1011pub struct AppendRecord {
1012    headers: Vec<Header>,
1013    body: Bytes,
1014    #[cfg(test)]
1015    max_bytes: u64,
1016}
1017
1018metered_impl!(AppendRecord);
1019
1020impl AppendRecord {
1021    const MAX_BYTES: u64 = MIB_BYTES;
1022
1023    fn validated(self) -> Result<Self, ConvertError> {
1024        #[cfg(test)]
1025        let max_bytes = self.max_bytes;
1026        #[cfg(not(test))]
1027        let max_bytes = Self::MAX_BYTES;
1028
1029        if self.metered_bytes() > max_bytes {
1030            Err("AppendRecord should have metered size less than 1 MiB".into())
1031        } else {
1032            Ok(self)
1033        }
1034    }
1035
1036    /// Try creating a new append record with body.
1037    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1038        Self {
1039            headers: Vec::new(),
1040            body: body.into(),
1041            #[cfg(test)]
1042            max_bytes: Self::MAX_BYTES,
1043        }
1044        .validated()
1045    }
1046
1047    #[cfg(test)]
1048    pub(crate) fn with_max_bytes(
1049        max_bytes: u64,
1050        body: impl Into<Bytes>,
1051    ) -> Result<Self, ConvertError> {
1052        Self {
1053            headers: Vec::new(),
1054            body: body.into(),
1055            max_bytes,
1056        }
1057        .validated()
1058    }
1059
1060    /// Overwrite headers.
1061    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1062        Self {
1063            headers: headers.into(),
1064            ..self
1065        }
1066        .validated()
1067    }
1068
1069    /// Body of the record.
1070    pub fn body(&self) -> &[u8] {
1071        &self.body
1072    }
1073
1074    /// Headers of the record.
1075    pub fn headers(&self) -> &[Header] {
1076        &self.headers
1077    }
1078
1079    /// Consume the record and return parts.
1080    pub fn into_parts(self) -> AppendRecordParts {
1081        AppendRecordParts {
1082            headers: self.headers,
1083            body: self.body,
1084        }
1085    }
1086
1087    /// Try creating the record from parts.
1088    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1089        Self::new(parts.body)?.with_headers(parts.headers)
1090    }
1091}
1092
1093impl From<AppendRecord> for api::AppendRecord {
1094    fn from(value: AppendRecord) -> Self {
1095        Self {
1096            headers: value.headers.into_iter().map(Into::into).collect(),
1097            body: value.body,
1098        }
1099    }
1100}
1101
1102impl From<CommandRecord> for AppendRecord {
1103    fn from(value: CommandRecord) -> Self {
1104        let (header_value, body) = match value {
1105            CommandRecord::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1106            CommandRecord::Trim { seq_num } => {
1107                (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec())
1108            }
1109        };
1110        AppendRecordParts {
1111            headers: vec![Header::from_value(header_value)],
1112            body: body.into(),
1113        }
1114        .try_into()
1115        .expect("command record is a valid append record")
1116    }
1117}
1118
1119#[sync_docs(AppendRecordParts = "AppendRecord")]
1120#[derive(Debug, Clone)]
1121pub struct AppendRecordParts {
1122    pub headers: Vec<Header>,
1123    pub body: Bytes,
1124}
1125
1126impl From<AppendRecord> for AppendRecordParts {
1127    fn from(value: AppendRecord) -> Self {
1128        value.into_parts()
1129    }
1130}
1131
1132impl TryFrom<AppendRecordParts> for AppendRecord {
1133    type Error = ConvertError;
1134
1135    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1136        Self::try_from_parts(value)
1137    }
1138}
1139
1140/// A collection of append records that can be sent together in a batch.
1141#[derive(Debug, Clone)]
1142pub struct AppendRecordBatch {
1143    records: Vec<AppendRecord>,
1144    metered_bytes: u64,
1145    max_capacity: usize,
1146    #[cfg(test)]
1147    max_bytes: u64,
1148}
1149
1150impl PartialEq for AppendRecordBatch {
1151    fn eq(&self, other: &Self) -> bool {
1152        if self.records.eq(&other.records) {
1153            assert_eq!(self.metered_bytes, other.metered_bytes);
1154            true
1155        } else {
1156            false
1157        }
1158    }
1159}
1160
1161impl Eq for AppendRecordBatch {}
1162
1163impl Default for AppendRecordBatch {
1164    fn default() -> Self {
1165        Self::new()
1166    }
1167}
1168
1169impl AppendRecordBatch {
1170    /// Maximum number of records that a batch can hold.
1171    ///
1172    /// A record batch cannot be created with a bigger capacity.
1173    pub const MAX_CAPACITY: usize = 1000;
1174
1175    /// Maximum metered bytes of the batch.
1176    pub const MAX_BYTES: u64 = MIB_BYTES;
1177
1178    /// Create an empty record batch.
1179    pub fn new() -> Self {
1180        Self::with_max_capacity(Self::MAX_CAPACITY)
1181    }
1182
1183    /// Create an empty record batch with custom max capacity.
1184    ///
1185    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1186    pub fn with_max_capacity(max_capacity: usize) -> Self {
1187        assert!(
1188            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1189            "Batch capacity must be between 1 and 1000"
1190        );
1191
1192        Self {
1193            records: Vec::with_capacity(max_capacity),
1194            metered_bytes: 0,
1195            max_capacity,
1196            #[cfg(test)]
1197            max_bytes: Self::MAX_BYTES,
1198        }
1199    }
1200
1201    #[cfg(test)]
1202    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1203        #[cfg(test)]
1204        assert!(
1205            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1206            "Batch size must be between 1 byte and 1 MiB"
1207        );
1208
1209        Self {
1210            max_bytes,
1211            ..Self::with_max_capacity(max_capacity)
1212        }
1213    }
1214
1215    /// Try creating a record batch from an iterator.
1216    ///
1217    /// If all the items of the iterator cannot be drained into the batch, the
1218    /// error returned contains a batch containing all records it could fit
1219    /// along-with the left over items from the iterator.
1220    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1221    where
1222        R: Into<AppendRecord>,
1223        T: IntoIterator<Item = R>,
1224    {
1225        let mut records = Self::new();
1226        let mut pending = Vec::new();
1227
1228        let mut iter = iter.into_iter();
1229
1230        for record in iter.by_ref() {
1231            if let Err(record) = records.push(record) {
1232                pending.push(record);
1233                break;
1234            }
1235        }
1236
1237        if pending.is_empty() {
1238            Ok(records)
1239        } else {
1240            pending.extend(iter.map(Into::into));
1241            Err((records, pending))
1242        }
1243    }
1244
1245    /// Returns true if the batch contains no records.
1246    pub fn is_empty(&self) -> bool {
1247        if self.records.is_empty() {
1248            assert_eq!(self.metered_bytes, 0);
1249            true
1250        } else {
1251            false
1252        }
1253    }
1254
1255    /// Returns the number of records contained in the batch.
1256    pub fn len(&self) -> usize {
1257        self.records.len()
1258    }
1259
1260    #[cfg(test)]
1261    fn max_bytes(&self) -> u64 {
1262        self.max_bytes
1263    }
1264
1265    #[cfg(not(test))]
1266    fn max_bytes(&self) -> u64 {
1267        Self::MAX_BYTES
1268    }
1269
1270    /// Returns true if the batch cannot fit any more records.
1271    pub fn is_full(&self) -> bool {
1272        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1273    }
1274
1275    /// Try to append a new record into the batch.
1276    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1277        assert!(self.records.len() <= self.max_capacity);
1278        assert!(self.metered_bytes <= self.max_bytes());
1279
1280        let record = record.into();
1281        let record_size = record.metered_bytes();
1282        if self.records.len() >= self.max_capacity
1283            || self.metered_bytes + record_size > self.max_bytes()
1284        {
1285            Err(record)
1286        } else {
1287            self.records.push(record);
1288            self.metered_bytes += record_size;
1289            Ok(())
1290        }
1291    }
1292}
1293
1294impl MeteredBytes for AppendRecordBatch {
1295    fn metered_bytes(&self) -> u64 {
1296        self.metered_bytes
1297    }
1298}
1299
1300impl IntoIterator for AppendRecordBatch {
1301    type Item = AppendRecord;
1302    type IntoIter = std::vec::IntoIter<Self::Item>;
1303
1304    fn into_iter(self) -> Self::IntoIter {
1305        self.records.into_iter()
1306    }
1307}
1308
1309impl<'a> IntoIterator for &'a AppendRecordBatch {
1310    type Item = &'a AppendRecord;
1311    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1312
1313    fn into_iter(self) -> Self::IntoIter {
1314        self.records.iter()
1315    }
1316}
1317
1318impl AsRef<[AppendRecord]> for AppendRecordBatch {
1319    fn as_ref(&self) -> &[AppendRecord] {
1320        &self.records
1321    }
1322}
1323
1324#[sync_docs]
1325#[derive(Debug, Default, Clone)]
1326pub struct AppendInput {
1327    pub records: AppendRecordBatch,
1328    pub match_seq_num: Option<u64>,
1329    pub fencing_token: Option<FencingToken>,
1330}
1331
1332impl MeteredBytes for AppendInput {
1333    fn metered_bytes(&self) -> u64 {
1334        self.records.metered_bytes()
1335    }
1336}
1337
1338impl AppendInput {
1339    /// Create a new append input from record batch.
1340    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1341        Self {
1342            records: records.into(),
1343            match_seq_num: None,
1344            fencing_token: None,
1345        }
1346    }
1347
1348    /// Overwrite match sequence number.
1349    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1350        Self {
1351            match_seq_num: Some(match_seq_num.into()),
1352            ..self
1353        }
1354    }
1355
1356    /// Overwrite fencing token.
1357    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1358        Self {
1359            fencing_token: Some(fencing_token),
1360            ..self
1361        }
1362    }
1363
1364    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1365        let Self {
1366            records,
1367            match_seq_num,
1368            fencing_token,
1369        } = self;
1370
1371        api::AppendInput {
1372            stream: stream.into(),
1373            records: records.into_iter().map(Into::into).collect(),
1374            match_seq_num,
1375            fencing_token: fencing_token.map(|f| f.0),
1376        }
1377    }
1378}
1379
1380#[sync_docs]
1381#[derive(Debug, Clone)]
1382pub struct AppendOutput {
1383    pub start_seq_num: u64,
1384    pub end_seq_num: u64,
1385    pub next_seq_num: u64,
1386}
1387
1388impl From<api::AppendOutput> for AppendOutput {
1389    fn from(value: api::AppendOutput) -> Self {
1390        let api::AppendOutput {
1391            start_seq_num,
1392            end_seq_num,
1393            next_seq_num,
1394        } = value;
1395        Self {
1396            start_seq_num,
1397            end_seq_num,
1398            next_seq_num,
1399        }
1400    }
1401}
1402
1403impl TryFrom<api::AppendResponse> for AppendOutput {
1404    type Error = ConvertError;
1405    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1406        let api::AppendResponse { output } = value;
1407        let output = output.ok_or("missing append output")?;
1408        Ok(output.into())
1409    }
1410}
1411
1412impl TryFrom<api::AppendSessionResponse> for AppendOutput {
1413    type Error = ConvertError;
1414    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1415        let api::AppendSessionResponse { output } = value;
1416        let output = output.ok_or("missing append output")?;
1417        Ok(output.into())
1418    }
1419}
1420
1421#[sync_docs]
1422#[derive(Debug, Clone, Default)]
1423pub struct ReadLimit {
1424    pub count: Option<u64>,
1425    pub bytes: Option<u64>,
1426}
1427
1428#[sync_docs]
1429#[derive(Debug, Clone, Default)]
1430pub struct ReadRequest {
1431    pub start_seq_num: u64,
1432    pub limit: ReadLimit,
1433}
1434
1435impl ReadRequest {
1436    /// Create a new request with start sequence number.
1437    pub fn new(start_seq_num: u64) -> Self {
1438        Self {
1439            start_seq_num,
1440            ..Default::default()
1441        }
1442    }
1443
1444    /// Overwrite limit.
1445    pub fn with_limit(self, limit: ReadLimit) -> Self {
1446        Self { limit, ..self }
1447    }
1448}
1449
1450impl ReadRequest {
1451    pub(crate) fn try_into_api_type(
1452        self,
1453        stream: impl Into<String>,
1454    ) -> Result<api::ReadRequest, ConvertError> {
1455        let Self {
1456            start_seq_num,
1457            limit,
1458        } = self;
1459
1460        let limit = if limit.count > Some(1000) {
1461            Err("read limit: count must not exceed 1000 for unary request")
1462        } else if limit.bytes > Some(MIB_BYTES) {
1463            Err("read limit: bytes must not exceed 1MiB for unary request")
1464        } else {
1465            Ok(api::ReadLimit {
1466                count: limit.count,
1467                bytes: limit.bytes,
1468            })
1469        }?;
1470
1471        Ok(api::ReadRequest {
1472            stream: stream.into(),
1473            start_seq_num,
1474            limit: Some(limit),
1475        })
1476    }
1477}
1478
1479#[sync_docs]
1480#[derive(Debug, Clone)]
1481pub struct SequencedRecord {
1482    pub seq_num: u64,
1483    pub headers: Vec<Header>,
1484    pub body: Bytes,
1485}
1486
1487metered_impl!(SequencedRecord);
1488
1489impl From<api::SequencedRecord> for SequencedRecord {
1490    fn from(value: api::SequencedRecord) -> Self {
1491        let api::SequencedRecord {
1492            seq_num,
1493            headers,
1494            body,
1495        } = value;
1496        Self {
1497            seq_num,
1498            headers: headers.into_iter().map(Into::into).collect(),
1499            body,
1500        }
1501    }
1502}
1503
1504impl SequencedRecord {
1505    /// Try representing the sequenced record as a command record.
1506    pub fn as_command_record(&self) -> Option<CommandRecord> {
1507        if self.headers.len() != 1 {
1508            return None;
1509        }
1510
1511        let header = self.headers.first().expect("pre-validated length");
1512
1513        if !header.name.is_empty() {
1514            return None;
1515        }
1516
1517        match header.value.as_ref() {
1518            CommandRecord::FENCE => {
1519                let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1520                Some(CommandRecord::Fence { fencing_token })
1521            }
1522            CommandRecord::TRIM => {
1523                let body: &[u8] = &self.body;
1524                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1525                Some(CommandRecord::Trim { seq_num })
1526            }
1527            _ => None,
1528        }
1529    }
1530}
1531
1532#[sync_docs]
1533#[derive(Debug, Clone)]
1534pub struct SequencedRecordBatch {
1535    pub records: Vec<SequencedRecord>,
1536}
1537
1538impl MeteredBytes for SequencedRecordBatch {
1539    fn metered_bytes(&self) -> u64 {
1540        self.records.metered_bytes()
1541    }
1542}
1543
1544impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1545    fn from(value: api::SequencedRecordBatch) -> Self {
1546        let api::SequencedRecordBatch { records } = value;
1547        Self {
1548            records: records.into_iter().map(Into::into).collect(),
1549        }
1550    }
1551}
1552
1553#[sync_docs(ReadOutput = "Output")]
1554#[derive(Debug, Clone)]
1555pub enum ReadOutput {
1556    Batch(SequencedRecordBatch),
1557    FirstSeqNum(u64),
1558    NextSeqNum(u64),
1559}
1560
1561impl From<api::read_output::Output> for ReadOutput {
1562    fn from(value: api::read_output::Output) -> Self {
1563        match value {
1564            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1565            api::read_output::Output::FirstSeqNum(first_seq_num) => {
1566                Self::FirstSeqNum(first_seq_num)
1567            }
1568            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1569        }
1570    }
1571}
1572
1573impl TryFrom<api::ReadOutput> for ReadOutput {
1574    type Error = ConvertError;
1575    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1576        let api::ReadOutput { output } = value;
1577        let output = output.ok_or("missing read output")?;
1578        Ok(output.into())
1579    }
1580}
1581
1582impl TryFrom<api::ReadResponse> for ReadOutput {
1583    type Error = ConvertError;
1584    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1585        let api::ReadResponse { output } = value;
1586        let output = output.ok_or("missing output in read response")?;
1587        output.try_into()
1588    }
1589}
1590
1591#[sync_docs]
1592#[derive(Debug, Clone, Default)]
1593pub struct ReadSessionRequest {
1594    pub start_seq_num: u64,
1595    pub limit: ReadLimit,
1596}
1597
1598impl ReadSessionRequest {
1599    /// Create a new request with start sequene number.
1600    pub fn new(start_seq_num: u64) -> Self {
1601        Self {
1602            start_seq_num,
1603            ..Default::default()
1604        }
1605    }
1606
1607    /// Overwrite limit.
1608    pub fn with_limit(self, limit: ReadLimit) -> Self {
1609        Self { limit, ..self }
1610    }
1611
1612    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1613        let Self {
1614            start_seq_num,
1615            limit,
1616        } = self;
1617        api::ReadSessionRequest {
1618            stream: stream.into(),
1619            start_seq_num,
1620            limit: Some(api::ReadLimit {
1621                count: limit.count,
1622                bytes: limit.bytes,
1623            }),
1624            heartbeats: false,
1625        }
1626    }
1627}
1628
1629impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1630    type Error = ConvertError;
1631    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1632        let api::ReadSessionResponse { output } = value;
1633        let output = output.ok_or("missing output in read session response")?;
1634        output.try_into()
1635    }
1636}
1637
1638/// Name of a basin.
1639///
1640/// Must be between 8 and 48 characters in length. Must comprise lowercase
1641/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1642#[derive(Debug, Clone)]
1643pub struct BasinName(String);
1644
1645impl AsRef<str> for BasinName {
1646    fn as_ref(&self) -> &str {
1647        &self.0
1648    }
1649}
1650
1651impl Deref for BasinName {
1652    type Target = str;
1653    fn deref(&self) -> &Self::Target {
1654        &self.0
1655    }
1656}
1657
1658impl TryFrom<String> for BasinName {
1659    type Error = ConvertError;
1660
1661    fn try_from(name: String) -> Result<Self, Self::Error> {
1662        if name.len() < 8 || name.len() > 48 {
1663            return Err("Basin name must be between 8 and 48 characters in length".into());
1664        }
1665
1666        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1667        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1668            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1669                .expect("Failed to compile basin name regex")
1670        });
1671
1672        if !regex.is_match(&name) {
1673            return Err(
1674                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1675                It cannot begin or end with a hyphen."
1676                    .into(),
1677            );
1678        }
1679
1680        Ok(Self(name))
1681    }
1682}
1683
1684impl FromStr for BasinName {
1685    type Err = ConvertError;
1686
1687    fn from_str(s: &str) -> Result<Self, Self::Err> {
1688        s.to_string().try_into()
1689    }
1690}
1691
1692impl std::fmt::Display for BasinName {
1693    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1694        f.write_str(&self.0)
1695    }
1696}