s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
14
15/// Error related to conversion from one type to another.
16#[derive(Debug, Clone, thiserror::Error)]
17#[error("{0}")]
18pub struct ConvertError(String);
19
20impl<T: Into<String>> From<T> for ConvertError {
21    fn from(value: T) -> Self {
22        Self(value.into())
23    }
24}
25
26/// Metered size of the object in bytes.
27///
28/// Bytes are calculated using the "metered bytes" formula:
29///
30/// ```python
31/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
32/// ```
33pub trait MeteredBytes {
34    /// Return the metered bytes of the object.
35    fn metered_bytes(&self) -> u64;
36}
37
38impl<T: MeteredBytes> MeteredBytes for Vec<T> {
39    fn metered_bytes(&self) -> u64 {
40        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
41    }
42}
43
44macro_rules! metered_impl {
45    ($ty:ty) => {
46        impl MeteredBytes for $ty {
47            fn metered_bytes(&self) -> u64 {
48                let bytes = 8
49                    + (2 * self.headers.len())
50                    + self
51                        .headers
52                        .iter()
53                        .map(|h| h.name.len() + h.value.len())
54                        .sum::<usize>()
55                    + self.body.len();
56                bytes as u64
57            }
58        }
59    };
60}
61
62#[sync_docs]
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum BasinScope {
65    AwsUsEast1,
66}
67
68impl From<BasinScope> for api::BasinScope {
69    fn from(value: BasinScope) -> Self {
70        match value {
71            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
72        }
73    }
74}
75
76impl From<api::BasinScope> for Option<BasinScope> {
77    fn from(value: api::BasinScope) -> Self {
78        match value {
79            api::BasinScope::Unspecified => None,
80            api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
81        }
82    }
83}
84
85impl FromStr for BasinScope {
86    type Err = ConvertError;
87    fn from_str(value: &str) -> Result<Self, Self::Err> {
88        match value {
89            "aws:us-east-1" => Ok(Self::AwsUsEast1),
90            _ => Err("invalid basin scope value".into()),
91        }
92    }
93}
94
95#[sync_docs]
96#[derive(Debug, Clone)]
97pub struct CreateBasinRequest {
98    pub basin: BasinName,
99    pub config: Option<BasinConfig>,
100    pub scope: Option<BasinScope>,
101}
102
103impl CreateBasinRequest {
104    /// Create a new request with basin name.
105    pub fn new(basin: BasinName) -> Self {
106        Self {
107            basin,
108            config: None,
109            scope: None,
110        }
111    }
112
113    /// Overwrite basin configuration.
114    pub fn with_config(self, config: BasinConfig) -> Self {
115        Self {
116            config: Some(config),
117            ..self
118        }
119    }
120
121    /// Overwrite basin scope.
122    pub fn with_scope(self, scope: BasinScope) -> Self {
123        Self {
124            scope: Some(scope),
125            ..self
126        }
127    }
128}
129
130impl From<CreateBasinRequest> for api::CreateBasinRequest {
131    fn from(value: CreateBasinRequest) -> Self {
132        let CreateBasinRequest {
133            basin,
134            config,
135            scope,
136        } = value;
137        Self {
138            basin: basin.0,
139            config: config.map(Into::into),
140            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
141        }
142    }
143}
144
145#[sync_docs]
146#[derive(Debug, Clone, Default)]
147pub struct BasinConfig {
148    pub default_stream_config: Option<StreamConfig>,
149    pub create_stream_on_append: bool,
150    pub create_stream_on_read: bool,
151}
152
153impl BasinConfig {
154    /// Create a new basin config.
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Overwrite the default stream config.
160    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
161        Self {
162            default_stream_config: Some(default_stream_config),
163            ..self
164        }
165    }
166
167    /// Overwrite `create_stream_on_append`.
168    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
169        Self {
170            create_stream_on_append,
171            ..self
172        }
173    }
174
175    /// Overwrite `create_stream_on_read`.
176    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
177        Self {
178            create_stream_on_read,
179            ..self
180        }
181    }
182}
183
184impl From<BasinConfig> for api::BasinConfig {
185    fn from(value: BasinConfig) -> Self {
186        let BasinConfig {
187            default_stream_config,
188            create_stream_on_append,
189            create_stream_on_read,
190        } = value;
191        Self {
192            default_stream_config: default_stream_config.map(Into::into),
193            create_stream_on_append,
194            create_stream_on_read,
195        }
196    }
197}
198
199impl From<api::BasinConfig> for BasinConfig {
200    fn from(value: api::BasinConfig) -> Self {
201        let api::BasinConfig {
202            default_stream_config,
203            create_stream_on_append,
204            create_stream_on_read,
205        } = value;
206        Self {
207            default_stream_config: default_stream_config.map(Into::into),
208            create_stream_on_append,
209            create_stream_on_read,
210        }
211    }
212}
213
214#[sync_docs]
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum TimestampingMode {
217    ClientPrefer,
218    ClientRequire,
219    Arrival,
220}
221
222impl From<TimestampingMode> for api::TimestampingMode {
223    fn from(value: TimestampingMode) -> Self {
224        match value {
225            TimestampingMode::ClientPrefer => Self::ClientPrefer,
226            TimestampingMode::ClientRequire => Self::ClientRequire,
227            TimestampingMode::Arrival => Self::Arrival,
228        }
229    }
230}
231
232impl From<api::TimestampingMode> for Option<TimestampingMode> {
233    fn from(value: api::TimestampingMode) -> Self {
234        match value {
235            api::TimestampingMode::Unspecified => None,
236            api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
237            api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
238            api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
239        }
240    }
241}
242
243#[sync_docs(TimestampingConfig = "Timestamping")]
244#[derive(Debug, Clone, Default)]
245/// Timestamping behavior.
246pub struct TimestampingConfig {
247    pub mode: Option<TimestampingMode>,
248    pub uncapped: Option<bool>,
249}
250
251impl TimestampingConfig {
252    /// Create a new timestamping config.
253    pub fn new() -> Self {
254        Self::default()
255    }
256
257    /// Overwrite timestamping mode.
258    pub fn with_mode(self, mode: TimestampingMode) -> Self {
259        Self {
260            mode: Some(mode),
261            ..self
262        }
263    }
264
265    /// Overwrite the uncapped knob.
266    pub fn with_uncapped(self, uncapped: bool) -> Self {
267        Self {
268            uncapped: Some(uncapped),
269            ..self
270        }
271    }
272}
273
274impl From<TimestampingConfig> for api::stream_config::Timestamping {
275    fn from(value: TimestampingConfig) -> Self {
276        Self {
277            mode: value
278                .mode
279                .map(api::TimestampingMode::from)
280                .unwrap_or_default()
281                .into(),
282            uncapped: value.uncapped,
283        }
284    }
285}
286
287impl From<api::stream_config::Timestamping> for TimestampingConfig {
288    fn from(value: api::stream_config::Timestamping) -> Self {
289        let mode = value.mode().into();
290        let uncapped = value.uncapped;
291        Self { mode, uncapped }
292    }
293}
294
295#[sync_docs]
296#[derive(Debug, Clone, Default)]
297pub struct StreamConfig {
298    pub storage_class: Option<StorageClass>,
299    pub retention_policy: Option<RetentionPolicy>,
300    pub timestamping: Option<TimestampingConfig>,
301}
302
303impl StreamConfig {
304    /// Create a new stream config.
305    pub fn new() -> Self {
306        Self::default()
307    }
308
309    /// Overwrite storage class.
310    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
311        Self {
312            storage_class: Some(storage_class),
313            ..self
314        }
315    }
316
317    /// Overwrite retention policy.
318    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
319        Self {
320            retention_policy: Some(retention_policy),
321            ..self
322        }
323    }
324
325    /// Overwrite timestamping config.
326    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
327        Self {
328            timestamping: Some(timestamping),
329            ..self
330        }
331    }
332}
333
334impl From<StreamConfig> for api::StreamConfig {
335    fn from(value: StreamConfig) -> Self {
336        let StreamConfig {
337            storage_class,
338            retention_policy,
339            timestamping,
340        } = value;
341        Self {
342            storage_class: storage_class
343                .map(api::StorageClass::from)
344                .unwrap_or_default()
345                .into(),
346            retention_policy: retention_policy.map(Into::into),
347            timestamping: timestamping.map(Into::into),
348        }
349    }
350}
351
352impl From<api::StreamConfig> for StreamConfig {
353    fn from(value: api::StreamConfig) -> Self {
354        Self {
355            storage_class: value.storage_class().into(),
356            retention_policy: value.retention_policy.map(Into::into),
357            timestamping: value.timestamping.map(Into::into),
358        }
359    }
360}
361
362#[sync_docs]
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364pub enum StorageClass {
365    Standard,
366    Express,
367}
368
369impl From<StorageClass> for api::StorageClass {
370    fn from(value: StorageClass) -> Self {
371        match value {
372            StorageClass::Standard => Self::Standard,
373            StorageClass::Express => Self::Express,
374        }
375    }
376}
377
378impl From<api::StorageClass> for Option<StorageClass> {
379    fn from(value: api::StorageClass) -> Self {
380        match value {
381            api::StorageClass::Unspecified => None,
382            api::StorageClass::Standard => Some(StorageClass::Standard),
383            api::StorageClass::Express => Some(StorageClass::Express),
384        }
385    }
386}
387
388impl FromStr for StorageClass {
389    type Err = ConvertError;
390
391    fn from_str(value: &str) -> Result<Self, Self::Err> {
392        match value {
393            "standard" => Ok(Self::Standard),
394            "express" => Ok(Self::Express),
395            v => Err(format!("unknown storage class: {v}").into()),
396        }
397    }
398}
399
400#[sync_docs(Age = "Age")]
401#[derive(Debug, Clone)]
402pub enum RetentionPolicy {
403    Age(Duration),
404}
405
406impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
407    fn from(value: RetentionPolicy) -> Self {
408        match value {
409            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
410        }
411    }
412}
413
414impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
415    fn from(value: api::stream_config::RetentionPolicy) -> Self {
416        match value {
417            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
418        }
419    }
420}
421
422#[sync_docs]
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum BasinState {
425    Active,
426    Creating,
427    Deleting,
428}
429
430impl From<BasinState> for api::BasinState {
431    fn from(value: BasinState) -> Self {
432        match value {
433            BasinState::Active => Self::Active,
434            BasinState::Creating => Self::Creating,
435            BasinState::Deleting => Self::Deleting,
436        }
437    }
438}
439
440impl From<api::BasinState> for Option<BasinState> {
441    fn from(value: api::BasinState) -> Self {
442        match value {
443            api::BasinState::Unspecified => None,
444            api::BasinState::Active => Some(BasinState::Active),
445            api::BasinState::Creating => Some(BasinState::Creating),
446            api::BasinState::Deleting => Some(BasinState::Deleting),
447        }
448    }
449}
450
451impl std::fmt::Display for BasinState {
452    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453        match self {
454            BasinState::Active => write!(f, "active"),
455            BasinState::Creating => write!(f, "creating"),
456            BasinState::Deleting => write!(f, "deleting"),
457        }
458    }
459}
460
461#[sync_docs]
462#[derive(Debug, Clone)]
463pub struct BasinInfo {
464    pub name: String,
465    pub scope: Option<BasinScope>,
466    pub state: Option<BasinState>,
467}
468
469impl From<BasinInfo> for api::BasinInfo {
470    fn from(value: BasinInfo) -> Self {
471        let BasinInfo { name, scope, state } = value;
472        Self {
473            name,
474            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
475            state: state.map(api::BasinState::from).unwrap_or_default().into(),
476        }
477    }
478}
479
480impl From<api::BasinInfo> for BasinInfo {
481    fn from(value: api::BasinInfo) -> Self {
482        let scope = value.scope().into();
483        let state = value.state().into();
484        let name = value.name;
485        Self { name, scope, state }
486    }
487}
488
489impl TryFrom<api::CreateBasinResponse> for BasinInfo {
490    type Error = ConvertError;
491    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
492        let api::CreateBasinResponse { info } = value;
493        let info = info.ok_or("missing basin info")?;
494        Ok(info.into())
495    }
496}
497
498#[sync_docs]
499#[derive(Debug, Clone, Default)]
500pub struct ListStreamsRequest {
501    pub prefix: String,
502    pub start_after: String,
503    pub limit: Option<usize>,
504}
505
506impl ListStreamsRequest {
507    /// Create a new request.
508    pub fn new() -> Self {
509        Self::default()
510    }
511
512    /// Overwrite prefix.
513    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
514        Self {
515            prefix: prefix.into(),
516            ..self
517        }
518    }
519
520    /// Overwrite start after.
521    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
522        Self {
523            start_after: start_after.into(),
524            ..self
525        }
526    }
527
528    /// Overwrite limit.
529    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
530        Self {
531            limit: limit.into(),
532            ..self
533        }
534    }
535}
536
537impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
538    type Error = ConvertError;
539    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
540        let ListStreamsRequest {
541            prefix,
542            start_after,
543            limit,
544        } = value;
545        Ok(Self {
546            prefix,
547            start_after,
548            limit: limit.map(|n| n as u64),
549        })
550    }
551}
552
553#[sync_docs]
554#[derive(Debug, Clone)]
555pub struct StreamInfo {
556    pub name: String,
557    pub created_at: u32,
558    pub deleted_at: Option<u32>,
559}
560
561impl From<api::StreamInfo> for StreamInfo {
562    fn from(value: api::StreamInfo) -> Self {
563        Self {
564            name: value.name,
565            created_at: value.created_at,
566            deleted_at: value.deleted_at,
567        }
568    }
569}
570
571impl TryFrom<api::CreateStreamResponse> for StreamInfo {
572    type Error = ConvertError;
573
574    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
575        let api::CreateStreamResponse { info } = value;
576        let info = info.ok_or("missing stream info")?;
577        Ok(info.into())
578    }
579}
580
581#[sync_docs]
582#[derive(Debug, Clone)]
583pub struct ListStreamsResponse {
584    pub streams: Vec<StreamInfo>,
585    pub has_more: bool,
586}
587
588impl From<api::ListStreamsResponse> for ListStreamsResponse {
589    fn from(value: api::ListStreamsResponse) -> Self {
590        let api::ListStreamsResponse { streams, has_more } = value;
591        let streams = streams.into_iter().map(Into::into).collect();
592        Self { streams, has_more }
593    }
594}
595
596impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
597    type Error = ConvertError;
598
599    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
600        let api::GetBasinConfigResponse { config } = value;
601        let config = config.ok_or("missing basin config")?;
602        Ok(config.into())
603    }
604}
605
606impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
607    type Error = ConvertError;
608
609    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
610        let api::GetStreamConfigResponse { config } = value;
611        let config = config.ok_or("missing stream config")?;
612        Ok(config.into())
613    }
614}
615
616#[sync_docs]
617#[derive(Debug, Clone)]
618pub struct CreateStreamRequest {
619    pub stream: String,
620    pub config: Option<StreamConfig>,
621}
622
623impl CreateStreamRequest {
624    /// Create a new request with stream name.
625    pub fn new(stream: impl Into<String>) -> Self {
626        Self {
627            stream: stream.into(),
628            config: None,
629        }
630    }
631
632    /// Overwrite stream config.
633    pub fn with_config(self, config: StreamConfig) -> Self {
634        Self {
635            config: Some(config),
636            ..self
637        }
638    }
639}
640
641impl From<CreateStreamRequest> for api::CreateStreamRequest {
642    fn from(value: CreateStreamRequest) -> Self {
643        let CreateStreamRequest { stream, config } = value;
644        Self {
645            stream,
646            config: config.map(Into::into),
647        }
648    }
649}
650
651#[sync_docs]
652#[derive(Debug, Clone, Default)]
653pub struct ListBasinsRequest {
654    pub prefix: String,
655    pub start_after: String,
656    pub limit: Option<usize>,
657}
658
659impl ListBasinsRequest {
660    /// Create a new request.
661    pub fn new() -> Self {
662        Self::default()
663    }
664
665    /// Overwrite prefix.
666    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
667        Self {
668            prefix: prefix.into(),
669            ..self
670        }
671    }
672
673    /// Overwrite start after.
674    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
675        Self {
676            start_after: start_after.into(),
677            ..self
678        }
679    }
680
681    /// Overwrite limit.
682    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
683        Self {
684            limit: limit.into(),
685            ..self
686        }
687    }
688}
689
690impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
691    type Error = ConvertError;
692    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
693        let ListBasinsRequest {
694            prefix,
695            start_after,
696            limit,
697        } = value;
698        Ok(Self {
699            prefix,
700            start_after,
701            limit: limit
702                .map(TryInto::try_into)
703                .transpose()
704                .map_err(|_| "request limit does not fit into u64 bounds")?,
705        })
706    }
707}
708
709#[sync_docs]
710#[derive(Debug, Clone)]
711pub struct ListBasinsResponse {
712    pub basins: Vec<BasinInfo>,
713    pub has_more: bool,
714}
715
716impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
717    type Error = ConvertError;
718    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
719        let api::ListBasinsResponse { basins, has_more } = value;
720        Ok(Self {
721            basins: basins.into_iter().map(Into::into).collect(),
722            has_more,
723        })
724    }
725}
726
727#[sync_docs]
728#[derive(Debug, Clone)]
729pub struct DeleteBasinRequest {
730    pub basin: BasinName,
731    /// Delete basin if it exists else do nothing.
732    pub if_exists: bool,
733}
734
735impl DeleteBasinRequest {
736    /// Create a new request.
737    pub fn new(basin: BasinName) -> Self {
738        Self {
739            basin,
740            if_exists: false,
741        }
742    }
743
744    /// Overwrite the if exists parameter.
745    pub fn with_if_exists(self, if_exists: bool) -> Self {
746        Self { if_exists, ..self }
747    }
748}
749
750impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
751    fn from(value: DeleteBasinRequest) -> Self {
752        let DeleteBasinRequest { basin, .. } = value;
753        Self { basin: basin.0 }
754    }
755}
756
757#[sync_docs]
758#[derive(Debug, Clone)]
759pub struct DeleteStreamRequest {
760    pub stream: String,
761    /// Delete stream if it exists else do nothing.
762    pub if_exists: bool,
763}
764
765impl DeleteStreamRequest {
766    /// Create a new request.
767    pub fn new(stream: impl Into<String>) -> Self {
768        Self {
769            stream: stream.into(),
770            if_exists: false,
771        }
772    }
773
774    /// Overwrite the if exists parameter.
775    pub fn with_if_exists(self, if_exists: bool) -> Self {
776        Self { if_exists, ..self }
777    }
778}
779
780impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
781    fn from(value: DeleteStreamRequest) -> Self {
782        let DeleteStreamRequest { stream, .. } = value;
783        Self { stream }
784    }
785}
786
787#[sync_docs]
788#[derive(Debug, Clone)]
789pub struct ReconfigureBasinRequest {
790    pub basin: BasinName,
791    pub config: Option<BasinConfig>,
792    pub mask: Option<Vec<String>>,
793}
794
795impl ReconfigureBasinRequest {
796    /// Create a new request with basin name.
797    pub fn new(basin: BasinName) -> Self {
798        Self {
799            basin,
800            config: None,
801            mask: None,
802        }
803    }
804
805    /// Overwrite basin config.
806    pub fn with_config(self, config: BasinConfig) -> Self {
807        Self {
808            config: Some(config),
809            ..self
810        }
811    }
812
813    /// Overwrite field mask.
814    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
815        Self {
816            mask: Some(mask.into()),
817            ..self
818        }
819    }
820}
821
822impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
823    fn from(value: ReconfigureBasinRequest) -> Self {
824        let ReconfigureBasinRequest {
825            basin,
826            config,
827            mask,
828        } = value;
829        Self {
830            basin: basin.0,
831            config: config.map(Into::into),
832            mask: mask.map(|paths| prost_types::FieldMask { paths }),
833        }
834    }
835}
836
837impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
838    type Error = ConvertError;
839    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
840        let api::ReconfigureBasinResponse { config } = value;
841        let config = config.ok_or("missing basin config")?;
842        Ok(config.into())
843    }
844}
845
846#[sync_docs]
847#[derive(Debug, Clone)]
848pub struct ReconfigureStreamRequest {
849    pub stream: String,
850    pub config: Option<StreamConfig>,
851    pub mask: Option<Vec<String>>,
852}
853
854impl ReconfigureStreamRequest {
855    /// Create a new request with stream name.
856    pub fn new(stream: impl Into<String>) -> Self {
857        Self {
858            stream: stream.into(),
859            config: None,
860            mask: None,
861        }
862    }
863
864    /// Overwrite stream config.
865    pub fn with_config(self, config: StreamConfig) -> Self {
866        Self {
867            config: Some(config),
868            ..self
869        }
870    }
871
872    /// Overwrite field mask.
873    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
874        Self {
875            mask: Some(mask.into()),
876            ..self
877        }
878    }
879}
880
881impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
882    fn from(value: ReconfigureStreamRequest) -> Self {
883        let ReconfigureStreamRequest {
884            stream,
885            config,
886            mask,
887        } = value;
888        Self {
889            stream,
890            config: config.map(Into::into),
891            mask: mask.map(|paths| prost_types::FieldMask { paths }),
892        }
893    }
894}
895
896impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
897    type Error = ConvertError;
898    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
899        let api::ReconfigureStreamResponse { config } = value;
900        let config = config.ok_or("missing stream config")?;
901        Ok(config.into())
902    }
903}
904
905impl From<api::CheckTailResponse> for StreamPosition {
906    fn from(value: api::CheckTailResponse) -> Self {
907        let api::CheckTailResponse {
908            next_seq_num,
909            last_timestamp,
910        } = value;
911        StreamPosition {
912            seq_num: next_seq_num,
913            timestamp: last_timestamp,
914        }
915    }
916}
917
918/// Position of a record in a stream.
919#[derive(Debug, Clone, PartialEq, Eq)]
920pub struct StreamPosition {
921    /// Sequence number assigned by the service.
922    pub seq_num: u64,
923    /// Timestamp, which may be user-specified or assigned by the service.
924    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
925    pub timestamp: u64,
926}
927
928#[sync_docs]
929#[derive(Debug, Clone, PartialEq, Eq)]
930pub struct Header {
931    pub name: Bytes,
932    pub value: Bytes,
933}
934
935impl Header {
936    /// Create a new header from name and value.
937    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
938        Self {
939            name: name.into(),
940            value: value.into(),
941        }
942    }
943
944    /// Create a new header from value.
945    pub fn from_value(value: impl Into<Bytes>) -> Self {
946        Self {
947            name: Bytes::new(),
948            value: value.into(),
949        }
950    }
951}
952
953impl From<Header> for api::Header {
954    fn from(value: Header) -> Self {
955        let Header { name, value } = value;
956        Self { name, value }
957    }
958}
959
960impl From<api::Header> for Header {
961    fn from(value: api::Header) -> Self {
962        let api::Header { name, value } = value;
963        Self { name, value }
964    }
965}
966
967/// A fencing token can be enforced on append requests.
968///
969/// Must not be more than 36 bytes.
970#[derive(Debug, Clone, Default, PartialEq, Eq)]
971pub struct FencingToken(String);
972
973impl FencingToken {
974    const MAX_BYTES: usize = 36;
975
976    /// Try creating a new fencing token from bytes.
977    pub fn new(s: impl Into<String>) -> Result<Self, ConvertError> {
978        let s = s.into();
979        if s.len() > Self::MAX_BYTES {
980            Err(format!(
981                "Size of a fencing token cannot exceed {} bytes",
982                Self::MAX_BYTES
983            )
984            .into())
985        } else {
986            Ok(Self(s))
987        }
988    }
989
990    /// Generate a random alphanumeric fencing token of `n` bytes.
991    pub fn generate(n: usize) -> Result<Self, ConvertError> {
992        Self::new(
993            rand::rng()
994                .sample_iter(&rand::distr::Alphanumeric)
995                .take(n)
996                .map(char::from)
997                .collect::<String>(),
998        )
999    }
1000}
1001
1002impl Deref for FencingToken {
1003    type Target = str;
1004
1005    fn deref(&self) -> &Self::Target {
1006        &self.0
1007    }
1008}
1009
1010impl TryFrom<&str> for FencingToken {
1011    type Error = ConvertError;
1012
1013    fn try_from(value: &str) -> Result<Self, Self::Error> {
1014        Self::new(value)
1015    }
1016}
1017
1018/// Command to send through a `CommandRecord`.
1019#[derive(Debug, Clone)]
1020pub enum Command {
1021    /// Enforce a fencing token.
1022    ///
1023    /// Fencing is strongly consistent, and subsequent appends that specify a
1024    /// fencing token will be rejected if it does not match.
1025    Fence {
1026        /// Fencing token to enforce.
1027        ///
1028        /// Set empty to clear the token.
1029        fencing_token: FencingToken,
1030    },
1031    /// Request a trim till the sequence number.
1032    ///
1033    /// Trimming is eventually consistent, and trimmed records may be visible
1034    /// for a brief period
1035    Trim {
1036        /// Trim point.
1037        ///
1038        /// This sequence number is only allowed to advance, and any regression
1039        /// will be ignored.
1040        seq_num: u64,
1041    },
1042}
1043
1044/// A command record is a special kind of [`AppendRecord`] that can be used to
1045/// send command messages.
1046///
1047/// Such a record is signalled by a sole header with empty name. The header
1048/// value represents the operation and record body acts as the payload.
1049#[derive(Debug, Clone)]
1050pub struct CommandRecord {
1051    /// Command kind.
1052    pub command: Command,
1053    /// Timestamp for the record.
1054    pub timestamp: Option<u64>,
1055}
1056
1057impl CommandRecord {
1058    const FENCE: &[u8] = b"fence";
1059    const TRIM: &[u8] = b"trim";
1060
1061    /// Create a new fence command record.
1062    pub fn fence(fencing_token: FencingToken) -> Self {
1063        Self {
1064            command: Command::Fence { fencing_token },
1065            timestamp: None,
1066        }
1067    }
1068
1069    /// Create a new trim command record.
1070    pub fn trim(seq_num: impl Into<u64>) -> Self {
1071        Self {
1072            command: Command::Trim {
1073                seq_num: seq_num.into(),
1074            },
1075            timestamp: None,
1076        }
1077    }
1078
1079    /// Overwrite timestamp.
1080    pub fn with_timestamp(self, timestamp: u64) -> Self {
1081        Self {
1082            timestamp: Some(timestamp),
1083            ..self
1084        }
1085    }
1086}
1087
1088#[sync_docs]
1089#[derive(Debug, Clone, PartialEq, Eq)]
1090pub struct AppendRecord {
1091    timestamp: Option<u64>,
1092    headers: Vec<Header>,
1093    body: Bytes,
1094    #[cfg(test)]
1095    max_bytes: u64,
1096}
1097
1098metered_impl!(AppendRecord);
1099
1100impl AppendRecord {
1101    const MAX_BYTES: u64 = MIB_BYTES;
1102
1103    fn validated(self) -> Result<Self, ConvertError> {
1104        #[cfg(test)]
1105        let max_bytes = self.max_bytes;
1106        #[cfg(not(test))]
1107        let max_bytes = Self::MAX_BYTES;
1108
1109        if self.metered_bytes() > max_bytes {
1110            Err("AppendRecord should have metered size less than 1 MiB".into())
1111        } else {
1112            Ok(self)
1113        }
1114    }
1115
1116    /// Try creating a new append record with body.
1117    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1118        Self {
1119            timestamp: None,
1120            headers: Vec::new(),
1121            body: body.into(),
1122            #[cfg(test)]
1123            max_bytes: Self::MAX_BYTES,
1124        }
1125        .validated()
1126    }
1127
1128    #[cfg(test)]
1129    pub(crate) fn with_max_bytes(
1130        max_bytes: u64,
1131        body: impl Into<Bytes>,
1132    ) -> Result<Self, ConvertError> {
1133        Self {
1134            timestamp: None,
1135            headers: Vec::new(),
1136            body: body.into(),
1137            max_bytes,
1138        }
1139        .validated()
1140    }
1141
1142    /// Overwrite headers.
1143    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1144        Self {
1145            headers: headers.into(),
1146            ..self
1147        }
1148        .validated()
1149    }
1150
1151    /// Overwrite timestamp.
1152    pub fn with_timestamp(self, timestamp: u64) -> Self {
1153        Self {
1154            timestamp: Some(timestamp),
1155            ..self
1156        }
1157    }
1158
1159    /// Body of the record.
1160    pub fn body(&self) -> &[u8] {
1161        &self.body
1162    }
1163
1164    /// Headers of the record.
1165    pub fn headers(&self) -> &[Header] {
1166        &self.headers
1167    }
1168
1169    /// Timestamp for the record.
1170    pub fn timestamp(&self) -> Option<u64> {
1171        self.timestamp
1172    }
1173
1174    /// Consume the record and return parts.
1175    pub fn into_parts(self) -> AppendRecordParts {
1176        AppendRecordParts {
1177            timestamp: self.timestamp,
1178            headers: self.headers,
1179            body: self.body,
1180        }
1181    }
1182
1183    /// Try creating the record from parts.
1184    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1185        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1186        if let Some(timestamp) = parts.timestamp {
1187            Ok(record.with_timestamp(timestamp))
1188        } else {
1189            Ok(record)
1190        }
1191    }
1192}
1193
1194impl From<AppendRecord> for api::AppendRecord {
1195    fn from(value: AppendRecord) -> Self {
1196        Self {
1197            timestamp: value.timestamp,
1198            headers: value.headers.into_iter().map(Into::into).collect(),
1199            body: value.body,
1200        }
1201    }
1202}
1203
1204impl From<CommandRecord> for AppendRecord {
1205    fn from(value: CommandRecord) -> Self {
1206        let (header_value, body) = match value.command {
1207            Command::Fence { fencing_token } => (
1208                CommandRecord::FENCE,
1209                Bytes::copy_from_slice(fencing_token.as_bytes()),
1210            ),
1211            Command::Trim { seq_num } => (
1212                CommandRecord::TRIM,
1213                Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1214            ),
1215        };
1216        AppendRecordParts {
1217            timestamp: value.timestamp,
1218            headers: vec![Header::from_value(header_value)],
1219            body,
1220        }
1221        .try_into()
1222        .expect("command record is a valid append record")
1223    }
1224}
1225
1226#[sync_docs(AppendRecordParts = "AppendRecord")]
1227#[derive(Debug, Clone)]
1228pub struct AppendRecordParts {
1229    pub timestamp: Option<u64>,
1230    pub headers: Vec<Header>,
1231    pub body: Bytes,
1232}
1233
1234impl From<AppendRecord> for AppendRecordParts {
1235    fn from(value: AppendRecord) -> Self {
1236        value.into_parts()
1237    }
1238}
1239
1240impl TryFrom<AppendRecordParts> for AppendRecord {
1241    type Error = ConvertError;
1242
1243    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1244        Self::try_from_parts(value)
1245    }
1246}
1247
1248/// A collection of append records that can be sent together in a batch.
1249#[derive(Debug, Clone)]
1250pub struct AppendRecordBatch {
1251    records: Vec<AppendRecord>,
1252    metered_bytes: u64,
1253    max_capacity: usize,
1254    #[cfg(test)]
1255    max_bytes: u64,
1256}
1257
1258impl PartialEq for AppendRecordBatch {
1259    fn eq(&self, other: &Self) -> bool {
1260        if self.records.eq(&other.records) {
1261            assert_eq!(self.metered_bytes, other.metered_bytes);
1262            true
1263        } else {
1264            false
1265        }
1266    }
1267}
1268
1269impl Eq for AppendRecordBatch {}
1270
1271impl Default for AppendRecordBatch {
1272    fn default() -> Self {
1273        Self::new()
1274    }
1275}
1276
1277impl AppendRecordBatch {
1278    /// Maximum number of records that a batch can hold.
1279    ///
1280    /// A record batch cannot be created with a bigger capacity.
1281    pub const MAX_CAPACITY: usize = 1000;
1282
1283    /// Maximum metered bytes of the batch.
1284    pub const MAX_BYTES: u64 = MIB_BYTES;
1285
1286    /// Create an empty record batch.
1287    pub fn new() -> Self {
1288        Self::with_max_capacity(Self::MAX_CAPACITY)
1289    }
1290
1291    /// Create an empty record batch with custom max capacity.
1292    ///
1293    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1294    pub fn with_max_capacity(max_capacity: usize) -> Self {
1295        assert!(
1296            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1297            "Batch capacity must be between 1 and 1000"
1298        );
1299
1300        Self {
1301            records: Vec::with_capacity(max_capacity),
1302            metered_bytes: 0,
1303            max_capacity,
1304            #[cfg(test)]
1305            max_bytes: Self::MAX_BYTES,
1306        }
1307    }
1308
1309    #[cfg(test)]
1310    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1311        #[cfg(test)]
1312        assert!(
1313            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1314            "Batch size must be between 1 byte and 1 MiB"
1315        );
1316
1317        Self {
1318            max_bytes,
1319            ..Self::with_max_capacity(max_capacity)
1320        }
1321    }
1322
1323    /// Try creating a record batch from an iterator.
1324    ///
1325    /// If all the items of the iterator cannot be drained into the batch, the
1326    /// error returned contains a batch containing all records it could fit
1327    /// along-with the left over items from the iterator.
1328    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1329    where
1330        R: Into<AppendRecord>,
1331        T: IntoIterator<Item = R>,
1332    {
1333        let mut records = Self::new();
1334        let mut pending = Vec::new();
1335
1336        let mut iter = iter.into_iter();
1337
1338        for record in iter.by_ref() {
1339            if let Err(record) = records.push(record) {
1340                pending.push(record);
1341                break;
1342            }
1343        }
1344
1345        if pending.is_empty() {
1346            Ok(records)
1347        } else {
1348            pending.extend(iter.map(Into::into));
1349            Err((records, pending))
1350        }
1351    }
1352
1353    /// Returns true if the batch contains no records.
1354    pub fn is_empty(&self) -> bool {
1355        if self.records.is_empty() {
1356            assert_eq!(self.metered_bytes, 0);
1357            true
1358        } else {
1359            false
1360        }
1361    }
1362
1363    /// Returns the number of records contained in the batch.
1364    pub fn len(&self) -> usize {
1365        self.records.len()
1366    }
1367
1368    #[cfg(test)]
1369    fn max_bytes(&self) -> u64 {
1370        self.max_bytes
1371    }
1372
1373    #[cfg(not(test))]
1374    fn max_bytes(&self) -> u64 {
1375        Self::MAX_BYTES
1376    }
1377
1378    /// Returns true if the batch cannot fit any more records.
1379    pub fn is_full(&self) -> bool {
1380        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1381    }
1382
1383    /// Try to append a new record into the batch.
1384    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1385        assert!(self.records.len() <= self.max_capacity);
1386        assert!(self.metered_bytes <= self.max_bytes());
1387
1388        let record = record.into();
1389        let record_size = record.metered_bytes();
1390        if self.records.len() >= self.max_capacity
1391            || self.metered_bytes + record_size > self.max_bytes()
1392        {
1393            Err(record)
1394        } else {
1395            self.records.push(record);
1396            self.metered_bytes += record_size;
1397            Ok(())
1398        }
1399    }
1400}
1401
1402impl MeteredBytes for AppendRecordBatch {
1403    fn metered_bytes(&self) -> u64 {
1404        self.metered_bytes
1405    }
1406}
1407
1408impl IntoIterator for AppendRecordBatch {
1409    type Item = AppendRecord;
1410    type IntoIter = std::vec::IntoIter<Self::Item>;
1411
1412    fn into_iter(self) -> Self::IntoIter {
1413        self.records.into_iter()
1414    }
1415}
1416
1417impl<'a> IntoIterator for &'a AppendRecordBatch {
1418    type Item = &'a AppendRecord;
1419    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1420
1421    fn into_iter(self) -> Self::IntoIter {
1422        self.records.iter()
1423    }
1424}
1425
1426impl AsRef<[AppendRecord]> for AppendRecordBatch {
1427    fn as_ref(&self) -> &[AppendRecord] {
1428        &self.records
1429    }
1430}
1431
1432#[sync_docs]
1433#[derive(Debug, Default, Clone)]
1434pub struct AppendInput {
1435    pub records: AppendRecordBatch,
1436    pub match_seq_num: Option<u64>,
1437    pub fencing_token: Option<FencingToken>,
1438}
1439
1440impl MeteredBytes for AppendInput {
1441    fn metered_bytes(&self) -> u64 {
1442        self.records.metered_bytes()
1443    }
1444}
1445
1446impl AppendInput {
1447    /// Create a new append input from record batch.
1448    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1449        Self {
1450            records: records.into(),
1451            match_seq_num: None,
1452            fencing_token: None,
1453        }
1454    }
1455
1456    /// Overwrite match sequence number.
1457    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1458        Self {
1459            match_seq_num: Some(match_seq_num.into()),
1460            ..self
1461        }
1462    }
1463
1464    /// Overwrite fencing token.
1465    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1466        Self {
1467            fencing_token: Some(fencing_token),
1468            ..self
1469        }
1470    }
1471
1472    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1473        let Self {
1474            records,
1475            match_seq_num,
1476            fencing_token,
1477        } = self;
1478
1479        api::AppendInput {
1480            stream: stream.into(),
1481            records: records.into_iter().map(Into::into).collect(),
1482            match_seq_num,
1483            fencing_token: fencing_token.map(|f| f.0),
1484        }
1485    }
1486}
1487
1488/// Acknowledgment to an append request.
1489#[derive(Debug, Clone)]
1490pub struct AppendAck {
1491    /// Sequence number and timestamp of the first record that was appended.
1492    pub start: StreamPosition,
1493    /// Sequence number of the last record that was appended + 1,
1494    /// and timestamp of the last record that was appended.
1495    /// The difference between `end.seq_num` and `start.seq_num`
1496    /// will be the number of records appended.
1497    pub end: StreamPosition,
1498    /// Sequence number that will be assigned to the next record on the stream,
1499    /// and timestamp of the last record on the stream.
1500    /// This can be greater than the `end` position in case of concurrent appends.
1501    pub tail: StreamPosition,
1502}
1503
1504impl From<api::AppendOutput> for AppendAck {
1505    fn from(value: api::AppendOutput) -> Self {
1506        let api::AppendOutput {
1507            start_seq_num,
1508            start_timestamp,
1509            end_seq_num,
1510            end_timestamp,
1511            next_seq_num,
1512            last_timestamp,
1513        } = value;
1514        let start = StreamPosition {
1515            seq_num: start_seq_num,
1516            timestamp: start_timestamp,
1517        };
1518        let end = StreamPosition {
1519            seq_num: end_seq_num,
1520            timestamp: end_timestamp,
1521        };
1522        let tail = StreamPosition {
1523            seq_num: next_seq_num,
1524            timestamp: last_timestamp,
1525        };
1526        Self { start, end, tail }
1527    }
1528}
1529
1530impl TryFrom<api::AppendResponse> for AppendAck {
1531    type Error = ConvertError;
1532    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1533        let api::AppendResponse { output } = value;
1534        let output = output.ok_or("missing append output")?;
1535        Ok(output.into())
1536    }
1537}
1538
1539impl TryFrom<api::AppendSessionResponse> for AppendAck {
1540    type Error = ConvertError;
1541    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1542        let api::AppendSessionResponse { output } = value;
1543        let output = output.ok_or("missing append output")?;
1544        Ok(output.into())
1545    }
1546}
1547
1548#[sync_docs]
1549#[derive(Debug, Clone, Default)]
1550pub struct ReadLimit {
1551    pub count: Option<u64>,
1552    pub bytes: Option<u64>,
1553}
1554
1555impl ReadLimit {
1556    /// Create a new read limit.
1557    pub fn new() -> Self {
1558        Self::default()
1559    }
1560
1561    /// Overwrite count limit.
1562    pub fn with_count(self, count: u64) -> Self {
1563        Self {
1564            count: Some(count),
1565            ..self
1566        }
1567    }
1568
1569    /// Overwrite bytes limit.
1570    pub fn with_bytes(self, bytes: u64) -> Self {
1571        Self {
1572            bytes: Some(bytes),
1573            ..self
1574        }
1575    }
1576}
1577
1578/// Starting point for read requests.
1579#[derive(Debug, Clone)]
1580pub enum ReadStart {
1581    /// Sequence number.
1582    SeqNum(u64),
1583    /// Timestamp.
1584    Timestamp(u64),
1585    /// Number of records before the tail, i.e. the next sequence number.
1586    TailOffset(u64),
1587}
1588
1589impl Default for ReadStart {
1590    fn default() -> Self {
1591        Self::SeqNum(0)
1592    }
1593}
1594
1595impl From<ReadStart> for api::read_request::Start {
1596    fn from(start: ReadStart) -> Self {
1597        match start {
1598            ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1599            ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1600            ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1601        }
1602    }
1603}
1604
1605impl From<ReadStart> for api::read_session_request::Start {
1606    fn from(start: ReadStart) -> Self {
1607        match start {
1608            ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1609            ReadStart::Timestamp(timestamp) => {
1610                api::read_session_request::Start::Timestamp(timestamp)
1611            }
1612            ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1613        }
1614    }
1615}
1616
1617#[sync_docs]
1618#[derive(Debug, Clone, Default)]
1619pub struct ReadRequest {
1620    pub start: ReadStart,
1621    pub limit: ReadLimit,
1622}
1623
1624impl ReadRequest {
1625    /// Create a new request with the specified starting point.
1626    pub fn new(start: ReadStart) -> Self {
1627        Self {
1628            start,
1629            ..Default::default()
1630        }
1631    }
1632
1633    /// Overwrite limit.
1634    pub fn with_limit(self, limit: ReadLimit) -> Self {
1635        Self { limit, ..self }
1636    }
1637}
1638
1639impl ReadRequest {
1640    pub(crate) fn try_into_api_type(
1641        self,
1642        stream: impl Into<String>,
1643    ) -> Result<api::ReadRequest, ConvertError> {
1644        let Self { start, limit } = self;
1645
1646        let limit = if limit.count > Some(1000) {
1647            Err("read limit: count must not exceed 1000 for unary request")
1648        } else if limit.bytes > Some(MIB_BYTES) {
1649            Err("read limit: bytes must not exceed 1MiB for unary request")
1650        } else {
1651            Ok(api::ReadLimit {
1652                count: limit.count,
1653                bytes: limit.bytes,
1654            })
1655        }?;
1656
1657        Ok(api::ReadRequest {
1658            stream: stream.into(),
1659            start: Some(start.into()),
1660            limit: Some(limit),
1661        })
1662    }
1663}
1664
1665#[sync_docs]
1666#[derive(Debug, Clone)]
1667pub struct SequencedRecord {
1668    pub seq_num: u64,
1669    pub timestamp: u64,
1670    pub headers: Vec<Header>,
1671    pub body: Bytes,
1672}
1673
1674metered_impl!(SequencedRecord);
1675
1676impl From<api::SequencedRecord> for SequencedRecord {
1677    fn from(value: api::SequencedRecord) -> Self {
1678        let api::SequencedRecord {
1679            seq_num,
1680            timestamp,
1681            headers,
1682            body,
1683        } = value;
1684        Self {
1685            seq_num,
1686            timestamp,
1687            headers: headers.into_iter().map(Into::into).collect(),
1688            body,
1689        }
1690    }
1691}
1692
1693impl SequencedRecord {
1694    /// Try representing the sequenced record as a command record.
1695    pub fn as_command_record(&self) -> Option<CommandRecord> {
1696        if self.headers.len() != 1 {
1697            return None;
1698        }
1699
1700        let header = self.headers.first().expect("pre-validated length");
1701
1702        if !header.name.is_empty() {
1703            return None;
1704        }
1705
1706        match header.value.as_ref() {
1707            CommandRecord::FENCE => {
1708                let fencing_token =
1709                    FencingToken::new(std::str::from_utf8(&self.body).ok()?).ok()?;
1710                Some(CommandRecord {
1711                    command: Command::Fence { fencing_token },
1712                    timestamp: Some(self.timestamp),
1713                })
1714            }
1715            CommandRecord::TRIM => {
1716                let body: &[u8] = &self.body;
1717                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1718                Some(CommandRecord {
1719                    command: Command::Trim { seq_num },
1720                    timestamp: Some(self.timestamp),
1721                })
1722            }
1723            _ => None,
1724        }
1725    }
1726}
1727
1728#[sync_docs]
1729#[derive(Debug, Clone)]
1730pub struct SequencedRecordBatch {
1731    pub records: Vec<SequencedRecord>,
1732}
1733
1734impl MeteredBytes for SequencedRecordBatch {
1735    fn metered_bytes(&self) -> u64 {
1736        self.records.metered_bytes()
1737    }
1738}
1739
1740impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1741    fn from(value: api::SequencedRecordBatch) -> Self {
1742        let api::SequencedRecordBatch { records } = value;
1743        Self {
1744            records: records.into_iter().map(Into::into).collect(),
1745        }
1746    }
1747}
1748
1749#[sync_docs(ReadOutput = "Output")]
1750#[derive(Debug, Clone)]
1751pub enum ReadOutput {
1752    Batch(SequencedRecordBatch),
1753    NextSeqNum(u64),
1754}
1755
1756impl From<api::read_output::Output> for ReadOutput {
1757    fn from(value: api::read_output::Output) -> Self {
1758        match value {
1759            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1760            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1761        }
1762    }
1763}
1764
1765impl TryFrom<api::ReadOutput> for ReadOutput {
1766    type Error = ConvertError;
1767    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1768        let api::ReadOutput { output } = value;
1769        let output = output.ok_or("missing read output")?;
1770        Ok(output.into())
1771    }
1772}
1773
1774impl TryFrom<api::ReadResponse> for ReadOutput {
1775    type Error = ConvertError;
1776    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1777        let api::ReadResponse { output } = value;
1778        let output = output.ok_or("missing output in read response")?;
1779        output.try_into()
1780    }
1781}
1782
1783#[sync_docs]
1784#[derive(Debug, Clone, Default)]
1785pub struct ReadSessionRequest {
1786    pub start: ReadStart,
1787    pub limit: ReadLimit,
1788}
1789
1790impl ReadSessionRequest {
1791    /// Create a new request with the specified starting point.
1792    pub fn new(start: ReadStart) -> Self {
1793        Self {
1794            start,
1795            ..Default::default()
1796        }
1797    }
1798
1799    /// Overwrite limit.
1800    pub fn with_limit(self, limit: ReadLimit) -> Self {
1801        Self { limit, ..self }
1802    }
1803
1804    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1805        let Self { start, limit } = self;
1806        api::ReadSessionRequest {
1807            stream: stream.into(),
1808            start: Some(start.into()),
1809            limit: Some(api::ReadLimit {
1810                count: limit.count,
1811                bytes: limit.bytes,
1812            }),
1813            heartbeats: false,
1814        }
1815    }
1816}
1817
1818impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1819    type Error = ConvertError;
1820    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1821        let api::ReadSessionResponse { output } = value;
1822        let output = output.ok_or("missing output in read session response")?;
1823        output.try_into()
1824    }
1825}
1826
1827/// Name of a basin.
1828///
1829/// Must be between 8 and 48 characters in length. Must comprise lowercase
1830/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1831#[derive(Debug, Clone)]
1832pub struct BasinName(String);
1833
1834impl AsRef<str> for BasinName {
1835    fn as_ref(&self) -> &str {
1836        &self.0
1837    }
1838}
1839
1840impl Deref for BasinName {
1841    type Target = str;
1842    fn deref(&self) -> &Self::Target {
1843        &self.0
1844    }
1845}
1846
1847impl TryFrom<String> for BasinName {
1848    type Error = ConvertError;
1849
1850    fn try_from(name: String) -> Result<Self, Self::Error> {
1851        if name.len() < 8 || name.len() > 48 {
1852            return Err("Basin name must be between 8 and 48 characters in length".into());
1853        }
1854
1855        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1856        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1857            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1858                .expect("Failed to compile basin name regex")
1859        });
1860
1861        if !regex.is_match(&name) {
1862            return Err(
1863                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1864                It cannot begin or end with a hyphen."
1865                    .into(),
1866            );
1867        }
1868
1869        Ok(Self(name))
1870    }
1871}
1872
1873impl FromStr for BasinName {
1874    type Err = ConvertError;
1875
1876    fn from_str(s: &str) -> Result<Self, Self::Err> {
1877        s.to_string().try_into()
1878    }
1879}
1880
1881impl std::fmt::Display for BasinName {
1882    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1883        f.write_str(&self.0)
1884    }
1885}
1886
1887/// Access token ID.
1888/// Must be between 1 and 96 characters.
1889#[derive(Debug, Clone)]
1890pub struct AccessTokenId(String);
1891
1892impl AsRef<str> for AccessTokenId {
1893    fn as_ref(&self) -> &str {
1894        &self.0
1895    }
1896}
1897
1898impl Deref for AccessTokenId {
1899    type Target = str;
1900
1901    fn deref(&self) -> &Self::Target {
1902        &self.0
1903    }
1904}
1905
1906impl TryFrom<String> for AccessTokenId {
1907    type Error = ConvertError;
1908
1909    fn try_from(name: String) -> Result<Self, Self::Error> {
1910        if name.is_empty() {
1911            return Err("Access token ID must not be empty".into());
1912        }
1913
1914        if name.len() > 96 {
1915            return Err("Access token ID must not exceed 96 characters".into());
1916        }
1917
1918        Ok(Self(name))
1919    }
1920}
1921
1922impl From<AccessTokenId> for String {
1923    fn from(value: AccessTokenId) -> Self {
1924        value.0
1925    }
1926}
1927
1928impl FromStr for AccessTokenId {
1929    type Err = ConvertError;
1930
1931    fn from_str(s: &str) -> Result<Self, Self::Err> {
1932        s.to_string().try_into()
1933    }
1934}
1935
1936impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1937    fn from(value: AccessTokenInfo) -> Self {
1938        Self {
1939            info: Some(value.into()),
1940        }
1941    }
1942}
1943
1944#[sync_docs]
1945#[derive(Debug, Clone)]
1946pub struct AccessTokenInfo {
1947    pub id: AccessTokenId,
1948    pub expires_at: Option<u32>,
1949    pub auto_prefix_streams: bool,
1950    pub scope: Option<AccessTokenScope>,
1951}
1952
1953impl AccessTokenInfo {
1954    /// Create a new access token info.
1955    pub fn new(id: AccessTokenId) -> Self {
1956        Self {
1957            id,
1958            expires_at: None,
1959            auto_prefix_streams: false,
1960            scope: None,
1961        }
1962    }
1963
1964    /// Overwrite expiration time.
1965    pub fn with_expires_at(self, expires_at: u32) -> Self {
1966        Self {
1967            expires_at: Some(expires_at),
1968            ..self
1969        }
1970    }
1971
1972    /// Overwrite auto prefix streams.
1973    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1974        Self {
1975            auto_prefix_streams,
1976            ..self
1977        }
1978    }
1979
1980    /// Overwrite scope.
1981    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1982        Self {
1983            scope: Some(scope),
1984            ..self
1985        }
1986    }
1987}
1988
1989impl From<AccessTokenInfo> for api::AccessTokenInfo {
1990    fn from(value: AccessTokenInfo) -> Self {
1991        let AccessTokenInfo {
1992            id,
1993            expires_at,
1994            auto_prefix_streams,
1995            scope,
1996        } = value;
1997        Self {
1998            id: id.into(),
1999            expires_at,
2000            auto_prefix_streams,
2001            scope: scope.map(Into::into),
2002        }
2003    }
2004}
2005
2006impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2007    type Error = ConvertError;
2008
2009    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2010        let api::AccessTokenInfo {
2011            id,
2012            expires_at,
2013            auto_prefix_streams,
2014            scope,
2015        } = value;
2016        Ok(Self {
2017            id: id.try_into()?,
2018            expires_at,
2019            auto_prefix_streams,
2020            scope: scope.map(Into::into),
2021        })
2022    }
2023}
2024
2025#[sync_docs]
2026#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2027pub enum Operation {
2028    ListBasins,
2029    CreateBasin,
2030    DeleteBasin,
2031    ReconfigureBasin,
2032    GetBasinConfig,
2033    IssueAccessToken,
2034    RevokeAccessToken,
2035    ListAccessTokens,
2036    ListStreams,
2037    CreateStream,
2038    DeleteStream,
2039    GetStreamConfig,
2040    ReconfigureStream,
2041    CheckTail,
2042    Append,
2043    Read,
2044    Trim,
2045    Fence,
2046}
2047
2048impl FromStr for Operation {
2049    type Err = ConvertError;
2050
2051    fn from_str(s: &str) -> Result<Self, Self::Err> {
2052        match s.to_lowercase().as_str() {
2053            "list-basins" => Ok(Self::ListBasins),
2054            "create-basin" => Ok(Self::CreateBasin),
2055            "delete-basin" => Ok(Self::DeleteBasin),
2056            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2057            "get-basin-config" => Ok(Self::GetBasinConfig),
2058            "issue-access-token" => Ok(Self::IssueAccessToken),
2059            "revoke-access-token" => Ok(Self::RevokeAccessToken),
2060            "list-access-tokens" => Ok(Self::ListAccessTokens),
2061            "list-streams" => Ok(Self::ListStreams),
2062            "create-stream" => Ok(Self::CreateStream),
2063            "delete-stream" => Ok(Self::DeleteStream),
2064            "get-stream-config" => Ok(Self::GetStreamConfig),
2065            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2066            "check-tail" => Ok(Self::CheckTail),
2067            "append" => Ok(Self::Append),
2068            "read" => Ok(Self::Read),
2069            "trim" => Ok(Self::Trim),
2070            "fence" => Ok(Self::Fence),
2071            _ => Err("invalid operation".into()),
2072        }
2073    }
2074}
2075
2076impl From<Operation> for api::Operation {
2077    fn from(value: Operation) -> Self {
2078        match value {
2079            Operation::ListBasins => Self::ListBasins,
2080            Operation::CreateBasin => Self::CreateBasin,
2081            Operation::DeleteBasin => Self::DeleteBasin,
2082            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2083            Operation::GetBasinConfig => Self::GetBasinConfig,
2084            Operation::IssueAccessToken => Self::IssueAccessToken,
2085            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2086            Operation::ListAccessTokens => Self::ListAccessTokens,
2087            Operation::ListStreams => Self::ListStreams,
2088            Operation::CreateStream => Self::CreateStream,
2089            Operation::DeleteStream => Self::DeleteStream,
2090            Operation::GetStreamConfig => Self::GetStreamConfig,
2091            Operation::ReconfigureStream => Self::ReconfigureStream,
2092            Operation::CheckTail => Self::CheckTail,
2093            Operation::Append => Self::Append,
2094            Operation::Read => Self::Read,
2095            Operation::Trim => Self::Trim,
2096            Operation::Fence => Self::Fence,
2097        }
2098    }
2099}
2100
2101impl From<api::Operation> for Option<Operation> {
2102    fn from(value: api::Operation) -> Self {
2103        match value {
2104            api::Operation::Unspecified => None,
2105            api::Operation::ListBasins => Some(Operation::ListBasins),
2106            api::Operation::CreateBasin => Some(Operation::CreateBasin),
2107            api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2108            api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2109            api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2110            api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2111            api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2112            api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2113            api::Operation::ListStreams => Some(Operation::ListStreams),
2114            api::Operation::CreateStream => Some(Operation::CreateStream),
2115            api::Operation::DeleteStream => Some(Operation::DeleteStream),
2116            api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2117            api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2118            api::Operation::CheckTail => Some(Operation::CheckTail),
2119            api::Operation::Append => Some(Operation::Append),
2120            api::Operation::Read => Some(Operation::Read),
2121            api::Operation::Trim => Some(Operation::Trim),
2122            api::Operation::Fence => Some(Operation::Fence),
2123        }
2124    }
2125}
2126
2127#[sync_docs]
2128#[derive(Debug, Clone, Default)]
2129pub struct AccessTokenScope {
2130    pub basins: Option<ResourceSet>,
2131    pub streams: Option<ResourceSet>,
2132    pub access_tokens: Option<ResourceSet>,
2133    pub op_groups: Option<PermittedOperationGroups>,
2134    pub ops: HashSet<Operation>,
2135}
2136
2137impl AccessTokenScope {
2138    /// Create a new access token scope.
2139    pub fn new() -> Self {
2140        Self::default()
2141    }
2142
2143    /// Overwrite resource set for access tokens.
2144    pub fn with_basins(self, basins: ResourceSet) -> Self {
2145        Self {
2146            basins: Some(basins),
2147            ..self
2148        }
2149    }
2150
2151    /// Overwrite resource set for streams.
2152    pub fn with_streams(self, streams: ResourceSet) -> Self {
2153        Self {
2154            streams: Some(streams),
2155            ..self
2156        }
2157    }
2158
2159    /// Overwrite resource set for access tokens.
2160    pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2161        Self {
2162            access_tokens: Some(access_tokens),
2163            ..self
2164        }
2165    }
2166
2167    /// Overwrite operation groups.
2168    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2169        Self {
2170            op_groups: Some(op_groups),
2171            ..self
2172        }
2173    }
2174
2175    /// Overwrite operations.
2176    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2177        Self {
2178            ops: ops.into_iter().collect(),
2179            ..self
2180        }
2181    }
2182
2183    /// Add an operation to operations.
2184    pub fn with_op(self, op: Operation) -> Self {
2185        let mut ops = self.ops;
2186        ops.insert(op);
2187        Self { ops, ..self }
2188    }
2189}
2190
2191impl From<AccessTokenScope> for api::AccessTokenScope {
2192    fn from(value: AccessTokenScope) -> Self {
2193        let AccessTokenScope {
2194            basins,
2195            streams,
2196            access_tokens,
2197            op_groups,
2198            ops,
2199        } = value;
2200        Self {
2201            basins: basins.map(Into::into),
2202            streams: streams.map(Into::into),
2203            access_tokens: access_tokens.map(Into::into),
2204            op_groups: op_groups.map(Into::into),
2205            ops: ops
2206                .into_iter()
2207                .map(api::Operation::from)
2208                .map(Into::into)
2209                .collect(),
2210        }
2211    }
2212}
2213
2214impl From<api::AccessTokenScope> for AccessTokenScope {
2215    fn from(value: api::AccessTokenScope) -> Self {
2216        let api::AccessTokenScope {
2217            basins,
2218            streams,
2219            access_tokens,
2220            op_groups,
2221            ops,
2222        } = value;
2223        Self {
2224            basins: basins.and_then(|set| set.matching.map(Into::into)),
2225            streams: streams.and_then(|set| set.matching.map(Into::into)),
2226            access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2227            op_groups: op_groups.map(Into::into),
2228            ops: ops
2229                .into_iter()
2230                .map(api::Operation::try_from)
2231                .flat_map(Result::ok)
2232                .flat_map(<Option<Operation>>::from)
2233                .collect(),
2234        }
2235    }
2236}
2237
2238impl From<ResourceSet> for api::ResourceSet {
2239    fn from(value: ResourceSet) -> Self {
2240        Self {
2241            matching: Some(value.into()),
2242        }
2243    }
2244}
2245
2246#[sync_docs(ResourceSet = "Matching")]
2247#[derive(Debug, Clone)]
2248pub enum ResourceSet {
2249    Exact(String),
2250    Prefix(String),
2251}
2252
2253impl From<ResourceSet> for api::resource_set::Matching {
2254    fn from(value: ResourceSet) -> Self {
2255        match value {
2256            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2257            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2258        }
2259    }
2260}
2261
2262impl From<api::resource_set::Matching> for ResourceSet {
2263    fn from(value: api::resource_set::Matching) -> Self {
2264        match value {
2265            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2266            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2267        }
2268    }
2269}
2270
2271#[sync_docs]
2272#[derive(Debug, Clone, Default)]
2273pub struct PermittedOperationGroups {
2274    pub account: Option<ReadWritePermissions>,
2275    pub basin: Option<ReadWritePermissions>,
2276    pub stream: Option<ReadWritePermissions>,
2277}
2278
2279impl PermittedOperationGroups {
2280    /// Create a new permitted operation groups.
2281    pub fn new() -> Self {
2282        Self::default()
2283    }
2284
2285    /// Overwrite account read-write permissions.
2286    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2287        Self {
2288            account: Some(account),
2289            ..self
2290        }
2291    }
2292
2293    /// Overwrite basin read-write permissions.
2294    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2295        Self {
2296            basin: Some(basin),
2297            ..self
2298        }
2299    }
2300
2301    /// Overwrite stream read-write permissions.
2302    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2303        Self {
2304            stream: Some(stream),
2305            ..self
2306        }
2307    }
2308}
2309
2310impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2311    fn from(value: PermittedOperationGroups) -> Self {
2312        let PermittedOperationGroups {
2313            account,
2314            basin,
2315            stream,
2316        } = value;
2317        Self {
2318            account: account.map(Into::into),
2319            basin: basin.map(Into::into),
2320            stream: stream.map(Into::into),
2321        }
2322    }
2323}
2324
2325impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2326    fn from(value: api::PermittedOperationGroups) -> Self {
2327        let api::PermittedOperationGroups {
2328            account,
2329            basin,
2330            stream,
2331        } = value;
2332        Self {
2333            account: account.map(Into::into),
2334            basin: basin.map(Into::into),
2335            stream: stream.map(Into::into),
2336        }
2337    }
2338}
2339
2340#[sync_docs]
2341#[derive(Debug, Clone, Default)]
2342pub struct ReadWritePermissions {
2343    pub read: bool,
2344    pub write: bool,
2345}
2346
2347impl ReadWritePermissions {
2348    /// Create a new read-write permission.
2349    pub fn new() -> Self {
2350        Self::default()
2351    }
2352
2353    /// Overwrite read permission.
2354    pub fn with_read(self, read: bool) -> Self {
2355        Self { read, ..self }
2356    }
2357
2358    /// Overwrite write permission.
2359    pub fn with_write(self, write: bool) -> Self {
2360        Self { write, ..self }
2361    }
2362}
2363
2364impl From<ReadWritePermissions> for api::ReadWritePermissions {
2365    fn from(value: ReadWritePermissions) -> Self {
2366        let ReadWritePermissions { read, write } = value;
2367        Self { read, write }
2368    }
2369}
2370
2371impl From<api::ReadWritePermissions> for ReadWritePermissions {
2372    fn from(value: api::ReadWritePermissions) -> Self {
2373        let api::ReadWritePermissions { read, write } = value;
2374        Self { read, write }
2375    }
2376}
2377
2378impl From<api::IssueAccessTokenResponse> for String {
2379    fn from(value: api::IssueAccessTokenResponse) -> Self {
2380        value.access_token
2381    }
2382}
2383
2384impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2385    fn from(value: AccessTokenId) -> Self {
2386        Self { id: value.into() }
2387    }
2388}
2389
2390impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2391    type Error = ConvertError;
2392    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2393        let token_info = value.info.ok_or("access token info is missing")?;
2394        token_info.try_into()
2395    }
2396}
2397
2398#[sync_docs]
2399#[derive(Debug, Clone, Default)]
2400pub struct ListAccessTokensRequest {
2401    pub prefix: String,
2402    pub start_after: String,
2403    pub limit: Option<usize>,
2404}
2405
2406impl ListAccessTokensRequest {
2407    /// Create a new request with prefix.
2408    pub fn new() -> Self {
2409        Self::default()
2410    }
2411
2412    /// Overwrite prefix.
2413    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2414        Self {
2415            prefix: prefix.into(),
2416            ..self
2417        }
2418    }
2419
2420    /// Overwrite start after.
2421    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2422        Self {
2423            start_after: start_after.into(),
2424            ..self
2425        }
2426    }
2427
2428    /// Overwrite limit.
2429    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2430        Self {
2431            limit: limit.into(),
2432            ..self
2433        }
2434    }
2435}
2436
2437impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2438    type Error = ConvertError;
2439    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2440        let ListAccessTokensRequest {
2441            prefix,
2442            start_after,
2443            limit,
2444        } = value;
2445        Ok(Self {
2446            prefix,
2447            start_after,
2448            limit: limit
2449                .map(TryInto::try_into)
2450                .transpose()
2451                .map_err(|_| "request limit does not fit into u64 bounds")?,
2452        })
2453    }
2454}
2455
2456#[sync_docs]
2457#[derive(Debug, Clone)]
2458pub struct ListAccessTokensResponse {
2459    pub access_tokens: Vec<AccessTokenInfo>,
2460    pub has_more: bool,
2461}
2462
2463impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2464    type Error = ConvertError;
2465    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2466        let api::ListAccessTokensResponse {
2467            access_tokens,
2468            has_more,
2469        } = value;
2470        let access_tokens = access_tokens
2471            .into_iter()
2472            .map(TryInto::try_into)
2473            .collect::<Result<Vec<_>, _>>()?;
2474        Ok(Self {
2475            access_tokens,
2476            has_more,
2477        })
2478    }
2479}