s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::{Rng, distributions::Uniform};
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14/// Error related to conversion from one type to another.
15#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20    fn from(value: T) -> Self {
21        Self(value.into())
22    }
23}
24
25/// Metered size of the object in bytes.
26///
27/// Bytes are calculated using the "metered bytes" formula:
28///
29/// ```python
30/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
31/// ```
32pub trait MeteredBytes {
33    /// Return the metered bytes of the object.
34    fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38    fn metered_bytes(&self) -> u64 {
39        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40    }
41}
42
43macro_rules! metered_impl {
44    ($ty:ty) => {
45        impl MeteredBytes for $ty {
46            fn metered_bytes(&self) -> u64 {
47                let bytes = 8
48                    + (2 * self.headers.len())
49                    + self
50                        .headers
51                        .iter()
52                        .map(|h| h.name.len() + h.value.len())
53                        .sum::<usize>()
54                    + self.body.len();
55                bytes as u64
56            }
57        }
58    };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64    #[default]
65    Unspecified,
66    AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70    fn from(value: BasinScope) -> Self {
71        match value {
72            BasinScope::Unspecified => Self::Unspecified,
73            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74        }
75    }
76}
77
78impl From<api::BasinScope> for BasinScope {
79    fn from(value: api::BasinScope) -> Self {
80        match value {
81            api::BasinScope::Unspecified => Self::Unspecified,
82            api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83        }
84    }
85}
86
87impl FromStr for BasinScope {
88    type Err = ConvertError;
89    fn from_str(value: &str) -> Result<Self, Self::Err> {
90        match value {
91            "unspecified" => Ok(Self::Unspecified),
92            "aws:us-east-1" => Ok(Self::AwsUsEast1),
93            _ => Err("invalid basin scope value".into()),
94        }
95    }
96}
97
98impl From<BasinScope> for i32 {
99    fn from(value: BasinScope) -> Self {
100        api::BasinScope::from(value).into()
101    }
102}
103
104impl TryFrom<i32> for BasinScope {
105    type Error = ConvertError;
106    fn try_from(value: i32) -> Result<Self, Self::Error> {
107        api::BasinScope::try_from(value)
108            .map(Into::into)
109            .map_err(|_| "invalid basin scope value".into())
110    }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116    pub basin: BasinName,
117    pub config: Option<BasinConfig>,
118    pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122    /// Create a new request with basin name.
123    pub fn new(basin: BasinName) -> Self {
124        Self {
125            basin,
126            config: None,
127            scope: BasinScope::Unspecified,
128        }
129    }
130
131    /// Overwrite basin configuration.
132    pub fn with_config(self, config: BasinConfig) -> Self {
133        Self {
134            config: Some(config),
135            ..self
136        }
137    }
138
139    /// Overwrite basin scope.
140    pub fn with_scope(self, scope: BasinScope) -> Self {
141        Self { scope, ..self }
142    }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146    fn from(value: CreateBasinRequest) -> Self {
147        let CreateBasinRequest {
148            basin,
149            config,
150            scope,
151        } = value;
152        Self {
153            basin: basin.0,
154            config: config.map(Into::into),
155            scope: scope.into(),
156        }
157    }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163    pub default_stream_config: Option<StreamConfig>,
164    pub create_stream_on_append: bool,
165    pub create_stream_on_read: bool,
166}
167
168impl From<BasinConfig> for api::BasinConfig {
169    fn from(value: BasinConfig) -> Self {
170        let BasinConfig {
171            default_stream_config,
172            create_stream_on_append,
173            create_stream_on_read,
174        } = value;
175        Self {
176            default_stream_config: default_stream_config.map(Into::into),
177            create_stream_on_append,
178            create_stream_on_read,
179        }
180    }
181}
182
183impl TryFrom<api::BasinConfig> for BasinConfig {
184    type Error = ConvertError;
185    fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
186        let api::BasinConfig {
187            default_stream_config,
188            create_stream_on_append,
189            create_stream_on_read,
190        } = value;
191        Ok(Self {
192            default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
193            create_stream_on_append,
194            create_stream_on_read,
195        })
196    }
197}
198
199#[sync_docs]
200#[derive(Debug, Clone, Default)]
201pub struct StreamConfig {
202    pub storage_class: StorageClass,
203    pub retention_policy: Option<RetentionPolicy>,
204    pub require_client_timestamps: Option<bool>,
205}
206
207impl StreamConfig {
208    /// Create a new request.
209    pub fn new() -> Self {
210        Self::default()
211    }
212
213    /// Overwrite storage class.
214    pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
215        Self {
216            storage_class: storage_class.into(),
217            ..self
218        }
219    }
220
221    /// Overwrite retention policy.
222    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
223        Self {
224            retention_policy: Some(retention_policy),
225            ..self
226        }
227    }
228
229    /// Overwrite `require_client_timestamps`.
230    pub fn with_require_client_timestamps(self, require_client_timestamps: bool) -> Self {
231        Self {
232            require_client_timestamps: Some(require_client_timestamps),
233            ..self
234        }
235    }
236}
237
238impl From<StreamConfig> for api::StreamConfig {
239    fn from(value: StreamConfig) -> Self {
240        let StreamConfig {
241            storage_class,
242            retention_policy,
243            require_client_timestamps,
244        } = value;
245        Self {
246            storage_class: storage_class.into(),
247            retention_policy: retention_policy.map(Into::into),
248            require_client_timestamps,
249        }
250    }
251}
252
253impl TryFrom<api::StreamConfig> for StreamConfig {
254    type Error = ConvertError;
255    fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
256        let api::StreamConfig {
257            storage_class,
258            retention_policy,
259            require_client_timestamps,
260        } = value;
261        Ok(Self {
262            storage_class: storage_class.try_into()?,
263            retention_policy: retention_policy.map(Into::into),
264            require_client_timestamps,
265        })
266    }
267}
268
269#[sync_docs]
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
271pub enum StorageClass {
272    #[default]
273    Unspecified,
274    Standard,
275    Express,
276}
277
278impl From<StorageClass> for api::StorageClass {
279    fn from(value: StorageClass) -> Self {
280        match value {
281            StorageClass::Unspecified => Self::Unspecified,
282            StorageClass::Standard => Self::Standard,
283            StorageClass::Express => Self::Express,
284        }
285    }
286}
287
288impl From<api::StorageClass> for StorageClass {
289    fn from(value: api::StorageClass) -> Self {
290        match value {
291            api::StorageClass::Unspecified => Self::Unspecified,
292            api::StorageClass::Standard => Self::Standard,
293            api::StorageClass::Express => Self::Express,
294        }
295    }
296}
297
298impl FromStr for StorageClass {
299    type Err = ConvertError;
300    fn from_str(value: &str) -> Result<Self, Self::Err> {
301        match value {
302            "unspecified" => Ok(Self::Unspecified),
303            "standard" => Ok(Self::Standard),
304            "express" => Ok(Self::Express),
305            _ => Err("invalid storage class value".into()),
306        }
307    }
308}
309
310impl From<StorageClass> for i32 {
311    fn from(value: StorageClass) -> Self {
312        api::StorageClass::from(value).into()
313    }
314}
315
316impl TryFrom<i32> for StorageClass {
317    type Error = ConvertError;
318    fn try_from(value: i32) -> Result<Self, Self::Error> {
319        api::StorageClass::try_from(value)
320            .map(Into::into)
321            .map_err(|_| "invalid storage class value".into())
322    }
323}
324
325#[sync_docs(Age = "Age")]
326#[derive(Debug, Clone)]
327pub enum RetentionPolicy {
328    Age(Duration),
329}
330
331impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
332    fn from(value: RetentionPolicy) -> Self {
333        match value {
334            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
335        }
336    }
337}
338
339impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
340    fn from(value: api::stream_config::RetentionPolicy) -> Self {
341        match value {
342            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
343        }
344    }
345}
346
347#[sync_docs]
348#[derive(Debug, Clone, Copy, PartialEq, Eq)]
349pub enum BasinState {
350    Unspecified,
351    Active,
352    Creating,
353    Deleting,
354}
355
356impl From<BasinState> for api::BasinState {
357    fn from(value: BasinState) -> Self {
358        match value {
359            BasinState::Unspecified => Self::Unspecified,
360            BasinState::Active => Self::Active,
361            BasinState::Creating => Self::Creating,
362            BasinState::Deleting => Self::Deleting,
363        }
364    }
365}
366
367impl From<api::BasinState> for BasinState {
368    fn from(value: api::BasinState) -> Self {
369        match value {
370            api::BasinState::Unspecified => Self::Unspecified,
371            api::BasinState::Active => Self::Active,
372            api::BasinState::Creating => Self::Creating,
373            api::BasinState::Deleting => Self::Deleting,
374        }
375    }
376}
377
378impl From<BasinState> for i32 {
379    fn from(value: BasinState) -> Self {
380        api::BasinState::from(value).into()
381    }
382}
383
384impl TryFrom<i32> for BasinState {
385    type Error = ConvertError;
386    fn try_from(value: i32) -> Result<Self, Self::Error> {
387        api::BasinState::try_from(value)
388            .map(Into::into)
389            .map_err(|_| "invalid basin status value".into())
390    }
391}
392
393impl std::fmt::Display for BasinState {
394    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395        match self {
396            BasinState::Unspecified => write!(f, "unspecified"),
397            BasinState::Active => write!(f, "active"),
398            BasinState::Creating => write!(f, "creating"),
399            BasinState::Deleting => write!(f, "deleting"),
400        }
401    }
402}
403
404#[sync_docs]
405#[derive(Debug, Clone)]
406pub struct BasinInfo {
407    pub name: String,
408    pub scope: BasinScope,
409    pub state: BasinState,
410}
411
412impl From<BasinInfo> for api::BasinInfo {
413    fn from(value: BasinInfo) -> Self {
414        let BasinInfo { name, scope, state } = value;
415        Self {
416            name,
417            scope: scope.into(),
418            state: state.into(),
419        }
420    }
421}
422
423impl TryFrom<api::BasinInfo> for BasinInfo {
424    type Error = ConvertError;
425    fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
426        let api::BasinInfo { name, scope, state } = value;
427        Ok(Self {
428            name,
429            scope: scope.try_into()?,
430            state: state.try_into()?,
431        })
432    }
433}
434
435impl TryFrom<api::CreateBasinResponse> for BasinInfo {
436    type Error = ConvertError;
437    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
438        let api::CreateBasinResponse { info } = value;
439        let info = info.ok_or("missing basin info")?;
440        info.try_into()
441    }
442}
443
444#[sync_docs]
445#[derive(Debug, Clone, Default)]
446pub struct ListStreamsRequest {
447    pub prefix: String,
448    pub start_after: String,
449    pub limit: Option<usize>,
450}
451
452impl ListStreamsRequest {
453    /// Create a new request.
454    pub fn new() -> Self {
455        Self::default()
456    }
457
458    /// Overwrite prefix.
459    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
460        Self {
461            prefix: prefix.into(),
462            ..self
463        }
464    }
465
466    /// Overwrite start after.
467    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
468        Self {
469            start_after: start_after.into(),
470            ..self
471        }
472    }
473
474    /// Overwrite limit.
475    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
476        Self {
477            limit: limit.into(),
478            ..self
479        }
480    }
481}
482
483impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
484    type Error = ConvertError;
485    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
486        let ListStreamsRequest {
487            prefix,
488            start_after,
489            limit,
490        } = value;
491        Ok(Self {
492            prefix,
493            start_after,
494            limit: limit.map(|n| n as u64),
495        })
496    }
497}
498
499#[sync_docs]
500#[derive(Debug, Clone)]
501pub struct StreamInfo {
502    pub name: String,
503    pub created_at: u32,
504    pub deleted_at: Option<u32>,
505}
506
507impl From<api::StreamInfo> for StreamInfo {
508    fn from(value: api::StreamInfo) -> Self {
509        Self {
510            name: value.name,
511            created_at: value.created_at,
512            deleted_at: value.deleted_at,
513        }
514    }
515}
516
517impl TryFrom<api::CreateStreamResponse> for StreamInfo {
518    type Error = ConvertError;
519
520    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
521        let api::CreateStreamResponse { info } = value;
522        let info = info.ok_or("missing stream info")?;
523        Ok(info.into())
524    }
525}
526
527#[sync_docs]
528#[derive(Debug, Clone)]
529pub struct ListStreamsResponse {
530    pub streams: Vec<StreamInfo>,
531    pub has_more: bool,
532}
533
534impl From<api::ListStreamsResponse> for ListStreamsResponse {
535    fn from(value: api::ListStreamsResponse) -> Self {
536        let api::ListStreamsResponse { streams, has_more } = value;
537        let streams = streams.into_iter().map(Into::into).collect();
538        Self { streams, has_more }
539    }
540}
541
542impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
543    type Error = ConvertError;
544    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
545        let api::GetBasinConfigResponse { config } = value;
546        let config = config.ok_or("missing basin config")?;
547        config.try_into()
548    }
549}
550
551impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
552    type Error = ConvertError;
553    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
554        let api::GetStreamConfigResponse { config } = value;
555        let config = config.ok_or("missing stream config")?;
556        config.try_into()
557    }
558}
559
560#[sync_docs]
561#[derive(Debug, Clone)]
562pub struct CreateStreamRequest {
563    pub stream: String,
564    pub config: Option<StreamConfig>,
565}
566
567impl CreateStreamRequest {
568    /// Create a new request with stream name.
569    pub fn new(stream: impl Into<String>) -> Self {
570        Self {
571            stream: stream.into(),
572            config: None,
573        }
574    }
575
576    /// Overwrite stream config.
577    pub fn with_config(self, config: StreamConfig) -> Self {
578        Self {
579            config: Some(config),
580            ..self
581        }
582    }
583}
584
585impl From<CreateStreamRequest> for api::CreateStreamRequest {
586    fn from(value: CreateStreamRequest) -> Self {
587        let CreateStreamRequest { stream, config } = value;
588        Self {
589            stream,
590            config: config.map(Into::into),
591        }
592    }
593}
594
595#[sync_docs]
596#[derive(Debug, Clone, Default)]
597pub struct ListBasinsRequest {
598    pub prefix: String,
599    pub start_after: String,
600    pub limit: Option<usize>,
601}
602
603impl ListBasinsRequest {
604    /// Create a new request.
605    pub fn new() -> Self {
606        Self::default()
607    }
608
609    /// Overwrite prefix.
610    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
611        Self {
612            prefix: prefix.into(),
613            ..self
614        }
615    }
616
617    /// Overwrite start after.
618    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
619        Self {
620            start_after: start_after.into(),
621            ..self
622        }
623    }
624
625    /// Overwrite limit.
626    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
627        Self {
628            limit: limit.into(),
629            ..self
630        }
631    }
632}
633
634impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
635    type Error = ConvertError;
636    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
637        let ListBasinsRequest {
638            prefix,
639            start_after,
640            limit,
641        } = value;
642        Ok(Self {
643            prefix,
644            start_after,
645            limit: limit
646                .map(TryInto::try_into)
647                .transpose()
648                .map_err(|_| "request limit does not fit into u64 bounds")?,
649        })
650    }
651}
652
653#[sync_docs]
654#[derive(Debug, Clone)]
655pub struct ListBasinsResponse {
656    pub basins: Vec<BasinInfo>,
657    pub has_more: bool,
658}
659
660impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
661    type Error = ConvertError;
662    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
663        let api::ListBasinsResponse { basins, has_more } = value;
664        Ok(Self {
665            basins: basins
666                .into_iter()
667                .map(TryInto::try_into)
668                .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
669            has_more,
670        })
671    }
672}
673
674#[sync_docs]
675#[derive(Debug, Clone)]
676pub struct DeleteBasinRequest {
677    pub basin: BasinName,
678    /// Delete basin if it exists else do nothing.
679    pub if_exists: bool,
680}
681
682impl DeleteBasinRequest {
683    /// Create a new request.
684    pub fn new(basin: BasinName) -> Self {
685        Self {
686            basin,
687            if_exists: false,
688        }
689    }
690
691    /// Overwrite the if exists parameter.
692    pub fn with_if_exists(self, if_exists: bool) -> Self {
693        Self { if_exists, ..self }
694    }
695}
696
697impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
698    fn from(value: DeleteBasinRequest) -> Self {
699        let DeleteBasinRequest { basin, .. } = value;
700        Self { basin: basin.0 }
701    }
702}
703
704#[sync_docs]
705#[derive(Debug, Clone)]
706pub struct DeleteStreamRequest {
707    pub stream: String,
708    /// Delete stream if it exists else do nothing.
709    pub if_exists: bool,
710}
711
712impl DeleteStreamRequest {
713    /// Create a new request.
714    pub fn new(stream: impl Into<String>) -> Self {
715        Self {
716            stream: stream.into(),
717            if_exists: false,
718        }
719    }
720
721    /// Overwrite the if exists parameter.
722    pub fn with_if_exists(self, if_exists: bool) -> Self {
723        Self { if_exists, ..self }
724    }
725}
726
727impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
728    fn from(value: DeleteStreamRequest) -> Self {
729        let DeleteStreamRequest { stream, .. } = value;
730        Self { stream }
731    }
732}
733
734#[sync_docs]
735#[derive(Debug, Clone)]
736pub struct ReconfigureBasinRequest {
737    pub basin: BasinName,
738    pub config: Option<BasinConfig>,
739    pub mask: Option<Vec<String>>,
740}
741
742impl ReconfigureBasinRequest {
743    /// Create a new request with basin name.
744    pub fn new(basin: BasinName) -> Self {
745        Self {
746            basin,
747            config: None,
748            mask: None,
749        }
750    }
751
752    /// Overwrite basin config.
753    pub fn with_config(self, config: BasinConfig) -> Self {
754        Self {
755            config: Some(config),
756            ..self
757        }
758    }
759
760    /// Overwrite field mask.
761    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
762        Self {
763            mask: Some(mask.into()),
764            ..self
765        }
766    }
767}
768
769impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
770    fn from(value: ReconfigureBasinRequest) -> Self {
771        let ReconfigureBasinRequest {
772            basin,
773            config,
774            mask,
775        } = value;
776        Self {
777            basin: basin.0,
778            config: config.map(Into::into),
779            mask: mask.map(|paths| prost_types::FieldMask { paths }),
780        }
781    }
782}
783
784impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
785    type Error = ConvertError;
786    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
787        let api::ReconfigureBasinResponse { config } = value;
788        let config = config.ok_or("missing basin config")?;
789        config.try_into()
790    }
791}
792
793#[sync_docs]
794#[derive(Debug, Clone)]
795pub struct ReconfigureStreamRequest {
796    pub stream: String,
797    pub config: Option<StreamConfig>,
798    pub mask: Option<Vec<String>>,
799}
800
801impl ReconfigureStreamRequest {
802    /// Create a new request with stream name.
803    pub fn new(stream: impl Into<String>) -> Self {
804        Self {
805            stream: stream.into(),
806            config: None,
807            mask: None,
808        }
809    }
810
811    /// Overwrite stream config.
812    pub fn with_config(self, config: StreamConfig) -> Self {
813        Self {
814            config: Some(config),
815            ..self
816        }
817    }
818
819    /// Overwrite field mask.
820    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
821        Self {
822            mask: Some(mask.into()),
823            ..self
824        }
825    }
826}
827
828impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
829    fn from(value: ReconfigureStreamRequest) -> Self {
830        let ReconfigureStreamRequest {
831            stream,
832            config,
833            mask,
834        } = value;
835        Self {
836            stream,
837            config: config.map(Into::into),
838            mask: mask.map(|paths| prost_types::FieldMask { paths }),
839        }
840    }
841}
842
843impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
844    type Error = ConvertError;
845    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
846        let api::ReconfigureStreamResponse { config } = value;
847        let config = config.ok_or("missing stream config")?;
848        config.try_into()
849    }
850}
851
852impl From<api::CheckTailResponse> for u64 {
853    fn from(value: api::CheckTailResponse) -> Self {
854        let api::CheckTailResponse { next_seq_num } = value;
855        next_seq_num
856    }
857}
858
859#[sync_docs]
860#[derive(Debug, Clone, PartialEq, Eq)]
861pub struct Header {
862    pub name: Bytes,
863    pub value: Bytes,
864}
865
866impl Header {
867    /// Create a new header from name and value.
868    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
869        Self {
870            name: name.into(),
871            value: value.into(),
872        }
873    }
874
875    /// Create a new header from value.
876    pub fn from_value(value: impl Into<Bytes>) -> Self {
877        Self {
878            name: Bytes::new(),
879            value: value.into(),
880        }
881    }
882}
883
884impl From<Header> for api::Header {
885    fn from(value: Header) -> Self {
886        let Header { name, value } = value;
887        Self { name, value }
888    }
889}
890
891impl From<api::Header> for Header {
892    fn from(value: api::Header) -> Self {
893        let api::Header { name, value } = value;
894        Self { name, value }
895    }
896}
897
898/// A fencing token can be enforced on append requests.
899///
900/// Must not be more than 16 bytes.
901#[derive(Debug, Clone, Default, PartialEq, Eq)]
902pub struct FencingToken(Bytes);
903
904impl FencingToken {
905    const MAX_BYTES: usize = 16;
906
907    /// Try creating a new fencing token from bytes.
908    pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
909        let bytes = bytes.into();
910        if bytes.len() > Self::MAX_BYTES {
911            Err(format!(
912                "Size of a fencing token cannot exceed {} bytes",
913                Self::MAX_BYTES
914            )
915            .into())
916        } else {
917            Ok(Self(bytes))
918        }
919    }
920
921    /// Generate a random fencing token with `n` bytes.
922    pub fn generate(n: usize) -> Result<Self, ConvertError> {
923        Self::new(
924            rand::thread_rng()
925                .sample_iter(&Uniform::new_inclusive(0, u8::MAX))
926                .take(n)
927                .collect::<Bytes>(),
928        )
929    }
930}
931
932impl AsRef<Bytes> for FencingToken {
933    fn as_ref(&self) -> &Bytes {
934        &self.0
935    }
936}
937
938impl AsRef<[u8]> for FencingToken {
939    fn as_ref(&self) -> &[u8] {
940        &self.0
941    }
942}
943
944impl Deref for FencingToken {
945    type Target = [u8];
946
947    fn deref(&self) -> &Self::Target {
948        &self.0
949    }
950}
951
952impl From<FencingToken> for Bytes {
953    fn from(value: FencingToken) -> Self {
954        value.0
955    }
956}
957
958impl From<FencingToken> for Vec<u8> {
959    fn from(value: FencingToken) -> Self {
960        value.0.into()
961    }
962}
963
964impl TryFrom<Bytes> for FencingToken {
965    type Error = ConvertError;
966
967    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
968        Self::new(value)
969    }
970}
971
972impl TryFrom<Vec<u8>> for FencingToken {
973    type Error = ConvertError;
974
975    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
976        Self::new(value)
977    }
978}
979
980/// Command to send through a `CommandRecord`.
981#[derive(Debug, Clone)]
982pub enum Command {
983    /// Enforce a fencing token.
984    ///
985    /// Fencing is strongly consistent, and subsequent appends that specify a
986    /// fencing token will be rejected if it does not match.
987    Fence {
988        /// Fencing token to enforce.
989        ///
990        /// Set empty to clear the token.
991        fencing_token: FencingToken,
992    },
993    /// Request a trim till the sequence number.
994    ///
995    /// Trimming is eventually consistent, and trimmed records may be visible
996    /// for a brief period
997    Trim {
998        /// Trim point.
999        ///
1000        /// This sequence number is only allowed to advance, and any regression
1001        /// will be ignored.
1002        seq_num: u64,
1003    },
1004}
1005
1006/// A command record is a special kind of [`AppendRecord`] that can be used to
1007/// send command messages.
1008///
1009/// Such a record is signalled by a sole header with empty name. The header
1010/// value represents the operation and record body acts as the payload.
1011#[derive(Debug, Clone)]
1012pub struct CommandRecord {
1013    /// Command kind.
1014    pub command: Command,
1015    /// Timestamp for the record.
1016    pub timestamp: Option<u64>,
1017}
1018
1019impl CommandRecord {
1020    const FENCE: &[u8] = b"fence";
1021    const TRIM: &[u8] = b"trim";
1022
1023    /// Create a new fence command record.
1024    pub fn fence(fencing_token: FencingToken) -> Self {
1025        Self {
1026            command: Command::Fence { fencing_token },
1027            timestamp: None,
1028        }
1029    }
1030
1031    /// Create a new trim command record.
1032    pub fn trim(seq_num: impl Into<u64>) -> Self {
1033        Self {
1034            command: Command::Trim {
1035                seq_num: seq_num.into(),
1036            },
1037            timestamp: None,
1038        }
1039    }
1040
1041    /// Overwrite timestamp.
1042    pub fn with_timestamp(self, timestamp: u64) -> Self {
1043        Self {
1044            timestamp: Some(timestamp),
1045            ..self
1046        }
1047    }
1048}
1049
1050#[sync_docs]
1051#[derive(Debug, Clone, PartialEq, Eq)]
1052pub struct AppendRecord {
1053    timestamp: Option<u64>,
1054    headers: Vec<Header>,
1055    body: Bytes,
1056    #[cfg(test)]
1057    max_bytes: u64,
1058}
1059
1060metered_impl!(AppendRecord);
1061
1062impl AppendRecord {
1063    const MAX_BYTES: u64 = MIB_BYTES;
1064
1065    fn validated(self) -> Result<Self, ConvertError> {
1066        #[cfg(test)]
1067        let max_bytes = self.max_bytes;
1068        #[cfg(not(test))]
1069        let max_bytes = Self::MAX_BYTES;
1070
1071        if self.metered_bytes() > max_bytes {
1072            Err("AppendRecord should have metered size less than 1 MiB".into())
1073        } else {
1074            Ok(self)
1075        }
1076    }
1077
1078    /// Try creating a new append record with body.
1079    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1080        Self {
1081            timestamp: None,
1082            headers: Vec::new(),
1083            body: body.into(),
1084            #[cfg(test)]
1085            max_bytes: Self::MAX_BYTES,
1086        }
1087        .validated()
1088    }
1089
1090    #[cfg(test)]
1091    pub(crate) fn with_max_bytes(
1092        max_bytes: u64,
1093        body: impl Into<Bytes>,
1094    ) -> Result<Self, ConvertError> {
1095        Self {
1096            timestamp: None,
1097            headers: Vec::new(),
1098            body: body.into(),
1099            max_bytes,
1100        }
1101        .validated()
1102    }
1103
1104    /// Overwrite headers.
1105    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1106        Self {
1107            headers: headers.into(),
1108            ..self
1109        }
1110        .validated()
1111    }
1112
1113    /// Overwrite timestamp.
1114    pub fn with_timestamp(self, timestamp: u64) -> Self {
1115        Self {
1116            timestamp: Some(timestamp),
1117            ..self
1118        }
1119    }
1120
1121    /// Body of the record.
1122    pub fn body(&self) -> &[u8] {
1123        &self.body
1124    }
1125
1126    /// Headers of the record.
1127    pub fn headers(&self) -> &[Header] {
1128        &self.headers
1129    }
1130
1131    /// Timestamp for the record.
1132    pub fn timestamp(&self) -> Option<u64> {
1133        self.timestamp
1134    }
1135
1136    /// Consume the record and return parts.
1137    pub fn into_parts(self) -> AppendRecordParts {
1138        AppendRecordParts {
1139            timestamp: self.timestamp,
1140            headers: self.headers,
1141            body: self.body,
1142        }
1143    }
1144
1145    /// Try creating the record from parts.
1146    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1147        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1148        if let Some(timestamp) = parts.timestamp {
1149            Ok(record.with_timestamp(timestamp))
1150        } else {
1151            Ok(record)
1152        }
1153    }
1154}
1155
1156impl From<AppendRecord> for api::AppendRecord {
1157    fn from(value: AppendRecord) -> Self {
1158        Self {
1159            timestamp: value.timestamp,
1160            headers: value.headers.into_iter().map(Into::into).collect(),
1161            body: value.body,
1162        }
1163    }
1164}
1165
1166impl From<CommandRecord> for AppendRecord {
1167    fn from(value: CommandRecord) -> Self {
1168        let (header_value, body) = match value.command {
1169            Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1170            Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1171        };
1172        AppendRecordParts {
1173            timestamp: value.timestamp,
1174            headers: vec![Header::from_value(header_value)],
1175            body: body.into(),
1176        }
1177        .try_into()
1178        .expect("command record is a valid append record")
1179    }
1180}
1181
1182#[sync_docs(AppendRecordParts = "AppendRecord")]
1183#[derive(Debug, Clone)]
1184pub struct AppendRecordParts {
1185    pub timestamp: Option<u64>,
1186    pub headers: Vec<Header>,
1187    pub body: Bytes,
1188}
1189
1190impl From<AppendRecord> for AppendRecordParts {
1191    fn from(value: AppendRecord) -> Self {
1192        value.into_parts()
1193    }
1194}
1195
1196impl TryFrom<AppendRecordParts> for AppendRecord {
1197    type Error = ConvertError;
1198
1199    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1200        Self::try_from_parts(value)
1201    }
1202}
1203
1204/// A collection of append records that can be sent together in a batch.
1205#[derive(Debug, Clone)]
1206pub struct AppendRecordBatch {
1207    records: Vec<AppendRecord>,
1208    metered_bytes: u64,
1209    max_capacity: usize,
1210    #[cfg(test)]
1211    max_bytes: u64,
1212}
1213
1214impl PartialEq for AppendRecordBatch {
1215    fn eq(&self, other: &Self) -> bool {
1216        if self.records.eq(&other.records) {
1217            assert_eq!(self.metered_bytes, other.metered_bytes);
1218            true
1219        } else {
1220            false
1221        }
1222    }
1223}
1224
1225impl Eq for AppendRecordBatch {}
1226
1227impl Default for AppendRecordBatch {
1228    fn default() -> Self {
1229        Self::new()
1230    }
1231}
1232
1233impl AppendRecordBatch {
1234    /// Maximum number of records that a batch can hold.
1235    ///
1236    /// A record batch cannot be created with a bigger capacity.
1237    pub const MAX_CAPACITY: usize = 1000;
1238
1239    /// Maximum metered bytes of the batch.
1240    pub const MAX_BYTES: u64 = MIB_BYTES;
1241
1242    /// Create an empty record batch.
1243    pub fn new() -> Self {
1244        Self::with_max_capacity(Self::MAX_CAPACITY)
1245    }
1246
1247    /// Create an empty record batch with custom max capacity.
1248    ///
1249    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1250    pub fn with_max_capacity(max_capacity: usize) -> Self {
1251        assert!(
1252            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1253            "Batch capacity must be between 1 and 1000"
1254        );
1255
1256        Self {
1257            records: Vec::with_capacity(max_capacity),
1258            metered_bytes: 0,
1259            max_capacity,
1260            #[cfg(test)]
1261            max_bytes: Self::MAX_BYTES,
1262        }
1263    }
1264
1265    #[cfg(test)]
1266    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1267        #[cfg(test)]
1268        assert!(
1269            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1270            "Batch size must be between 1 byte and 1 MiB"
1271        );
1272
1273        Self {
1274            max_bytes,
1275            ..Self::with_max_capacity(max_capacity)
1276        }
1277    }
1278
1279    /// Try creating a record batch from an iterator.
1280    ///
1281    /// If all the items of the iterator cannot be drained into the batch, the
1282    /// error returned contains a batch containing all records it could fit
1283    /// along-with the left over items from the iterator.
1284    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1285    where
1286        R: Into<AppendRecord>,
1287        T: IntoIterator<Item = R>,
1288    {
1289        let mut records = Self::new();
1290        let mut pending = Vec::new();
1291
1292        let mut iter = iter.into_iter();
1293
1294        for record in iter.by_ref() {
1295            if let Err(record) = records.push(record) {
1296                pending.push(record);
1297                break;
1298            }
1299        }
1300
1301        if pending.is_empty() {
1302            Ok(records)
1303        } else {
1304            pending.extend(iter.map(Into::into));
1305            Err((records, pending))
1306        }
1307    }
1308
1309    /// Returns true if the batch contains no records.
1310    pub fn is_empty(&self) -> bool {
1311        if self.records.is_empty() {
1312            assert_eq!(self.metered_bytes, 0);
1313            true
1314        } else {
1315            false
1316        }
1317    }
1318
1319    /// Returns the number of records contained in the batch.
1320    pub fn len(&self) -> usize {
1321        self.records.len()
1322    }
1323
1324    #[cfg(test)]
1325    fn max_bytes(&self) -> u64 {
1326        self.max_bytes
1327    }
1328
1329    #[cfg(not(test))]
1330    fn max_bytes(&self) -> u64 {
1331        Self::MAX_BYTES
1332    }
1333
1334    /// Returns true if the batch cannot fit any more records.
1335    pub fn is_full(&self) -> bool {
1336        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1337    }
1338
1339    /// Try to append a new record into the batch.
1340    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1341        assert!(self.records.len() <= self.max_capacity);
1342        assert!(self.metered_bytes <= self.max_bytes());
1343
1344        let record = record.into();
1345        let record_size = record.metered_bytes();
1346        if self.records.len() >= self.max_capacity
1347            || self.metered_bytes + record_size > self.max_bytes()
1348        {
1349            Err(record)
1350        } else {
1351            self.records.push(record);
1352            self.metered_bytes += record_size;
1353            Ok(())
1354        }
1355    }
1356}
1357
1358impl MeteredBytes for AppendRecordBatch {
1359    fn metered_bytes(&self) -> u64 {
1360        self.metered_bytes
1361    }
1362}
1363
1364impl IntoIterator for AppendRecordBatch {
1365    type Item = AppendRecord;
1366    type IntoIter = std::vec::IntoIter<Self::Item>;
1367
1368    fn into_iter(self) -> Self::IntoIter {
1369        self.records.into_iter()
1370    }
1371}
1372
1373impl<'a> IntoIterator for &'a AppendRecordBatch {
1374    type Item = &'a AppendRecord;
1375    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1376
1377    fn into_iter(self) -> Self::IntoIter {
1378        self.records.iter()
1379    }
1380}
1381
1382impl AsRef<[AppendRecord]> for AppendRecordBatch {
1383    fn as_ref(&self) -> &[AppendRecord] {
1384        &self.records
1385    }
1386}
1387
1388#[sync_docs]
1389#[derive(Debug, Default, Clone)]
1390pub struct AppendInput {
1391    pub records: AppendRecordBatch,
1392    pub match_seq_num: Option<u64>,
1393    pub fencing_token: Option<FencingToken>,
1394}
1395
1396impl MeteredBytes for AppendInput {
1397    fn metered_bytes(&self) -> u64 {
1398        self.records.metered_bytes()
1399    }
1400}
1401
1402impl AppendInput {
1403    /// Create a new append input from record batch.
1404    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1405        Self {
1406            records: records.into(),
1407            match_seq_num: None,
1408            fencing_token: None,
1409        }
1410    }
1411
1412    /// Overwrite match sequence number.
1413    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1414        Self {
1415            match_seq_num: Some(match_seq_num.into()),
1416            ..self
1417        }
1418    }
1419
1420    /// Overwrite fencing token.
1421    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1422        Self {
1423            fencing_token: Some(fencing_token),
1424            ..self
1425        }
1426    }
1427
1428    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1429        let Self {
1430            records,
1431            match_seq_num,
1432            fencing_token,
1433        } = self;
1434
1435        api::AppendInput {
1436            stream: stream.into(),
1437            records: records.into_iter().map(Into::into).collect(),
1438            match_seq_num,
1439            fencing_token: fencing_token.map(|f| f.0),
1440        }
1441    }
1442}
1443
1444#[sync_docs]
1445#[derive(Debug, Clone)]
1446pub struct AppendOutput {
1447    pub start_seq_num: u64,
1448    pub end_seq_num: u64,
1449    pub next_seq_num: u64,
1450}
1451
1452impl From<api::AppendOutput> for AppendOutput {
1453    fn from(value: api::AppendOutput) -> Self {
1454        let api::AppendOutput {
1455            start_seq_num,
1456            end_seq_num,
1457            next_seq_num,
1458        } = value;
1459        Self {
1460            start_seq_num,
1461            end_seq_num,
1462            next_seq_num,
1463        }
1464    }
1465}
1466
1467impl TryFrom<api::AppendResponse> for AppendOutput {
1468    type Error = ConvertError;
1469    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1470        let api::AppendResponse { output } = value;
1471        let output = output.ok_or("missing append output")?;
1472        Ok(output.into())
1473    }
1474}
1475
1476impl TryFrom<api::AppendSessionResponse> for AppendOutput {
1477    type Error = ConvertError;
1478    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1479        let api::AppendSessionResponse { output } = value;
1480        let output = output.ok_or("missing append output")?;
1481        Ok(output.into())
1482    }
1483}
1484
1485#[sync_docs]
1486#[derive(Debug, Clone, Default)]
1487pub struct ReadLimit {
1488    pub count: Option<u64>,
1489    pub bytes: Option<u64>,
1490}
1491
1492#[sync_docs]
1493#[derive(Debug, Clone, Default)]
1494pub struct ReadRequest {
1495    pub start_seq_num: u64,
1496    pub limit: ReadLimit,
1497}
1498
1499impl ReadRequest {
1500    /// Create a new request with start sequence number.
1501    pub fn new(start_seq_num: u64) -> Self {
1502        Self {
1503            start_seq_num,
1504            ..Default::default()
1505        }
1506    }
1507
1508    /// Overwrite limit.
1509    pub fn with_limit(self, limit: ReadLimit) -> Self {
1510        Self { limit, ..self }
1511    }
1512}
1513
1514impl ReadRequest {
1515    pub(crate) fn try_into_api_type(
1516        self,
1517        stream: impl Into<String>,
1518    ) -> Result<api::ReadRequest, ConvertError> {
1519        let Self {
1520            start_seq_num,
1521            limit,
1522        } = self;
1523
1524        let limit = if limit.count > Some(1000) {
1525            Err("read limit: count must not exceed 1000 for unary request")
1526        } else if limit.bytes > Some(MIB_BYTES) {
1527            Err("read limit: bytes must not exceed 1MiB for unary request")
1528        } else {
1529            Ok(api::ReadLimit {
1530                count: limit.count,
1531                bytes: limit.bytes,
1532            })
1533        }?;
1534
1535        Ok(api::ReadRequest {
1536            stream: stream.into(),
1537            start_seq_num,
1538            limit: Some(limit),
1539        })
1540    }
1541}
1542
1543#[sync_docs]
1544#[derive(Debug, Clone)]
1545pub struct SequencedRecord {
1546    pub seq_num: u64,
1547    pub timestamp: u64,
1548    pub headers: Vec<Header>,
1549    pub body: Bytes,
1550}
1551
1552metered_impl!(SequencedRecord);
1553
1554impl From<api::SequencedRecord> for SequencedRecord {
1555    fn from(value: api::SequencedRecord) -> Self {
1556        let api::SequencedRecord {
1557            seq_num,
1558            timestamp,
1559            headers,
1560            body,
1561        } = value;
1562        Self {
1563            seq_num,
1564            timestamp,
1565            headers: headers.into_iter().map(Into::into).collect(),
1566            body,
1567        }
1568    }
1569}
1570
1571impl SequencedRecord {
1572    /// Try representing the sequenced record as a command record.
1573    pub fn as_command_record(&self) -> Option<CommandRecord> {
1574        if self.headers.len() != 1 {
1575            return None;
1576        }
1577
1578        let header = self.headers.first().expect("pre-validated length");
1579
1580        if !header.name.is_empty() {
1581            return None;
1582        }
1583
1584        match header.value.as_ref() {
1585            CommandRecord::FENCE => {
1586                let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1587                Some(CommandRecord {
1588                    command: Command::Fence { fencing_token },
1589                    timestamp: Some(self.timestamp),
1590                })
1591            }
1592            CommandRecord::TRIM => {
1593                let body: &[u8] = &self.body;
1594                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1595                Some(CommandRecord {
1596                    command: Command::Trim { seq_num },
1597                    timestamp: Some(self.timestamp),
1598                })
1599            }
1600            _ => None,
1601        }
1602    }
1603}
1604
1605#[sync_docs]
1606#[derive(Debug, Clone)]
1607pub struct SequencedRecordBatch {
1608    pub records: Vec<SequencedRecord>,
1609}
1610
1611impl MeteredBytes for SequencedRecordBatch {
1612    fn metered_bytes(&self) -> u64 {
1613        self.records.metered_bytes()
1614    }
1615}
1616
1617impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1618    fn from(value: api::SequencedRecordBatch) -> Self {
1619        let api::SequencedRecordBatch { records } = value;
1620        Self {
1621            records: records.into_iter().map(Into::into).collect(),
1622        }
1623    }
1624}
1625
1626#[sync_docs(ReadOutput = "Output")]
1627#[derive(Debug, Clone)]
1628pub enum ReadOutput {
1629    Batch(SequencedRecordBatch),
1630    FirstSeqNum(u64),
1631    NextSeqNum(u64),
1632}
1633
1634impl From<api::read_output::Output> for ReadOutput {
1635    fn from(value: api::read_output::Output) -> Self {
1636        match value {
1637            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1638            api::read_output::Output::FirstSeqNum(first_seq_num) => {
1639                Self::FirstSeqNum(first_seq_num)
1640            }
1641            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1642        }
1643    }
1644}
1645
1646impl TryFrom<api::ReadOutput> for ReadOutput {
1647    type Error = ConvertError;
1648    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1649        let api::ReadOutput { output } = value;
1650        let output = output.ok_or("missing read output")?;
1651        Ok(output.into())
1652    }
1653}
1654
1655impl TryFrom<api::ReadResponse> for ReadOutput {
1656    type Error = ConvertError;
1657    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1658        let api::ReadResponse { output } = value;
1659        let output = output.ok_or("missing output in read response")?;
1660        output.try_into()
1661    }
1662}
1663
1664#[sync_docs]
1665#[derive(Debug, Clone, Default)]
1666pub struct ReadSessionRequest {
1667    pub start_seq_num: u64,
1668    pub limit: ReadLimit,
1669}
1670
1671impl ReadSessionRequest {
1672    /// Create a new request with start sequene number.
1673    pub fn new(start_seq_num: u64) -> Self {
1674        Self {
1675            start_seq_num,
1676            ..Default::default()
1677        }
1678    }
1679
1680    /// Overwrite limit.
1681    pub fn with_limit(self, limit: ReadLimit) -> Self {
1682        Self { limit, ..self }
1683    }
1684
1685    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1686        let Self {
1687            start_seq_num,
1688            limit,
1689        } = self;
1690        api::ReadSessionRequest {
1691            stream: stream.into(),
1692            start_seq_num,
1693            limit: Some(api::ReadLimit {
1694                count: limit.count,
1695                bytes: limit.bytes,
1696            }),
1697            heartbeats: false,
1698        }
1699    }
1700}
1701
1702impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1703    type Error = ConvertError;
1704    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1705        let api::ReadSessionResponse { output } = value;
1706        let output = output.ok_or("missing output in read session response")?;
1707        output.try_into()
1708    }
1709}
1710
1711/// Name of a basin.
1712///
1713/// Must be between 8 and 48 characters in length. Must comprise lowercase
1714/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1715#[derive(Debug, Clone)]
1716pub struct BasinName(String);
1717
1718impl AsRef<str> for BasinName {
1719    fn as_ref(&self) -> &str {
1720        &self.0
1721    }
1722}
1723
1724impl Deref for BasinName {
1725    type Target = str;
1726    fn deref(&self) -> &Self::Target {
1727        &self.0
1728    }
1729}
1730
1731impl TryFrom<String> for BasinName {
1732    type Error = ConvertError;
1733
1734    fn try_from(name: String) -> Result<Self, Self::Error> {
1735        if name.len() < 8 || name.len() > 48 {
1736            return Err("Basin name must be between 8 and 48 characters in length".into());
1737        }
1738
1739        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1740        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1741            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1742                .expect("Failed to compile basin name regex")
1743        });
1744
1745        if !regex.is_match(&name) {
1746            return Err(
1747                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1748                It cannot begin or end with a hyphen."
1749                    .into(),
1750            );
1751        }
1752
1753        Ok(Self(name))
1754    }
1755}
1756
1757impl FromStr for BasinName {
1758    type Err = ConvertError;
1759
1760    fn from_str(s: &str) -> Result<Self, Self::Err> {
1761        s.to_string().try_into()
1762    }
1763}
1764
1765impl std::fmt::Display for BasinName {
1766    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1767        f.write_str(&self.0)
1768    }
1769}
1770
1771/// Access token ID.
1772/// Must be between 1 and 50 characters.
1773#[derive(Debug, Clone)]
1774pub struct AccessTokenId(String);
1775
1776impl AsRef<str> for AccessTokenId {
1777    fn as_ref(&self) -> &str {
1778        &self.0
1779    }
1780}
1781
1782impl Deref for AccessTokenId {
1783    type Target = str;
1784
1785    fn deref(&self) -> &Self::Target {
1786        &self.0
1787    }
1788}
1789
1790impl TryFrom<String> for AccessTokenId {
1791    type Error = ConvertError;
1792
1793    fn try_from(name: String) -> Result<Self, Self::Error> {
1794        if name.is_empty() {
1795            return Err("Access token ID must not be empty".into());
1796        }
1797
1798        if name.len() > 50 {
1799            return Err("Access token ID must not exceed 50 characters".into());
1800        }
1801
1802        Ok(Self(name))
1803    }
1804}
1805
1806impl From<AccessTokenId> for String {
1807    fn from(value: AccessTokenId) -> Self {
1808        value.0
1809    }
1810}
1811
1812impl FromStr for AccessTokenId {
1813    type Err = ConvertError;
1814
1815    fn from_str(s: &str) -> Result<Self, Self::Err> {
1816        s.to_string().try_into()
1817    }
1818}
1819
1820impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1821    fn from(value: AccessTokenInfo) -> Self {
1822        Self {
1823            info: Some(value.into()),
1824        }
1825    }
1826}
1827
1828#[sync_docs]
1829#[derive(Debug, Clone)]
1830pub struct AccessTokenInfo {
1831    pub id: AccessTokenId,
1832    pub expires_at: Option<u32>,
1833    pub auto_prefix_streams: bool,
1834    pub scope: Option<AccessTokenScope>,
1835}
1836
1837impl AccessTokenInfo {
1838    /// Create a new access token info.
1839    pub fn new(id: AccessTokenId) -> Self {
1840        Self {
1841            id,
1842            expires_at: None,
1843            auto_prefix_streams: false,
1844            scope: None,
1845        }
1846    }
1847
1848    /// Overwrite expiration time.
1849    pub fn with_expires_at(self, expires_at: u32) -> Self {
1850        Self {
1851            expires_at: Some(expires_at),
1852            ..self
1853        }
1854    }
1855
1856    /// Overwrite auto prefix streams.
1857    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1858        Self {
1859            auto_prefix_streams,
1860            ..self
1861        }
1862    }
1863
1864    /// Overwrite scope.
1865    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1866        Self {
1867            scope: Some(scope),
1868            ..self
1869        }
1870    }
1871}
1872
1873impl From<AccessTokenInfo> for api::AccessTokenInfo {
1874    fn from(value: AccessTokenInfo) -> Self {
1875        let AccessTokenInfo {
1876            id,
1877            expires_at,
1878            auto_prefix_streams,
1879            scope,
1880        } = value;
1881        Self {
1882            id: id.into(),
1883            expires_at,
1884            auto_prefix_streams,
1885            scope: scope.map(Into::into),
1886        }
1887    }
1888}
1889
1890impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
1891    type Error = ConvertError;
1892    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
1893        let api::AccessTokenInfo {
1894            id,
1895            expires_at,
1896            auto_prefix_streams,
1897            scope,
1898        } = value;
1899        Ok(Self {
1900            id: id.try_into()?,
1901            expires_at,
1902            auto_prefix_streams,
1903            scope: scope.map(TryInto::try_into).transpose()?,
1904        })
1905    }
1906}
1907
1908#[sync_docs]
1909#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1910pub enum Operation {
1911    Unspecified,
1912    ListBasins,
1913    CreateBasin,
1914    DeleteBasin,
1915    ReconfigureBasin,
1916    GetBasinConfig,
1917    IssueAccessToken,
1918    RevokeAccessToken,
1919    ListAccessTokens,
1920    ListStreams,
1921    CreateStream,
1922    DeleteStream,
1923    GetStreamConfig,
1924    ReconfigureStream,
1925    CheckTail,
1926    Append,
1927    Read,
1928    Trim,
1929    Fence,
1930}
1931
1932impl FromStr for Operation {
1933    type Err = ConvertError;
1934
1935    fn from_str(s: &str) -> Result<Self, Self::Err> {
1936        match s.to_lowercase().as_str() {
1937            "unspecified" => Ok(Self::Unspecified),
1938            "list-basins" | "listbasins" => Ok(Self::ListBasins),
1939            "create-basin" | "createbasin" => Ok(Self::CreateBasin),
1940            "delete-basin" | "deletebasin" => Ok(Self::DeleteBasin),
1941            "reconfigure-basin" | "reconfigurebasin" => Ok(Self::ReconfigureBasin),
1942            "get-basin-config" | "getbasinconfig" => Ok(Self::GetBasinConfig),
1943            "issue-access-token" | "issueaccesstoken" => Ok(Self::IssueAccessToken),
1944            "revoke-access-token" | "revokeaccesstoken" => Ok(Self::RevokeAccessToken),
1945            "list-access-tokens" | "listaccesstokens" => Ok(Self::ListAccessTokens),
1946            "list-streams" | "liststreams" => Ok(Self::ListStreams),
1947            "create-stream" | "createstream" => Ok(Self::CreateStream),
1948            "delete-stream" | "deletestream" => Ok(Self::DeleteStream),
1949            "get-stream-config" | "getstreamconfig" => Ok(Self::GetStreamConfig),
1950            "reconfigure-stream" | "reconfigurestream" => Ok(Self::ReconfigureStream),
1951            "check-tail" | "checktail" => Ok(Self::CheckTail),
1952            "append" => Ok(Self::Append),
1953            "read" => Ok(Self::Read),
1954            "trim" => Ok(Self::Trim),
1955            "fence" => Ok(Self::Fence),
1956            _ => Err("invalid operation".into()),
1957        }
1958    }
1959}
1960
1961impl From<Operation> for i32 {
1962    fn from(value: Operation) -> Self {
1963        api::Operation::from(value).into()
1964    }
1965}
1966
1967impl TryFrom<i32> for Operation {
1968    type Error = ConvertError;
1969    fn try_from(value: i32) -> Result<Self, Self::Error> {
1970        api::Operation::try_from(value)
1971            .map_err(|_| "invalid operation".into())
1972            .map(Into::into)
1973    }
1974}
1975
1976impl From<Operation> for api::Operation {
1977    fn from(value: Operation) -> Self {
1978        match value {
1979            Operation::Unspecified => Self::Unspecified,
1980            Operation::ListBasins => Self::ListBasins,
1981            Operation::CreateBasin => Self::CreateBasin,
1982            Operation::DeleteBasin => Self::DeleteBasin,
1983            Operation::ReconfigureBasin => Self::ReconfigureBasin,
1984            Operation::GetBasinConfig => Self::GetBasinConfig,
1985            Operation::IssueAccessToken => Self::IssueAccessToken,
1986            Operation::RevokeAccessToken => Self::RevokeAccessToken,
1987            Operation::ListAccessTokens => Self::ListAccessTokens,
1988            Operation::ListStreams => Self::ListStreams,
1989            Operation::CreateStream => Self::CreateStream,
1990            Operation::DeleteStream => Self::DeleteStream,
1991            Operation::GetStreamConfig => Self::GetStreamConfig,
1992            Operation::ReconfigureStream => Self::ReconfigureStream,
1993            Operation::CheckTail => Self::CheckTail,
1994            Operation::Append => Self::Append,
1995            Operation::Read => Self::Read,
1996            Operation::Trim => Self::Trim,
1997            Operation::Fence => Self::Fence,
1998        }
1999    }
2000}
2001
2002impl From<api::Operation> for Operation {
2003    fn from(value: api::Operation) -> Self {
2004        match value {
2005            api::Operation::Unspecified => Self::Unspecified,
2006            api::Operation::ListBasins => Self::ListBasins,
2007            api::Operation::CreateBasin => Self::CreateBasin,
2008            api::Operation::DeleteBasin => Self::DeleteBasin,
2009            api::Operation::ReconfigureBasin => Self::ReconfigureBasin,
2010            api::Operation::GetBasinConfig => Self::GetBasinConfig,
2011            api::Operation::IssueAccessToken => Self::IssueAccessToken,
2012            api::Operation::RevokeAccessToken => Self::RevokeAccessToken,
2013            api::Operation::ListAccessTokens => Self::ListAccessTokens,
2014            api::Operation::ListStreams => Self::ListStreams,
2015            api::Operation::CreateStream => Self::CreateStream,
2016            api::Operation::DeleteStream => Self::DeleteStream,
2017            api::Operation::GetStreamConfig => Self::GetStreamConfig,
2018            api::Operation::ReconfigureStream => Self::ReconfigureStream,
2019            api::Operation::CheckTail => Self::CheckTail,
2020            api::Operation::Append => Self::Append,
2021            api::Operation::Read => Self::Read,
2022            api::Operation::Trim => Self::Trim,
2023            api::Operation::Fence => Self::Fence,
2024        }
2025    }
2026}
2027
2028#[sync_docs]
2029#[derive(Debug, Clone, Default)]
2030pub struct AccessTokenScope {
2031    pub basins: Option<ResourceSet>,
2032    pub streams: Option<ResourceSet>,
2033    pub tokens: Option<ResourceSet>,
2034    pub op_groups: Option<PermittedOperationGroups>,
2035    pub ops: HashSet<Operation>,
2036}
2037
2038impl AccessTokenScope {
2039    /// Create a new access token scope.
2040    pub fn new() -> Self {
2041        Self::default()
2042    }
2043
2044    /// Overwrite resource set for tokens.
2045    pub fn with_basins(self, basins: ResourceSet) -> Self {
2046        Self {
2047            basins: Some(basins),
2048            ..self
2049        }
2050    }
2051
2052    /// Overwrite resource set for streams.
2053    pub fn with_streams(self, streams: ResourceSet) -> Self {
2054        Self {
2055            streams: Some(streams),
2056            ..self
2057        }
2058    }
2059
2060    /// Overwrite resource set for tokens.
2061    pub fn with_tokens(self, tokens: ResourceSet) -> Self {
2062        Self {
2063            tokens: Some(tokens),
2064            ..self
2065        }
2066    }
2067
2068    /// Overwrite operation groups.
2069    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2070        Self {
2071            op_groups: Some(op_groups),
2072            ..self
2073        }
2074    }
2075
2076    /// Overwrite operations.
2077    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2078        Self {
2079            ops: ops.into_iter().collect(),
2080            ..self
2081        }
2082    }
2083
2084    /// Add an operation to operations.
2085    pub fn with_op(self, op: Operation) -> Self {
2086        let mut ops = self.ops;
2087        ops.insert(op);
2088        Self { ops, ..self }
2089    }
2090}
2091
2092impl From<AccessTokenScope> for api::AccessTokenScope {
2093    fn from(value: AccessTokenScope) -> Self {
2094        let AccessTokenScope {
2095            basins,
2096            streams,
2097            tokens,
2098            op_groups,
2099            ops,
2100        } = value;
2101        Self {
2102            basins: basins.map(Into::into),
2103            streams: streams.map(Into::into),
2104            tokens: tokens.map(Into::into),
2105            op_groups: op_groups.map(Into::into),
2106            ops: ops.into_iter().map(Into::into).collect(),
2107        }
2108    }
2109}
2110
2111impl TryFrom<api::AccessTokenScope> for AccessTokenScope {
2112    type Error = ConvertError;
2113    fn try_from(value: api::AccessTokenScope) -> Result<Self, Self::Error> {
2114        let api::AccessTokenScope {
2115            basins,
2116            streams,
2117            tokens,
2118            op_groups,
2119            ops,
2120        } = value;
2121        Ok(Self {
2122            basins: basins.and_then(|set| set.matching.map(Into::into)),
2123            streams: streams.and_then(|set| set.matching.map(Into::into)),
2124            tokens: tokens.and_then(|set| set.matching.map(Into::into)),
2125            op_groups: op_groups.map(Into::into),
2126            ops: ops
2127                .into_iter()
2128                .map(Operation::try_from)
2129                .collect::<Result<HashSet<_>, _>>()?,
2130        })
2131    }
2132}
2133
2134impl From<ResourceSet> for api::ResourceSet {
2135    fn from(value: ResourceSet) -> Self {
2136        Self {
2137            matching: Some(value.into()),
2138        }
2139    }
2140}
2141
2142#[sync_docs(ResourceSet = "Matching")]
2143#[derive(Debug, Clone)]
2144pub enum ResourceSet {
2145    Exact(String),
2146    Prefix(String),
2147}
2148
2149impl From<ResourceSet> for api::resource_set::Matching {
2150    fn from(value: ResourceSet) -> Self {
2151        match value {
2152            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2153            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2154        }
2155    }
2156}
2157
2158impl From<api::resource_set::Matching> for ResourceSet {
2159    fn from(value: api::resource_set::Matching) -> Self {
2160        match value {
2161            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2162            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2163        }
2164    }
2165}
2166
2167#[sync_docs]
2168#[derive(Debug, Clone, Default)]
2169pub struct PermittedOperationGroups {
2170    pub account: Option<ReadWritePermissions>,
2171    pub basin: Option<ReadWritePermissions>,
2172    pub stream: Option<ReadWritePermissions>,
2173}
2174
2175impl PermittedOperationGroups {
2176    /// Create a new permitted operation groups.
2177    pub fn new() -> Self {
2178        Self::default()
2179    }
2180
2181    /// Overwrite account read-write permissions.
2182    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2183        Self {
2184            account: Some(account),
2185            ..self
2186        }
2187    }
2188
2189    /// Overwrite basin read-write permissions.
2190    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2191        Self {
2192            basin: Some(basin),
2193            ..self
2194        }
2195    }
2196
2197    /// Overwrite stream read-write permissions.
2198    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2199        Self {
2200            stream: Some(stream),
2201            ..self
2202        }
2203    }
2204}
2205
2206impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2207    fn from(value: PermittedOperationGroups) -> Self {
2208        let PermittedOperationGroups {
2209            account,
2210            basin,
2211            stream,
2212        } = value;
2213        Self {
2214            account: account.map(Into::into),
2215            basin: basin.map(Into::into),
2216            stream: stream.map(Into::into),
2217        }
2218    }
2219}
2220
2221impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2222    fn from(value: api::PermittedOperationGroups) -> Self {
2223        let api::PermittedOperationGroups {
2224            account,
2225            basin,
2226            stream,
2227        } = value;
2228        Self {
2229            account: account.map(Into::into),
2230            basin: basin.map(Into::into),
2231            stream: stream.map(Into::into),
2232        }
2233    }
2234}
2235
2236#[sync_docs]
2237#[derive(Debug, Clone, Default)]
2238pub struct ReadWritePermissions {
2239    pub read: bool,
2240    pub write: bool,
2241}
2242
2243impl ReadWritePermissions {
2244    /// Create a new read-write permission.
2245    pub fn new() -> Self {
2246        Self::default()
2247    }
2248
2249    /// Overwrite read permission.
2250    pub fn with_read(self, read: bool) -> Self {
2251        Self { read, ..self }
2252    }
2253
2254    /// Overwrite write permission.
2255    pub fn with_write(self, write: bool) -> Self {
2256        Self { write, ..self }
2257    }
2258}
2259
2260impl From<ReadWritePermissions> for api::ReadWritePermissions {
2261    fn from(value: ReadWritePermissions) -> Self {
2262        let ReadWritePermissions { read, write } = value;
2263        Self { read, write }
2264    }
2265}
2266
2267impl From<api::ReadWritePermissions> for ReadWritePermissions {
2268    fn from(value: api::ReadWritePermissions) -> Self {
2269        let api::ReadWritePermissions { read, write } = value;
2270        Self { read, write }
2271    }
2272}
2273
2274impl From<api::IssueAccessTokenResponse> for String {
2275    fn from(value: api::IssueAccessTokenResponse) -> Self {
2276        value.token
2277    }
2278}
2279
2280impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2281    fn from(value: AccessTokenId) -> Self {
2282        Self { id: value.into() }
2283    }
2284}
2285
2286impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2287    type Error = ConvertError;
2288    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2289        let token_info = value.info.ok_or("access token info is missing")?;
2290        token_info.try_into()
2291    }
2292}
2293
2294#[sync_docs]
2295#[derive(Debug, Clone, Default)]
2296pub struct ListAccessTokensRequest {
2297    pub prefix: String,
2298    pub start_after: String,
2299    pub limit: Option<usize>,
2300}
2301
2302impl ListAccessTokensRequest {
2303    /// Create a new request with prefix.
2304    pub fn new() -> Self {
2305        Self::default()
2306    }
2307
2308    /// Overwrite prefix.
2309    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2310        Self {
2311            prefix: prefix.into(),
2312            ..self
2313        }
2314    }
2315
2316    /// Overwrite start after.
2317    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2318        Self {
2319            start_after: start_after.into(),
2320            ..self
2321        }
2322    }
2323
2324    /// Overwrite limit.
2325    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2326        Self {
2327            limit: limit.into(),
2328            ..self
2329        }
2330    }
2331}
2332
2333impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2334    type Error = ConvertError;
2335    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2336        let ListAccessTokensRequest {
2337            prefix,
2338            start_after,
2339            limit,
2340        } = value;
2341        Ok(Self {
2342            prefix,
2343            start_after,
2344            limit: limit
2345                .map(TryInto::try_into)
2346                .transpose()
2347                .map_err(|_| "request limit does not fit into u64 bounds")?,
2348        })
2349    }
2350}
2351
2352#[sync_docs]
2353#[derive(Debug, Clone)]
2354pub struct ListAccessTokensResponse {
2355    pub tokens: Vec<AccessTokenInfo>,
2356    pub has_more: bool,
2357}
2358
2359impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2360    type Error = ConvertError;
2361    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2362        let api::ListAccessTokensResponse { tokens, has_more } = value;
2363        let tokens = tokens
2364            .into_iter()
2365            .map(TryInto::try_into)
2366            .collect::<Result<Vec<_>, _>>()?;
2367        Ok(Self { tokens, has_more })
2368    }
2369}