s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14/// Error related to conversion from one type to another.
15#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20    fn from(value: T) -> Self {
21        Self(value.into())
22    }
23}
24
25/// Metered size of the object in bytes.
26///
27/// Bytes are calculated using the "metered bytes" formula:
28///
29/// ```python
30/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
31/// ```
32pub trait MeteredBytes {
33    /// Return the metered bytes of the object.
34    fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38    fn metered_bytes(&self) -> u64 {
39        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40    }
41}
42
43macro_rules! metered_impl {
44    ($ty:ty) => {
45        impl MeteredBytes for $ty {
46            fn metered_bytes(&self) -> u64 {
47                let bytes = 8
48                    + (2 * self.headers.len())
49                    + self
50                        .headers
51                        .iter()
52                        .map(|h| h.name.len() + h.value.len())
53                        .sum::<usize>()
54                    + self.body.len();
55                bytes as u64
56            }
57        }
58    };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64    #[default]
65    Unspecified,
66    AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70    fn from(value: BasinScope) -> Self {
71        match value {
72            BasinScope::Unspecified => Self::Unspecified,
73            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74        }
75    }
76}
77
78impl From<api::BasinScope> for BasinScope {
79    fn from(value: api::BasinScope) -> Self {
80        match value {
81            api::BasinScope::Unspecified => Self::Unspecified,
82            api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83        }
84    }
85}
86
87impl FromStr for BasinScope {
88    type Err = ConvertError;
89    fn from_str(value: &str) -> Result<Self, Self::Err> {
90        match value {
91            "unspecified" => Ok(Self::Unspecified),
92            "aws:us-east-1" => Ok(Self::AwsUsEast1),
93            _ => Err("invalid basin scope value".into()),
94        }
95    }
96}
97
98impl From<BasinScope> for i32 {
99    fn from(value: BasinScope) -> Self {
100        api::BasinScope::from(value).into()
101    }
102}
103
104impl TryFrom<i32> for BasinScope {
105    type Error = ConvertError;
106    fn try_from(value: i32) -> Result<Self, Self::Error> {
107        api::BasinScope::try_from(value)
108            .map(Into::into)
109            .map_err(|_| "invalid basin scope value".into())
110    }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116    pub basin: BasinName,
117    pub config: Option<BasinConfig>,
118    pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122    /// Create a new request with basin name.
123    pub fn new(basin: BasinName) -> Self {
124        Self {
125            basin,
126            config: None,
127            scope: BasinScope::Unspecified,
128        }
129    }
130
131    /// Overwrite basin configuration.
132    pub fn with_config(self, config: BasinConfig) -> Self {
133        Self {
134            config: Some(config),
135            ..self
136        }
137    }
138
139    /// Overwrite basin scope.
140    pub fn with_scope(self, scope: BasinScope) -> Self {
141        Self { scope, ..self }
142    }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146    fn from(value: CreateBasinRequest) -> Self {
147        let CreateBasinRequest {
148            basin,
149            config,
150            scope,
151        } = value;
152        Self {
153            basin: basin.0,
154            config: config.map(Into::into),
155            scope: scope.into(),
156        }
157    }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163    pub default_stream_config: Option<StreamConfig>,
164    pub create_stream_on_append: bool,
165    pub create_stream_on_read: bool,
166}
167
168impl BasinConfig {
169    /// Create a new basin config.
170    pub fn new() -> Self {
171        Self::default()
172    }
173
174    /// Overwrite the default stream config.
175    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
176        Self {
177            default_stream_config: Some(default_stream_config),
178            ..self
179        }
180    }
181
182    /// Overwrite `create_stream_on_append`.
183    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
184        Self {
185            create_stream_on_append,
186            ..self
187        }
188    }
189
190    /// Overwrite `create_stream_on_read`.
191    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
192        Self {
193            create_stream_on_read,
194            ..self
195        }
196    }
197}
198
199impl From<BasinConfig> for api::BasinConfig {
200    fn from(value: BasinConfig) -> Self {
201        let BasinConfig {
202            default_stream_config,
203            create_stream_on_append,
204            create_stream_on_read,
205        } = value;
206        Self {
207            default_stream_config: default_stream_config.map(Into::into),
208            create_stream_on_append,
209            create_stream_on_read,
210        }
211    }
212}
213
214impl TryFrom<api::BasinConfig> for BasinConfig {
215    type Error = ConvertError;
216    fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
217        let api::BasinConfig {
218            default_stream_config,
219            create_stream_on_append,
220            create_stream_on_read,
221        } = value;
222        Ok(Self {
223            default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
224            create_stream_on_append,
225            create_stream_on_read,
226        })
227    }
228}
229
230#[sync_docs]
231#[derive(Debug, Clone, Default)]
232pub struct StreamConfig {
233    pub storage_class: StorageClass,
234    pub retention_policy: Option<RetentionPolicy>,
235    pub require_client_timestamps: Option<bool>,
236    pub uncapped_client_timestamps: Option<bool>,
237}
238
239impl StreamConfig {
240    /// Create a new stream config.
241    pub fn new() -> Self {
242        Self::default()
243    }
244
245    /// Overwrite storage class.
246    pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
247        Self {
248            storage_class: storage_class.into(),
249            ..self
250        }
251    }
252
253    /// Overwrite retention policy.
254    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
255        Self {
256            retention_policy: Some(retention_policy),
257            ..self
258        }
259    }
260
261    /// Overwrite `require_client_timestamps`.
262    pub fn with_require_client_timestamps(self, require_client_timestamps: bool) -> Self {
263        Self {
264            require_client_timestamps: Some(require_client_timestamps),
265            ..self
266        }
267    }
268
269    /// Overwrite `uncapped_client_timestamps`.
270    pub fn with_uncapped_client_timestamps(self, uncapped_client_timestamps: bool) -> Self {
271        Self {
272            uncapped_client_timestamps: Some(uncapped_client_timestamps),
273            ..self
274        }
275    }
276}
277
278impl From<StreamConfig> for api::StreamConfig {
279    fn from(value: StreamConfig) -> Self {
280        let StreamConfig {
281            storage_class,
282            retention_policy,
283            require_client_timestamps,
284            uncapped_client_timestamps,
285        } = value;
286        Self {
287            storage_class: storage_class.into(),
288            retention_policy: retention_policy.map(Into::into),
289            require_client_timestamps,
290            uncapped_client_timestamps,
291        }
292    }
293}
294
295impl TryFrom<api::StreamConfig> for StreamConfig {
296    type Error = ConvertError;
297    fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
298        let api::StreamConfig {
299            storage_class,
300            retention_policy,
301            require_client_timestamps,
302            uncapped_client_timestamps,
303        } = value;
304        Ok(Self {
305            storage_class: storage_class.try_into()?,
306            retention_policy: retention_policy.map(Into::into),
307            require_client_timestamps,
308            uncapped_client_timestamps,
309        })
310    }
311}
312
313#[sync_docs]
314#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
315pub enum StorageClass {
316    #[default]
317    Unspecified,
318    Standard,
319    Express,
320}
321
322impl From<StorageClass> for api::StorageClass {
323    fn from(value: StorageClass) -> Self {
324        match value {
325            StorageClass::Unspecified => Self::Unspecified,
326            StorageClass::Standard => Self::Standard,
327            StorageClass::Express => Self::Express,
328        }
329    }
330}
331
332impl From<api::StorageClass> for StorageClass {
333    fn from(value: api::StorageClass) -> Self {
334        match value {
335            api::StorageClass::Unspecified => Self::Unspecified,
336            api::StorageClass::Standard => Self::Standard,
337            api::StorageClass::Express => Self::Express,
338        }
339    }
340}
341
342impl FromStr for StorageClass {
343    type Err = ConvertError;
344    fn from_str(value: &str) -> Result<Self, Self::Err> {
345        match value {
346            "unspecified" => Ok(Self::Unspecified),
347            "standard" => Ok(Self::Standard),
348            "express" => Ok(Self::Express),
349            _ => Err("invalid storage class value".into()),
350        }
351    }
352}
353
354impl From<StorageClass> for i32 {
355    fn from(value: StorageClass) -> Self {
356        api::StorageClass::from(value).into()
357    }
358}
359
360impl TryFrom<i32> for StorageClass {
361    type Error = ConvertError;
362    fn try_from(value: i32) -> Result<Self, Self::Error> {
363        api::StorageClass::try_from(value)
364            .map(Into::into)
365            .map_err(|_| "invalid storage class value".into())
366    }
367}
368
369#[sync_docs(Age = "Age")]
370#[derive(Debug, Clone)]
371pub enum RetentionPolicy {
372    Age(Duration),
373}
374
375impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
376    fn from(value: RetentionPolicy) -> Self {
377        match value {
378            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
379        }
380    }
381}
382
383impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
384    fn from(value: api::stream_config::RetentionPolicy) -> Self {
385        match value {
386            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
387        }
388    }
389}
390
391#[sync_docs]
392#[derive(Debug, Clone, Copy, PartialEq, Eq)]
393pub enum BasinState {
394    Unspecified,
395    Active,
396    Creating,
397    Deleting,
398}
399
400impl From<BasinState> for api::BasinState {
401    fn from(value: BasinState) -> Self {
402        match value {
403            BasinState::Unspecified => Self::Unspecified,
404            BasinState::Active => Self::Active,
405            BasinState::Creating => Self::Creating,
406            BasinState::Deleting => Self::Deleting,
407        }
408    }
409}
410
411impl From<api::BasinState> for BasinState {
412    fn from(value: api::BasinState) -> Self {
413        match value {
414            api::BasinState::Unspecified => Self::Unspecified,
415            api::BasinState::Active => Self::Active,
416            api::BasinState::Creating => Self::Creating,
417            api::BasinState::Deleting => Self::Deleting,
418        }
419    }
420}
421
422impl From<BasinState> for i32 {
423    fn from(value: BasinState) -> Self {
424        api::BasinState::from(value).into()
425    }
426}
427
428impl TryFrom<i32> for BasinState {
429    type Error = ConvertError;
430    fn try_from(value: i32) -> Result<Self, Self::Error> {
431        api::BasinState::try_from(value)
432            .map(Into::into)
433            .map_err(|_| "invalid basin status value".into())
434    }
435}
436
437impl std::fmt::Display for BasinState {
438    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439        match self {
440            BasinState::Unspecified => write!(f, "unspecified"),
441            BasinState::Active => write!(f, "active"),
442            BasinState::Creating => write!(f, "creating"),
443            BasinState::Deleting => write!(f, "deleting"),
444        }
445    }
446}
447
448#[sync_docs]
449#[derive(Debug, Clone)]
450pub struct BasinInfo {
451    pub name: String,
452    pub scope: BasinScope,
453    pub state: BasinState,
454}
455
456impl From<BasinInfo> for api::BasinInfo {
457    fn from(value: BasinInfo) -> Self {
458        let BasinInfo { name, scope, state } = value;
459        Self {
460            name,
461            scope: scope.into(),
462            state: state.into(),
463        }
464    }
465}
466
467impl TryFrom<api::BasinInfo> for BasinInfo {
468    type Error = ConvertError;
469    fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
470        let api::BasinInfo { name, scope, state } = value;
471        Ok(Self {
472            name,
473            scope: scope.try_into()?,
474            state: state.try_into()?,
475        })
476    }
477}
478
479impl TryFrom<api::CreateBasinResponse> for BasinInfo {
480    type Error = ConvertError;
481    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
482        let api::CreateBasinResponse { info } = value;
483        let info = info.ok_or("missing basin info")?;
484        info.try_into()
485    }
486}
487
488#[sync_docs]
489#[derive(Debug, Clone, Default)]
490pub struct ListStreamsRequest {
491    pub prefix: String,
492    pub start_after: String,
493    pub limit: Option<usize>,
494}
495
496impl ListStreamsRequest {
497    /// Create a new request.
498    pub fn new() -> Self {
499        Self::default()
500    }
501
502    /// Overwrite prefix.
503    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
504        Self {
505            prefix: prefix.into(),
506            ..self
507        }
508    }
509
510    /// Overwrite start after.
511    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
512        Self {
513            start_after: start_after.into(),
514            ..self
515        }
516    }
517
518    /// Overwrite limit.
519    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
520        Self {
521            limit: limit.into(),
522            ..self
523        }
524    }
525}
526
527impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
528    type Error = ConvertError;
529    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
530        let ListStreamsRequest {
531            prefix,
532            start_after,
533            limit,
534        } = value;
535        Ok(Self {
536            prefix,
537            start_after,
538            limit: limit.map(|n| n as u64),
539        })
540    }
541}
542
543#[sync_docs]
544#[derive(Debug, Clone)]
545pub struct StreamInfo {
546    pub name: String,
547    pub created_at: u32,
548    pub deleted_at: Option<u32>,
549}
550
551impl From<api::StreamInfo> for StreamInfo {
552    fn from(value: api::StreamInfo) -> Self {
553        Self {
554            name: value.name,
555            created_at: value.created_at,
556            deleted_at: value.deleted_at,
557        }
558    }
559}
560
561impl TryFrom<api::CreateStreamResponse> for StreamInfo {
562    type Error = ConvertError;
563
564    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
565        let api::CreateStreamResponse { info } = value;
566        let info = info.ok_or("missing stream info")?;
567        Ok(info.into())
568    }
569}
570
571#[sync_docs]
572#[derive(Debug, Clone)]
573pub struct ListStreamsResponse {
574    pub streams: Vec<StreamInfo>,
575    pub has_more: bool,
576}
577
578impl From<api::ListStreamsResponse> for ListStreamsResponse {
579    fn from(value: api::ListStreamsResponse) -> Self {
580        let api::ListStreamsResponse { streams, has_more } = value;
581        let streams = streams.into_iter().map(Into::into).collect();
582        Self { streams, has_more }
583    }
584}
585
586impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
587    type Error = ConvertError;
588    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
589        let api::GetBasinConfigResponse { config } = value;
590        let config = config.ok_or("missing basin config")?;
591        config.try_into()
592    }
593}
594
595impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
596    type Error = ConvertError;
597    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
598        let api::GetStreamConfigResponse { config } = value;
599        let config = config.ok_or("missing stream config")?;
600        config.try_into()
601    }
602}
603
604#[sync_docs]
605#[derive(Debug, Clone)]
606pub struct CreateStreamRequest {
607    pub stream: String,
608    pub config: Option<StreamConfig>,
609}
610
611impl CreateStreamRequest {
612    /// Create a new request with stream name.
613    pub fn new(stream: impl Into<String>) -> Self {
614        Self {
615            stream: stream.into(),
616            config: None,
617        }
618    }
619
620    /// Overwrite stream config.
621    pub fn with_config(self, config: StreamConfig) -> Self {
622        Self {
623            config: Some(config),
624            ..self
625        }
626    }
627}
628
629impl From<CreateStreamRequest> for api::CreateStreamRequest {
630    fn from(value: CreateStreamRequest) -> Self {
631        let CreateStreamRequest { stream, config } = value;
632        Self {
633            stream,
634            config: config.map(Into::into),
635        }
636    }
637}
638
639#[sync_docs]
640#[derive(Debug, Clone, Default)]
641pub struct ListBasinsRequest {
642    pub prefix: String,
643    pub start_after: String,
644    pub limit: Option<usize>,
645}
646
647impl ListBasinsRequest {
648    /// Create a new request.
649    pub fn new() -> Self {
650        Self::default()
651    }
652
653    /// Overwrite prefix.
654    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
655        Self {
656            prefix: prefix.into(),
657            ..self
658        }
659    }
660
661    /// Overwrite start after.
662    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
663        Self {
664            start_after: start_after.into(),
665            ..self
666        }
667    }
668
669    /// Overwrite limit.
670    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
671        Self {
672            limit: limit.into(),
673            ..self
674        }
675    }
676}
677
678impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
679    type Error = ConvertError;
680    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
681        let ListBasinsRequest {
682            prefix,
683            start_after,
684            limit,
685        } = value;
686        Ok(Self {
687            prefix,
688            start_after,
689            limit: limit
690                .map(TryInto::try_into)
691                .transpose()
692                .map_err(|_| "request limit does not fit into u64 bounds")?,
693        })
694    }
695}
696
697#[sync_docs]
698#[derive(Debug, Clone)]
699pub struct ListBasinsResponse {
700    pub basins: Vec<BasinInfo>,
701    pub has_more: bool,
702}
703
704impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
705    type Error = ConvertError;
706    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
707        let api::ListBasinsResponse { basins, has_more } = value;
708        Ok(Self {
709            basins: basins
710                .into_iter()
711                .map(TryInto::try_into)
712                .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
713            has_more,
714        })
715    }
716}
717
718#[sync_docs]
719#[derive(Debug, Clone)]
720pub struct DeleteBasinRequest {
721    pub basin: BasinName,
722    /// Delete basin if it exists else do nothing.
723    pub if_exists: bool,
724}
725
726impl DeleteBasinRequest {
727    /// Create a new request.
728    pub fn new(basin: BasinName) -> Self {
729        Self {
730            basin,
731            if_exists: false,
732        }
733    }
734
735    /// Overwrite the if exists parameter.
736    pub fn with_if_exists(self, if_exists: bool) -> Self {
737        Self { if_exists, ..self }
738    }
739}
740
741impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
742    fn from(value: DeleteBasinRequest) -> Self {
743        let DeleteBasinRequest { basin, .. } = value;
744        Self { basin: basin.0 }
745    }
746}
747
748#[sync_docs]
749#[derive(Debug, Clone)]
750pub struct DeleteStreamRequest {
751    pub stream: String,
752    /// Delete stream if it exists else do nothing.
753    pub if_exists: bool,
754}
755
756impl DeleteStreamRequest {
757    /// Create a new request.
758    pub fn new(stream: impl Into<String>) -> Self {
759        Self {
760            stream: stream.into(),
761            if_exists: false,
762        }
763    }
764
765    /// Overwrite the if exists parameter.
766    pub fn with_if_exists(self, if_exists: bool) -> Self {
767        Self { if_exists, ..self }
768    }
769}
770
771impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
772    fn from(value: DeleteStreamRequest) -> Self {
773        let DeleteStreamRequest { stream, .. } = value;
774        Self { stream }
775    }
776}
777
778#[sync_docs]
779#[derive(Debug, Clone)]
780pub struct ReconfigureBasinRequest {
781    pub basin: BasinName,
782    pub config: Option<BasinConfig>,
783    pub mask: Option<Vec<String>>,
784}
785
786impl ReconfigureBasinRequest {
787    /// Create a new request with basin name.
788    pub fn new(basin: BasinName) -> Self {
789        Self {
790            basin,
791            config: None,
792            mask: None,
793        }
794    }
795
796    /// Overwrite basin config.
797    pub fn with_config(self, config: BasinConfig) -> Self {
798        Self {
799            config: Some(config),
800            ..self
801        }
802    }
803
804    /// Overwrite field mask.
805    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
806        Self {
807            mask: Some(mask.into()),
808            ..self
809        }
810    }
811}
812
813impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
814    fn from(value: ReconfigureBasinRequest) -> Self {
815        let ReconfigureBasinRequest {
816            basin,
817            config,
818            mask,
819        } = value;
820        Self {
821            basin: basin.0,
822            config: config.map(Into::into),
823            mask: mask.map(|paths| prost_types::FieldMask { paths }),
824        }
825    }
826}
827
828impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
829    type Error = ConvertError;
830    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
831        let api::ReconfigureBasinResponse { config } = value;
832        let config = config.ok_or("missing basin config")?;
833        config.try_into()
834    }
835}
836
837#[sync_docs]
838#[derive(Debug, Clone)]
839pub struct ReconfigureStreamRequest {
840    pub stream: String,
841    pub config: Option<StreamConfig>,
842    pub mask: Option<Vec<String>>,
843}
844
845impl ReconfigureStreamRequest {
846    /// Create a new request with stream name.
847    pub fn new(stream: impl Into<String>) -> Self {
848        Self {
849            stream: stream.into(),
850            config: None,
851            mask: None,
852        }
853    }
854
855    /// Overwrite stream config.
856    pub fn with_config(self, config: StreamConfig) -> Self {
857        Self {
858            config: Some(config),
859            ..self
860        }
861    }
862
863    /// Overwrite field mask.
864    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
865        Self {
866            mask: Some(mask.into()),
867            ..self
868        }
869    }
870}
871
872impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
873    fn from(value: ReconfigureStreamRequest) -> Self {
874        let ReconfigureStreamRequest {
875            stream,
876            config,
877            mask,
878        } = value;
879        Self {
880            stream,
881            config: config.map(Into::into),
882            mask: mask.map(|paths| prost_types::FieldMask { paths }),
883        }
884    }
885}
886
887impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
888    type Error = ConvertError;
889    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
890        let api::ReconfigureStreamResponse { config } = value;
891        let config = config.ok_or("missing stream config")?;
892        config.try_into()
893    }
894}
895
896impl From<api::CheckTailResponse> for StreamPosition {
897    fn from(value: api::CheckTailResponse) -> Self {
898        let api::CheckTailResponse {
899            next_seq_num,
900            last_timestamp,
901        } = value;
902        StreamPosition {
903            seq_num: next_seq_num,
904            timestamp: last_timestamp,
905        }
906    }
907}
908
909/// Position of a record in a stream.
910#[derive(Debug, Clone, PartialEq, Eq)]
911pub struct StreamPosition {
912    /// Sequence number assigned by the service.
913    pub seq_num: u64,
914    /// Timestamp, which may be user-specified or assigned by the service.
915    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
916    pub timestamp: u64,
917}
918
919#[sync_docs]
920#[derive(Debug, Clone, PartialEq, Eq)]
921pub struct Header {
922    pub name: Bytes,
923    pub value: Bytes,
924}
925
926impl Header {
927    /// Create a new header from name and value.
928    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
929        Self {
930            name: name.into(),
931            value: value.into(),
932        }
933    }
934
935    /// Create a new header from value.
936    pub fn from_value(value: impl Into<Bytes>) -> Self {
937        Self {
938            name: Bytes::new(),
939            value: value.into(),
940        }
941    }
942}
943
944impl From<Header> for api::Header {
945    fn from(value: Header) -> Self {
946        let Header { name, value } = value;
947        Self { name, value }
948    }
949}
950
951impl From<api::Header> for Header {
952    fn from(value: api::Header) -> Self {
953        let api::Header { name, value } = value;
954        Self { name, value }
955    }
956}
957
958/// A fencing token can be enforced on append requests.
959///
960/// Must not be more than 16 bytes.
961#[derive(Debug, Clone, Default, PartialEq, Eq)]
962pub struct FencingToken(Bytes);
963
964impl FencingToken {
965    const MAX_BYTES: usize = 16;
966
967    /// Try creating a new fencing token from bytes.
968    pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
969        let bytes = bytes.into();
970        if bytes.len() > Self::MAX_BYTES {
971            Err(format!(
972                "Size of a fencing token cannot exceed {} bytes",
973                Self::MAX_BYTES
974            )
975            .into())
976        } else {
977            Ok(Self(bytes))
978        }
979    }
980
981    /// Generate a random fencing token with `n` bytes.
982    pub fn generate(n: usize) -> Result<Self, ConvertError> {
983        Self::new(
984            rand::rng()
985                .sample_iter(rand::distr::StandardUniform)
986                .take(n)
987                .collect::<Bytes>(),
988        )
989    }
990}
991
992impl AsRef<Bytes> for FencingToken {
993    fn as_ref(&self) -> &Bytes {
994        &self.0
995    }
996}
997
998impl AsRef<[u8]> for FencingToken {
999    fn as_ref(&self) -> &[u8] {
1000        &self.0
1001    }
1002}
1003
1004impl Deref for FencingToken {
1005    type Target = [u8];
1006
1007    fn deref(&self) -> &Self::Target {
1008        &self.0
1009    }
1010}
1011
1012impl From<FencingToken> for Bytes {
1013    fn from(value: FencingToken) -> Self {
1014        value.0
1015    }
1016}
1017
1018impl From<FencingToken> for Vec<u8> {
1019    fn from(value: FencingToken) -> Self {
1020        value.0.into()
1021    }
1022}
1023
1024impl TryFrom<Bytes> for FencingToken {
1025    type Error = ConvertError;
1026
1027    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1028        Self::new(value)
1029    }
1030}
1031
1032impl TryFrom<Vec<u8>> for FencingToken {
1033    type Error = ConvertError;
1034
1035    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
1036        Self::new(value)
1037    }
1038}
1039
1040/// Command to send through a `CommandRecord`.
1041#[derive(Debug, Clone)]
1042pub enum Command {
1043    /// Enforce a fencing token.
1044    ///
1045    /// Fencing is strongly consistent, and subsequent appends that specify a
1046    /// fencing token will be rejected if it does not match.
1047    Fence {
1048        /// Fencing token to enforce.
1049        ///
1050        /// Set empty to clear the token.
1051        fencing_token: FencingToken,
1052    },
1053    /// Request a trim till the sequence number.
1054    ///
1055    /// Trimming is eventually consistent, and trimmed records may be visible
1056    /// for a brief period
1057    Trim {
1058        /// Trim point.
1059        ///
1060        /// This sequence number is only allowed to advance, and any regression
1061        /// will be ignored.
1062        seq_num: u64,
1063    },
1064}
1065
1066/// A command record is a special kind of [`AppendRecord`] that can be used to
1067/// send command messages.
1068///
1069/// Such a record is signalled by a sole header with empty name. The header
1070/// value represents the operation and record body acts as the payload.
1071#[derive(Debug, Clone)]
1072pub struct CommandRecord {
1073    /// Command kind.
1074    pub command: Command,
1075    /// Timestamp for the record.
1076    pub timestamp: Option<u64>,
1077}
1078
1079impl CommandRecord {
1080    const FENCE: &[u8] = b"fence";
1081    const TRIM: &[u8] = b"trim";
1082
1083    /// Create a new fence command record.
1084    pub fn fence(fencing_token: FencingToken) -> Self {
1085        Self {
1086            command: Command::Fence { fencing_token },
1087            timestamp: None,
1088        }
1089    }
1090
1091    /// Create a new trim command record.
1092    pub fn trim(seq_num: impl Into<u64>) -> Self {
1093        Self {
1094            command: Command::Trim {
1095                seq_num: seq_num.into(),
1096            },
1097            timestamp: None,
1098        }
1099    }
1100
1101    /// Overwrite timestamp.
1102    pub fn with_timestamp(self, timestamp: u64) -> Self {
1103        Self {
1104            timestamp: Some(timestamp),
1105            ..self
1106        }
1107    }
1108}
1109
1110#[sync_docs]
1111#[derive(Debug, Clone, PartialEq, Eq)]
1112pub struct AppendRecord {
1113    timestamp: Option<u64>,
1114    headers: Vec<Header>,
1115    body: Bytes,
1116    #[cfg(test)]
1117    max_bytes: u64,
1118}
1119
1120metered_impl!(AppendRecord);
1121
1122impl AppendRecord {
1123    const MAX_BYTES: u64 = MIB_BYTES;
1124
1125    fn validated(self) -> Result<Self, ConvertError> {
1126        #[cfg(test)]
1127        let max_bytes = self.max_bytes;
1128        #[cfg(not(test))]
1129        let max_bytes = Self::MAX_BYTES;
1130
1131        if self.metered_bytes() > max_bytes {
1132            Err("AppendRecord should have metered size less than 1 MiB".into())
1133        } else {
1134            Ok(self)
1135        }
1136    }
1137
1138    /// Try creating a new append record with body.
1139    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1140        Self {
1141            timestamp: None,
1142            headers: Vec::new(),
1143            body: body.into(),
1144            #[cfg(test)]
1145            max_bytes: Self::MAX_BYTES,
1146        }
1147        .validated()
1148    }
1149
1150    #[cfg(test)]
1151    pub(crate) fn with_max_bytes(
1152        max_bytes: u64,
1153        body: impl Into<Bytes>,
1154    ) -> Result<Self, ConvertError> {
1155        Self {
1156            timestamp: None,
1157            headers: Vec::new(),
1158            body: body.into(),
1159            max_bytes,
1160        }
1161        .validated()
1162    }
1163
1164    /// Overwrite headers.
1165    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1166        Self {
1167            headers: headers.into(),
1168            ..self
1169        }
1170        .validated()
1171    }
1172
1173    /// Overwrite timestamp.
1174    pub fn with_timestamp(self, timestamp: u64) -> Self {
1175        Self {
1176            timestamp: Some(timestamp),
1177            ..self
1178        }
1179    }
1180
1181    /// Body of the record.
1182    pub fn body(&self) -> &[u8] {
1183        &self.body
1184    }
1185
1186    /// Headers of the record.
1187    pub fn headers(&self) -> &[Header] {
1188        &self.headers
1189    }
1190
1191    /// Timestamp for the record.
1192    pub fn timestamp(&self) -> Option<u64> {
1193        self.timestamp
1194    }
1195
1196    /// Consume the record and return parts.
1197    pub fn into_parts(self) -> AppendRecordParts {
1198        AppendRecordParts {
1199            timestamp: self.timestamp,
1200            headers: self.headers,
1201            body: self.body,
1202        }
1203    }
1204
1205    /// Try creating the record from parts.
1206    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1207        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1208        if let Some(timestamp) = parts.timestamp {
1209            Ok(record.with_timestamp(timestamp))
1210        } else {
1211            Ok(record)
1212        }
1213    }
1214}
1215
1216impl From<AppendRecord> for api::AppendRecord {
1217    fn from(value: AppendRecord) -> Self {
1218        Self {
1219            timestamp: value.timestamp,
1220            headers: value.headers.into_iter().map(Into::into).collect(),
1221            body: value.body,
1222        }
1223    }
1224}
1225
1226impl From<CommandRecord> for AppendRecord {
1227    fn from(value: CommandRecord) -> Self {
1228        let (header_value, body) = match value.command {
1229            Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1230            Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1231        };
1232        AppendRecordParts {
1233            timestamp: value.timestamp,
1234            headers: vec![Header::from_value(header_value)],
1235            body: body.into(),
1236        }
1237        .try_into()
1238        .expect("command record is a valid append record")
1239    }
1240}
1241
1242#[sync_docs(AppendRecordParts = "AppendRecord")]
1243#[derive(Debug, Clone)]
1244pub struct AppendRecordParts {
1245    pub timestamp: Option<u64>,
1246    pub headers: Vec<Header>,
1247    pub body: Bytes,
1248}
1249
1250impl From<AppendRecord> for AppendRecordParts {
1251    fn from(value: AppendRecord) -> Self {
1252        value.into_parts()
1253    }
1254}
1255
1256impl TryFrom<AppendRecordParts> for AppendRecord {
1257    type Error = ConvertError;
1258
1259    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1260        Self::try_from_parts(value)
1261    }
1262}
1263
1264/// A collection of append records that can be sent together in a batch.
1265#[derive(Debug, Clone)]
1266pub struct AppendRecordBatch {
1267    records: Vec<AppendRecord>,
1268    metered_bytes: u64,
1269    max_capacity: usize,
1270    #[cfg(test)]
1271    max_bytes: u64,
1272}
1273
1274impl PartialEq for AppendRecordBatch {
1275    fn eq(&self, other: &Self) -> bool {
1276        if self.records.eq(&other.records) {
1277            assert_eq!(self.metered_bytes, other.metered_bytes);
1278            true
1279        } else {
1280            false
1281        }
1282    }
1283}
1284
1285impl Eq for AppendRecordBatch {}
1286
1287impl Default for AppendRecordBatch {
1288    fn default() -> Self {
1289        Self::new()
1290    }
1291}
1292
1293impl AppendRecordBatch {
1294    /// Maximum number of records that a batch can hold.
1295    ///
1296    /// A record batch cannot be created with a bigger capacity.
1297    pub const MAX_CAPACITY: usize = 1000;
1298
1299    /// Maximum metered bytes of the batch.
1300    pub const MAX_BYTES: u64 = MIB_BYTES;
1301
1302    /// Create an empty record batch.
1303    pub fn new() -> Self {
1304        Self::with_max_capacity(Self::MAX_CAPACITY)
1305    }
1306
1307    /// Create an empty record batch with custom max capacity.
1308    ///
1309    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1310    pub fn with_max_capacity(max_capacity: usize) -> Self {
1311        assert!(
1312            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1313            "Batch capacity must be between 1 and 1000"
1314        );
1315
1316        Self {
1317            records: Vec::with_capacity(max_capacity),
1318            metered_bytes: 0,
1319            max_capacity,
1320            #[cfg(test)]
1321            max_bytes: Self::MAX_BYTES,
1322        }
1323    }
1324
1325    #[cfg(test)]
1326    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1327        #[cfg(test)]
1328        assert!(
1329            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1330            "Batch size must be between 1 byte and 1 MiB"
1331        );
1332
1333        Self {
1334            max_bytes,
1335            ..Self::with_max_capacity(max_capacity)
1336        }
1337    }
1338
1339    /// Try creating a record batch from an iterator.
1340    ///
1341    /// If all the items of the iterator cannot be drained into the batch, the
1342    /// error returned contains a batch containing all records it could fit
1343    /// along-with the left over items from the iterator.
1344    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1345    where
1346        R: Into<AppendRecord>,
1347        T: IntoIterator<Item = R>,
1348    {
1349        let mut records = Self::new();
1350        let mut pending = Vec::new();
1351
1352        let mut iter = iter.into_iter();
1353
1354        for record in iter.by_ref() {
1355            if let Err(record) = records.push(record) {
1356                pending.push(record);
1357                break;
1358            }
1359        }
1360
1361        if pending.is_empty() {
1362            Ok(records)
1363        } else {
1364            pending.extend(iter.map(Into::into));
1365            Err((records, pending))
1366        }
1367    }
1368
1369    /// Returns true if the batch contains no records.
1370    pub fn is_empty(&self) -> bool {
1371        if self.records.is_empty() {
1372            assert_eq!(self.metered_bytes, 0);
1373            true
1374        } else {
1375            false
1376        }
1377    }
1378
1379    /// Returns the number of records contained in the batch.
1380    pub fn len(&self) -> usize {
1381        self.records.len()
1382    }
1383
1384    #[cfg(test)]
1385    fn max_bytes(&self) -> u64 {
1386        self.max_bytes
1387    }
1388
1389    #[cfg(not(test))]
1390    fn max_bytes(&self) -> u64 {
1391        Self::MAX_BYTES
1392    }
1393
1394    /// Returns true if the batch cannot fit any more records.
1395    pub fn is_full(&self) -> bool {
1396        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1397    }
1398
1399    /// Try to append a new record into the batch.
1400    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1401        assert!(self.records.len() <= self.max_capacity);
1402        assert!(self.metered_bytes <= self.max_bytes());
1403
1404        let record = record.into();
1405        let record_size = record.metered_bytes();
1406        if self.records.len() >= self.max_capacity
1407            || self.metered_bytes + record_size > self.max_bytes()
1408        {
1409            Err(record)
1410        } else {
1411            self.records.push(record);
1412            self.metered_bytes += record_size;
1413            Ok(())
1414        }
1415    }
1416}
1417
1418impl MeteredBytes for AppendRecordBatch {
1419    fn metered_bytes(&self) -> u64 {
1420        self.metered_bytes
1421    }
1422}
1423
1424impl IntoIterator for AppendRecordBatch {
1425    type Item = AppendRecord;
1426    type IntoIter = std::vec::IntoIter<Self::Item>;
1427
1428    fn into_iter(self) -> Self::IntoIter {
1429        self.records.into_iter()
1430    }
1431}
1432
1433impl<'a> IntoIterator for &'a AppendRecordBatch {
1434    type Item = &'a AppendRecord;
1435    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1436
1437    fn into_iter(self) -> Self::IntoIter {
1438        self.records.iter()
1439    }
1440}
1441
1442impl AsRef<[AppendRecord]> for AppendRecordBatch {
1443    fn as_ref(&self) -> &[AppendRecord] {
1444        &self.records
1445    }
1446}
1447
1448#[sync_docs]
1449#[derive(Debug, Default, Clone)]
1450pub struct AppendInput {
1451    pub records: AppendRecordBatch,
1452    pub match_seq_num: Option<u64>,
1453    pub fencing_token: Option<FencingToken>,
1454}
1455
1456impl MeteredBytes for AppendInput {
1457    fn metered_bytes(&self) -> u64 {
1458        self.records.metered_bytes()
1459    }
1460}
1461
1462impl AppendInput {
1463    /// Create a new append input from record batch.
1464    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1465        Self {
1466            records: records.into(),
1467            match_seq_num: None,
1468            fencing_token: None,
1469        }
1470    }
1471
1472    /// Overwrite match sequence number.
1473    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1474        Self {
1475            match_seq_num: Some(match_seq_num.into()),
1476            ..self
1477        }
1478    }
1479
1480    /// Overwrite fencing token.
1481    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1482        Self {
1483            fencing_token: Some(fencing_token),
1484            ..self
1485        }
1486    }
1487
1488    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1489        let Self {
1490            records,
1491            match_seq_num,
1492            fencing_token,
1493        } = self;
1494
1495        api::AppendInput {
1496            stream: stream.into(),
1497            records: records.into_iter().map(Into::into).collect(),
1498            match_seq_num,
1499            fencing_token: fencing_token.map(|f| f.0),
1500        }
1501    }
1502}
1503
1504/// Acknowledgment to an append request.
1505#[derive(Debug, Clone)]
1506pub struct AppendAck {
1507    /// Sequence number and timestamp of the first record that was appended.
1508    pub start: StreamPosition,
1509    /// Sequence number of the last record that was appended + 1,
1510    /// and timestamp of the last record that was appended.
1511    /// The difference between `end.seq_num` and `start.seq_num`
1512    /// will be the number of records appended.
1513    pub end: StreamPosition,
1514    /// Sequence number that will be assigned to the next record on the stream,
1515    /// and timestamp of the last record on the stream.
1516    /// This can be greater than the `end` position in case of concurrent appends.
1517    pub tail: StreamPosition,
1518}
1519
1520impl From<api::AppendOutput> for AppendAck {
1521    fn from(value: api::AppendOutput) -> Self {
1522        let api::AppendOutput {
1523            start_seq_num,
1524            start_timestamp,
1525            end_seq_num,
1526            end_timestamp,
1527            next_seq_num,
1528            last_timestamp,
1529        } = value;
1530        let start = StreamPosition {
1531            seq_num: start_seq_num,
1532            timestamp: start_timestamp,
1533        };
1534        let end = StreamPosition {
1535            seq_num: end_seq_num,
1536            timestamp: end_timestamp,
1537        };
1538        let tail = StreamPosition {
1539            seq_num: next_seq_num,
1540            timestamp: last_timestamp,
1541        };
1542        Self { start, end, tail }
1543    }
1544}
1545
1546impl TryFrom<api::AppendResponse> for AppendAck {
1547    type Error = ConvertError;
1548    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1549        let api::AppendResponse { output } = value;
1550        let output = output.ok_or("missing append output")?;
1551        Ok(output.into())
1552    }
1553}
1554
1555impl TryFrom<api::AppendSessionResponse> for AppendAck {
1556    type Error = ConvertError;
1557    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1558        let api::AppendSessionResponse { output } = value;
1559        let output = output.ok_or("missing append output")?;
1560        Ok(output.into())
1561    }
1562}
1563
1564#[sync_docs]
1565#[derive(Debug, Clone, Default)]
1566pub struct ReadLimit {
1567    pub count: Option<u64>,
1568    pub bytes: Option<u64>,
1569}
1570
1571impl ReadLimit {
1572    /// Create a new read limit.
1573    pub fn new() -> Self {
1574        Self::default()
1575    }
1576
1577    /// Overwrite count limit.
1578    pub fn with_count(self, count: u64) -> Self {
1579        Self {
1580            count: Some(count),
1581            ..self
1582        }
1583    }
1584
1585    /// Overwrite bytes limit.
1586    pub fn with_bytes(self, bytes: u64) -> Self {
1587        Self {
1588            bytes: Some(bytes),
1589            ..self
1590        }
1591    }
1592}
1593
1594/// Starting point for read requests.
1595#[derive(Debug, Clone)]
1596pub enum ReadStart {
1597    /// Sequence number.
1598    SeqNum(u64),
1599    /// Timestamp.
1600    Timestamp(u64),
1601    /// Number of records before the tail, i.e. the next sequence number.
1602    TailOffset(u64),
1603}
1604
1605impl Default for ReadStart {
1606    fn default() -> Self {
1607        Self::SeqNum(0)
1608    }
1609}
1610
1611impl From<ReadStart> for api::read_request::Start {
1612    fn from(start: ReadStart) -> Self {
1613        match start {
1614            ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1615            ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1616            ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1617        }
1618    }
1619}
1620
1621impl From<ReadStart> for api::read_session_request::Start {
1622    fn from(start: ReadStart) -> Self {
1623        match start {
1624            ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1625            ReadStart::Timestamp(timestamp) => {
1626                api::read_session_request::Start::Timestamp(timestamp)
1627            }
1628            ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1629        }
1630    }
1631}
1632
1633#[sync_docs]
1634#[derive(Debug, Clone, Default)]
1635pub struct ReadRequest {
1636    pub start: ReadStart,
1637    pub limit: ReadLimit,
1638}
1639
1640impl ReadRequest {
1641    /// Create a new request with the specified starting point.
1642    pub fn new(start: ReadStart) -> Self {
1643        Self {
1644            start,
1645            ..Default::default()
1646        }
1647    }
1648
1649    /// Overwrite limit.
1650    pub fn with_limit(self, limit: ReadLimit) -> Self {
1651        Self { limit, ..self }
1652    }
1653}
1654
1655impl ReadRequest {
1656    pub(crate) fn try_into_api_type(
1657        self,
1658        stream: impl Into<String>,
1659    ) -> Result<api::ReadRequest, ConvertError> {
1660        let Self { start, limit } = self;
1661
1662        let limit = if limit.count > Some(1000) {
1663            Err("read limit: count must not exceed 1000 for unary request")
1664        } else if limit.bytes > Some(MIB_BYTES) {
1665            Err("read limit: bytes must not exceed 1MiB for unary request")
1666        } else {
1667            Ok(api::ReadLimit {
1668                count: limit.count,
1669                bytes: limit.bytes,
1670            })
1671        }?;
1672
1673        Ok(api::ReadRequest {
1674            stream: stream.into(),
1675            start: Some(start.into()),
1676            limit: Some(limit),
1677        })
1678    }
1679}
1680
1681#[sync_docs]
1682#[derive(Debug, Clone)]
1683pub struct SequencedRecord {
1684    pub seq_num: u64,
1685    pub timestamp: u64,
1686    pub headers: Vec<Header>,
1687    pub body: Bytes,
1688}
1689
1690metered_impl!(SequencedRecord);
1691
1692impl From<api::SequencedRecord> for SequencedRecord {
1693    fn from(value: api::SequencedRecord) -> Self {
1694        let api::SequencedRecord {
1695            seq_num,
1696            timestamp,
1697            headers,
1698            body,
1699        } = value;
1700        Self {
1701            seq_num,
1702            timestamp,
1703            headers: headers.into_iter().map(Into::into).collect(),
1704            body,
1705        }
1706    }
1707}
1708
1709impl SequencedRecord {
1710    /// Try representing the sequenced record as a command record.
1711    pub fn as_command_record(&self) -> Option<CommandRecord> {
1712        if self.headers.len() != 1 {
1713            return None;
1714        }
1715
1716        let header = self.headers.first().expect("pre-validated length");
1717
1718        if !header.name.is_empty() {
1719            return None;
1720        }
1721
1722        match header.value.as_ref() {
1723            CommandRecord::FENCE => {
1724                let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1725                Some(CommandRecord {
1726                    command: Command::Fence { fencing_token },
1727                    timestamp: Some(self.timestamp),
1728                })
1729            }
1730            CommandRecord::TRIM => {
1731                let body: &[u8] = &self.body;
1732                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1733                Some(CommandRecord {
1734                    command: Command::Trim { seq_num },
1735                    timestamp: Some(self.timestamp),
1736                })
1737            }
1738            _ => None,
1739        }
1740    }
1741}
1742
1743#[sync_docs]
1744#[derive(Debug, Clone)]
1745pub struct SequencedRecordBatch {
1746    pub records: Vec<SequencedRecord>,
1747}
1748
1749impl MeteredBytes for SequencedRecordBatch {
1750    fn metered_bytes(&self) -> u64 {
1751        self.records.metered_bytes()
1752    }
1753}
1754
1755impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1756    fn from(value: api::SequencedRecordBatch) -> Self {
1757        let api::SequencedRecordBatch { records } = value;
1758        Self {
1759            records: records.into_iter().map(Into::into).collect(),
1760        }
1761    }
1762}
1763
1764#[sync_docs(ReadOutput = "Output")]
1765#[derive(Debug, Clone)]
1766pub enum ReadOutput {
1767    Batch(SequencedRecordBatch),
1768    NextSeqNum(u64),
1769}
1770
1771impl From<api::read_output::Output> for ReadOutput {
1772    fn from(value: api::read_output::Output) -> Self {
1773        match value {
1774            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1775            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1776        }
1777    }
1778}
1779
1780impl TryFrom<api::ReadOutput> for ReadOutput {
1781    type Error = ConvertError;
1782    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1783        let api::ReadOutput { output } = value;
1784        let output = output.ok_or("missing read output")?;
1785        Ok(output.into())
1786    }
1787}
1788
1789impl TryFrom<api::ReadResponse> for ReadOutput {
1790    type Error = ConvertError;
1791    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1792        let api::ReadResponse { output } = value;
1793        let output = output.ok_or("missing output in read response")?;
1794        output.try_into()
1795    }
1796}
1797
1798#[sync_docs]
1799#[derive(Debug, Clone, Default)]
1800pub struct ReadSessionRequest {
1801    pub start: ReadStart,
1802    pub limit: ReadLimit,
1803}
1804
1805impl ReadSessionRequest {
1806    /// Create a new request with the specified starting point.
1807    pub fn new(start: ReadStart) -> Self {
1808        Self {
1809            start,
1810            ..Default::default()
1811        }
1812    }
1813
1814    /// Overwrite limit.
1815    pub fn with_limit(self, limit: ReadLimit) -> Self {
1816        Self { limit, ..self }
1817    }
1818
1819    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1820        let Self { start, limit } = self;
1821        api::ReadSessionRequest {
1822            stream: stream.into(),
1823            start: Some(start.into()),
1824            limit: Some(api::ReadLimit {
1825                count: limit.count,
1826                bytes: limit.bytes,
1827            }),
1828            heartbeats: false,
1829        }
1830    }
1831}
1832
1833impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1834    type Error = ConvertError;
1835    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1836        let api::ReadSessionResponse { output } = value;
1837        let output = output.ok_or("missing output in read session response")?;
1838        output.try_into()
1839    }
1840}
1841
1842/// Name of a basin.
1843///
1844/// Must be between 8 and 48 characters in length. Must comprise lowercase
1845/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1846#[derive(Debug, Clone)]
1847pub struct BasinName(String);
1848
1849impl AsRef<str> for BasinName {
1850    fn as_ref(&self) -> &str {
1851        &self.0
1852    }
1853}
1854
1855impl Deref for BasinName {
1856    type Target = str;
1857    fn deref(&self) -> &Self::Target {
1858        &self.0
1859    }
1860}
1861
1862impl TryFrom<String> for BasinName {
1863    type Error = ConvertError;
1864
1865    fn try_from(name: String) -> Result<Self, Self::Error> {
1866        if name.len() < 8 || name.len() > 48 {
1867            return Err("Basin name must be between 8 and 48 characters in length".into());
1868        }
1869
1870        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1871        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1872            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1873                .expect("Failed to compile basin name regex")
1874        });
1875
1876        if !regex.is_match(&name) {
1877            return Err(
1878                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1879                It cannot begin or end with a hyphen."
1880                    .into(),
1881            );
1882        }
1883
1884        Ok(Self(name))
1885    }
1886}
1887
1888impl FromStr for BasinName {
1889    type Err = ConvertError;
1890
1891    fn from_str(s: &str) -> Result<Self, Self::Err> {
1892        s.to_string().try_into()
1893    }
1894}
1895
1896impl std::fmt::Display for BasinName {
1897    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1898        f.write_str(&self.0)
1899    }
1900}
1901
1902/// Access token ID.
1903/// Must be between 1 and 50 characters.
1904#[derive(Debug, Clone)]
1905pub struct AccessTokenId(String);
1906
1907impl AsRef<str> for AccessTokenId {
1908    fn as_ref(&self) -> &str {
1909        &self.0
1910    }
1911}
1912
1913impl Deref for AccessTokenId {
1914    type Target = str;
1915
1916    fn deref(&self) -> &Self::Target {
1917        &self.0
1918    }
1919}
1920
1921impl TryFrom<String> for AccessTokenId {
1922    type Error = ConvertError;
1923
1924    fn try_from(name: String) -> Result<Self, Self::Error> {
1925        if name.is_empty() {
1926            return Err("Access token ID must not be empty".into());
1927        }
1928
1929        if name.len() > 50 {
1930            return Err("Access token ID must not exceed 50 characters".into());
1931        }
1932
1933        Ok(Self(name))
1934    }
1935}
1936
1937impl From<AccessTokenId> for String {
1938    fn from(value: AccessTokenId) -> Self {
1939        value.0
1940    }
1941}
1942
1943impl FromStr for AccessTokenId {
1944    type Err = ConvertError;
1945
1946    fn from_str(s: &str) -> Result<Self, Self::Err> {
1947        s.to_string().try_into()
1948    }
1949}
1950
1951impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1952    fn from(value: AccessTokenInfo) -> Self {
1953        Self {
1954            info: Some(value.into()),
1955        }
1956    }
1957}
1958
1959#[sync_docs]
1960#[derive(Debug, Clone)]
1961pub struct AccessTokenInfo {
1962    pub id: AccessTokenId,
1963    pub expires_at: Option<u32>,
1964    pub auto_prefix_streams: bool,
1965    pub scope: Option<AccessTokenScope>,
1966}
1967
1968impl AccessTokenInfo {
1969    /// Create a new access token info.
1970    pub fn new(id: AccessTokenId) -> Self {
1971        Self {
1972            id,
1973            expires_at: None,
1974            auto_prefix_streams: false,
1975            scope: None,
1976        }
1977    }
1978
1979    /// Overwrite expiration time.
1980    pub fn with_expires_at(self, expires_at: u32) -> Self {
1981        Self {
1982            expires_at: Some(expires_at),
1983            ..self
1984        }
1985    }
1986
1987    /// Overwrite auto prefix streams.
1988    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1989        Self {
1990            auto_prefix_streams,
1991            ..self
1992        }
1993    }
1994
1995    /// Overwrite scope.
1996    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1997        Self {
1998            scope: Some(scope),
1999            ..self
2000        }
2001    }
2002}
2003
2004impl From<AccessTokenInfo> for api::AccessTokenInfo {
2005    fn from(value: AccessTokenInfo) -> Self {
2006        let AccessTokenInfo {
2007            id,
2008            expires_at,
2009            auto_prefix_streams,
2010            scope,
2011        } = value;
2012        Self {
2013            id: id.into(),
2014            expires_at,
2015            auto_prefix_streams,
2016            scope: scope.map(Into::into),
2017        }
2018    }
2019}
2020
2021impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2022    type Error = ConvertError;
2023    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2024        let api::AccessTokenInfo {
2025            id,
2026            expires_at,
2027            auto_prefix_streams,
2028            scope,
2029        } = value;
2030        Ok(Self {
2031            id: id.try_into()?,
2032            expires_at,
2033            auto_prefix_streams,
2034            scope: scope.map(TryInto::try_into).transpose()?,
2035        })
2036    }
2037}
2038
2039#[sync_docs]
2040#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2041pub enum Operation {
2042    Unspecified,
2043    ListBasins,
2044    CreateBasin,
2045    DeleteBasin,
2046    ReconfigureBasin,
2047    GetBasinConfig,
2048    IssueAccessToken,
2049    RevokeAccessToken,
2050    ListAccessTokens,
2051    ListStreams,
2052    CreateStream,
2053    DeleteStream,
2054    GetStreamConfig,
2055    ReconfigureStream,
2056    CheckTail,
2057    Append,
2058    Read,
2059    Trim,
2060    Fence,
2061}
2062
2063impl FromStr for Operation {
2064    type Err = ConvertError;
2065
2066    fn from_str(s: &str) -> Result<Self, Self::Err> {
2067        match s.to_lowercase().as_str() {
2068            "unspecified" => Ok(Self::Unspecified),
2069            "list-basins" => Ok(Self::ListBasins),
2070            "create-basin" => Ok(Self::CreateBasin),
2071            "delete-basin" => Ok(Self::DeleteBasin),
2072            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2073            "get-basin-config" => Ok(Self::GetBasinConfig),
2074            "issue-access-token" => Ok(Self::IssueAccessToken),
2075            "revoke-access-token" => Ok(Self::RevokeAccessToken),
2076            "list-access-tokens" => Ok(Self::ListAccessTokens),
2077            "list-streams" => Ok(Self::ListStreams),
2078            "create-stream" => Ok(Self::CreateStream),
2079            "delete-stream" => Ok(Self::DeleteStream),
2080            "get-stream-config" => Ok(Self::GetStreamConfig),
2081            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2082            "check-tail" => Ok(Self::CheckTail),
2083            "append" => Ok(Self::Append),
2084            "read" => Ok(Self::Read),
2085            "trim" => Ok(Self::Trim),
2086            "fence" => Ok(Self::Fence),
2087            _ => Err("invalid operation".into()),
2088        }
2089    }
2090}
2091
2092impl From<Operation> for i32 {
2093    fn from(value: Operation) -> Self {
2094        api::Operation::from(value).into()
2095    }
2096}
2097
2098impl TryFrom<i32> for Operation {
2099    type Error = ConvertError;
2100    fn try_from(value: i32) -> Result<Self, Self::Error> {
2101        api::Operation::try_from(value)
2102            .map_err(|_| "invalid operation".into())
2103            .map(Into::into)
2104    }
2105}
2106
2107impl From<Operation> for api::Operation {
2108    fn from(value: Operation) -> Self {
2109        match value {
2110            Operation::Unspecified => Self::Unspecified,
2111            Operation::ListBasins => Self::ListBasins,
2112            Operation::CreateBasin => Self::CreateBasin,
2113            Operation::DeleteBasin => Self::DeleteBasin,
2114            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2115            Operation::GetBasinConfig => Self::GetBasinConfig,
2116            Operation::IssueAccessToken => Self::IssueAccessToken,
2117            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2118            Operation::ListAccessTokens => Self::ListAccessTokens,
2119            Operation::ListStreams => Self::ListStreams,
2120            Operation::CreateStream => Self::CreateStream,
2121            Operation::DeleteStream => Self::DeleteStream,
2122            Operation::GetStreamConfig => Self::GetStreamConfig,
2123            Operation::ReconfigureStream => Self::ReconfigureStream,
2124            Operation::CheckTail => Self::CheckTail,
2125            Operation::Append => Self::Append,
2126            Operation::Read => Self::Read,
2127            Operation::Trim => Self::Trim,
2128            Operation::Fence => Self::Fence,
2129        }
2130    }
2131}
2132
2133impl From<api::Operation> for Operation {
2134    fn from(value: api::Operation) -> Self {
2135        match value {
2136            api::Operation::Unspecified => Self::Unspecified,
2137            api::Operation::ListBasins => Self::ListBasins,
2138            api::Operation::CreateBasin => Self::CreateBasin,
2139            api::Operation::DeleteBasin => Self::DeleteBasin,
2140            api::Operation::ReconfigureBasin => Self::ReconfigureBasin,
2141            api::Operation::GetBasinConfig => Self::GetBasinConfig,
2142            api::Operation::IssueAccessToken => Self::IssueAccessToken,
2143            api::Operation::RevokeAccessToken => Self::RevokeAccessToken,
2144            api::Operation::ListAccessTokens => Self::ListAccessTokens,
2145            api::Operation::ListStreams => Self::ListStreams,
2146            api::Operation::CreateStream => Self::CreateStream,
2147            api::Operation::DeleteStream => Self::DeleteStream,
2148            api::Operation::GetStreamConfig => Self::GetStreamConfig,
2149            api::Operation::ReconfigureStream => Self::ReconfigureStream,
2150            api::Operation::CheckTail => Self::CheckTail,
2151            api::Operation::Append => Self::Append,
2152            api::Operation::Read => Self::Read,
2153            api::Operation::Trim => Self::Trim,
2154            api::Operation::Fence => Self::Fence,
2155        }
2156    }
2157}
2158
2159#[sync_docs]
2160#[derive(Debug, Clone, Default)]
2161pub struct AccessTokenScope {
2162    pub basins: Option<ResourceSet>,
2163    pub streams: Option<ResourceSet>,
2164    pub access_tokens: Option<ResourceSet>,
2165    pub op_groups: Option<PermittedOperationGroups>,
2166    pub ops: HashSet<Operation>,
2167}
2168
2169impl AccessTokenScope {
2170    /// Create a new access token scope.
2171    pub fn new() -> Self {
2172        Self::default()
2173    }
2174
2175    /// Overwrite resource set for access tokens.
2176    pub fn with_basins(self, basins: ResourceSet) -> Self {
2177        Self {
2178            basins: Some(basins),
2179            ..self
2180        }
2181    }
2182
2183    /// Overwrite resource set for streams.
2184    pub fn with_streams(self, streams: ResourceSet) -> Self {
2185        Self {
2186            streams: Some(streams),
2187            ..self
2188        }
2189    }
2190
2191    /// Overwrite resource set for access tokens.
2192    pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2193        Self {
2194            access_tokens: Some(access_tokens),
2195            ..self
2196        }
2197    }
2198
2199    /// Overwrite operation groups.
2200    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2201        Self {
2202            op_groups: Some(op_groups),
2203            ..self
2204        }
2205    }
2206
2207    /// Overwrite operations.
2208    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2209        Self {
2210            ops: ops.into_iter().collect(),
2211            ..self
2212        }
2213    }
2214
2215    /// Add an operation to operations.
2216    pub fn with_op(self, op: Operation) -> Self {
2217        let mut ops = self.ops;
2218        ops.insert(op);
2219        Self { ops, ..self }
2220    }
2221}
2222
2223impl From<AccessTokenScope> for api::AccessTokenScope {
2224    fn from(value: AccessTokenScope) -> Self {
2225        let AccessTokenScope {
2226            basins,
2227            streams,
2228            access_tokens,
2229            op_groups,
2230            ops,
2231        } = value;
2232        Self {
2233            basins: basins.map(Into::into),
2234            streams: streams.map(Into::into),
2235            access_tokens: access_tokens.map(Into::into),
2236            op_groups: op_groups.map(Into::into),
2237            ops: ops.into_iter().map(Into::into).collect(),
2238        }
2239    }
2240}
2241
2242impl TryFrom<api::AccessTokenScope> for AccessTokenScope {
2243    type Error = ConvertError;
2244    fn try_from(value: api::AccessTokenScope) -> Result<Self, Self::Error> {
2245        let api::AccessTokenScope {
2246            basins,
2247            streams,
2248            access_tokens,
2249            op_groups,
2250            ops,
2251        } = value;
2252        Ok(Self {
2253            basins: basins.and_then(|set| set.matching.map(Into::into)),
2254            streams: streams.and_then(|set| set.matching.map(Into::into)),
2255            access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2256            op_groups: op_groups.map(Into::into),
2257            ops: ops
2258                .into_iter()
2259                .map(Operation::try_from)
2260                .collect::<Result<HashSet<_>, _>>()?,
2261        })
2262    }
2263}
2264
2265impl From<ResourceSet> for api::ResourceSet {
2266    fn from(value: ResourceSet) -> Self {
2267        Self {
2268            matching: Some(value.into()),
2269        }
2270    }
2271}
2272
2273#[sync_docs(ResourceSet = "Matching")]
2274#[derive(Debug, Clone)]
2275pub enum ResourceSet {
2276    Exact(String),
2277    Prefix(String),
2278}
2279
2280impl From<ResourceSet> for api::resource_set::Matching {
2281    fn from(value: ResourceSet) -> Self {
2282        match value {
2283            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2284            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2285        }
2286    }
2287}
2288
2289impl From<api::resource_set::Matching> for ResourceSet {
2290    fn from(value: api::resource_set::Matching) -> Self {
2291        match value {
2292            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2293            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2294        }
2295    }
2296}
2297
2298#[sync_docs]
2299#[derive(Debug, Clone, Default)]
2300pub struct PermittedOperationGroups {
2301    pub account: Option<ReadWritePermissions>,
2302    pub basin: Option<ReadWritePermissions>,
2303    pub stream: Option<ReadWritePermissions>,
2304}
2305
2306impl PermittedOperationGroups {
2307    /// Create a new permitted operation groups.
2308    pub fn new() -> Self {
2309        Self::default()
2310    }
2311
2312    /// Overwrite account read-write permissions.
2313    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2314        Self {
2315            account: Some(account),
2316            ..self
2317        }
2318    }
2319
2320    /// Overwrite basin read-write permissions.
2321    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2322        Self {
2323            basin: Some(basin),
2324            ..self
2325        }
2326    }
2327
2328    /// Overwrite stream read-write permissions.
2329    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2330        Self {
2331            stream: Some(stream),
2332            ..self
2333        }
2334    }
2335}
2336
2337impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2338    fn from(value: PermittedOperationGroups) -> Self {
2339        let PermittedOperationGroups {
2340            account,
2341            basin,
2342            stream,
2343        } = value;
2344        Self {
2345            account: account.map(Into::into),
2346            basin: basin.map(Into::into),
2347            stream: stream.map(Into::into),
2348        }
2349    }
2350}
2351
2352impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2353    fn from(value: api::PermittedOperationGroups) -> Self {
2354        let api::PermittedOperationGroups {
2355            account,
2356            basin,
2357            stream,
2358        } = value;
2359        Self {
2360            account: account.map(Into::into),
2361            basin: basin.map(Into::into),
2362            stream: stream.map(Into::into),
2363        }
2364    }
2365}
2366
2367#[sync_docs]
2368#[derive(Debug, Clone, Default)]
2369pub struct ReadWritePermissions {
2370    pub read: bool,
2371    pub write: bool,
2372}
2373
2374impl ReadWritePermissions {
2375    /// Create a new read-write permission.
2376    pub fn new() -> Self {
2377        Self::default()
2378    }
2379
2380    /// Overwrite read permission.
2381    pub fn with_read(self, read: bool) -> Self {
2382        Self { read, ..self }
2383    }
2384
2385    /// Overwrite write permission.
2386    pub fn with_write(self, write: bool) -> Self {
2387        Self { write, ..self }
2388    }
2389}
2390
2391impl From<ReadWritePermissions> for api::ReadWritePermissions {
2392    fn from(value: ReadWritePermissions) -> Self {
2393        let ReadWritePermissions { read, write } = value;
2394        Self { read, write }
2395    }
2396}
2397
2398impl From<api::ReadWritePermissions> for ReadWritePermissions {
2399    fn from(value: api::ReadWritePermissions) -> Self {
2400        let api::ReadWritePermissions { read, write } = value;
2401        Self { read, write }
2402    }
2403}
2404
2405impl From<api::IssueAccessTokenResponse> for String {
2406    fn from(value: api::IssueAccessTokenResponse) -> Self {
2407        value.access_token
2408    }
2409}
2410
2411impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2412    fn from(value: AccessTokenId) -> Self {
2413        Self { id: value.into() }
2414    }
2415}
2416
2417impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2418    type Error = ConvertError;
2419    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2420        let token_info = value.info.ok_or("access token info is missing")?;
2421        token_info.try_into()
2422    }
2423}
2424
2425#[sync_docs]
2426#[derive(Debug, Clone, Default)]
2427pub struct ListAccessTokensRequest {
2428    pub prefix: String,
2429    pub start_after: String,
2430    pub limit: Option<usize>,
2431}
2432
2433impl ListAccessTokensRequest {
2434    /// Create a new request with prefix.
2435    pub fn new() -> Self {
2436        Self::default()
2437    }
2438
2439    /// Overwrite prefix.
2440    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2441        Self {
2442            prefix: prefix.into(),
2443            ..self
2444        }
2445    }
2446
2447    /// Overwrite start after.
2448    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2449        Self {
2450            start_after: start_after.into(),
2451            ..self
2452        }
2453    }
2454
2455    /// Overwrite limit.
2456    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2457        Self {
2458            limit: limit.into(),
2459            ..self
2460        }
2461    }
2462}
2463
2464impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2465    type Error = ConvertError;
2466    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2467        let ListAccessTokensRequest {
2468            prefix,
2469            start_after,
2470            limit,
2471        } = value;
2472        Ok(Self {
2473            prefix,
2474            start_after,
2475            limit: limit
2476                .map(TryInto::try_into)
2477                .transpose()
2478                .map_err(|_| "request limit does not fit into u64 bounds")?,
2479        })
2480    }
2481}
2482
2483#[sync_docs]
2484#[derive(Debug, Clone)]
2485pub struct ListAccessTokensResponse {
2486    pub access_tokens: Vec<AccessTokenInfo>,
2487    pub has_more: bool,
2488}
2489
2490impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2491    type Error = ConvertError;
2492    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2493        let api::ListAccessTokensResponse {
2494            access_tokens,
2495            has_more,
2496        } = value;
2497        let access_tokens = access_tokens
2498            .into_iter()
2499            .map(TryInto::try_into)
2500            .collect::<Result<Vec<_>, _>>()?;
2501        Ok(Self {
2502            access_tokens,
2503            has_more,
2504        })
2505    }
2506}