s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14/// Error related to conversion from one type to another.
15#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20    fn from(value: T) -> Self {
21        Self(value.into())
22    }
23}
24
25/// Metered size of the object in bytes.
26///
27/// Bytes are calculated using the "metered bytes" formula:
28///
29/// ```python
30/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
31/// ```
32pub trait MeteredBytes {
33    /// Return the metered bytes of the object.
34    fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38    fn metered_bytes(&self) -> u64 {
39        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40    }
41}
42
43macro_rules! metered_impl {
44    ($ty:ty) => {
45        impl MeteredBytes for $ty {
46            fn metered_bytes(&self) -> u64 {
47                let bytes = 8
48                    + (2 * self.headers.len())
49                    + self
50                        .headers
51                        .iter()
52                        .map(|h| h.name.len() + h.value.len())
53                        .sum::<usize>()
54                    + self.body.len();
55                bytes as u64
56            }
57        }
58    };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum BasinScope {
64    AwsUsEast1,
65}
66
67impl From<BasinScope> for api::BasinScope {
68    fn from(value: BasinScope) -> Self {
69        match value {
70            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
71        }
72    }
73}
74
75impl From<api::BasinScope> for Option<BasinScope> {
76    fn from(value: api::BasinScope) -> Self {
77        match value {
78            api::BasinScope::Unspecified => None,
79            api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
80        }
81    }
82}
83
84impl FromStr for BasinScope {
85    type Err = ConvertError;
86    fn from_str(value: &str) -> Result<Self, Self::Err> {
87        match value {
88            "aws:us-east-1" => Ok(Self::AwsUsEast1),
89            _ => Err("invalid basin scope value".into()),
90        }
91    }
92}
93
94#[sync_docs]
95#[derive(Debug, Clone)]
96pub struct CreateBasinRequest {
97    pub basin: BasinName,
98    pub config: Option<BasinConfig>,
99    pub scope: Option<BasinScope>,
100}
101
102impl CreateBasinRequest {
103    /// Create a new request with basin name.
104    pub fn new(basin: BasinName) -> Self {
105        Self {
106            basin,
107            config: None,
108            scope: None,
109        }
110    }
111
112    /// Overwrite basin configuration.
113    pub fn with_config(self, config: BasinConfig) -> Self {
114        Self {
115            config: Some(config),
116            ..self
117        }
118    }
119
120    /// Overwrite basin scope.
121    pub fn with_scope(self, scope: BasinScope) -> Self {
122        Self {
123            scope: Some(scope),
124            ..self
125        }
126    }
127}
128
129impl From<CreateBasinRequest> for api::CreateBasinRequest {
130    fn from(value: CreateBasinRequest) -> Self {
131        let CreateBasinRequest {
132            basin,
133            config,
134            scope,
135        } = value;
136        Self {
137            basin: basin.0,
138            config: config.map(Into::into),
139            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
140        }
141    }
142}
143
144#[sync_docs]
145#[derive(Debug, Clone, Default)]
146pub struct BasinConfig {
147    pub default_stream_config: Option<StreamConfig>,
148    pub create_stream_on_append: bool,
149    pub create_stream_on_read: bool,
150}
151
152impl BasinConfig {
153    /// Create a new basin config.
154    pub fn new() -> Self {
155        Self::default()
156    }
157
158    /// Overwrite the default stream config.
159    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
160        Self {
161            default_stream_config: Some(default_stream_config),
162            ..self
163        }
164    }
165
166    /// Overwrite `create_stream_on_append`.
167    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
168        Self {
169            create_stream_on_append,
170            ..self
171        }
172    }
173
174    /// Overwrite `create_stream_on_read`.
175    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
176        Self {
177            create_stream_on_read,
178            ..self
179        }
180    }
181}
182
183impl From<BasinConfig> for api::BasinConfig {
184    fn from(value: BasinConfig) -> Self {
185        let BasinConfig {
186            default_stream_config,
187            create_stream_on_append,
188            create_stream_on_read,
189        } = value;
190        Self {
191            default_stream_config: default_stream_config.map(Into::into),
192            create_stream_on_append,
193            create_stream_on_read,
194        }
195    }
196}
197
198impl From<api::BasinConfig> for BasinConfig {
199    fn from(value: api::BasinConfig) -> Self {
200        let api::BasinConfig {
201            default_stream_config,
202            create_stream_on_append,
203            create_stream_on_read,
204        } = value;
205        Self {
206            default_stream_config: default_stream_config.map(Into::into),
207            create_stream_on_append,
208            create_stream_on_read,
209        }
210    }
211}
212
213#[sync_docs]
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub enum TimestampingMode {
216    ClientPrefer,
217    ClientRequire,
218    Arrival,
219}
220
221impl From<TimestampingMode> for api::TimestampingMode {
222    fn from(value: TimestampingMode) -> Self {
223        match value {
224            TimestampingMode::ClientPrefer => Self::ClientPrefer,
225            TimestampingMode::ClientRequire => Self::ClientRequire,
226            TimestampingMode::Arrival => Self::Arrival,
227        }
228    }
229}
230
231impl From<api::TimestampingMode> for Option<TimestampingMode> {
232    fn from(value: api::TimestampingMode) -> Self {
233        match value {
234            api::TimestampingMode::Unspecified => None,
235            api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
236            api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
237            api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
238        }
239    }
240}
241
242#[sync_docs(TimestampingConfig = "Timestamping")]
243#[derive(Debug, Clone, Default)]
244/// Timestamping behavior.
245pub struct TimestampingConfig {
246    pub mode: Option<TimestampingMode>,
247    pub uncapped: Option<bool>,
248}
249
250impl TimestampingConfig {
251    /// Create a new timestamping config.
252    pub fn new() -> Self {
253        Self::default()
254    }
255
256    /// Overwrite timestamping mode.
257    pub fn with_mode(self, mode: TimestampingMode) -> Self {
258        Self {
259            mode: Some(mode),
260            ..self
261        }
262    }
263
264    /// Overwrite the uncapped knob.
265    pub fn with_uncapped(self, uncapped: bool) -> Self {
266        Self {
267            uncapped: Some(uncapped),
268            ..self
269        }
270    }
271}
272
273impl From<TimestampingConfig> for api::stream_config::Timestamping {
274    fn from(value: TimestampingConfig) -> Self {
275        Self {
276            mode: value
277                .mode
278                .map(api::TimestampingMode::from)
279                .unwrap_or_default()
280                .into(),
281            uncapped: value.uncapped,
282        }
283    }
284}
285
286impl From<api::stream_config::Timestamping> for TimestampingConfig {
287    fn from(value: api::stream_config::Timestamping) -> Self {
288        let mode = value.mode().into();
289        let uncapped = value.uncapped;
290        Self { mode, uncapped }
291    }
292}
293
294#[sync_docs]
295#[derive(Debug, Clone, Default)]
296pub struct StreamConfig {
297    pub storage_class: Option<StorageClass>,
298    pub retention_policy: Option<RetentionPolicy>,
299    pub timestamping: Option<TimestampingConfig>,
300}
301
302impl StreamConfig {
303    /// Create a new stream config.
304    pub fn new() -> Self {
305        Self::default()
306    }
307
308    /// Overwrite storage class.
309    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
310        Self {
311            storage_class: Some(storage_class),
312            ..self
313        }
314    }
315
316    /// Overwrite retention policy.
317    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
318        Self {
319            retention_policy: Some(retention_policy),
320            ..self
321        }
322    }
323
324    /// Overwrite timestamping config.
325    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
326        Self {
327            timestamping: Some(timestamping),
328            ..self
329        }
330    }
331}
332
333impl From<StreamConfig> for api::StreamConfig {
334    fn from(value: StreamConfig) -> Self {
335        let StreamConfig {
336            storage_class,
337            retention_policy,
338            timestamping,
339        } = value;
340        Self {
341            storage_class: storage_class
342                .map(api::StorageClass::from)
343                .unwrap_or_default()
344                .into(),
345            retention_policy: retention_policy.map(Into::into),
346            timestamping: timestamping.map(Into::into),
347        }
348    }
349}
350
351impl From<api::StreamConfig> for StreamConfig {
352    fn from(value: api::StreamConfig) -> Self {
353        Self {
354            storage_class: value.storage_class().into(),
355            retention_policy: value.retention_policy.map(Into::into),
356            timestamping: value.timestamping.map(Into::into),
357        }
358    }
359}
360
361#[sync_docs]
362#[derive(Debug, Clone, Copy, PartialEq, Eq)]
363pub enum StorageClass {
364    Standard,
365    Express,
366}
367
368impl From<StorageClass> for api::StorageClass {
369    fn from(value: StorageClass) -> Self {
370        match value {
371            StorageClass::Standard => Self::Standard,
372            StorageClass::Express => Self::Express,
373        }
374    }
375}
376
377impl From<api::StorageClass> for Option<StorageClass> {
378    fn from(value: api::StorageClass) -> Self {
379        match value {
380            api::StorageClass::Unspecified => None,
381            api::StorageClass::Standard => Some(StorageClass::Standard),
382            api::StorageClass::Express => Some(StorageClass::Express),
383        }
384    }
385}
386
387impl FromStr for StorageClass {
388    type Err = ConvertError;
389
390    fn from_str(value: &str) -> Result<Self, Self::Err> {
391        match value {
392            "standard" => Ok(Self::Standard),
393            "express" => Ok(Self::Express),
394            v => Err(format!("unknown storage class: {v}").into()),
395        }
396    }
397}
398
399#[sync_docs(Age = "Age")]
400#[derive(Debug, Clone)]
401pub enum RetentionPolicy {
402    Age(Duration),
403}
404
405impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
406    fn from(value: RetentionPolicy) -> Self {
407        match value {
408            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
409        }
410    }
411}
412
413impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
414    fn from(value: api::stream_config::RetentionPolicy) -> Self {
415        match value {
416            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
417        }
418    }
419}
420
421#[sync_docs]
422#[derive(Debug, Clone, Copy, PartialEq, Eq)]
423pub enum BasinState {
424    Active,
425    Creating,
426    Deleting,
427}
428
429impl From<BasinState> for api::BasinState {
430    fn from(value: BasinState) -> Self {
431        match value {
432            BasinState::Active => Self::Active,
433            BasinState::Creating => Self::Creating,
434            BasinState::Deleting => Self::Deleting,
435        }
436    }
437}
438
439impl From<api::BasinState> for Option<BasinState> {
440    fn from(value: api::BasinState) -> Self {
441        match value {
442            api::BasinState::Unspecified => None,
443            api::BasinState::Active => Some(BasinState::Active),
444            api::BasinState::Creating => Some(BasinState::Creating),
445            api::BasinState::Deleting => Some(BasinState::Deleting),
446        }
447    }
448}
449
450impl std::fmt::Display for BasinState {
451    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452        match self {
453            BasinState::Active => write!(f, "active"),
454            BasinState::Creating => write!(f, "creating"),
455            BasinState::Deleting => write!(f, "deleting"),
456        }
457    }
458}
459
460#[sync_docs]
461#[derive(Debug, Clone)]
462pub struct BasinInfo {
463    pub name: String,
464    pub scope: Option<BasinScope>,
465    pub state: Option<BasinState>,
466}
467
468impl From<BasinInfo> for api::BasinInfo {
469    fn from(value: BasinInfo) -> Self {
470        let BasinInfo { name, scope, state } = value;
471        Self {
472            name,
473            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
474            state: state.map(api::BasinState::from).unwrap_or_default().into(),
475        }
476    }
477}
478
479impl From<api::BasinInfo> for BasinInfo {
480    fn from(value: api::BasinInfo) -> Self {
481        let scope = value.scope().into();
482        let state = value.state().into();
483        let name = value.name;
484        Self { name, scope, state }
485    }
486}
487
488impl TryFrom<api::CreateBasinResponse> for BasinInfo {
489    type Error = ConvertError;
490    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
491        let api::CreateBasinResponse { info } = value;
492        let info = info.ok_or("missing basin info")?;
493        Ok(info.into())
494    }
495}
496
497#[sync_docs]
498#[derive(Debug, Clone, Default)]
499pub struct ListStreamsRequest {
500    pub prefix: String,
501    pub start_after: String,
502    pub limit: Option<usize>,
503}
504
505impl ListStreamsRequest {
506    /// Create a new request.
507    pub fn new() -> Self {
508        Self::default()
509    }
510
511    /// Overwrite prefix.
512    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
513        Self {
514            prefix: prefix.into(),
515            ..self
516        }
517    }
518
519    /// Overwrite start after.
520    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
521        Self {
522            start_after: start_after.into(),
523            ..self
524        }
525    }
526
527    /// Overwrite limit.
528    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
529        Self {
530            limit: limit.into(),
531            ..self
532        }
533    }
534}
535
536impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
537    type Error = ConvertError;
538    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
539        let ListStreamsRequest {
540            prefix,
541            start_after,
542            limit,
543        } = value;
544        Ok(Self {
545            prefix,
546            start_after,
547            limit: limit.map(|n| n as u64),
548        })
549    }
550}
551
552#[sync_docs]
553#[derive(Debug, Clone)]
554pub struct StreamInfo {
555    pub name: String,
556    pub created_at: u32,
557    pub deleted_at: Option<u32>,
558}
559
560impl From<api::StreamInfo> for StreamInfo {
561    fn from(value: api::StreamInfo) -> Self {
562        Self {
563            name: value.name,
564            created_at: value.created_at,
565            deleted_at: value.deleted_at,
566        }
567    }
568}
569
570impl TryFrom<api::CreateStreamResponse> for StreamInfo {
571    type Error = ConvertError;
572
573    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
574        let api::CreateStreamResponse { info } = value;
575        let info = info.ok_or("missing stream info")?;
576        Ok(info.into())
577    }
578}
579
580#[sync_docs]
581#[derive(Debug, Clone)]
582pub struct ListStreamsResponse {
583    pub streams: Vec<StreamInfo>,
584    pub has_more: bool,
585}
586
587impl From<api::ListStreamsResponse> for ListStreamsResponse {
588    fn from(value: api::ListStreamsResponse) -> Self {
589        let api::ListStreamsResponse { streams, has_more } = value;
590        let streams = streams.into_iter().map(Into::into).collect();
591        Self { streams, has_more }
592    }
593}
594
595impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
596    type Error = ConvertError;
597
598    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
599        let api::GetBasinConfigResponse { config } = value;
600        let config = config.ok_or("missing basin config")?;
601        Ok(config.into())
602    }
603}
604
605impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
606    type Error = ConvertError;
607
608    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
609        let api::GetStreamConfigResponse { config } = value;
610        let config = config.ok_or("missing stream config")?;
611        Ok(config.into())
612    }
613}
614
615#[sync_docs]
616#[derive(Debug, Clone)]
617pub struct CreateStreamRequest {
618    pub stream: String,
619    pub config: Option<StreamConfig>,
620}
621
622impl CreateStreamRequest {
623    /// Create a new request with stream name.
624    pub fn new(stream: impl Into<String>) -> Self {
625        Self {
626            stream: stream.into(),
627            config: None,
628        }
629    }
630
631    /// Overwrite stream config.
632    pub fn with_config(self, config: StreamConfig) -> Self {
633        Self {
634            config: Some(config),
635            ..self
636        }
637    }
638}
639
640impl From<CreateStreamRequest> for api::CreateStreamRequest {
641    fn from(value: CreateStreamRequest) -> Self {
642        let CreateStreamRequest { stream, config } = value;
643        Self {
644            stream,
645            config: config.map(Into::into),
646        }
647    }
648}
649
650#[sync_docs]
651#[derive(Debug, Clone, Default)]
652pub struct ListBasinsRequest {
653    pub prefix: String,
654    pub start_after: String,
655    pub limit: Option<usize>,
656}
657
658impl ListBasinsRequest {
659    /// Create a new request.
660    pub fn new() -> Self {
661        Self::default()
662    }
663
664    /// Overwrite prefix.
665    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
666        Self {
667            prefix: prefix.into(),
668            ..self
669        }
670    }
671
672    /// Overwrite start after.
673    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
674        Self {
675            start_after: start_after.into(),
676            ..self
677        }
678    }
679
680    /// Overwrite limit.
681    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
682        Self {
683            limit: limit.into(),
684            ..self
685        }
686    }
687}
688
689impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
690    type Error = ConvertError;
691    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
692        let ListBasinsRequest {
693            prefix,
694            start_after,
695            limit,
696        } = value;
697        Ok(Self {
698            prefix,
699            start_after,
700            limit: limit
701                .map(TryInto::try_into)
702                .transpose()
703                .map_err(|_| "request limit does not fit into u64 bounds")?,
704        })
705    }
706}
707
708#[sync_docs]
709#[derive(Debug, Clone)]
710pub struct ListBasinsResponse {
711    pub basins: Vec<BasinInfo>,
712    pub has_more: bool,
713}
714
715impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
716    type Error = ConvertError;
717    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
718        let api::ListBasinsResponse { basins, has_more } = value;
719        Ok(Self {
720            basins: basins.into_iter().map(Into::into).collect(),
721            has_more,
722        })
723    }
724}
725
726#[sync_docs]
727#[derive(Debug, Clone)]
728pub struct DeleteBasinRequest {
729    pub basin: BasinName,
730    /// Delete basin if it exists else do nothing.
731    pub if_exists: bool,
732}
733
734impl DeleteBasinRequest {
735    /// Create a new request.
736    pub fn new(basin: BasinName) -> Self {
737        Self {
738            basin,
739            if_exists: false,
740        }
741    }
742
743    /// Overwrite the if exists parameter.
744    pub fn with_if_exists(self, if_exists: bool) -> Self {
745        Self { if_exists, ..self }
746    }
747}
748
749impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
750    fn from(value: DeleteBasinRequest) -> Self {
751        let DeleteBasinRequest { basin, .. } = value;
752        Self { basin: basin.0 }
753    }
754}
755
756#[sync_docs]
757#[derive(Debug, Clone)]
758pub struct DeleteStreamRequest {
759    pub stream: String,
760    /// Delete stream if it exists else do nothing.
761    pub if_exists: bool,
762}
763
764impl DeleteStreamRequest {
765    /// Create a new request.
766    pub fn new(stream: impl Into<String>) -> Self {
767        Self {
768            stream: stream.into(),
769            if_exists: false,
770        }
771    }
772
773    /// Overwrite the if exists parameter.
774    pub fn with_if_exists(self, if_exists: bool) -> Self {
775        Self { if_exists, ..self }
776    }
777}
778
779impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
780    fn from(value: DeleteStreamRequest) -> Self {
781        let DeleteStreamRequest { stream, .. } = value;
782        Self { stream }
783    }
784}
785
786#[sync_docs]
787#[derive(Debug, Clone)]
788pub struct ReconfigureBasinRequest {
789    pub basin: BasinName,
790    pub config: Option<BasinConfig>,
791    pub mask: Option<Vec<String>>,
792}
793
794impl ReconfigureBasinRequest {
795    /// Create a new request with basin name.
796    pub fn new(basin: BasinName) -> Self {
797        Self {
798            basin,
799            config: None,
800            mask: None,
801        }
802    }
803
804    /// Overwrite basin config.
805    pub fn with_config(self, config: BasinConfig) -> Self {
806        Self {
807            config: Some(config),
808            ..self
809        }
810    }
811
812    /// Overwrite field mask.
813    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
814        Self {
815            mask: Some(mask.into()),
816            ..self
817        }
818    }
819}
820
821impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
822    fn from(value: ReconfigureBasinRequest) -> Self {
823        let ReconfigureBasinRequest {
824            basin,
825            config,
826            mask,
827        } = value;
828        Self {
829            basin: basin.0,
830            config: config.map(Into::into),
831            mask: mask.map(|paths| prost_types::FieldMask { paths }),
832        }
833    }
834}
835
836impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
837    type Error = ConvertError;
838    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
839        let api::ReconfigureBasinResponse { config } = value;
840        let config = config.ok_or("missing basin config")?;
841        Ok(config.into())
842    }
843}
844
845#[sync_docs]
846#[derive(Debug, Clone)]
847pub struct ReconfigureStreamRequest {
848    pub stream: String,
849    pub config: Option<StreamConfig>,
850    pub mask: Option<Vec<String>>,
851}
852
853impl ReconfigureStreamRequest {
854    /// Create a new request with stream name.
855    pub fn new(stream: impl Into<String>) -> Self {
856        Self {
857            stream: stream.into(),
858            config: None,
859            mask: None,
860        }
861    }
862
863    /// Overwrite stream config.
864    pub fn with_config(self, config: StreamConfig) -> Self {
865        Self {
866            config: Some(config),
867            ..self
868        }
869    }
870
871    /// Overwrite field mask.
872    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
873        Self {
874            mask: Some(mask.into()),
875            ..self
876        }
877    }
878}
879
880impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
881    fn from(value: ReconfigureStreamRequest) -> Self {
882        let ReconfigureStreamRequest {
883            stream,
884            config,
885            mask,
886        } = value;
887        Self {
888            stream,
889            config: config.map(Into::into),
890            mask: mask.map(|paths| prost_types::FieldMask { paths }),
891        }
892    }
893}
894
895impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
896    type Error = ConvertError;
897    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
898        let api::ReconfigureStreamResponse { config } = value;
899        let config = config.ok_or("missing stream config")?;
900        Ok(config.into())
901    }
902}
903
904impl From<api::CheckTailResponse> for StreamPosition {
905    fn from(value: api::CheckTailResponse) -> Self {
906        let api::CheckTailResponse {
907            next_seq_num,
908            last_timestamp,
909        } = value;
910        StreamPosition {
911            seq_num: next_seq_num,
912            timestamp: last_timestamp,
913        }
914    }
915}
916
917/// Position of a record in a stream.
918#[derive(Debug, Clone, PartialEq, Eq)]
919pub struct StreamPosition {
920    /// Sequence number assigned by the service.
921    pub seq_num: u64,
922    /// Timestamp, which may be user-specified or assigned by the service.
923    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
924    pub timestamp: u64,
925}
926
927#[sync_docs]
928#[derive(Debug, Clone, PartialEq, Eq)]
929pub struct Header {
930    pub name: Bytes,
931    pub value: Bytes,
932}
933
934impl Header {
935    /// Create a new header from name and value.
936    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
937        Self {
938            name: name.into(),
939            value: value.into(),
940        }
941    }
942
943    /// Create a new header from value.
944    pub fn from_value(value: impl Into<Bytes>) -> Self {
945        Self {
946            name: Bytes::new(),
947            value: value.into(),
948        }
949    }
950}
951
952impl From<Header> for api::Header {
953    fn from(value: Header) -> Self {
954        let Header { name, value } = value;
955        Self { name, value }
956    }
957}
958
959impl From<api::Header> for Header {
960    fn from(value: api::Header) -> Self {
961        let api::Header { name, value } = value;
962        Self { name, value }
963    }
964}
965
966/// A fencing token can be enforced on append requests.
967///
968/// Must not be more than 16 bytes.
969#[derive(Debug, Clone, Default, PartialEq, Eq)]
970pub struct FencingToken(Bytes);
971
972impl FencingToken {
973    const MAX_BYTES: usize = 16;
974
975    /// Try creating a new fencing token from bytes.
976    pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
977        let bytes = bytes.into();
978        if bytes.len() > Self::MAX_BYTES {
979            Err(format!(
980                "Size of a fencing token cannot exceed {} bytes",
981                Self::MAX_BYTES
982            )
983            .into())
984        } else {
985            Ok(Self(bytes))
986        }
987    }
988
989    /// Generate a random fencing token with `n` bytes.
990    pub fn generate(n: usize) -> Result<Self, ConvertError> {
991        Self::new(
992            rand::rng()
993                .sample_iter(rand::distr::StandardUniform)
994                .take(n)
995                .collect::<Bytes>(),
996        )
997    }
998}
999
1000impl AsRef<Bytes> for FencingToken {
1001    fn as_ref(&self) -> &Bytes {
1002        &self.0
1003    }
1004}
1005
1006impl AsRef<[u8]> for FencingToken {
1007    fn as_ref(&self) -> &[u8] {
1008        &self.0
1009    }
1010}
1011
1012impl Deref for FencingToken {
1013    type Target = [u8];
1014
1015    fn deref(&self) -> &Self::Target {
1016        &self.0
1017    }
1018}
1019
1020impl From<FencingToken> for Bytes {
1021    fn from(value: FencingToken) -> Self {
1022        value.0
1023    }
1024}
1025
1026impl From<FencingToken> for Vec<u8> {
1027    fn from(value: FencingToken) -> Self {
1028        value.0.into()
1029    }
1030}
1031
1032impl TryFrom<Bytes> for FencingToken {
1033    type Error = ConvertError;
1034
1035    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1036        Self::new(value)
1037    }
1038}
1039
1040impl TryFrom<Vec<u8>> for FencingToken {
1041    type Error = ConvertError;
1042
1043    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
1044        Self::new(value)
1045    }
1046}
1047
1048/// Command to send through a `CommandRecord`.
1049#[derive(Debug, Clone)]
1050pub enum Command {
1051    /// Enforce a fencing token.
1052    ///
1053    /// Fencing is strongly consistent, and subsequent appends that specify a
1054    /// fencing token will be rejected if it does not match.
1055    Fence {
1056        /// Fencing token to enforce.
1057        ///
1058        /// Set empty to clear the token.
1059        fencing_token: FencingToken,
1060    },
1061    /// Request a trim till the sequence number.
1062    ///
1063    /// Trimming is eventually consistent, and trimmed records may be visible
1064    /// for a brief period
1065    Trim {
1066        /// Trim point.
1067        ///
1068        /// This sequence number is only allowed to advance, and any regression
1069        /// will be ignored.
1070        seq_num: u64,
1071    },
1072}
1073
1074/// A command record is a special kind of [`AppendRecord`] that can be used to
1075/// send command messages.
1076///
1077/// Such a record is signalled by a sole header with empty name. The header
1078/// value represents the operation and record body acts as the payload.
1079#[derive(Debug, Clone)]
1080pub struct CommandRecord {
1081    /// Command kind.
1082    pub command: Command,
1083    /// Timestamp for the record.
1084    pub timestamp: Option<u64>,
1085}
1086
1087impl CommandRecord {
1088    const FENCE: &[u8] = b"fence";
1089    const TRIM: &[u8] = b"trim";
1090
1091    /// Create a new fence command record.
1092    pub fn fence(fencing_token: FencingToken) -> Self {
1093        Self {
1094            command: Command::Fence { fencing_token },
1095            timestamp: None,
1096        }
1097    }
1098
1099    /// Create a new trim command record.
1100    pub fn trim(seq_num: impl Into<u64>) -> Self {
1101        Self {
1102            command: Command::Trim {
1103                seq_num: seq_num.into(),
1104            },
1105            timestamp: None,
1106        }
1107    }
1108
1109    /// Overwrite timestamp.
1110    pub fn with_timestamp(self, timestamp: u64) -> Self {
1111        Self {
1112            timestamp: Some(timestamp),
1113            ..self
1114        }
1115    }
1116}
1117
1118#[sync_docs]
1119#[derive(Debug, Clone, PartialEq, Eq)]
1120pub struct AppendRecord {
1121    timestamp: Option<u64>,
1122    headers: Vec<Header>,
1123    body: Bytes,
1124    #[cfg(test)]
1125    max_bytes: u64,
1126}
1127
1128metered_impl!(AppendRecord);
1129
1130impl AppendRecord {
1131    const MAX_BYTES: u64 = MIB_BYTES;
1132
1133    fn validated(self) -> Result<Self, ConvertError> {
1134        #[cfg(test)]
1135        let max_bytes = self.max_bytes;
1136        #[cfg(not(test))]
1137        let max_bytes = Self::MAX_BYTES;
1138
1139        if self.metered_bytes() > max_bytes {
1140            Err("AppendRecord should have metered size less than 1 MiB".into())
1141        } else {
1142            Ok(self)
1143        }
1144    }
1145
1146    /// Try creating a new append record with body.
1147    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1148        Self {
1149            timestamp: None,
1150            headers: Vec::new(),
1151            body: body.into(),
1152            #[cfg(test)]
1153            max_bytes: Self::MAX_BYTES,
1154        }
1155        .validated()
1156    }
1157
1158    #[cfg(test)]
1159    pub(crate) fn with_max_bytes(
1160        max_bytes: u64,
1161        body: impl Into<Bytes>,
1162    ) -> Result<Self, ConvertError> {
1163        Self {
1164            timestamp: None,
1165            headers: Vec::new(),
1166            body: body.into(),
1167            max_bytes,
1168        }
1169        .validated()
1170    }
1171
1172    /// Overwrite headers.
1173    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1174        Self {
1175            headers: headers.into(),
1176            ..self
1177        }
1178        .validated()
1179    }
1180
1181    /// Overwrite timestamp.
1182    pub fn with_timestamp(self, timestamp: u64) -> Self {
1183        Self {
1184            timestamp: Some(timestamp),
1185            ..self
1186        }
1187    }
1188
1189    /// Body of the record.
1190    pub fn body(&self) -> &[u8] {
1191        &self.body
1192    }
1193
1194    /// Headers of the record.
1195    pub fn headers(&self) -> &[Header] {
1196        &self.headers
1197    }
1198
1199    /// Timestamp for the record.
1200    pub fn timestamp(&self) -> Option<u64> {
1201        self.timestamp
1202    }
1203
1204    /// Consume the record and return parts.
1205    pub fn into_parts(self) -> AppendRecordParts {
1206        AppendRecordParts {
1207            timestamp: self.timestamp,
1208            headers: self.headers,
1209            body: self.body,
1210        }
1211    }
1212
1213    /// Try creating the record from parts.
1214    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1215        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1216        if let Some(timestamp) = parts.timestamp {
1217            Ok(record.with_timestamp(timestamp))
1218        } else {
1219            Ok(record)
1220        }
1221    }
1222}
1223
1224impl From<AppendRecord> for api::AppendRecord {
1225    fn from(value: AppendRecord) -> Self {
1226        Self {
1227            timestamp: value.timestamp,
1228            headers: value.headers.into_iter().map(Into::into).collect(),
1229            body: value.body,
1230        }
1231    }
1232}
1233
1234impl From<CommandRecord> for AppendRecord {
1235    fn from(value: CommandRecord) -> Self {
1236        let (header_value, body) = match value.command {
1237            Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1238            Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1239        };
1240        AppendRecordParts {
1241            timestamp: value.timestamp,
1242            headers: vec![Header::from_value(header_value)],
1243            body: body.into(),
1244        }
1245        .try_into()
1246        .expect("command record is a valid append record")
1247    }
1248}
1249
1250#[sync_docs(AppendRecordParts = "AppendRecord")]
1251#[derive(Debug, Clone)]
1252pub struct AppendRecordParts {
1253    pub timestamp: Option<u64>,
1254    pub headers: Vec<Header>,
1255    pub body: Bytes,
1256}
1257
1258impl From<AppendRecord> for AppendRecordParts {
1259    fn from(value: AppendRecord) -> Self {
1260        value.into_parts()
1261    }
1262}
1263
1264impl TryFrom<AppendRecordParts> for AppendRecord {
1265    type Error = ConvertError;
1266
1267    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1268        Self::try_from_parts(value)
1269    }
1270}
1271
1272/// A collection of append records that can be sent together in a batch.
1273#[derive(Debug, Clone)]
1274pub struct AppendRecordBatch {
1275    records: Vec<AppendRecord>,
1276    metered_bytes: u64,
1277    max_capacity: usize,
1278    #[cfg(test)]
1279    max_bytes: u64,
1280}
1281
1282impl PartialEq for AppendRecordBatch {
1283    fn eq(&self, other: &Self) -> bool {
1284        if self.records.eq(&other.records) {
1285            assert_eq!(self.metered_bytes, other.metered_bytes);
1286            true
1287        } else {
1288            false
1289        }
1290    }
1291}
1292
1293impl Eq for AppendRecordBatch {}
1294
1295impl Default for AppendRecordBatch {
1296    fn default() -> Self {
1297        Self::new()
1298    }
1299}
1300
1301impl AppendRecordBatch {
1302    /// Maximum number of records that a batch can hold.
1303    ///
1304    /// A record batch cannot be created with a bigger capacity.
1305    pub const MAX_CAPACITY: usize = 1000;
1306
1307    /// Maximum metered bytes of the batch.
1308    pub const MAX_BYTES: u64 = MIB_BYTES;
1309
1310    /// Create an empty record batch.
1311    pub fn new() -> Self {
1312        Self::with_max_capacity(Self::MAX_CAPACITY)
1313    }
1314
1315    /// Create an empty record batch with custom max capacity.
1316    ///
1317    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1318    pub fn with_max_capacity(max_capacity: usize) -> Self {
1319        assert!(
1320            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1321            "Batch capacity must be between 1 and 1000"
1322        );
1323
1324        Self {
1325            records: Vec::with_capacity(max_capacity),
1326            metered_bytes: 0,
1327            max_capacity,
1328            #[cfg(test)]
1329            max_bytes: Self::MAX_BYTES,
1330        }
1331    }
1332
1333    #[cfg(test)]
1334    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1335        #[cfg(test)]
1336        assert!(
1337            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1338            "Batch size must be between 1 byte and 1 MiB"
1339        );
1340
1341        Self {
1342            max_bytes,
1343            ..Self::with_max_capacity(max_capacity)
1344        }
1345    }
1346
1347    /// Try creating a record batch from an iterator.
1348    ///
1349    /// If all the items of the iterator cannot be drained into the batch, the
1350    /// error returned contains a batch containing all records it could fit
1351    /// along-with the left over items from the iterator.
1352    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1353    where
1354        R: Into<AppendRecord>,
1355        T: IntoIterator<Item = R>,
1356    {
1357        let mut records = Self::new();
1358        let mut pending = Vec::new();
1359
1360        let mut iter = iter.into_iter();
1361
1362        for record in iter.by_ref() {
1363            if let Err(record) = records.push(record) {
1364                pending.push(record);
1365                break;
1366            }
1367        }
1368
1369        if pending.is_empty() {
1370            Ok(records)
1371        } else {
1372            pending.extend(iter.map(Into::into));
1373            Err((records, pending))
1374        }
1375    }
1376
1377    /// Returns true if the batch contains no records.
1378    pub fn is_empty(&self) -> bool {
1379        if self.records.is_empty() {
1380            assert_eq!(self.metered_bytes, 0);
1381            true
1382        } else {
1383            false
1384        }
1385    }
1386
1387    /// Returns the number of records contained in the batch.
1388    pub fn len(&self) -> usize {
1389        self.records.len()
1390    }
1391
1392    #[cfg(test)]
1393    fn max_bytes(&self) -> u64 {
1394        self.max_bytes
1395    }
1396
1397    #[cfg(not(test))]
1398    fn max_bytes(&self) -> u64 {
1399        Self::MAX_BYTES
1400    }
1401
1402    /// Returns true if the batch cannot fit any more records.
1403    pub fn is_full(&self) -> bool {
1404        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1405    }
1406
1407    /// Try to append a new record into the batch.
1408    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1409        assert!(self.records.len() <= self.max_capacity);
1410        assert!(self.metered_bytes <= self.max_bytes());
1411
1412        let record = record.into();
1413        let record_size = record.metered_bytes();
1414        if self.records.len() >= self.max_capacity
1415            || self.metered_bytes + record_size > self.max_bytes()
1416        {
1417            Err(record)
1418        } else {
1419            self.records.push(record);
1420            self.metered_bytes += record_size;
1421            Ok(())
1422        }
1423    }
1424}
1425
1426impl MeteredBytes for AppendRecordBatch {
1427    fn metered_bytes(&self) -> u64 {
1428        self.metered_bytes
1429    }
1430}
1431
1432impl IntoIterator for AppendRecordBatch {
1433    type Item = AppendRecord;
1434    type IntoIter = std::vec::IntoIter<Self::Item>;
1435
1436    fn into_iter(self) -> Self::IntoIter {
1437        self.records.into_iter()
1438    }
1439}
1440
1441impl<'a> IntoIterator for &'a AppendRecordBatch {
1442    type Item = &'a AppendRecord;
1443    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1444
1445    fn into_iter(self) -> Self::IntoIter {
1446        self.records.iter()
1447    }
1448}
1449
1450impl AsRef<[AppendRecord]> for AppendRecordBatch {
1451    fn as_ref(&self) -> &[AppendRecord] {
1452        &self.records
1453    }
1454}
1455
1456#[sync_docs]
1457#[derive(Debug, Default, Clone)]
1458pub struct AppendInput {
1459    pub records: AppendRecordBatch,
1460    pub match_seq_num: Option<u64>,
1461    pub fencing_token: Option<FencingToken>,
1462}
1463
1464impl MeteredBytes for AppendInput {
1465    fn metered_bytes(&self) -> u64 {
1466        self.records.metered_bytes()
1467    }
1468}
1469
1470impl AppendInput {
1471    /// Create a new append input from record batch.
1472    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1473        Self {
1474            records: records.into(),
1475            match_seq_num: None,
1476            fencing_token: None,
1477        }
1478    }
1479
1480    /// Overwrite match sequence number.
1481    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1482        Self {
1483            match_seq_num: Some(match_seq_num.into()),
1484            ..self
1485        }
1486    }
1487
1488    /// Overwrite fencing token.
1489    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1490        Self {
1491            fencing_token: Some(fencing_token),
1492            ..self
1493        }
1494    }
1495
1496    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1497        let Self {
1498            records,
1499            match_seq_num,
1500            fencing_token,
1501        } = self;
1502
1503        api::AppendInput {
1504            stream: stream.into(),
1505            records: records.into_iter().map(Into::into).collect(),
1506            match_seq_num,
1507            fencing_token: fencing_token.map(|f| f.0),
1508        }
1509    }
1510}
1511
1512/// Acknowledgment to an append request.
1513#[derive(Debug, Clone)]
1514pub struct AppendAck {
1515    /// Sequence number and timestamp of the first record that was appended.
1516    pub start: StreamPosition,
1517    /// Sequence number of the last record that was appended + 1,
1518    /// and timestamp of the last record that was appended.
1519    /// The difference between `end.seq_num` and `start.seq_num`
1520    /// will be the number of records appended.
1521    pub end: StreamPosition,
1522    /// Sequence number that will be assigned to the next record on the stream,
1523    /// and timestamp of the last record on the stream.
1524    /// This can be greater than the `end` position in case of concurrent appends.
1525    pub tail: StreamPosition,
1526}
1527
1528impl From<api::AppendOutput> for AppendAck {
1529    fn from(value: api::AppendOutput) -> Self {
1530        let api::AppendOutput {
1531            start_seq_num,
1532            start_timestamp,
1533            end_seq_num,
1534            end_timestamp,
1535            next_seq_num,
1536            last_timestamp,
1537        } = value;
1538        let start = StreamPosition {
1539            seq_num: start_seq_num,
1540            timestamp: start_timestamp,
1541        };
1542        let end = StreamPosition {
1543            seq_num: end_seq_num,
1544            timestamp: end_timestamp,
1545        };
1546        let tail = StreamPosition {
1547            seq_num: next_seq_num,
1548            timestamp: last_timestamp,
1549        };
1550        Self { start, end, tail }
1551    }
1552}
1553
1554impl TryFrom<api::AppendResponse> for AppendAck {
1555    type Error = ConvertError;
1556    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1557        let api::AppendResponse { output } = value;
1558        let output = output.ok_or("missing append output")?;
1559        Ok(output.into())
1560    }
1561}
1562
1563impl TryFrom<api::AppendSessionResponse> for AppendAck {
1564    type Error = ConvertError;
1565    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1566        let api::AppendSessionResponse { output } = value;
1567        let output = output.ok_or("missing append output")?;
1568        Ok(output.into())
1569    }
1570}
1571
1572#[sync_docs]
1573#[derive(Debug, Clone, Default)]
1574pub struct ReadLimit {
1575    pub count: Option<u64>,
1576    pub bytes: Option<u64>,
1577}
1578
1579impl ReadLimit {
1580    /// Create a new read limit.
1581    pub fn new() -> Self {
1582        Self::default()
1583    }
1584
1585    /// Overwrite count limit.
1586    pub fn with_count(self, count: u64) -> Self {
1587        Self {
1588            count: Some(count),
1589            ..self
1590        }
1591    }
1592
1593    /// Overwrite bytes limit.
1594    pub fn with_bytes(self, bytes: u64) -> Self {
1595        Self {
1596            bytes: Some(bytes),
1597            ..self
1598        }
1599    }
1600}
1601
1602/// Starting point for read requests.
1603#[derive(Debug, Clone)]
1604pub enum ReadStart {
1605    /// Sequence number.
1606    SeqNum(u64),
1607    /// Timestamp.
1608    Timestamp(u64),
1609    /// Number of records before the tail, i.e. the next sequence number.
1610    TailOffset(u64),
1611}
1612
1613impl Default for ReadStart {
1614    fn default() -> Self {
1615        Self::SeqNum(0)
1616    }
1617}
1618
1619impl From<ReadStart> for api::read_request::Start {
1620    fn from(start: ReadStart) -> Self {
1621        match start {
1622            ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1623            ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1624            ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1625        }
1626    }
1627}
1628
1629impl From<ReadStart> for api::read_session_request::Start {
1630    fn from(start: ReadStart) -> Self {
1631        match start {
1632            ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1633            ReadStart::Timestamp(timestamp) => {
1634                api::read_session_request::Start::Timestamp(timestamp)
1635            }
1636            ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1637        }
1638    }
1639}
1640
1641#[sync_docs]
1642#[derive(Debug, Clone, Default)]
1643pub struct ReadRequest {
1644    pub start: ReadStart,
1645    pub limit: ReadLimit,
1646}
1647
1648impl ReadRequest {
1649    /// Create a new request with the specified starting point.
1650    pub fn new(start: ReadStart) -> Self {
1651        Self {
1652            start,
1653            ..Default::default()
1654        }
1655    }
1656
1657    /// Overwrite limit.
1658    pub fn with_limit(self, limit: ReadLimit) -> Self {
1659        Self { limit, ..self }
1660    }
1661}
1662
1663impl ReadRequest {
1664    pub(crate) fn try_into_api_type(
1665        self,
1666        stream: impl Into<String>,
1667    ) -> Result<api::ReadRequest, ConvertError> {
1668        let Self { start, limit } = self;
1669
1670        let limit = if limit.count > Some(1000) {
1671            Err("read limit: count must not exceed 1000 for unary request")
1672        } else if limit.bytes > Some(MIB_BYTES) {
1673            Err("read limit: bytes must not exceed 1MiB for unary request")
1674        } else {
1675            Ok(api::ReadLimit {
1676                count: limit.count,
1677                bytes: limit.bytes,
1678            })
1679        }?;
1680
1681        Ok(api::ReadRequest {
1682            stream: stream.into(),
1683            start: Some(start.into()),
1684            limit: Some(limit),
1685        })
1686    }
1687}
1688
1689#[sync_docs]
1690#[derive(Debug, Clone)]
1691pub struct SequencedRecord {
1692    pub seq_num: u64,
1693    pub timestamp: u64,
1694    pub headers: Vec<Header>,
1695    pub body: Bytes,
1696}
1697
1698metered_impl!(SequencedRecord);
1699
1700impl From<api::SequencedRecord> for SequencedRecord {
1701    fn from(value: api::SequencedRecord) -> Self {
1702        let api::SequencedRecord {
1703            seq_num,
1704            timestamp,
1705            headers,
1706            body,
1707        } = value;
1708        Self {
1709            seq_num,
1710            timestamp,
1711            headers: headers.into_iter().map(Into::into).collect(),
1712            body,
1713        }
1714    }
1715}
1716
1717impl SequencedRecord {
1718    /// Try representing the sequenced record as a command record.
1719    pub fn as_command_record(&self) -> Option<CommandRecord> {
1720        if self.headers.len() != 1 {
1721            return None;
1722        }
1723
1724        let header = self.headers.first().expect("pre-validated length");
1725
1726        if !header.name.is_empty() {
1727            return None;
1728        }
1729
1730        match header.value.as_ref() {
1731            CommandRecord::FENCE => {
1732                let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1733                Some(CommandRecord {
1734                    command: Command::Fence { fencing_token },
1735                    timestamp: Some(self.timestamp),
1736                })
1737            }
1738            CommandRecord::TRIM => {
1739                let body: &[u8] = &self.body;
1740                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1741                Some(CommandRecord {
1742                    command: Command::Trim { seq_num },
1743                    timestamp: Some(self.timestamp),
1744                })
1745            }
1746            _ => None,
1747        }
1748    }
1749}
1750
1751#[sync_docs]
1752#[derive(Debug, Clone)]
1753pub struct SequencedRecordBatch {
1754    pub records: Vec<SequencedRecord>,
1755}
1756
1757impl MeteredBytes for SequencedRecordBatch {
1758    fn metered_bytes(&self) -> u64 {
1759        self.records.metered_bytes()
1760    }
1761}
1762
1763impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1764    fn from(value: api::SequencedRecordBatch) -> Self {
1765        let api::SequencedRecordBatch { records } = value;
1766        Self {
1767            records: records.into_iter().map(Into::into).collect(),
1768        }
1769    }
1770}
1771
1772#[sync_docs(ReadOutput = "Output")]
1773#[derive(Debug, Clone)]
1774pub enum ReadOutput {
1775    Batch(SequencedRecordBatch),
1776    NextSeqNum(u64),
1777}
1778
1779impl From<api::read_output::Output> for ReadOutput {
1780    fn from(value: api::read_output::Output) -> Self {
1781        match value {
1782            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1783            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1784        }
1785    }
1786}
1787
1788impl TryFrom<api::ReadOutput> for ReadOutput {
1789    type Error = ConvertError;
1790    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1791        let api::ReadOutput { output } = value;
1792        let output = output.ok_or("missing read output")?;
1793        Ok(output.into())
1794    }
1795}
1796
1797impl TryFrom<api::ReadResponse> for ReadOutput {
1798    type Error = ConvertError;
1799    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1800        let api::ReadResponse { output } = value;
1801        let output = output.ok_or("missing output in read response")?;
1802        output.try_into()
1803    }
1804}
1805
1806#[sync_docs]
1807#[derive(Debug, Clone, Default)]
1808pub struct ReadSessionRequest {
1809    pub start: ReadStart,
1810    pub limit: ReadLimit,
1811}
1812
1813impl ReadSessionRequest {
1814    /// Create a new request with the specified starting point.
1815    pub fn new(start: ReadStart) -> Self {
1816        Self {
1817            start,
1818            ..Default::default()
1819        }
1820    }
1821
1822    /// Overwrite limit.
1823    pub fn with_limit(self, limit: ReadLimit) -> Self {
1824        Self { limit, ..self }
1825    }
1826
1827    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1828        let Self { start, limit } = self;
1829        api::ReadSessionRequest {
1830            stream: stream.into(),
1831            start: Some(start.into()),
1832            limit: Some(api::ReadLimit {
1833                count: limit.count,
1834                bytes: limit.bytes,
1835            }),
1836            heartbeats: false,
1837        }
1838    }
1839}
1840
1841impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1842    type Error = ConvertError;
1843    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1844        let api::ReadSessionResponse { output } = value;
1845        let output = output.ok_or("missing output in read session response")?;
1846        output.try_into()
1847    }
1848}
1849
1850/// Name of a basin.
1851///
1852/// Must be between 8 and 48 characters in length. Must comprise lowercase
1853/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1854#[derive(Debug, Clone)]
1855pub struct BasinName(String);
1856
1857impl AsRef<str> for BasinName {
1858    fn as_ref(&self) -> &str {
1859        &self.0
1860    }
1861}
1862
1863impl Deref for BasinName {
1864    type Target = str;
1865    fn deref(&self) -> &Self::Target {
1866        &self.0
1867    }
1868}
1869
1870impl TryFrom<String> for BasinName {
1871    type Error = ConvertError;
1872
1873    fn try_from(name: String) -> Result<Self, Self::Error> {
1874        if name.len() < 8 || name.len() > 48 {
1875            return Err("Basin name must be between 8 and 48 characters in length".into());
1876        }
1877
1878        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1879        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1880            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1881                .expect("Failed to compile basin name regex")
1882        });
1883
1884        if !regex.is_match(&name) {
1885            return Err(
1886                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1887                It cannot begin or end with a hyphen."
1888                    .into(),
1889            );
1890        }
1891
1892        Ok(Self(name))
1893    }
1894}
1895
1896impl FromStr for BasinName {
1897    type Err = ConvertError;
1898
1899    fn from_str(s: &str) -> Result<Self, Self::Err> {
1900        s.to_string().try_into()
1901    }
1902}
1903
1904impl std::fmt::Display for BasinName {
1905    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1906        f.write_str(&self.0)
1907    }
1908}
1909
1910/// Access token ID.
1911/// Must be between 1 and 96 characters.
1912#[derive(Debug, Clone)]
1913pub struct AccessTokenId(String);
1914
1915impl AsRef<str> for AccessTokenId {
1916    fn as_ref(&self) -> &str {
1917        &self.0
1918    }
1919}
1920
1921impl Deref for AccessTokenId {
1922    type Target = str;
1923
1924    fn deref(&self) -> &Self::Target {
1925        &self.0
1926    }
1927}
1928
1929impl TryFrom<String> for AccessTokenId {
1930    type Error = ConvertError;
1931
1932    fn try_from(name: String) -> Result<Self, Self::Error> {
1933        if name.is_empty() {
1934            return Err("Access token ID must not be empty".into());
1935        }
1936
1937        if name.len() > 96 {
1938            return Err("Access token ID must not exceed 96 characters".into());
1939        }
1940
1941        Ok(Self(name))
1942    }
1943}
1944
1945impl From<AccessTokenId> for String {
1946    fn from(value: AccessTokenId) -> Self {
1947        value.0
1948    }
1949}
1950
1951impl FromStr for AccessTokenId {
1952    type Err = ConvertError;
1953
1954    fn from_str(s: &str) -> Result<Self, Self::Err> {
1955        s.to_string().try_into()
1956    }
1957}
1958
1959impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1960    fn from(value: AccessTokenInfo) -> Self {
1961        Self {
1962            info: Some(value.into()),
1963        }
1964    }
1965}
1966
1967#[sync_docs]
1968#[derive(Debug, Clone)]
1969pub struct AccessTokenInfo {
1970    pub id: AccessTokenId,
1971    pub expires_at: Option<u32>,
1972    pub auto_prefix_streams: bool,
1973    pub scope: Option<AccessTokenScope>,
1974}
1975
1976impl AccessTokenInfo {
1977    /// Create a new access token info.
1978    pub fn new(id: AccessTokenId) -> Self {
1979        Self {
1980            id,
1981            expires_at: None,
1982            auto_prefix_streams: false,
1983            scope: None,
1984        }
1985    }
1986
1987    /// Overwrite expiration time.
1988    pub fn with_expires_at(self, expires_at: u32) -> Self {
1989        Self {
1990            expires_at: Some(expires_at),
1991            ..self
1992        }
1993    }
1994
1995    /// Overwrite auto prefix streams.
1996    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1997        Self {
1998            auto_prefix_streams,
1999            ..self
2000        }
2001    }
2002
2003    /// Overwrite scope.
2004    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2005        Self {
2006            scope: Some(scope),
2007            ..self
2008        }
2009    }
2010}
2011
2012impl From<AccessTokenInfo> for api::AccessTokenInfo {
2013    fn from(value: AccessTokenInfo) -> Self {
2014        let AccessTokenInfo {
2015            id,
2016            expires_at,
2017            auto_prefix_streams,
2018            scope,
2019        } = value;
2020        Self {
2021            id: id.into(),
2022            expires_at,
2023            auto_prefix_streams,
2024            scope: scope.map(Into::into),
2025        }
2026    }
2027}
2028
2029impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2030    type Error = ConvertError;
2031
2032    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2033        let api::AccessTokenInfo {
2034            id,
2035            expires_at,
2036            auto_prefix_streams,
2037            scope,
2038        } = value;
2039        Ok(Self {
2040            id: id.try_into()?,
2041            expires_at,
2042            auto_prefix_streams,
2043            scope: scope.map(Into::into),
2044        })
2045    }
2046}
2047
2048#[sync_docs]
2049#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2050pub enum Operation {
2051    ListBasins,
2052    CreateBasin,
2053    DeleteBasin,
2054    ReconfigureBasin,
2055    GetBasinConfig,
2056    IssueAccessToken,
2057    RevokeAccessToken,
2058    ListAccessTokens,
2059    ListStreams,
2060    CreateStream,
2061    DeleteStream,
2062    GetStreamConfig,
2063    ReconfigureStream,
2064    CheckTail,
2065    Append,
2066    Read,
2067    Trim,
2068    Fence,
2069}
2070
2071impl FromStr for Operation {
2072    type Err = ConvertError;
2073
2074    fn from_str(s: &str) -> Result<Self, Self::Err> {
2075        match s.to_lowercase().as_str() {
2076            "list-basins" => Ok(Self::ListBasins),
2077            "create-basin" => Ok(Self::CreateBasin),
2078            "delete-basin" => Ok(Self::DeleteBasin),
2079            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2080            "get-basin-config" => Ok(Self::GetBasinConfig),
2081            "issue-access-token" => Ok(Self::IssueAccessToken),
2082            "revoke-access-token" => Ok(Self::RevokeAccessToken),
2083            "list-access-tokens" => Ok(Self::ListAccessTokens),
2084            "list-streams" => Ok(Self::ListStreams),
2085            "create-stream" => Ok(Self::CreateStream),
2086            "delete-stream" => Ok(Self::DeleteStream),
2087            "get-stream-config" => Ok(Self::GetStreamConfig),
2088            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2089            "check-tail" => Ok(Self::CheckTail),
2090            "append" => Ok(Self::Append),
2091            "read" => Ok(Self::Read),
2092            "trim" => Ok(Self::Trim),
2093            "fence" => Ok(Self::Fence),
2094            _ => Err("invalid operation".into()),
2095        }
2096    }
2097}
2098
2099impl From<Operation> for api::Operation {
2100    fn from(value: Operation) -> Self {
2101        match value {
2102            Operation::ListBasins => Self::ListBasins,
2103            Operation::CreateBasin => Self::CreateBasin,
2104            Operation::DeleteBasin => Self::DeleteBasin,
2105            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2106            Operation::GetBasinConfig => Self::GetBasinConfig,
2107            Operation::IssueAccessToken => Self::IssueAccessToken,
2108            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2109            Operation::ListAccessTokens => Self::ListAccessTokens,
2110            Operation::ListStreams => Self::ListStreams,
2111            Operation::CreateStream => Self::CreateStream,
2112            Operation::DeleteStream => Self::DeleteStream,
2113            Operation::GetStreamConfig => Self::GetStreamConfig,
2114            Operation::ReconfigureStream => Self::ReconfigureStream,
2115            Operation::CheckTail => Self::CheckTail,
2116            Operation::Append => Self::Append,
2117            Operation::Read => Self::Read,
2118            Operation::Trim => Self::Trim,
2119            Operation::Fence => Self::Fence,
2120        }
2121    }
2122}
2123
2124impl From<api::Operation> for Option<Operation> {
2125    fn from(value: api::Operation) -> Self {
2126        match value {
2127            api::Operation::Unspecified => None,
2128            api::Operation::ListBasins => Some(Operation::ListBasins),
2129            api::Operation::CreateBasin => Some(Operation::CreateBasin),
2130            api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2131            api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2132            api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2133            api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2134            api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2135            api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2136            api::Operation::ListStreams => Some(Operation::ListStreams),
2137            api::Operation::CreateStream => Some(Operation::CreateStream),
2138            api::Operation::DeleteStream => Some(Operation::DeleteStream),
2139            api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2140            api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2141            api::Operation::CheckTail => Some(Operation::CheckTail),
2142            api::Operation::Append => Some(Operation::Append),
2143            api::Operation::Read => Some(Operation::Read),
2144            api::Operation::Trim => Some(Operation::Trim),
2145            api::Operation::Fence => Some(Operation::Fence),
2146        }
2147    }
2148}
2149
2150#[sync_docs]
2151#[derive(Debug, Clone, Default)]
2152pub struct AccessTokenScope {
2153    pub basins: Option<ResourceSet>,
2154    pub streams: Option<ResourceSet>,
2155    pub access_tokens: Option<ResourceSet>,
2156    pub op_groups: Option<PermittedOperationGroups>,
2157    pub ops: HashSet<Operation>,
2158}
2159
2160impl AccessTokenScope {
2161    /// Create a new access token scope.
2162    pub fn new() -> Self {
2163        Self::default()
2164    }
2165
2166    /// Overwrite resource set for access tokens.
2167    pub fn with_basins(self, basins: ResourceSet) -> Self {
2168        Self {
2169            basins: Some(basins),
2170            ..self
2171        }
2172    }
2173
2174    /// Overwrite resource set for streams.
2175    pub fn with_streams(self, streams: ResourceSet) -> Self {
2176        Self {
2177            streams: Some(streams),
2178            ..self
2179        }
2180    }
2181
2182    /// Overwrite resource set for access tokens.
2183    pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2184        Self {
2185            access_tokens: Some(access_tokens),
2186            ..self
2187        }
2188    }
2189
2190    /// Overwrite operation groups.
2191    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2192        Self {
2193            op_groups: Some(op_groups),
2194            ..self
2195        }
2196    }
2197
2198    /// Overwrite operations.
2199    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2200        Self {
2201            ops: ops.into_iter().collect(),
2202            ..self
2203        }
2204    }
2205
2206    /// Add an operation to operations.
2207    pub fn with_op(self, op: Operation) -> Self {
2208        let mut ops = self.ops;
2209        ops.insert(op);
2210        Self { ops, ..self }
2211    }
2212}
2213
2214impl From<AccessTokenScope> for api::AccessTokenScope {
2215    fn from(value: AccessTokenScope) -> Self {
2216        let AccessTokenScope {
2217            basins,
2218            streams,
2219            access_tokens,
2220            op_groups,
2221            ops,
2222        } = value;
2223        Self {
2224            basins: basins.map(Into::into),
2225            streams: streams.map(Into::into),
2226            access_tokens: access_tokens.map(Into::into),
2227            op_groups: op_groups.map(Into::into),
2228            ops: ops
2229                .into_iter()
2230                .map(api::Operation::from)
2231                .map(Into::into)
2232                .collect(),
2233        }
2234    }
2235}
2236
2237impl From<api::AccessTokenScope> for AccessTokenScope {
2238    fn from(value: api::AccessTokenScope) -> Self {
2239        let api::AccessTokenScope {
2240            basins,
2241            streams,
2242            access_tokens,
2243            op_groups,
2244            ops,
2245        } = value;
2246        Self {
2247            basins: basins.and_then(|set| set.matching.map(Into::into)),
2248            streams: streams.and_then(|set| set.matching.map(Into::into)),
2249            access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2250            op_groups: op_groups.map(Into::into),
2251            ops: ops
2252                .into_iter()
2253                .map(api::Operation::try_from)
2254                .flat_map(Result::ok)
2255                .flat_map(<Option<Operation>>::from)
2256                .collect(),
2257        }
2258    }
2259}
2260
2261impl From<ResourceSet> for api::ResourceSet {
2262    fn from(value: ResourceSet) -> Self {
2263        Self {
2264            matching: Some(value.into()),
2265        }
2266    }
2267}
2268
2269#[sync_docs(ResourceSet = "Matching")]
2270#[derive(Debug, Clone)]
2271pub enum ResourceSet {
2272    Exact(String),
2273    Prefix(String),
2274}
2275
2276impl From<ResourceSet> for api::resource_set::Matching {
2277    fn from(value: ResourceSet) -> Self {
2278        match value {
2279            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2280            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2281        }
2282    }
2283}
2284
2285impl From<api::resource_set::Matching> for ResourceSet {
2286    fn from(value: api::resource_set::Matching) -> Self {
2287        match value {
2288            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2289            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2290        }
2291    }
2292}
2293
2294#[sync_docs]
2295#[derive(Debug, Clone, Default)]
2296pub struct PermittedOperationGroups {
2297    pub account: Option<ReadWritePermissions>,
2298    pub basin: Option<ReadWritePermissions>,
2299    pub stream: Option<ReadWritePermissions>,
2300}
2301
2302impl PermittedOperationGroups {
2303    /// Create a new permitted operation groups.
2304    pub fn new() -> Self {
2305        Self::default()
2306    }
2307
2308    /// Overwrite account read-write permissions.
2309    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2310        Self {
2311            account: Some(account),
2312            ..self
2313        }
2314    }
2315
2316    /// Overwrite basin read-write permissions.
2317    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2318        Self {
2319            basin: Some(basin),
2320            ..self
2321        }
2322    }
2323
2324    /// Overwrite stream read-write permissions.
2325    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2326        Self {
2327            stream: Some(stream),
2328            ..self
2329        }
2330    }
2331}
2332
2333impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2334    fn from(value: PermittedOperationGroups) -> Self {
2335        let PermittedOperationGroups {
2336            account,
2337            basin,
2338            stream,
2339        } = value;
2340        Self {
2341            account: account.map(Into::into),
2342            basin: basin.map(Into::into),
2343            stream: stream.map(Into::into),
2344        }
2345    }
2346}
2347
2348impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2349    fn from(value: api::PermittedOperationGroups) -> Self {
2350        let api::PermittedOperationGroups {
2351            account,
2352            basin,
2353            stream,
2354        } = value;
2355        Self {
2356            account: account.map(Into::into),
2357            basin: basin.map(Into::into),
2358            stream: stream.map(Into::into),
2359        }
2360    }
2361}
2362
2363#[sync_docs]
2364#[derive(Debug, Clone, Default)]
2365pub struct ReadWritePermissions {
2366    pub read: bool,
2367    pub write: bool,
2368}
2369
2370impl ReadWritePermissions {
2371    /// Create a new read-write permission.
2372    pub fn new() -> Self {
2373        Self::default()
2374    }
2375
2376    /// Overwrite read permission.
2377    pub fn with_read(self, read: bool) -> Self {
2378        Self { read, ..self }
2379    }
2380
2381    /// Overwrite write permission.
2382    pub fn with_write(self, write: bool) -> Self {
2383        Self { write, ..self }
2384    }
2385}
2386
2387impl From<ReadWritePermissions> for api::ReadWritePermissions {
2388    fn from(value: ReadWritePermissions) -> Self {
2389        let ReadWritePermissions { read, write } = value;
2390        Self { read, write }
2391    }
2392}
2393
2394impl From<api::ReadWritePermissions> for ReadWritePermissions {
2395    fn from(value: api::ReadWritePermissions) -> Self {
2396        let api::ReadWritePermissions { read, write } = value;
2397        Self { read, write }
2398    }
2399}
2400
2401impl From<api::IssueAccessTokenResponse> for String {
2402    fn from(value: api::IssueAccessTokenResponse) -> Self {
2403        value.access_token
2404    }
2405}
2406
2407impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2408    fn from(value: AccessTokenId) -> Self {
2409        Self { id: value.into() }
2410    }
2411}
2412
2413impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2414    type Error = ConvertError;
2415    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2416        let token_info = value.info.ok_or("access token info is missing")?;
2417        token_info.try_into()
2418    }
2419}
2420
2421#[sync_docs]
2422#[derive(Debug, Clone, Default)]
2423pub struct ListAccessTokensRequest {
2424    pub prefix: String,
2425    pub start_after: String,
2426    pub limit: Option<usize>,
2427}
2428
2429impl ListAccessTokensRequest {
2430    /// Create a new request with prefix.
2431    pub fn new() -> Self {
2432        Self::default()
2433    }
2434
2435    /// Overwrite prefix.
2436    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2437        Self {
2438            prefix: prefix.into(),
2439            ..self
2440        }
2441    }
2442
2443    /// Overwrite start after.
2444    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2445        Self {
2446            start_after: start_after.into(),
2447            ..self
2448        }
2449    }
2450
2451    /// Overwrite limit.
2452    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2453        Self {
2454            limit: limit.into(),
2455            ..self
2456        }
2457    }
2458}
2459
2460impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2461    type Error = ConvertError;
2462    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2463        let ListAccessTokensRequest {
2464            prefix,
2465            start_after,
2466            limit,
2467        } = value;
2468        Ok(Self {
2469            prefix,
2470            start_after,
2471            limit: limit
2472                .map(TryInto::try_into)
2473                .transpose()
2474                .map_err(|_| "request limit does not fit into u64 bounds")?,
2475        })
2476    }
2477}
2478
2479#[sync_docs]
2480#[derive(Debug, Clone)]
2481pub struct ListAccessTokensResponse {
2482    pub access_tokens: Vec<AccessTokenInfo>,
2483    pub has_more: bool,
2484}
2485
2486impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2487    type Error = ConvertError;
2488    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2489        let api::ListAccessTokensResponse {
2490            access_tokens,
2491            has_more,
2492        } = value;
2493        let access_tokens = access_tokens
2494            .into_iter()
2495            .map(TryInto::try_into)
2496            .collect::<Result<Vec<_>, _>>()?;
2497        Ok(Self {
2498            access_tokens,
2499            has_more,
2500        })
2501    }
2502}