s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14/// Error related to conversion from one type to another.
15#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20    fn from(value: T) -> Self {
21        Self(value.into())
22    }
23}
24
25/// Metered size of the object in bytes.
26///
27/// Bytes are calculated using the "metered bytes" formula:
28///
29/// ```python
30/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
31/// ```
32pub trait MeteredBytes {
33    /// Return the metered bytes of the object.
34    fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38    fn metered_bytes(&self) -> u64 {
39        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40    }
41}
42
43macro_rules! metered_impl {
44    ($ty:ty) => {
45        impl MeteredBytes for $ty {
46            fn metered_bytes(&self) -> u64 {
47                let bytes = 8
48                    + (2 * self.headers.len())
49                    + self
50                        .headers
51                        .iter()
52                        .map(|h| h.name.len() + h.value.len())
53                        .sum::<usize>()
54                    + self.body.len();
55                bytes as u64
56            }
57        }
58    };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64    #[default]
65    Unspecified,
66    AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70    fn from(value: BasinScope) -> Self {
71        match value {
72            BasinScope::Unspecified => Self::Unspecified,
73            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74        }
75    }
76}
77
78impl From<api::BasinScope> for BasinScope {
79    fn from(value: api::BasinScope) -> Self {
80        match value {
81            api::BasinScope::Unspecified => Self::Unspecified,
82            api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83        }
84    }
85}
86
87impl FromStr for BasinScope {
88    type Err = ConvertError;
89    fn from_str(value: &str) -> Result<Self, Self::Err> {
90        match value {
91            "unspecified" => Ok(Self::Unspecified),
92            "aws:us-east-1" => Ok(Self::AwsUsEast1),
93            _ => Err("invalid basin scope value".into()),
94        }
95    }
96}
97
98impl From<BasinScope> for i32 {
99    fn from(value: BasinScope) -> Self {
100        api::BasinScope::from(value).into()
101    }
102}
103
104impl TryFrom<i32> for BasinScope {
105    type Error = ConvertError;
106    fn try_from(value: i32) -> Result<Self, Self::Error> {
107        api::BasinScope::try_from(value)
108            .map(Into::into)
109            .map_err(|_| "invalid basin scope value".into())
110    }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116    pub basin: BasinName,
117    pub config: Option<BasinConfig>,
118    pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122    /// Create a new request with basin name.
123    pub fn new(basin: BasinName) -> Self {
124        Self {
125            basin,
126            config: None,
127            scope: BasinScope::Unspecified,
128        }
129    }
130
131    /// Overwrite basin configuration.
132    pub fn with_config(self, config: BasinConfig) -> Self {
133        Self {
134            config: Some(config),
135            ..self
136        }
137    }
138
139    /// Overwrite basin scope.
140    pub fn with_scope(self, scope: BasinScope) -> Self {
141        Self { scope, ..self }
142    }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146    fn from(value: CreateBasinRequest) -> Self {
147        let CreateBasinRequest {
148            basin,
149            config,
150            scope,
151        } = value;
152        Self {
153            basin: basin.0,
154            config: config.map(Into::into),
155            scope: scope.into(),
156        }
157    }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163    pub default_stream_config: Option<StreamConfig>,
164    pub create_stream_on_append: bool,
165    pub create_stream_on_read: bool,
166}
167
168impl BasinConfig {
169    /// Create a new basin config.
170    pub fn new() -> Self {
171        Self::default()
172    }
173
174    /// Overwrite the default stream config.
175    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
176        Self {
177            default_stream_config: Some(default_stream_config),
178            ..self
179        }
180    }
181
182    /// Overwrite `create_stream_on_append`.
183    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
184        Self {
185            create_stream_on_append,
186            ..self
187        }
188    }
189
190    /// Overwrite `create_stream_on_read`.
191    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
192        Self {
193            create_stream_on_read,
194            ..self
195        }
196    }
197}
198
199impl From<BasinConfig> for api::BasinConfig {
200    fn from(value: BasinConfig) -> Self {
201        let BasinConfig {
202            default_stream_config,
203            create_stream_on_append,
204            create_stream_on_read,
205        } = value;
206        Self {
207            default_stream_config: default_stream_config.map(Into::into),
208            create_stream_on_append,
209            create_stream_on_read,
210        }
211    }
212}
213
214impl TryFrom<api::BasinConfig> for BasinConfig {
215    type Error = ConvertError;
216    fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
217        let api::BasinConfig {
218            default_stream_config,
219            create_stream_on_append,
220            create_stream_on_read,
221        } = value;
222        Ok(Self {
223            default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
224            create_stream_on_append,
225            create_stream_on_read,
226        })
227    }
228}
229
230#[sync_docs]
231#[derive(Debug, Clone, Default)]
232pub struct StreamConfig {
233    pub storage_class: StorageClass,
234    pub retention_policy: Option<RetentionPolicy>,
235    pub require_client_timestamps: Option<bool>,
236}
237
238impl StreamConfig {
239    /// Create a new stream config.
240    pub fn new() -> Self {
241        Self::default()
242    }
243
244    /// Overwrite storage class.
245    pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
246        Self {
247            storage_class: storage_class.into(),
248            ..self
249        }
250    }
251
252    /// Overwrite retention policy.
253    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
254        Self {
255            retention_policy: Some(retention_policy),
256            ..self
257        }
258    }
259
260    /// Overwrite `require_client_timestamps`.
261    pub fn with_require_client_timestamps(self, require_client_timestamps: bool) -> Self {
262        Self {
263            require_client_timestamps: Some(require_client_timestamps),
264            ..self
265        }
266    }
267}
268
269impl From<StreamConfig> for api::StreamConfig {
270    fn from(value: StreamConfig) -> Self {
271        let StreamConfig {
272            storage_class,
273            retention_policy,
274            require_client_timestamps,
275        } = value;
276        Self {
277            storage_class: storage_class.into(),
278            retention_policy: retention_policy.map(Into::into),
279            require_client_timestamps,
280        }
281    }
282}
283
284impl TryFrom<api::StreamConfig> for StreamConfig {
285    type Error = ConvertError;
286    fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
287        let api::StreamConfig {
288            storage_class,
289            retention_policy,
290            require_client_timestamps,
291        } = value;
292        Ok(Self {
293            storage_class: storage_class.try_into()?,
294            retention_policy: retention_policy.map(Into::into),
295            require_client_timestamps,
296        })
297    }
298}
299
300#[sync_docs]
301#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
302pub enum StorageClass {
303    #[default]
304    Unspecified,
305    Standard,
306    Express,
307}
308
309impl From<StorageClass> for api::StorageClass {
310    fn from(value: StorageClass) -> Self {
311        match value {
312            StorageClass::Unspecified => Self::Unspecified,
313            StorageClass::Standard => Self::Standard,
314            StorageClass::Express => Self::Express,
315        }
316    }
317}
318
319impl From<api::StorageClass> for StorageClass {
320    fn from(value: api::StorageClass) -> Self {
321        match value {
322            api::StorageClass::Unspecified => Self::Unspecified,
323            api::StorageClass::Standard => Self::Standard,
324            api::StorageClass::Express => Self::Express,
325        }
326    }
327}
328
329impl FromStr for StorageClass {
330    type Err = ConvertError;
331    fn from_str(value: &str) -> Result<Self, Self::Err> {
332        match value {
333            "unspecified" => Ok(Self::Unspecified),
334            "standard" => Ok(Self::Standard),
335            "express" => Ok(Self::Express),
336            _ => Err("invalid storage class value".into()),
337        }
338    }
339}
340
341impl From<StorageClass> for i32 {
342    fn from(value: StorageClass) -> Self {
343        api::StorageClass::from(value).into()
344    }
345}
346
347impl TryFrom<i32> for StorageClass {
348    type Error = ConvertError;
349    fn try_from(value: i32) -> Result<Self, Self::Error> {
350        api::StorageClass::try_from(value)
351            .map(Into::into)
352            .map_err(|_| "invalid storage class value".into())
353    }
354}
355
356#[sync_docs(Age = "Age")]
357#[derive(Debug, Clone)]
358pub enum RetentionPolicy {
359    Age(Duration),
360}
361
362impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
363    fn from(value: RetentionPolicy) -> Self {
364        match value {
365            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
366        }
367    }
368}
369
370impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
371    fn from(value: api::stream_config::RetentionPolicy) -> Self {
372        match value {
373            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
374        }
375    }
376}
377
378#[sync_docs]
379#[derive(Debug, Clone, Copy, PartialEq, Eq)]
380pub enum BasinState {
381    Unspecified,
382    Active,
383    Creating,
384    Deleting,
385}
386
387impl From<BasinState> for api::BasinState {
388    fn from(value: BasinState) -> Self {
389        match value {
390            BasinState::Unspecified => Self::Unspecified,
391            BasinState::Active => Self::Active,
392            BasinState::Creating => Self::Creating,
393            BasinState::Deleting => Self::Deleting,
394        }
395    }
396}
397
398impl From<api::BasinState> for BasinState {
399    fn from(value: api::BasinState) -> Self {
400        match value {
401            api::BasinState::Unspecified => Self::Unspecified,
402            api::BasinState::Active => Self::Active,
403            api::BasinState::Creating => Self::Creating,
404            api::BasinState::Deleting => Self::Deleting,
405        }
406    }
407}
408
409impl From<BasinState> for i32 {
410    fn from(value: BasinState) -> Self {
411        api::BasinState::from(value).into()
412    }
413}
414
415impl TryFrom<i32> for BasinState {
416    type Error = ConvertError;
417    fn try_from(value: i32) -> Result<Self, Self::Error> {
418        api::BasinState::try_from(value)
419            .map(Into::into)
420            .map_err(|_| "invalid basin status value".into())
421    }
422}
423
424impl std::fmt::Display for BasinState {
425    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426        match self {
427            BasinState::Unspecified => write!(f, "unspecified"),
428            BasinState::Active => write!(f, "active"),
429            BasinState::Creating => write!(f, "creating"),
430            BasinState::Deleting => write!(f, "deleting"),
431        }
432    }
433}
434
435#[sync_docs]
436#[derive(Debug, Clone)]
437pub struct BasinInfo {
438    pub name: String,
439    pub scope: BasinScope,
440    pub state: BasinState,
441}
442
443impl From<BasinInfo> for api::BasinInfo {
444    fn from(value: BasinInfo) -> Self {
445        let BasinInfo { name, scope, state } = value;
446        Self {
447            name,
448            scope: scope.into(),
449            state: state.into(),
450        }
451    }
452}
453
454impl TryFrom<api::BasinInfo> for BasinInfo {
455    type Error = ConvertError;
456    fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
457        let api::BasinInfo { name, scope, state } = value;
458        Ok(Self {
459            name,
460            scope: scope.try_into()?,
461            state: state.try_into()?,
462        })
463    }
464}
465
466impl TryFrom<api::CreateBasinResponse> for BasinInfo {
467    type Error = ConvertError;
468    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
469        let api::CreateBasinResponse { info } = value;
470        let info = info.ok_or("missing basin info")?;
471        info.try_into()
472    }
473}
474
475#[sync_docs]
476#[derive(Debug, Clone, Default)]
477pub struct ListStreamsRequest {
478    pub prefix: String,
479    pub start_after: String,
480    pub limit: Option<usize>,
481}
482
483impl ListStreamsRequest {
484    /// Create a new request.
485    pub fn new() -> Self {
486        Self::default()
487    }
488
489    /// Overwrite prefix.
490    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
491        Self {
492            prefix: prefix.into(),
493            ..self
494        }
495    }
496
497    /// Overwrite start after.
498    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
499        Self {
500            start_after: start_after.into(),
501            ..self
502        }
503    }
504
505    /// Overwrite limit.
506    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
507        Self {
508            limit: limit.into(),
509            ..self
510        }
511    }
512}
513
514impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
515    type Error = ConvertError;
516    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
517        let ListStreamsRequest {
518            prefix,
519            start_after,
520            limit,
521        } = value;
522        Ok(Self {
523            prefix,
524            start_after,
525            limit: limit.map(|n| n as u64),
526        })
527    }
528}
529
530#[sync_docs]
531#[derive(Debug, Clone)]
532pub struct StreamInfo {
533    pub name: String,
534    pub created_at: u32,
535    pub deleted_at: Option<u32>,
536}
537
538impl From<api::StreamInfo> for StreamInfo {
539    fn from(value: api::StreamInfo) -> Self {
540        Self {
541            name: value.name,
542            created_at: value.created_at,
543            deleted_at: value.deleted_at,
544        }
545    }
546}
547
548impl TryFrom<api::CreateStreamResponse> for StreamInfo {
549    type Error = ConvertError;
550
551    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
552        let api::CreateStreamResponse { info } = value;
553        let info = info.ok_or("missing stream info")?;
554        Ok(info.into())
555    }
556}
557
558#[sync_docs]
559#[derive(Debug, Clone)]
560pub struct ListStreamsResponse {
561    pub streams: Vec<StreamInfo>,
562    pub has_more: bool,
563}
564
565impl From<api::ListStreamsResponse> for ListStreamsResponse {
566    fn from(value: api::ListStreamsResponse) -> Self {
567        let api::ListStreamsResponse { streams, has_more } = value;
568        let streams = streams.into_iter().map(Into::into).collect();
569        Self { streams, has_more }
570    }
571}
572
573impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
574    type Error = ConvertError;
575    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
576        let api::GetBasinConfigResponse { config } = value;
577        let config = config.ok_or("missing basin config")?;
578        config.try_into()
579    }
580}
581
582impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
583    type Error = ConvertError;
584    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
585        let api::GetStreamConfigResponse { config } = value;
586        let config = config.ok_or("missing stream config")?;
587        config.try_into()
588    }
589}
590
591#[sync_docs]
592#[derive(Debug, Clone)]
593pub struct CreateStreamRequest {
594    pub stream: String,
595    pub config: Option<StreamConfig>,
596}
597
598impl CreateStreamRequest {
599    /// Create a new request with stream name.
600    pub fn new(stream: impl Into<String>) -> Self {
601        Self {
602            stream: stream.into(),
603            config: None,
604        }
605    }
606
607    /// Overwrite stream config.
608    pub fn with_config(self, config: StreamConfig) -> Self {
609        Self {
610            config: Some(config),
611            ..self
612        }
613    }
614}
615
616impl From<CreateStreamRequest> for api::CreateStreamRequest {
617    fn from(value: CreateStreamRequest) -> Self {
618        let CreateStreamRequest { stream, config } = value;
619        Self {
620            stream,
621            config: config.map(Into::into),
622        }
623    }
624}
625
626#[sync_docs]
627#[derive(Debug, Clone, Default)]
628pub struct ListBasinsRequest {
629    pub prefix: String,
630    pub start_after: String,
631    pub limit: Option<usize>,
632}
633
634impl ListBasinsRequest {
635    /// Create a new request.
636    pub fn new() -> Self {
637        Self::default()
638    }
639
640    /// Overwrite prefix.
641    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
642        Self {
643            prefix: prefix.into(),
644            ..self
645        }
646    }
647
648    /// Overwrite start after.
649    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
650        Self {
651            start_after: start_after.into(),
652            ..self
653        }
654    }
655
656    /// Overwrite limit.
657    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
658        Self {
659            limit: limit.into(),
660            ..self
661        }
662    }
663}
664
665impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
666    type Error = ConvertError;
667    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
668        let ListBasinsRequest {
669            prefix,
670            start_after,
671            limit,
672        } = value;
673        Ok(Self {
674            prefix,
675            start_after,
676            limit: limit
677                .map(TryInto::try_into)
678                .transpose()
679                .map_err(|_| "request limit does not fit into u64 bounds")?,
680        })
681    }
682}
683
684#[sync_docs]
685#[derive(Debug, Clone)]
686pub struct ListBasinsResponse {
687    pub basins: Vec<BasinInfo>,
688    pub has_more: bool,
689}
690
691impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
692    type Error = ConvertError;
693    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
694        let api::ListBasinsResponse { basins, has_more } = value;
695        Ok(Self {
696            basins: basins
697                .into_iter()
698                .map(TryInto::try_into)
699                .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
700            has_more,
701        })
702    }
703}
704
705#[sync_docs]
706#[derive(Debug, Clone)]
707pub struct DeleteBasinRequest {
708    pub basin: BasinName,
709    /// Delete basin if it exists else do nothing.
710    pub if_exists: bool,
711}
712
713impl DeleteBasinRequest {
714    /// Create a new request.
715    pub fn new(basin: BasinName) -> Self {
716        Self {
717            basin,
718            if_exists: false,
719        }
720    }
721
722    /// Overwrite the if exists parameter.
723    pub fn with_if_exists(self, if_exists: bool) -> Self {
724        Self { if_exists, ..self }
725    }
726}
727
728impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
729    fn from(value: DeleteBasinRequest) -> Self {
730        let DeleteBasinRequest { basin, .. } = value;
731        Self { basin: basin.0 }
732    }
733}
734
735#[sync_docs]
736#[derive(Debug, Clone)]
737pub struct DeleteStreamRequest {
738    pub stream: String,
739    /// Delete stream if it exists else do nothing.
740    pub if_exists: bool,
741}
742
743impl DeleteStreamRequest {
744    /// Create a new request.
745    pub fn new(stream: impl Into<String>) -> Self {
746        Self {
747            stream: stream.into(),
748            if_exists: false,
749        }
750    }
751
752    /// Overwrite the if exists parameter.
753    pub fn with_if_exists(self, if_exists: bool) -> Self {
754        Self { if_exists, ..self }
755    }
756}
757
758impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
759    fn from(value: DeleteStreamRequest) -> Self {
760        let DeleteStreamRequest { stream, .. } = value;
761        Self { stream }
762    }
763}
764
765#[sync_docs]
766#[derive(Debug, Clone)]
767pub struct ReconfigureBasinRequest {
768    pub basin: BasinName,
769    pub config: Option<BasinConfig>,
770    pub mask: Option<Vec<String>>,
771}
772
773impl ReconfigureBasinRequest {
774    /// Create a new request with basin name.
775    pub fn new(basin: BasinName) -> Self {
776        Self {
777            basin,
778            config: None,
779            mask: None,
780        }
781    }
782
783    /// Overwrite basin config.
784    pub fn with_config(self, config: BasinConfig) -> Self {
785        Self {
786            config: Some(config),
787            ..self
788        }
789    }
790
791    /// Overwrite field mask.
792    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
793        Self {
794            mask: Some(mask.into()),
795            ..self
796        }
797    }
798}
799
800impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
801    fn from(value: ReconfigureBasinRequest) -> Self {
802        let ReconfigureBasinRequest {
803            basin,
804            config,
805            mask,
806        } = value;
807        Self {
808            basin: basin.0,
809            config: config.map(Into::into),
810            mask: mask.map(|paths| prost_types::FieldMask { paths }),
811        }
812    }
813}
814
815impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
816    type Error = ConvertError;
817    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
818        let api::ReconfigureBasinResponse { config } = value;
819        let config = config.ok_or("missing basin config")?;
820        config.try_into()
821    }
822}
823
824#[sync_docs]
825#[derive(Debug, Clone)]
826pub struct ReconfigureStreamRequest {
827    pub stream: String,
828    pub config: Option<StreamConfig>,
829    pub mask: Option<Vec<String>>,
830}
831
832impl ReconfigureStreamRequest {
833    /// Create a new request with stream name.
834    pub fn new(stream: impl Into<String>) -> Self {
835        Self {
836            stream: stream.into(),
837            config: None,
838            mask: None,
839        }
840    }
841
842    /// Overwrite stream config.
843    pub fn with_config(self, config: StreamConfig) -> Self {
844        Self {
845            config: Some(config),
846            ..self
847        }
848    }
849
850    /// Overwrite field mask.
851    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
852        Self {
853            mask: Some(mask.into()),
854            ..self
855        }
856    }
857}
858
859impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
860    fn from(value: ReconfigureStreamRequest) -> Self {
861        let ReconfigureStreamRequest {
862            stream,
863            config,
864            mask,
865        } = value;
866        Self {
867            stream,
868            config: config.map(Into::into),
869            mask: mask.map(|paths| prost_types::FieldMask { paths }),
870        }
871    }
872}
873
874impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
875    type Error = ConvertError;
876    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
877        let api::ReconfigureStreamResponse { config } = value;
878        let config = config.ok_or("missing stream config")?;
879        config.try_into()
880    }
881}
882
883impl From<api::CheckTailResponse> for u64 {
884    fn from(value: api::CheckTailResponse) -> Self {
885        let api::CheckTailResponse { next_seq_num } = value;
886        next_seq_num
887    }
888}
889
890#[sync_docs]
891#[derive(Debug, Clone, PartialEq, Eq)]
892pub struct Header {
893    pub name: Bytes,
894    pub value: Bytes,
895}
896
897impl Header {
898    /// Create a new header from name and value.
899    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
900        Self {
901            name: name.into(),
902            value: value.into(),
903        }
904    }
905
906    /// Create a new header from value.
907    pub fn from_value(value: impl Into<Bytes>) -> Self {
908        Self {
909            name: Bytes::new(),
910            value: value.into(),
911        }
912    }
913}
914
915impl From<Header> for api::Header {
916    fn from(value: Header) -> Self {
917        let Header { name, value } = value;
918        Self { name, value }
919    }
920}
921
922impl From<api::Header> for Header {
923    fn from(value: api::Header) -> Self {
924        let api::Header { name, value } = value;
925        Self { name, value }
926    }
927}
928
929/// A fencing token can be enforced on append requests.
930///
931/// Must not be more than 16 bytes.
932#[derive(Debug, Clone, Default, PartialEq, Eq)]
933pub struct FencingToken(Bytes);
934
935impl FencingToken {
936    const MAX_BYTES: usize = 16;
937
938    /// Try creating a new fencing token from bytes.
939    pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
940        let bytes = bytes.into();
941        if bytes.len() > Self::MAX_BYTES {
942            Err(format!(
943                "Size of a fencing token cannot exceed {} bytes",
944                Self::MAX_BYTES
945            )
946            .into())
947        } else {
948            Ok(Self(bytes))
949        }
950    }
951
952    /// Generate a random fencing token with `n` bytes.
953    pub fn generate(n: usize) -> Result<Self, ConvertError> {
954        Self::new(
955            rand::rng()
956                .sample_iter(rand::distr::StandardUniform)
957                .take(n)
958                .collect::<Bytes>(),
959        )
960    }
961}
962
963impl AsRef<Bytes> for FencingToken {
964    fn as_ref(&self) -> &Bytes {
965        &self.0
966    }
967}
968
969impl AsRef<[u8]> for FencingToken {
970    fn as_ref(&self) -> &[u8] {
971        &self.0
972    }
973}
974
975impl Deref for FencingToken {
976    type Target = [u8];
977
978    fn deref(&self) -> &Self::Target {
979        &self.0
980    }
981}
982
983impl From<FencingToken> for Bytes {
984    fn from(value: FencingToken) -> Self {
985        value.0
986    }
987}
988
989impl From<FencingToken> for Vec<u8> {
990    fn from(value: FencingToken) -> Self {
991        value.0.into()
992    }
993}
994
995impl TryFrom<Bytes> for FencingToken {
996    type Error = ConvertError;
997
998    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
999        Self::new(value)
1000    }
1001}
1002
1003impl TryFrom<Vec<u8>> for FencingToken {
1004    type Error = ConvertError;
1005
1006    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
1007        Self::new(value)
1008    }
1009}
1010
1011/// Command to send through a `CommandRecord`.
1012#[derive(Debug, Clone)]
1013pub enum Command {
1014    /// Enforce a fencing token.
1015    ///
1016    /// Fencing is strongly consistent, and subsequent appends that specify a
1017    /// fencing token will be rejected if it does not match.
1018    Fence {
1019        /// Fencing token to enforce.
1020        ///
1021        /// Set empty to clear the token.
1022        fencing_token: FencingToken,
1023    },
1024    /// Request a trim till the sequence number.
1025    ///
1026    /// Trimming is eventually consistent, and trimmed records may be visible
1027    /// for a brief period
1028    Trim {
1029        /// Trim point.
1030        ///
1031        /// This sequence number is only allowed to advance, and any regression
1032        /// will be ignored.
1033        seq_num: u64,
1034    },
1035}
1036
1037/// A command record is a special kind of [`AppendRecord`] that can be used to
1038/// send command messages.
1039///
1040/// Such a record is signalled by a sole header with empty name. The header
1041/// value represents the operation and record body acts as the payload.
1042#[derive(Debug, Clone)]
1043pub struct CommandRecord {
1044    /// Command kind.
1045    pub command: Command,
1046    /// Timestamp for the record.
1047    pub timestamp: Option<u64>,
1048}
1049
1050impl CommandRecord {
1051    const FENCE: &[u8] = b"fence";
1052    const TRIM: &[u8] = b"trim";
1053
1054    /// Create a new fence command record.
1055    pub fn fence(fencing_token: FencingToken) -> Self {
1056        Self {
1057            command: Command::Fence { fencing_token },
1058            timestamp: None,
1059        }
1060    }
1061
1062    /// Create a new trim command record.
1063    pub fn trim(seq_num: impl Into<u64>) -> Self {
1064        Self {
1065            command: Command::Trim {
1066                seq_num: seq_num.into(),
1067            },
1068            timestamp: None,
1069        }
1070    }
1071
1072    /// Overwrite timestamp.
1073    pub fn with_timestamp(self, timestamp: u64) -> Self {
1074        Self {
1075            timestamp: Some(timestamp),
1076            ..self
1077        }
1078    }
1079}
1080
1081#[sync_docs]
1082#[derive(Debug, Clone, PartialEq, Eq)]
1083pub struct AppendRecord {
1084    timestamp: Option<u64>,
1085    headers: Vec<Header>,
1086    body: Bytes,
1087    #[cfg(test)]
1088    max_bytes: u64,
1089}
1090
1091metered_impl!(AppendRecord);
1092
1093impl AppendRecord {
1094    const MAX_BYTES: u64 = MIB_BYTES;
1095
1096    fn validated(self) -> Result<Self, ConvertError> {
1097        #[cfg(test)]
1098        let max_bytes = self.max_bytes;
1099        #[cfg(not(test))]
1100        let max_bytes = Self::MAX_BYTES;
1101
1102        if self.metered_bytes() > max_bytes {
1103            Err("AppendRecord should have metered size less than 1 MiB".into())
1104        } else {
1105            Ok(self)
1106        }
1107    }
1108
1109    /// Try creating a new append record with body.
1110    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1111        Self {
1112            timestamp: None,
1113            headers: Vec::new(),
1114            body: body.into(),
1115            #[cfg(test)]
1116            max_bytes: Self::MAX_BYTES,
1117        }
1118        .validated()
1119    }
1120
1121    #[cfg(test)]
1122    pub(crate) fn with_max_bytes(
1123        max_bytes: u64,
1124        body: impl Into<Bytes>,
1125    ) -> Result<Self, ConvertError> {
1126        Self {
1127            timestamp: None,
1128            headers: Vec::new(),
1129            body: body.into(),
1130            max_bytes,
1131        }
1132        .validated()
1133    }
1134
1135    /// Overwrite headers.
1136    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1137        Self {
1138            headers: headers.into(),
1139            ..self
1140        }
1141        .validated()
1142    }
1143
1144    /// Overwrite timestamp.
1145    pub fn with_timestamp(self, timestamp: u64) -> Self {
1146        Self {
1147            timestamp: Some(timestamp),
1148            ..self
1149        }
1150    }
1151
1152    /// Body of the record.
1153    pub fn body(&self) -> &[u8] {
1154        &self.body
1155    }
1156
1157    /// Headers of the record.
1158    pub fn headers(&self) -> &[Header] {
1159        &self.headers
1160    }
1161
1162    /// Timestamp for the record.
1163    pub fn timestamp(&self) -> Option<u64> {
1164        self.timestamp
1165    }
1166
1167    /// Consume the record and return parts.
1168    pub fn into_parts(self) -> AppendRecordParts {
1169        AppendRecordParts {
1170            timestamp: self.timestamp,
1171            headers: self.headers,
1172            body: self.body,
1173        }
1174    }
1175
1176    /// Try creating the record from parts.
1177    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1178        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1179        if let Some(timestamp) = parts.timestamp {
1180            Ok(record.with_timestamp(timestamp))
1181        } else {
1182            Ok(record)
1183        }
1184    }
1185}
1186
1187impl From<AppendRecord> for api::AppendRecord {
1188    fn from(value: AppendRecord) -> Self {
1189        Self {
1190            timestamp: value.timestamp,
1191            headers: value.headers.into_iter().map(Into::into).collect(),
1192            body: value.body,
1193        }
1194    }
1195}
1196
1197impl From<CommandRecord> for AppendRecord {
1198    fn from(value: CommandRecord) -> Self {
1199        let (header_value, body) = match value.command {
1200            Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1201            Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1202        };
1203        AppendRecordParts {
1204            timestamp: value.timestamp,
1205            headers: vec![Header::from_value(header_value)],
1206            body: body.into(),
1207        }
1208        .try_into()
1209        .expect("command record is a valid append record")
1210    }
1211}
1212
1213#[sync_docs(AppendRecordParts = "AppendRecord")]
1214#[derive(Debug, Clone)]
1215pub struct AppendRecordParts {
1216    pub timestamp: Option<u64>,
1217    pub headers: Vec<Header>,
1218    pub body: Bytes,
1219}
1220
1221impl From<AppendRecord> for AppendRecordParts {
1222    fn from(value: AppendRecord) -> Self {
1223        value.into_parts()
1224    }
1225}
1226
1227impl TryFrom<AppendRecordParts> for AppendRecord {
1228    type Error = ConvertError;
1229
1230    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1231        Self::try_from_parts(value)
1232    }
1233}
1234
1235/// A collection of append records that can be sent together in a batch.
1236#[derive(Debug, Clone)]
1237pub struct AppendRecordBatch {
1238    records: Vec<AppendRecord>,
1239    metered_bytes: u64,
1240    max_capacity: usize,
1241    #[cfg(test)]
1242    max_bytes: u64,
1243}
1244
1245impl PartialEq for AppendRecordBatch {
1246    fn eq(&self, other: &Self) -> bool {
1247        if self.records.eq(&other.records) {
1248            assert_eq!(self.metered_bytes, other.metered_bytes);
1249            true
1250        } else {
1251            false
1252        }
1253    }
1254}
1255
1256impl Eq for AppendRecordBatch {}
1257
1258impl Default for AppendRecordBatch {
1259    fn default() -> Self {
1260        Self::new()
1261    }
1262}
1263
1264impl AppendRecordBatch {
1265    /// Maximum number of records that a batch can hold.
1266    ///
1267    /// A record batch cannot be created with a bigger capacity.
1268    pub const MAX_CAPACITY: usize = 1000;
1269
1270    /// Maximum metered bytes of the batch.
1271    pub const MAX_BYTES: u64 = MIB_BYTES;
1272
1273    /// Create an empty record batch.
1274    pub fn new() -> Self {
1275        Self::with_max_capacity(Self::MAX_CAPACITY)
1276    }
1277
1278    /// Create an empty record batch with custom max capacity.
1279    ///
1280    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1281    pub fn with_max_capacity(max_capacity: usize) -> Self {
1282        assert!(
1283            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1284            "Batch capacity must be between 1 and 1000"
1285        );
1286
1287        Self {
1288            records: Vec::with_capacity(max_capacity),
1289            metered_bytes: 0,
1290            max_capacity,
1291            #[cfg(test)]
1292            max_bytes: Self::MAX_BYTES,
1293        }
1294    }
1295
1296    #[cfg(test)]
1297    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1298        #[cfg(test)]
1299        assert!(
1300            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1301            "Batch size must be between 1 byte and 1 MiB"
1302        );
1303
1304        Self {
1305            max_bytes,
1306            ..Self::with_max_capacity(max_capacity)
1307        }
1308    }
1309
1310    /// Try creating a record batch from an iterator.
1311    ///
1312    /// If all the items of the iterator cannot be drained into the batch, the
1313    /// error returned contains a batch containing all records it could fit
1314    /// along-with the left over items from the iterator.
1315    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1316    where
1317        R: Into<AppendRecord>,
1318        T: IntoIterator<Item = R>,
1319    {
1320        let mut records = Self::new();
1321        let mut pending = Vec::new();
1322
1323        let mut iter = iter.into_iter();
1324
1325        for record in iter.by_ref() {
1326            if let Err(record) = records.push(record) {
1327                pending.push(record);
1328                break;
1329            }
1330        }
1331
1332        if pending.is_empty() {
1333            Ok(records)
1334        } else {
1335            pending.extend(iter.map(Into::into));
1336            Err((records, pending))
1337        }
1338    }
1339
1340    /// Returns true if the batch contains no records.
1341    pub fn is_empty(&self) -> bool {
1342        if self.records.is_empty() {
1343            assert_eq!(self.metered_bytes, 0);
1344            true
1345        } else {
1346            false
1347        }
1348    }
1349
1350    /// Returns the number of records contained in the batch.
1351    pub fn len(&self) -> usize {
1352        self.records.len()
1353    }
1354
1355    #[cfg(test)]
1356    fn max_bytes(&self) -> u64 {
1357        self.max_bytes
1358    }
1359
1360    #[cfg(not(test))]
1361    fn max_bytes(&self) -> u64 {
1362        Self::MAX_BYTES
1363    }
1364
1365    /// Returns true if the batch cannot fit any more records.
1366    pub fn is_full(&self) -> bool {
1367        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1368    }
1369
1370    /// Try to append a new record into the batch.
1371    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1372        assert!(self.records.len() <= self.max_capacity);
1373        assert!(self.metered_bytes <= self.max_bytes());
1374
1375        let record = record.into();
1376        let record_size = record.metered_bytes();
1377        if self.records.len() >= self.max_capacity
1378            || self.metered_bytes + record_size > self.max_bytes()
1379        {
1380            Err(record)
1381        } else {
1382            self.records.push(record);
1383            self.metered_bytes += record_size;
1384            Ok(())
1385        }
1386    }
1387}
1388
1389impl MeteredBytes for AppendRecordBatch {
1390    fn metered_bytes(&self) -> u64 {
1391        self.metered_bytes
1392    }
1393}
1394
1395impl IntoIterator for AppendRecordBatch {
1396    type Item = AppendRecord;
1397    type IntoIter = std::vec::IntoIter<Self::Item>;
1398
1399    fn into_iter(self) -> Self::IntoIter {
1400        self.records.into_iter()
1401    }
1402}
1403
1404impl<'a> IntoIterator for &'a AppendRecordBatch {
1405    type Item = &'a AppendRecord;
1406    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1407
1408    fn into_iter(self) -> Self::IntoIter {
1409        self.records.iter()
1410    }
1411}
1412
1413impl AsRef<[AppendRecord]> for AppendRecordBatch {
1414    fn as_ref(&self) -> &[AppendRecord] {
1415        &self.records
1416    }
1417}
1418
1419#[sync_docs]
1420#[derive(Debug, Default, Clone)]
1421pub struct AppendInput {
1422    pub records: AppendRecordBatch,
1423    pub match_seq_num: Option<u64>,
1424    pub fencing_token: Option<FencingToken>,
1425}
1426
1427impl MeteredBytes for AppendInput {
1428    fn metered_bytes(&self) -> u64 {
1429        self.records.metered_bytes()
1430    }
1431}
1432
1433impl AppendInput {
1434    /// Create a new append input from record batch.
1435    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1436        Self {
1437            records: records.into(),
1438            match_seq_num: None,
1439            fencing_token: None,
1440        }
1441    }
1442
1443    /// Overwrite match sequence number.
1444    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1445        Self {
1446            match_seq_num: Some(match_seq_num.into()),
1447            ..self
1448        }
1449    }
1450
1451    /// Overwrite fencing token.
1452    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1453        Self {
1454            fencing_token: Some(fencing_token),
1455            ..self
1456        }
1457    }
1458
1459    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1460        let Self {
1461            records,
1462            match_seq_num,
1463            fencing_token,
1464        } = self;
1465
1466        api::AppendInput {
1467            stream: stream.into(),
1468            records: records.into_iter().map(Into::into).collect(),
1469            match_seq_num,
1470            fencing_token: fencing_token.map(|f| f.0),
1471        }
1472    }
1473}
1474
1475#[sync_docs]
1476#[derive(Debug, Clone)]
1477pub struct AppendOutput {
1478    pub start_seq_num: u64,
1479    pub end_seq_num: u64,
1480    pub next_seq_num: u64,
1481}
1482
1483impl From<api::AppendOutput> for AppendOutput {
1484    fn from(value: api::AppendOutput) -> Self {
1485        let api::AppendOutput {
1486            start_seq_num,
1487            end_seq_num,
1488            next_seq_num,
1489        } = value;
1490        Self {
1491            start_seq_num,
1492            end_seq_num,
1493            next_seq_num,
1494        }
1495    }
1496}
1497
1498impl TryFrom<api::AppendResponse> for AppendOutput {
1499    type Error = ConvertError;
1500    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1501        let api::AppendResponse { output } = value;
1502        let output = output.ok_or("missing append output")?;
1503        Ok(output.into())
1504    }
1505}
1506
1507impl TryFrom<api::AppendSessionResponse> for AppendOutput {
1508    type Error = ConvertError;
1509    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1510        let api::AppendSessionResponse { output } = value;
1511        let output = output.ok_or("missing append output")?;
1512        Ok(output.into())
1513    }
1514}
1515
1516#[sync_docs]
1517#[derive(Debug, Clone, Default)]
1518pub struct ReadLimit {
1519    pub count: Option<u64>,
1520    pub bytes: Option<u64>,
1521}
1522
1523impl ReadLimit {
1524    /// Create a new read limit.
1525    pub fn new() -> Self {
1526        Self::default()
1527    }
1528
1529    /// Overwrite count limit.
1530    pub fn with_count(self, count: u64) -> Self {
1531        Self {
1532            count: Some(count),
1533            ..self
1534        }
1535    }
1536
1537    /// Overwrite bytes limit.
1538    pub fn with_bytes(self, bytes: u64) -> Self {
1539        Self {
1540            bytes: Some(bytes),
1541            ..self
1542        }
1543    }
1544}
1545
1546#[sync_docs]
1547#[derive(Debug, Clone, Default)]
1548pub struct ReadRequest {
1549    pub start_seq_num: u64,
1550    pub limit: ReadLimit,
1551}
1552
1553impl ReadRequest {
1554    /// Create a new request with start sequence number.
1555    pub fn new(start_seq_num: u64) -> Self {
1556        Self {
1557            start_seq_num,
1558            ..Default::default()
1559        }
1560    }
1561
1562    /// Overwrite limit.
1563    pub fn with_limit(self, limit: ReadLimit) -> Self {
1564        Self { limit, ..self }
1565    }
1566}
1567
1568impl ReadRequest {
1569    pub(crate) fn try_into_api_type(
1570        self,
1571        stream: impl Into<String>,
1572    ) -> Result<api::ReadRequest, ConvertError> {
1573        let Self {
1574            start_seq_num,
1575            limit,
1576        } = self;
1577
1578        let limit = if limit.count > Some(1000) {
1579            Err("read limit: count must not exceed 1000 for unary request")
1580        } else if limit.bytes > Some(MIB_BYTES) {
1581            Err("read limit: bytes must not exceed 1MiB for unary request")
1582        } else {
1583            Ok(api::ReadLimit {
1584                count: limit.count,
1585                bytes: limit.bytes,
1586            })
1587        }?;
1588
1589        Ok(api::ReadRequest {
1590            stream: stream.into(),
1591            start_seq_num,
1592            limit: Some(limit),
1593        })
1594    }
1595}
1596
1597#[sync_docs]
1598#[derive(Debug, Clone)]
1599pub struct SequencedRecord {
1600    pub seq_num: u64,
1601    pub timestamp: u64,
1602    pub headers: Vec<Header>,
1603    pub body: Bytes,
1604}
1605
1606metered_impl!(SequencedRecord);
1607
1608impl From<api::SequencedRecord> for SequencedRecord {
1609    fn from(value: api::SequencedRecord) -> Self {
1610        let api::SequencedRecord {
1611            seq_num,
1612            timestamp,
1613            headers,
1614            body,
1615        } = value;
1616        Self {
1617            seq_num,
1618            timestamp,
1619            headers: headers.into_iter().map(Into::into).collect(),
1620            body,
1621        }
1622    }
1623}
1624
1625impl SequencedRecord {
1626    /// Try representing the sequenced record as a command record.
1627    pub fn as_command_record(&self) -> Option<CommandRecord> {
1628        if self.headers.len() != 1 {
1629            return None;
1630        }
1631
1632        let header = self.headers.first().expect("pre-validated length");
1633
1634        if !header.name.is_empty() {
1635            return None;
1636        }
1637
1638        match header.value.as_ref() {
1639            CommandRecord::FENCE => {
1640                let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1641                Some(CommandRecord {
1642                    command: Command::Fence { fencing_token },
1643                    timestamp: Some(self.timestamp),
1644                })
1645            }
1646            CommandRecord::TRIM => {
1647                let body: &[u8] = &self.body;
1648                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1649                Some(CommandRecord {
1650                    command: Command::Trim { seq_num },
1651                    timestamp: Some(self.timestamp),
1652                })
1653            }
1654            _ => None,
1655        }
1656    }
1657}
1658
1659#[sync_docs]
1660#[derive(Debug, Clone)]
1661pub struct SequencedRecordBatch {
1662    pub records: Vec<SequencedRecord>,
1663}
1664
1665impl MeteredBytes for SequencedRecordBatch {
1666    fn metered_bytes(&self) -> u64 {
1667        self.records.metered_bytes()
1668    }
1669}
1670
1671impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1672    fn from(value: api::SequencedRecordBatch) -> Self {
1673        let api::SequencedRecordBatch { records } = value;
1674        Self {
1675            records: records.into_iter().map(Into::into).collect(),
1676        }
1677    }
1678}
1679
1680#[sync_docs(ReadOutput = "Output")]
1681#[derive(Debug, Clone)]
1682pub enum ReadOutput {
1683    Batch(SequencedRecordBatch),
1684    NextSeqNum(u64),
1685}
1686
1687impl From<api::read_output::Output> for ReadOutput {
1688    fn from(value: api::read_output::Output) -> Self {
1689        match value {
1690            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1691            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1692        }
1693    }
1694}
1695
1696impl TryFrom<api::ReadOutput> for ReadOutput {
1697    type Error = ConvertError;
1698    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1699        let api::ReadOutput { output } = value;
1700        let output = output.ok_or("missing read output")?;
1701        Ok(output.into())
1702    }
1703}
1704
1705impl TryFrom<api::ReadResponse> for ReadOutput {
1706    type Error = ConvertError;
1707    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1708        let api::ReadResponse { output } = value;
1709        let output = output.ok_or("missing output in read response")?;
1710        output.try_into()
1711    }
1712}
1713
1714#[sync_docs]
1715#[derive(Debug, Clone, Default)]
1716pub struct ReadSessionRequest {
1717    pub start_seq_num: u64,
1718    pub limit: ReadLimit,
1719}
1720
1721impl ReadSessionRequest {
1722    /// Create a new request with start sequene number.
1723    pub fn new(start_seq_num: u64) -> Self {
1724        Self {
1725            start_seq_num,
1726            ..Default::default()
1727        }
1728    }
1729
1730    /// Overwrite limit.
1731    pub fn with_limit(self, limit: ReadLimit) -> Self {
1732        Self { limit, ..self }
1733    }
1734
1735    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1736        let Self {
1737            start_seq_num,
1738            limit,
1739        } = self;
1740        api::ReadSessionRequest {
1741            stream: stream.into(),
1742            start_seq_num,
1743            limit: Some(api::ReadLimit {
1744                count: limit.count,
1745                bytes: limit.bytes,
1746            }),
1747            heartbeats: false,
1748        }
1749    }
1750}
1751
1752impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1753    type Error = ConvertError;
1754    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1755        let api::ReadSessionResponse { output } = value;
1756        let output = output.ok_or("missing output in read session response")?;
1757        output.try_into()
1758    }
1759}
1760
1761/// Name of a basin.
1762///
1763/// Must be between 8 and 48 characters in length. Must comprise lowercase
1764/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1765#[derive(Debug, Clone)]
1766pub struct BasinName(String);
1767
1768impl AsRef<str> for BasinName {
1769    fn as_ref(&self) -> &str {
1770        &self.0
1771    }
1772}
1773
1774impl Deref for BasinName {
1775    type Target = str;
1776    fn deref(&self) -> &Self::Target {
1777        &self.0
1778    }
1779}
1780
1781impl TryFrom<String> for BasinName {
1782    type Error = ConvertError;
1783
1784    fn try_from(name: String) -> Result<Self, Self::Error> {
1785        if name.len() < 8 || name.len() > 48 {
1786            return Err("Basin name must be between 8 and 48 characters in length".into());
1787        }
1788
1789        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1790        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1791            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1792                .expect("Failed to compile basin name regex")
1793        });
1794
1795        if !regex.is_match(&name) {
1796            return Err(
1797                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1798                It cannot begin or end with a hyphen."
1799                    .into(),
1800            );
1801        }
1802
1803        Ok(Self(name))
1804    }
1805}
1806
1807impl FromStr for BasinName {
1808    type Err = ConvertError;
1809
1810    fn from_str(s: &str) -> Result<Self, Self::Err> {
1811        s.to_string().try_into()
1812    }
1813}
1814
1815impl std::fmt::Display for BasinName {
1816    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1817        f.write_str(&self.0)
1818    }
1819}
1820
1821/// Access token ID.
1822/// Must be between 1 and 50 characters.
1823#[derive(Debug, Clone)]
1824pub struct AccessTokenId(String);
1825
1826impl AsRef<str> for AccessTokenId {
1827    fn as_ref(&self) -> &str {
1828        &self.0
1829    }
1830}
1831
1832impl Deref for AccessTokenId {
1833    type Target = str;
1834
1835    fn deref(&self) -> &Self::Target {
1836        &self.0
1837    }
1838}
1839
1840impl TryFrom<String> for AccessTokenId {
1841    type Error = ConvertError;
1842
1843    fn try_from(name: String) -> Result<Self, Self::Error> {
1844        if name.is_empty() {
1845            return Err("Access token ID must not be empty".into());
1846        }
1847
1848        if name.len() > 50 {
1849            return Err("Access token ID must not exceed 50 characters".into());
1850        }
1851
1852        Ok(Self(name))
1853    }
1854}
1855
1856impl From<AccessTokenId> for String {
1857    fn from(value: AccessTokenId) -> Self {
1858        value.0
1859    }
1860}
1861
1862impl FromStr for AccessTokenId {
1863    type Err = ConvertError;
1864
1865    fn from_str(s: &str) -> Result<Self, Self::Err> {
1866        s.to_string().try_into()
1867    }
1868}
1869
1870impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1871    fn from(value: AccessTokenInfo) -> Self {
1872        Self {
1873            info: Some(value.into()),
1874        }
1875    }
1876}
1877
1878#[sync_docs]
1879#[derive(Debug, Clone)]
1880pub struct AccessTokenInfo {
1881    pub id: AccessTokenId,
1882    pub expires_at: Option<u32>,
1883    pub auto_prefix_streams: bool,
1884    pub scope: Option<AccessTokenScope>,
1885}
1886
1887impl AccessTokenInfo {
1888    /// Create a new access token info.
1889    pub fn new(id: AccessTokenId) -> Self {
1890        Self {
1891            id,
1892            expires_at: None,
1893            auto_prefix_streams: false,
1894            scope: None,
1895        }
1896    }
1897
1898    /// Overwrite expiration time.
1899    pub fn with_expires_at(self, expires_at: u32) -> Self {
1900        Self {
1901            expires_at: Some(expires_at),
1902            ..self
1903        }
1904    }
1905
1906    /// Overwrite auto prefix streams.
1907    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1908        Self {
1909            auto_prefix_streams,
1910            ..self
1911        }
1912    }
1913
1914    /// Overwrite scope.
1915    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1916        Self {
1917            scope: Some(scope),
1918            ..self
1919        }
1920    }
1921}
1922
1923impl From<AccessTokenInfo> for api::AccessTokenInfo {
1924    fn from(value: AccessTokenInfo) -> Self {
1925        let AccessTokenInfo {
1926            id,
1927            expires_at,
1928            auto_prefix_streams,
1929            scope,
1930        } = value;
1931        Self {
1932            id: id.into(),
1933            expires_at,
1934            auto_prefix_streams,
1935            scope: scope.map(Into::into),
1936        }
1937    }
1938}
1939
1940impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
1941    type Error = ConvertError;
1942    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
1943        let api::AccessTokenInfo {
1944            id,
1945            expires_at,
1946            auto_prefix_streams,
1947            scope,
1948        } = value;
1949        Ok(Self {
1950            id: id.try_into()?,
1951            expires_at,
1952            auto_prefix_streams,
1953            scope: scope.map(TryInto::try_into).transpose()?,
1954        })
1955    }
1956}
1957
1958#[sync_docs]
1959#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1960pub enum Operation {
1961    Unspecified,
1962    ListBasins,
1963    CreateBasin,
1964    DeleteBasin,
1965    ReconfigureBasin,
1966    GetBasinConfig,
1967    IssueAccessToken,
1968    RevokeAccessToken,
1969    ListAccessTokens,
1970    ListStreams,
1971    CreateStream,
1972    DeleteStream,
1973    GetStreamConfig,
1974    ReconfigureStream,
1975    CheckTail,
1976    Append,
1977    Read,
1978    Trim,
1979    Fence,
1980}
1981
1982impl FromStr for Operation {
1983    type Err = ConvertError;
1984
1985    fn from_str(s: &str) -> Result<Self, Self::Err> {
1986        match s.to_lowercase().as_str() {
1987            "unspecified" => Ok(Self::Unspecified),
1988            "list-basins" => Ok(Self::ListBasins),
1989            "create-basin" => Ok(Self::CreateBasin),
1990            "delete-basin" => Ok(Self::DeleteBasin),
1991            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
1992            "get-basin-config" => Ok(Self::GetBasinConfig),
1993            "issue-access-token" => Ok(Self::IssueAccessToken),
1994            "revoke-access-token" => Ok(Self::RevokeAccessToken),
1995            "list-access-tokens" => Ok(Self::ListAccessTokens),
1996            "list-streams" => Ok(Self::ListStreams),
1997            "create-stream" => Ok(Self::CreateStream),
1998            "delete-stream" => Ok(Self::DeleteStream),
1999            "get-stream-config" => Ok(Self::GetStreamConfig),
2000            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2001            "check-tail" => Ok(Self::CheckTail),
2002            "append" => Ok(Self::Append),
2003            "read" => Ok(Self::Read),
2004            "trim" => Ok(Self::Trim),
2005            "fence" => Ok(Self::Fence),
2006            _ => Err("invalid operation".into()),
2007        }
2008    }
2009}
2010
2011impl From<Operation> for i32 {
2012    fn from(value: Operation) -> Self {
2013        api::Operation::from(value).into()
2014    }
2015}
2016
2017impl TryFrom<i32> for Operation {
2018    type Error = ConvertError;
2019    fn try_from(value: i32) -> Result<Self, Self::Error> {
2020        api::Operation::try_from(value)
2021            .map_err(|_| "invalid operation".into())
2022            .map(Into::into)
2023    }
2024}
2025
2026impl From<Operation> for api::Operation {
2027    fn from(value: Operation) -> Self {
2028        match value {
2029            Operation::Unspecified => Self::Unspecified,
2030            Operation::ListBasins => Self::ListBasins,
2031            Operation::CreateBasin => Self::CreateBasin,
2032            Operation::DeleteBasin => Self::DeleteBasin,
2033            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2034            Operation::GetBasinConfig => Self::GetBasinConfig,
2035            Operation::IssueAccessToken => Self::IssueAccessToken,
2036            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2037            Operation::ListAccessTokens => Self::ListAccessTokens,
2038            Operation::ListStreams => Self::ListStreams,
2039            Operation::CreateStream => Self::CreateStream,
2040            Operation::DeleteStream => Self::DeleteStream,
2041            Operation::GetStreamConfig => Self::GetStreamConfig,
2042            Operation::ReconfigureStream => Self::ReconfigureStream,
2043            Operation::CheckTail => Self::CheckTail,
2044            Operation::Append => Self::Append,
2045            Operation::Read => Self::Read,
2046            Operation::Trim => Self::Trim,
2047            Operation::Fence => Self::Fence,
2048        }
2049    }
2050}
2051
2052impl From<api::Operation> for Operation {
2053    fn from(value: api::Operation) -> Self {
2054        match value {
2055            api::Operation::Unspecified => Self::Unspecified,
2056            api::Operation::ListBasins => Self::ListBasins,
2057            api::Operation::CreateBasin => Self::CreateBasin,
2058            api::Operation::DeleteBasin => Self::DeleteBasin,
2059            api::Operation::ReconfigureBasin => Self::ReconfigureBasin,
2060            api::Operation::GetBasinConfig => Self::GetBasinConfig,
2061            api::Operation::IssueAccessToken => Self::IssueAccessToken,
2062            api::Operation::RevokeAccessToken => Self::RevokeAccessToken,
2063            api::Operation::ListAccessTokens => Self::ListAccessTokens,
2064            api::Operation::ListStreams => Self::ListStreams,
2065            api::Operation::CreateStream => Self::CreateStream,
2066            api::Operation::DeleteStream => Self::DeleteStream,
2067            api::Operation::GetStreamConfig => Self::GetStreamConfig,
2068            api::Operation::ReconfigureStream => Self::ReconfigureStream,
2069            api::Operation::CheckTail => Self::CheckTail,
2070            api::Operation::Append => Self::Append,
2071            api::Operation::Read => Self::Read,
2072            api::Operation::Trim => Self::Trim,
2073            api::Operation::Fence => Self::Fence,
2074        }
2075    }
2076}
2077
2078#[sync_docs]
2079#[derive(Debug, Clone, Default)]
2080pub struct AccessTokenScope {
2081    pub basins: Option<ResourceSet>,
2082    pub streams: Option<ResourceSet>,
2083    pub tokens: Option<ResourceSet>,
2084    pub op_groups: Option<PermittedOperationGroups>,
2085    pub ops: HashSet<Operation>,
2086}
2087
2088impl AccessTokenScope {
2089    /// Create a new access token scope.
2090    pub fn new() -> Self {
2091        Self::default()
2092    }
2093
2094    /// Overwrite resource set for tokens.
2095    pub fn with_basins(self, basins: ResourceSet) -> Self {
2096        Self {
2097            basins: Some(basins),
2098            ..self
2099        }
2100    }
2101
2102    /// Overwrite resource set for streams.
2103    pub fn with_streams(self, streams: ResourceSet) -> Self {
2104        Self {
2105            streams: Some(streams),
2106            ..self
2107        }
2108    }
2109
2110    /// Overwrite resource set for tokens.
2111    pub fn with_tokens(self, tokens: ResourceSet) -> Self {
2112        Self {
2113            tokens: Some(tokens),
2114            ..self
2115        }
2116    }
2117
2118    /// Overwrite operation groups.
2119    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2120        Self {
2121            op_groups: Some(op_groups),
2122            ..self
2123        }
2124    }
2125
2126    /// Overwrite operations.
2127    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2128        Self {
2129            ops: ops.into_iter().collect(),
2130            ..self
2131        }
2132    }
2133
2134    /// Add an operation to operations.
2135    pub fn with_op(self, op: Operation) -> Self {
2136        let mut ops = self.ops;
2137        ops.insert(op);
2138        Self { ops, ..self }
2139    }
2140}
2141
2142impl From<AccessTokenScope> for api::AccessTokenScope {
2143    fn from(value: AccessTokenScope) -> Self {
2144        let AccessTokenScope {
2145            basins,
2146            streams,
2147            tokens,
2148            op_groups,
2149            ops,
2150        } = value;
2151        Self {
2152            basins: basins.map(Into::into),
2153            streams: streams.map(Into::into),
2154            tokens: tokens.map(Into::into),
2155            op_groups: op_groups.map(Into::into),
2156            ops: ops.into_iter().map(Into::into).collect(),
2157        }
2158    }
2159}
2160
2161impl TryFrom<api::AccessTokenScope> for AccessTokenScope {
2162    type Error = ConvertError;
2163    fn try_from(value: api::AccessTokenScope) -> Result<Self, Self::Error> {
2164        let api::AccessTokenScope {
2165            basins,
2166            streams,
2167            tokens,
2168            op_groups,
2169            ops,
2170        } = value;
2171        Ok(Self {
2172            basins: basins.and_then(|set| set.matching.map(Into::into)),
2173            streams: streams.and_then(|set| set.matching.map(Into::into)),
2174            tokens: tokens.and_then(|set| set.matching.map(Into::into)),
2175            op_groups: op_groups.map(Into::into),
2176            ops: ops
2177                .into_iter()
2178                .map(Operation::try_from)
2179                .collect::<Result<HashSet<_>, _>>()?,
2180        })
2181    }
2182}
2183
2184impl From<ResourceSet> for api::ResourceSet {
2185    fn from(value: ResourceSet) -> Self {
2186        Self {
2187            matching: Some(value.into()),
2188        }
2189    }
2190}
2191
2192#[sync_docs(ResourceSet = "Matching")]
2193#[derive(Debug, Clone)]
2194pub enum ResourceSet {
2195    Exact(String),
2196    Prefix(String),
2197}
2198
2199impl From<ResourceSet> for api::resource_set::Matching {
2200    fn from(value: ResourceSet) -> Self {
2201        match value {
2202            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2203            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2204        }
2205    }
2206}
2207
2208impl From<api::resource_set::Matching> for ResourceSet {
2209    fn from(value: api::resource_set::Matching) -> Self {
2210        match value {
2211            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2212            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2213        }
2214    }
2215}
2216
2217#[sync_docs]
2218#[derive(Debug, Clone, Default)]
2219pub struct PermittedOperationGroups {
2220    pub account: Option<ReadWritePermissions>,
2221    pub basin: Option<ReadWritePermissions>,
2222    pub stream: Option<ReadWritePermissions>,
2223}
2224
2225impl PermittedOperationGroups {
2226    /// Create a new permitted operation groups.
2227    pub fn new() -> Self {
2228        Self::default()
2229    }
2230
2231    /// Overwrite account read-write permissions.
2232    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2233        Self {
2234            account: Some(account),
2235            ..self
2236        }
2237    }
2238
2239    /// Overwrite basin read-write permissions.
2240    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2241        Self {
2242            basin: Some(basin),
2243            ..self
2244        }
2245    }
2246
2247    /// Overwrite stream read-write permissions.
2248    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2249        Self {
2250            stream: Some(stream),
2251            ..self
2252        }
2253    }
2254}
2255
2256impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2257    fn from(value: PermittedOperationGroups) -> Self {
2258        let PermittedOperationGroups {
2259            account,
2260            basin,
2261            stream,
2262        } = value;
2263        Self {
2264            account: account.map(Into::into),
2265            basin: basin.map(Into::into),
2266            stream: stream.map(Into::into),
2267        }
2268    }
2269}
2270
2271impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2272    fn from(value: api::PermittedOperationGroups) -> Self {
2273        let api::PermittedOperationGroups {
2274            account,
2275            basin,
2276            stream,
2277        } = value;
2278        Self {
2279            account: account.map(Into::into),
2280            basin: basin.map(Into::into),
2281            stream: stream.map(Into::into),
2282        }
2283    }
2284}
2285
2286#[sync_docs]
2287#[derive(Debug, Clone, Default)]
2288pub struct ReadWritePermissions {
2289    pub read: bool,
2290    pub write: bool,
2291}
2292
2293impl ReadWritePermissions {
2294    /// Create a new read-write permission.
2295    pub fn new() -> Self {
2296        Self::default()
2297    }
2298
2299    /// Overwrite read permission.
2300    pub fn with_read(self, read: bool) -> Self {
2301        Self { read, ..self }
2302    }
2303
2304    /// Overwrite write permission.
2305    pub fn with_write(self, write: bool) -> Self {
2306        Self { write, ..self }
2307    }
2308}
2309
2310impl From<ReadWritePermissions> for api::ReadWritePermissions {
2311    fn from(value: ReadWritePermissions) -> Self {
2312        let ReadWritePermissions { read, write } = value;
2313        Self { read, write }
2314    }
2315}
2316
2317impl From<api::ReadWritePermissions> for ReadWritePermissions {
2318    fn from(value: api::ReadWritePermissions) -> Self {
2319        let api::ReadWritePermissions { read, write } = value;
2320        Self { read, write }
2321    }
2322}
2323
2324impl From<api::IssueAccessTokenResponse> for String {
2325    fn from(value: api::IssueAccessTokenResponse) -> Self {
2326        value.token
2327    }
2328}
2329
2330impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2331    fn from(value: AccessTokenId) -> Self {
2332        Self { id: value.into() }
2333    }
2334}
2335
2336impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2337    type Error = ConvertError;
2338    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2339        let token_info = value.info.ok_or("access token info is missing")?;
2340        token_info.try_into()
2341    }
2342}
2343
2344#[sync_docs]
2345#[derive(Debug, Clone, Default)]
2346pub struct ListAccessTokensRequest {
2347    pub prefix: String,
2348    pub start_after: String,
2349    pub limit: Option<usize>,
2350}
2351
2352impl ListAccessTokensRequest {
2353    /// Create a new request with prefix.
2354    pub fn new() -> Self {
2355        Self::default()
2356    }
2357
2358    /// Overwrite prefix.
2359    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2360        Self {
2361            prefix: prefix.into(),
2362            ..self
2363        }
2364    }
2365
2366    /// Overwrite start after.
2367    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2368        Self {
2369            start_after: start_after.into(),
2370            ..self
2371        }
2372    }
2373
2374    /// Overwrite limit.
2375    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2376        Self {
2377            limit: limit.into(),
2378            ..self
2379        }
2380    }
2381}
2382
2383impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2384    type Error = ConvertError;
2385    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2386        let ListAccessTokensRequest {
2387            prefix,
2388            start_after,
2389            limit,
2390        } = value;
2391        Ok(Self {
2392            prefix,
2393            start_after,
2394            limit: limit
2395                .map(TryInto::try_into)
2396                .transpose()
2397                .map_err(|_| "request limit does not fit into u64 bounds")?,
2398        })
2399    }
2400}
2401
2402#[sync_docs]
2403#[derive(Debug, Clone)]
2404pub struct ListAccessTokensResponse {
2405    pub tokens: Vec<AccessTokenInfo>,
2406    pub has_more: bool,
2407}
2408
2409impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2410    type Error = ConvertError;
2411    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2412        let api::ListAccessTokensResponse { tokens, has_more } = value;
2413        let tokens = tokens
2414            .into_iter()
2415            .map(TryInto::try_into)
2416            .collect::<Result<Vec<_>, _>>()?;
2417        Ok(Self { tokens, has_more })
2418    }
2419}