s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
14
15/// Error related to conversion from one type to another.
16#[derive(Debug, Clone, thiserror::Error)]
17#[error("{0}")]
18pub struct ConvertError(String);
19
20impl<T: Into<String>> From<T> for ConvertError {
21    fn from(value: T) -> Self {
22        Self(value.into())
23    }
24}
25
26/// Metered size of the object in bytes.
27///
28/// Bytes are calculated using the "metered bytes" formula:
29///
30/// ```python
31/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
32/// ```
33pub trait MeteredBytes {
34    /// Return the metered bytes of the object.
35    fn metered_bytes(&self) -> u64;
36}
37
38impl<T: MeteredBytes> MeteredBytes for Vec<T> {
39    fn metered_bytes(&self) -> u64 {
40        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
41    }
42}
43
44macro_rules! metered_impl {
45    ($ty:ty) => {
46        impl MeteredBytes for $ty {
47            fn metered_bytes(&self) -> u64 {
48                let bytes = 8
49                    + (2 * self.headers.len())
50                    + self
51                        .headers
52                        .iter()
53                        .map(|h| h.name.len() + h.value.len())
54                        .sum::<usize>()
55                    + self.body.len();
56                bytes as u64
57            }
58        }
59    };
60}
61
62#[sync_docs]
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum BasinScope {
65    AwsUsEast1,
66}
67
68impl From<BasinScope> for api::BasinScope {
69    fn from(value: BasinScope) -> Self {
70        match value {
71            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
72        }
73    }
74}
75
76impl From<api::BasinScope> for Option<BasinScope> {
77    fn from(value: api::BasinScope) -> Self {
78        match value {
79            api::BasinScope::Unspecified => None,
80            api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
81        }
82    }
83}
84
85impl FromStr for BasinScope {
86    type Err = ConvertError;
87    fn from_str(value: &str) -> Result<Self, Self::Err> {
88        match value {
89            "aws:us-east-1" => Ok(Self::AwsUsEast1),
90            _ => Err("invalid basin scope value".into()),
91        }
92    }
93}
94
95#[sync_docs]
96#[derive(Debug, Clone)]
97pub struct CreateBasinRequest {
98    pub basin: BasinName,
99    pub config: Option<BasinConfig>,
100    pub scope: Option<BasinScope>,
101}
102
103impl CreateBasinRequest {
104    /// Create a new request with basin name.
105    pub fn new(basin: BasinName) -> Self {
106        Self {
107            basin,
108            config: None,
109            scope: None,
110        }
111    }
112
113    /// Overwrite basin configuration.
114    pub fn with_config(self, config: BasinConfig) -> Self {
115        Self {
116            config: Some(config),
117            ..self
118        }
119    }
120
121    /// Overwrite basin scope.
122    pub fn with_scope(self, scope: BasinScope) -> Self {
123        Self {
124            scope: Some(scope),
125            ..self
126        }
127    }
128}
129
130impl From<CreateBasinRequest> for api::CreateBasinRequest {
131    fn from(value: CreateBasinRequest) -> Self {
132        let CreateBasinRequest {
133            basin,
134            config,
135            scope,
136        } = value;
137        Self {
138            basin: basin.0,
139            config: config.map(Into::into),
140            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
141        }
142    }
143}
144
145#[sync_docs]
146#[derive(Debug, Clone, Default)]
147pub struct BasinConfig {
148    pub default_stream_config: Option<StreamConfig>,
149    pub create_stream_on_append: bool,
150    pub create_stream_on_read: bool,
151}
152
153impl BasinConfig {
154    /// Create a new basin config.
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Overwrite the default stream config.
160    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
161        Self {
162            default_stream_config: Some(default_stream_config),
163            ..self
164        }
165    }
166
167    /// Overwrite `create_stream_on_append`.
168    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
169        Self {
170            create_stream_on_append,
171            ..self
172        }
173    }
174
175    /// Overwrite `create_stream_on_read`.
176    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
177        Self {
178            create_stream_on_read,
179            ..self
180        }
181    }
182}
183
184impl From<BasinConfig> for api::BasinConfig {
185    fn from(value: BasinConfig) -> Self {
186        let BasinConfig {
187            default_stream_config,
188            create_stream_on_append,
189            create_stream_on_read,
190        } = value;
191        Self {
192            default_stream_config: default_stream_config.map(Into::into),
193            create_stream_on_append,
194            create_stream_on_read,
195        }
196    }
197}
198
199impl From<api::BasinConfig> for BasinConfig {
200    fn from(value: api::BasinConfig) -> Self {
201        let api::BasinConfig {
202            default_stream_config,
203            create_stream_on_append,
204            create_stream_on_read,
205        } = value;
206        Self {
207            default_stream_config: default_stream_config.map(Into::into),
208            create_stream_on_append,
209            create_stream_on_read,
210        }
211    }
212}
213
214#[sync_docs]
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum TimestampingMode {
217    ClientPrefer,
218    ClientRequire,
219    Arrival,
220}
221
222impl From<TimestampingMode> for api::TimestampingMode {
223    fn from(value: TimestampingMode) -> Self {
224        match value {
225            TimestampingMode::ClientPrefer => Self::ClientPrefer,
226            TimestampingMode::ClientRequire => Self::ClientRequire,
227            TimestampingMode::Arrival => Self::Arrival,
228        }
229    }
230}
231
232impl From<api::TimestampingMode> for Option<TimestampingMode> {
233    fn from(value: api::TimestampingMode) -> Self {
234        match value {
235            api::TimestampingMode::Unspecified => None,
236            api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
237            api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
238            api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
239        }
240    }
241}
242
243#[sync_docs(TimestampingConfig = "Timestamping")]
244#[derive(Debug, Clone, Default)]
245/// Timestamping behavior.
246pub struct TimestampingConfig {
247    pub mode: Option<TimestampingMode>,
248    pub uncapped: Option<bool>,
249}
250
251impl TimestampingConfig {
252    /// Create a new timestamping config.
253    pub fn new() -> Self {
254        Self::default()
255    }
256
257    /// Overwrite timestamping mode.
258    pub fn with_mode(self, mode: TimestampingMode) -> Self {
259        Self {
260            mode: Some(mode),
261            ..self
262        }
263    }
264
265    /// Overwrite the uncapped knob.
266    pub fn with_uncapped(self, uncapped: bool) -> Self {
267        Self {
268            uncapped: Some(uncapped),
269            ..self
270        }
271    }
272}
273
274impl From<TimestampingConfig> for api::stream_config::Timestamping {
275    fn from(value: TimestampingConfig) -> Self {
276        Self {
277            mode: value
278                .mode
279                .map(api::TimestampingMode::from)
280                .unwrap_or_default()
281                .into(),
282            uncapped: value.uncapped,
283        }
284    }
285}
286
287impl From<api::stream_config::Timestamping> for TimestampingConfig {
288    fn from(value: api::stream_config::Timestamping) -> Self {
289        let mode = value.mode().into();
290        let uncapped = value.uncapped;
291        Self { mode, uncapped }
292    }
293}
294
295#[sync_docs]
296#[derive(Debug, Clone, Default)]
297pub struct StreamConfig {
298    pub storage_class: Option<StorageClass>,
299    pub retention_policy: Option<RetentionPolicy>,
300    pub timestamping: Option<TimestampingConfig>,
301}
302
303impl StreamConfig {
304    /// Create a new stream config.
305    pub fn new() -> Self {
306        Self::default()
307    }
308
309    /// Overwrite storage class.
310    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
311        Self {
312            storage_class: Some(storage_class),
313            ..self
314        }
315    }
316
317    /// Overwrite retention policy.
318    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
319        Self {
320            retention_policy: Some(retention_policy),
321            ..self
322        }
323    }
324
325    /// Overwrite timestamping config.
326    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
327        Self {
328            timestamping: Some(timestamping),
329            ..self
330        }
331    }
332}
333
334impl From<StreamConfig> for api::StreamConfig {
335    fn from(value: StreamConfig) -> Self {
336        let StreamConfig {
337            storage_class,
338            retention_policy,
339            timestamping,
340        } = value;
341        Self {
342            storage_class: storage_class
343                .map(api::StorageClass::from)
344                .unwrap_or_default()
345                .into(),
346            retention_policy: retention_policy.map(Into::into),
347            timestamping: timestamping.map(Into::into),
348        }
349    }
350}
351
352impl From<api::StreamConfig> for StreamConfig {
353    fn from(value: api::StreamConfig) -> Self {
354        Self {
355            storage_class: value.storage_class().into(),
356            retention_policy: value.retention_policy.map(Into::into),
357            timestamping: value.timestamping.map(Into::into),
358        }
359    }
360}
361
362#[sync_docs]
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364pub enum StorageClass {
365    Standard,
366    Express,
367}
368
369impl From<StorageClass> for api::StorageClass {
370    fn from(value: StorageClass) -> Self {
371        match value {
372            StorageClass::Standard => Self::Standard,
373            StorageClass::Express => Self::Express,
374        }
375    }
376}
377
378impl From<api::StorageClass> for Option<StorageClass> {
379    fn from(value: api::StorageClass) -> Self {
380        match value {
381            api::StorageClass::Unspecified => None,
382            api::StorageClass::Standard => Some(StorageClass::Standard),
383            api::StorageClass::Express => Some(StorageClass::Express),
384        }
385    }
386}
387
388impl FromStr for StorageClass {
389    type Err = ConvertError;
390
391    fn from_str(value: &str) -> Result<Self, Self::Err> {
392        match value {
393            "standard" => Ok(Self::Standard),
394            "express" => Ok(Self::Express),
395            v => Err(format!("unknown storage class: {v}").into()),
396        }
397    }
398}
399
400#[sync_docs(Age = "Age")]
401#[derive(Debug, Clone)]
402pub enum RetentionPolicy {
403    Age(Duration),
404}
405
406impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
407    fn from(value: RetentionPolicy) -> Self {
408        match value {
409            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
410        }
411    }
412}
413
414impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
415    fn from(value: api::stream_config::RetentionPolicy) -> Self {
416        match value {
417            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
418        }
419    }
420}
421
422#[sync_docs]
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum BasinState {
425    Active,
426    Creating,
427    Deleting,
428}
429
430impl From<BasinState> for api::BasinState {
431    fn from(value: BasinState) -> Self {
432        match value {
433            BasinState::Active => Self::Active,
434            BasinState::Creating => Self::Creating,
435            BasinState::Deleting => Self::Deleting,
436        }
437    }
438}
439
440impl From<api::BasinState> for Option<BasinState> {
441    fn from(value: api::BasinState) -> Self {
442        match value {
443            api::BasinState::Unspecified => None,
444            api::BasinState::Active => Some(BasinState::Active),
445            api::BasinState::Creating => Some(BasinState::Creating),
446            api::BasinState::Deleting => Some(BasinState::Deleting),
447        }
448    }
449}
450
451impl std::fmt::Display for BasinState {
452    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453        match self {
454            BasinState::Active => write!(f, "active"),
455            BasinState::Creating => write!(f, "creating"),
456            BasinState::Deleting => write!(f, "deleting"),
457        }
458    }
459}
460
461#[sync_docs]
462#[derive(Debug, Clone)]
463pub struct BasinInfo {
464    pub name: String,
465    pub scope: Option<BasinScope>,
466    pub state: Option<BasinState>,
467}
468
469impl From<BasinInfo> for api::BasinInfo {
470    fn from(value: BasinInfo) -> Self {
471        let BasinInfo { name, scope, state } = value;
472        Self {
473            name,
474            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
475            state: state.map(api::BasinState::from).unwrap_or_default().into(),
476        }
477    }
478}
479
480impl From<api::BasinInfo> for BasinInfo {
481    fn from(value: api::BasinInfo) -> Self {
482        let scope = value.scope().into();
483        let state = value.state().into();
484        let name = value.name;
485        Self { name, scope, state }
486    }
487}
488
489impl TryFrom<api::CreateBasinResponse> for BasinInfo {
490    type Error = ConvertError;
491    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
492        let api::CreateBasinResponse { info } = value;
493        let info = info.ok_or("missing basin info")?;
494        Ok(info.into())
495    }
496}
497
498#[sync_docs]
499#[derive(Debug, Clone, Default)]
500pub struct ListStreamsRequest {
501    pub prefix: String,
502    pub start_after: String,
503    pub limit: Option<usize>,
504}
505
506impl ListStreamsRequest {
507    /// Create a new request.
508    pub fn new() -> Self {
509        Self::default()
510    }
511
512    /// Overwrite prefix.
513    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
514        Self {
515            prefix: prefix.into(),
516            ..self
517        }
518    }
519
520    /// Overwrite start after.
521    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
522        Self {
523            start_after: start_after.into(),
524            ..self
525        }
526    }
527
528    /// Overwrite limit.
529    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
530        Self {
531            limit: limit.into(),
532            ..self
533        }
534    }
535}
536
537impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
538    type Error = ConvertError;
539    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
540        let ListStreamsRequest {
541            prefix,
542            start_after,
543            limit,
544        } = value;
545        Ok(Self {
546            prefix,
547            start_after,
548            limit: limit.map(|n| n as u64),
549        })
550    }
551}
552
553#[sync_docs]
554#[derive(Debug, Clone)]
555pub struct StreamInfo {
556    pub name: String,
557    pub created_at: u32,
558    pub deleted_at: Option<u32>,
559}
560
561impl From<api::StreamInfo> for StreamInfo {
562    fn from(value: api::StreamInfo) -> Self {
563        Self {
564            name: value.name,
565            created_at: value.created_at,
566            deleted_at: value.deleted_at,
567        }
568    }
569}
570
571impl TryFrom<api::CreateStreamResponse> for StreamInfo {
572    type Error = ConvertError;
573
574    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
575        let api::CreateStreamResponse { info } = value;
576        let info = info.ok_or("missing stream info")?;
577        Ok(info.into())
578    }
579}
580
581#[sync_docs]
582#[derive(Debug, Clone)]
583pub struct ListStreamsResponse {
584    pub streams: Vec<StreamInfo>,
585    pub has_more: bool,
586}
587
588impl From<api::ListStreamsResponse> for ListStreamsResponse {
589    fn from(value: api::ListStreamsResponse) -> Self {
590        let api::ListStreamsResponse { streams, has_more } = value;
591        let streams = streams.into_iter().map(Into::into).collect();
592        Self { streams, has_more }
593    }
594}
595
596impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
597    type Error = ConvertError;
598
599    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
600        let api::GetBasinConfigResponse { config } = value;
601        let config = config.ok_or("missing basin config")?;
602        Ok(config.into())
603    }
604}
605
606impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
607    type Error = ConvertError;
608
609    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
610        let api::GetStreamConfigResponse { config } = value;
611        let config = config.ok_or("missing stream config")?;
612        Ok(config.into())
613    }
614}
615
616#[sync_docs]
617#[derive(Debug, Clone)]
618pub struct CreateStreamRequest {
619    pub stream: String,
620    pub config: Option<StreamConfig>,
621}
622
623impl CreateStreamRequest {
624    /// Create a new request with stream name.
625    pub fn new(stream: impl Into<String>) -> Self {
626        Self {
627            stream: stream.into(),
628            config: None,
629        }
630    }
631
632    /// Overwrite stream config.
633    pub fn with_config(self, config: StreamConfig) -> Self {
634        Self {
635            config: Some(config),
636            ..self
637        }
638    }
639}
640
641impl From<CreateStreamRequest> for api::CreateStreamRequest {
642    fn from(value: CreateStreamRequest) -> Self {
643        let CreateStreamRequest { stream, config } = value;
644        Self {
645            stream,
646            config: config.map(Into::into),
647        }
648    }
649}
650
651#[sync_docs]
652#[derive(Debug, Clone, Default)]
653pub struct ListBasinsRequest {
654    pub prefix: String,
655    pub start_after: String,
656    pub limit: Option<usize>,
657}
658
659impl ListBasinsRequest {
660    /// Create a new request.
661    pub fn new() -> Self {
662        Self::default()
663    }
664
665    /// Overwrite prefix.
666    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
667        Self {
668            prefix: prefix.into(),
669            ..self
670        }
671    }
672
673    /// Overwrite start after.
674    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
675        Self {
676            start_after: start_after.into(),
677            ..self
678        }
679    }
680
681    /// Overwrite limit.
682    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
683        Self {
684            limit: limit.into(),
685            ..self
686        }
687    }
688}
689
690impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
691    type Error = ConvertError;
692    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
693        let ListBasinsRequest {
694            prefix,
695            start_after,
696            limit,
697        } = value;
698        Ok(Self {
699            prefix,
700            start_after,
701            limit: limit
702                .map(TryInto::try_into)
703                .transpose()
704                .map_err(|_| "request limit does not fit into u64 bounds")?,
705        })
706    }
707}
708
709#[sync_docs]
710#[derive(Debug, Clone)]
711pub struct ListBasinsResponse {
712    pub basins: Vec<BasinInfo>,
713    pub has_more: bool,
714}
715
716impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
717    type Error = ConvertError;
718    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
719        let api::ListBasinsResponse { basins, has_more } = value;
720        Ok(Self {
721            basins: basins.into_iter().map(Into::into).collect(),
722            has_more,
723        })
724    }
725}
726
727#[sync_docs]
728#[derive(Debug, Clone)]
729pub struct DeleteBasinRequest {
730    pub basin: BasinName,
731    /// Delete basin if it exists else do nothing.
732    pub if_exists: bool,
733}
734
735impl DeleteBasinRequest {
736    /// Create a new request.
737    pub fn new(basin: BasinName) -> Self {
738        Self {
739            basin,
740            if_exists: false,
741        }
742    }
743
744    /// Overwrite the if exists parameter.
745    pub fn with_if_exists(self, if_exists: bool) -> Self {
746        Self { if_exists, ..self }
747    }
748}
749
750impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
751    fn from(value: DeleteBasinRequest) -> Self {
752        let DeleteBasinRequest { basin, .. } = value;
753        Self { basin: basin.0 }
754    }
755}
756
757#[sync_docs]
758#[derive(Debug, Clone)]
759pub struct DeleteStreamRequest {
760    pub stream: String,
761    /// Delete stream if it exists else do nothing.
762    pub if_exists: bool,
763}
764
765impl DeleteStreamRequest {
766    /// Create a new request.
767    pub fn new(stream: impl Into<String>) -> Self {
768        Self {
769            stream: stream.into(),
770            if_exists: false,
771        }
772    }
773
774    /// Overwrite the if exists parameter.
775    pub fn with_if_exists(self, if_exists: bool) -> Self {
776        Self { if_exists, ..self }
777    }
778}
779
780impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
781    fn from(value: DeleteStreamRequest) -> Self {
782        let DeleteStreamRequest { stream, .. } = value;
783        Self { stream }
784    }
785}
786
787#[sync_docs]
788#[derive(Debug, Clone)]
789pub struct ReconfigureBasinRequest {
790    pub basin: BasinName,
791    pub config: Option<BasinConfig>,
792    pub mask: Option<Vec<String>>,
793}
794
795impl ReconfigureBasinRequest {
796    /// Create a new request with basin name.
797    pub fn new(basin: BasinName) -> Self {
798        Self {
799            basin,
800            config: None,
801            mask: None,
802        }
803    }
804
805    /// Overwrite basin config.
806    pub fn with_config(self, config: BasinConfig) -> Self {
807        Self {
808            config: Some(config),
809            ..self
810        }
811    }
812
813    /// Overwrite field mask.
814    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
815        Self {
816            mask: Some(mask.into()),
817            ..self
818        }
819    }
820}
821
822impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
823    fn from(value: ReconfigureBasinRequest) -> Self {
824        let ReconfigureBasinRequest {
825            basin,
826            config,
827            mask,
828        } = value;
829        Self {
830            basin: basin.0,
831            config: config.map(Into::into),
832            mask: mask.map(|paths| prost_types::FieldMask { paths }),
833        }
834    }
835}
836
837impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
838    type Error = ConvertError;
839    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
840        let api::ReconfigureBasinResponse { config } = value;
841        let config = config.ok_or("missing basin config")?;
842        Ok(config.into())
843    }
844}
845
846#[sync_docs]
847#[derive(Debug, Clone)]
848pub struct ReconfigureStreamRequest {
849    pub stream: String,
850    pub config: Option<StreamConfig>,
851    pub mask: Option<Vec<String>>,
852}
853
854impl ReconfigureStreamRequest {
855    /// Create a new request with stream name.
856    pub fn new(stream: impl Into<String>) -> Self {
857        Self {
858            stream: stream.into(),
859            config: None,
860            mask: None,
861        }
862    }
863
864    /// Overwrite stream config.
865    pub fn with_config(self, config: StreamConfig) -> Self {
866        Self {
867            config: Some(config),
868            ..self
869        }
870    }
871
872    /// Overwrite field mask.
873    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
874        Self {
875            mask: Some(mask.into()),
876            ..self
877        }
878    }
879}
880
881impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
882    fn from(value: ReconfigureStreamRequest) -> Self {
883        let ReconfigureStreamRequest {
884            stream,
885            config,
886            mask,
887        } = value;
888        Self {
889            stream,
890            config: config.map(Into::into),
891            mask: mask.map(|paths| prost_types::FieldMask { paths }),
892        }
893    }
894}
895
896impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
897    type Error = ConvertError;
898    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
899        let api::ReconfigureStreamResponse { config } = value;
900        let config = config.ok_or("missing stream config")?;
901        Ok(config.into())
902    }
903}
904
905impl From<api::CheckTailResponse> for StreamPosition {
906    fn from(value: api::CheckTailResponse) -> Self {
907        let api::CheckTailResponse {
908            next_seq_num,
909            last_timestamp,
910        } = value;
911        StreamPosition {
912            seq_num: next_seq_num,
913            timestamp: last_timestamp,
914        }
915    }
916}
917
918/// Position of a record in a stream.
919#[derive(Debug, Clone, PartialEq, Eq)]
920pub struct StreamPosition {
921    /// Sequence number assigned by the service.
922    pub seq_num: u64,
923    /// Timestamp, which may be user-specified or assigned by the service.
924    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
925    pub timestamp: u64,
926}
927
928#[sync_docs]
929#[derive(Debug, Clone, PartialEq, Eq)]
930pub struct Header {
931    pub name: Bytes,
932    pub value: Bytes,
933}
934
935impl Header {
936    /// Create a new header from name and value.
937    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
938        Self {
939            name: name.into(),
940            value: value.into(),
941        }
942    }
943
944    /// Create a new header from value.
945    pub fn from_value(value: impl Into<Bytes>) -> Self {
946        Self {
947            name: Bytes::new(),
948            value: value.into(),
949        }
950    }
951}
952
953impl From<Header> for api::Header {
954    fn from(value: Header) -> Self {
955        let Header { name, value } = value;
956        Self { name, value }
957    }
958}
959
960impl From<api::Header> for Header {
961    fn from(value: api::Header) -> Self {
962        let api::Header { name, value } = value;
963        Self { name, value }
964    }
965}
966
967/// A fencing token can be enforced on append requests.
968///
969/// Must not be more than 16 bytes.
970#[derive(Debug, Clone, Default, PartialEq, Eq)]
971pub struct FencingToken(Bytes);
972
973impl FencingToken {
974    const MAX_BYTES: usize = 16;
975
976    /// Try creating a new fencing token from bytes.
977    pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
978        let bytes = bytes.into();
979        if bytes.len() > Self::MAX_BYTES {
980            Err(format!(
981                "Size of a fencing token cannot exceed {} bytes",
982                Self::MAX_BYTES
983            )
984            .into())
985        } else {
986            Ok(Self(bytes))
987        }
988    }
989
990    /// Generate a random fencing token with `n` bytes.
991    pub fn generate(n: usize) -> Result<Self, ConvertError> {
992        Self::new(
993            rand::rng()
994                .sample_iter(rand::distr::StandardUniform)
995                .take(n)
996                .collect::<Bytes>(),
997        )
998    }
999}
1000
1001impl AsRef<Bytes> for FencingToken {
1002    fn as_ref(&self) -> &Bytes {
1003        &self.0
1004    }
1005}
1006
1007impl AsRef<[u8]> for FencingToken {
1008    fn as_ref(&self) -> &[u8] {
1009        &self.0
1010    }
1011}
1012
1013impl Deref for FencingToken {
1014    type Target = [u8];
1015
1016    fn deref(&self) -> &Self::Target {
1017        &self.0
1018    }
1019}
1020
1021impl From<FencingToken> for Bytes {
1022    fn from(value: FencingToken) -> Self {
1023        value.0
1024    }
1025}
1026
1027impl From<FencingToken> for Vec<u8> {
1028    fn from(value: FencingToken) -> Self {
1029        value.0.into()
1030    }
1031}
1032
1033impl TryFrom<Bytes> for FencingToken {
1034    type Error = ConvertError;
1035
1036    fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1037        Self::new(value)
1038    }
1039}
1040
1041impl TryFrom<Vec<u8>> for FencingToken {
1042    type Error = ConvertError;
1043
1044    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
1045        Self::new(value)
1046    }
1047}
1048
1049/// Command to send through a `CommandRecord`.
1050#[derive(Debug, Clone)]
1051pub enum Command {
1052    /// Enforce a fencing token.
1053    ///
1054    /// Fencing is strongly consistent, and subsequent appends that specify a
1055    /// fencing token will be rejected if it does not match.
1056    Fence {
1057        /// Fencing token to enforce.
1058        ///
1059        /// Set empty to clear the token.
1060        fencing_token: FencingToken,
1061    },
1062    /// Request a trim till the sequence number.
1063    ///
1064    /// Trimming is eventually consistent, and trimmed records may be visible
1065    /// for a brief period
1066    Trim {
1067        /// Trim point.
1068        ///
1069        /// This sequence number is only allowed to advance, and any regression
1070        /// will be ignored.
1071        seq_num: u64,
1072    },
1073}
1074
1075/// A command record is a special kind of [`AppendRecord`] that can be used to
1076/// send command messages.
1077///
1078/// Such a record is signalled by a sole header with empty name. The header
1079/// value represents the operation and record body acts as the payload.
1080#[derive(Debug, Clone)]
1081pub struct CommandRecord {
1082    /// Command kind.
1083    pub command: Command,
1084    /// Timestamp for the record.
1085    pub timestamp: Option<u64>,
1086}
1087
1088impl CommandRecord {
1089    const FENCE: &[u8] = b"fence";
1090    const TRIM: &[u8] = b"trim";
1091
1092    /// Create a new fence command record.
1093    pub fn fence(fencing_token: FencingToken) -> Self {
1094        Self {
1095            command: Command::Fence { fencing_token },
1096            timestamp: None,
1097        }
1098    }
1099
1100    /// Create a new trim command record.
1101    pub fn trim(seq_num: impl Into<u64>) -> Self {
1102        Self {
1103            command: Command::Trim {
1104                seq_num: seq_num.into(),
1105            },
1106            timestamp: None,
1107        }
1108    }
1109
1110    /// Overwrite timestamp.
1111    pub fn with_timestamp(self, timestamp: u64) -> Self {
1112        Self {
1113            timestamp: Some(timestamp),
1114            ..self
1115        }
1116    }
1117}
1118
1119#[sync_docs]
1120#[derive(Debug, Clone, PartialEq, Eq)]
1121pub struct AppendRecord {
1122    timestamp: Option<u64>,
1123    headers: Vec<Header>,
1124    body: Bytes,
1125    #[cfg(test)]
1126    max_bytes: u64,
1127}
1128
1129metered_impl!(AppendRecord);
1130
1131impl AppendRecord {
1132    const MAX_BYTES: u64 = MIB_BYTES;
1133
1134    fn validated(self) -> Result<Self, ConvertError> {
1135        #[cfg(test)]
1136        let max_bytes = self.max_bytes;
1137        #[cfg(not(test))]
1138        let max_bytes = Self::MAX_BYTES;
1139
1140        if self.metered_bytes() > max_bytes {
1141            Err("AppendRecord should have metered size less than 1 MiB".into())
1142        } else {
1143            Ok(self)
1144        }
1145    }
1146
1147    /// Try creating a new append record with body.
1148    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1149        Self {
1150            timestamp: None,
1151            headers: Vec::new(),
1152            body: body.into(),
1153            #[cfg(test)]
1154            max_bytes: Self::MAX_BYTES,
1155        }
1156        .validated()
1157    }
1158
1159    #[cfg(test)]
1160    pub(crate) fn with_max_bytes(
1161        max_bytes: u64,
1162        body: impl Into<Bytes>,
1163    ) -> Result<Self, ConvertError> {
1164        Self {
1165            timestamp: None,
1166            headers: Vec::new(),
1167            body: body.into(),
1168            max_bytes,
1169        }
1170        .validated()
1171    }
1172
1173    /// Overwrite headers.
1174    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1175        Self {
1176            headers: headers.into(),
1177            ..self
1178        }
1179        .validated()
1180    }
1181
1182    /// Overwrite timestamp.
1183    pub fn with_timestamp(self, timestamp: u64) -> Self {
1184        Self {
1185            timestamp: Some(timestamp),
1186            ..self
1187        }
1188    }
1189
1190    /// Body of the record.
1191    pub fn body(&self) -> &[u8] {
1192        &self.body
1193    }
1194
1195    /// Headers of the record.
1196    pub fn headers(&self) -> &[Header] {
1197        &self.headers
1198    }
1199
1200    /// Timestamp for the record.
1201    pub fn timestamp(&self) -> Option<u64> {
1202        self.timestamp
1203    }
1204
1205    /// Consume the record and return parts.
1206    pub fn into_parts(self) -> AppendRecordParts {
1207        AppendRecordParts {
1208            timestamp: self.timestamp,
1209            headers: self.headers,
1210            body: self.body,
1211        }
1212    }
1213
1214    /// Try creating the record from parts.
1215    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1216        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1217        if let Some(timestamp) = parts.timestamp {
1218            Ok(record.with_timestamp(timestamp))
1219        } else {
1220            Ok(record)
1221        }
1222    }
1223}
1224
1225impl From<AppendRecord> for api::AppendRecord {
1226    fn from(value: AppendRecord) -> Self {
1227        Self {
1228            timestamp: value.timestamp,
1229            headers: value.headers.into_iter().map(Into::into).collect(),
1230            body: value.body,
1231        }
1232    }
1233}
1234
1235impl From<CommandRecord> for AppendRecord {
1236    fn from(value: CommandRecord) -> Self {
1237        let (header_value, body) = match value.command {
1238            Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1239            Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1240        };
1241        AppendRecordParts {
1242            timestamp: value.timestamp,
1243            headers: vec![Header::from_value(header_value)],
1244            body: body.into(),
1245        }
1246        .try_into()
1247        .expect("command record is a valid append record")
1248    }
1249}
1250
1251#[sync_docs(AppendRecordParts = "AppendRecord")]
1252#[derive(Debug, Clone)]
1253pub struct AppendRecordParts {
1254    pub timestamp: Option<u64>,
1255    pub headers: Vec<Header>,
1256    pub body: Bytes,
1257}
1258
1259impl From<AppendRecord> for AppendRecordParts {
1260    fn from(value: AppendRecord) -> Self {
1261        value.into_parts()
1262    }
1263}
1264
1265impl TryFrom<AppendRecordParts> for AppendRecord {
1266    type Error = ConvertError;
1267
1268    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1269        Self::try_from_parts(value)
1270    }
1271}
1272
1273/// A collection of append records that can be sent together in a batch.
1274#[derive(Debug, Clone)]
1275pub struct AppendRecordBatch {
1276    records: Vec<AppendRecord>,
1277    metered_bytes: u64,
1278    max_capacity: usize,
1279    #[cfg(test)]
1280    max_bytes: u64,
1281}
1282
1283impl PartialEq for AppendRecordBatch {
1284    fn eq(&self, other: &Self) -> bool {
1285        if self.records.eq(&other.records) {
1286            assert_eq!(self.metered_bytes, other.metered_bytes);
1287            true
1288        } else {
1289            false
1290        }
1291    }
1292}
1293
1294impl Eq for AppendRecordBatch {}
1295
1296impl Default for AppendRecordBatch {
1297    fn default() -> Self {
1298        Self::new()
1299    }
1300}
1301
1302impl AppendRecordBatch {
1303    /// Maximum number of records that a batch can hold.
1304    ///
1305    /// A record batch cannot be created with a bigger capacity.
1306    pub const MAX_CAPACITY: usize = 1000;
1307
1308    /// Maximum metered bytes of the batch.
1309    pub const MAX_BYTES: u64 = MIB_BYTES;
1310
1311    /// Create an empty record batch.
1312    pub fn new() -> Self {
1313        Self::with_max_capacity(Self::MAX_CAPACITY)
1314    }
1315
1316    /// Create an empty record batch with custom max capacity.
1317    ///
1318    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1319    pub fn with_max_capacity(max_capacity: usize) -> Self {
1320        assert!(
1321            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1322            "Batch capacity must be between 1 and 1000"
1323        );
1324
1325        Self {
1326            records: Vec::with_capacity(max_capacity),
1327            metered_bytes: 0,
1328            max_capacity,
1329            #[cfg(test)]
1330            max_bytes: Self::MAX_BYTES,
1331        }
1332    }
1333
1334    #[cfg(test)]
1335    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1336        #[cfg(test)]
1337        assert!(
1338            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1339            "Batch size must be between 1 byte and 1 MiB"
1340        );
1341
1342        Self {
1343            max_bytes,
1344            ..Self::with_max_capacity(max_capacity)
1345        }
1346    }
1347
1348    /// Try creating a record batch from an iterator.
1349    ///
1350    /// If all the items of the iterator cannot be drained into the batch, the
1351    /// error returned contains a batch containing all records it could fit
1352    /// along-with the left over items from the iterator.
1353    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1354    where
1355        R: Into<AppendRecord>,
1356        T: IntoIterator<Item = R>,
1357    {
1358        let mut records = Self::new();
1359        let mut pending = Vec::new();
1360
1361        let mut iter = iter.into_iter();
1362
1363        for record in iter.by_ref() {
1364            if let Err(record) = records.push(record) {
1365                pending.push(record);
1366                break;
1367            }
1368        }
1369
1370        if pending.is_empty() {
1371            Ok(records)
1372        } else {
1373            pending.extend(iter.map(Into::into));
1374            Err((records, pending))
1375        }
1376    }
1377
1378    /// Returns true if the batch contains no records.
1379    pub fn is_empty(&self) -> bool {
1380        if self.records.is_empty() {
1381            assert_eq!(self.metered_bytes, 0);
1382            true
1383        } else {
1384            false
1385        }
1386    }
1387
1388    /// Returns the number of records contained in the batch.
1389    pub fn len(&self) -> usize {
1390        self.records.len()
1391    }
1392
1393    #[cfg(test)]
1394    fn max_bytes(&self) -> u64 {
1395        self.max_bytes
1396    }
1397
1398    #[cfg(not(test))]
1399    fn max_bytes(&self) -> u64 {
1400        Self::MAX_BYTES
1401    }
1402
1403    /// Returns true if the batch cannot fit any more records.
1404    pub fn is_full(&self) -> bool {
1405        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1406    }
1407
1408    /// Try to append a new record into the batch.
1409    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1410        assert!(self.records.len() <= self.max_capacity);
1411        assert!(self.metered_bytes <= self.max_bytes());
1412
1413        let record = record.into();
1414        let record_size = record.metered_bytes();
1415        if self.records.len() >= self.max_capacity
1416            || self.metered_bytes + record_size > self.max_bytes()
1417        {
1418            Err(record)
1419        } else {
1420            self.records.push(record);
1421            self.metered_bytes += record_size;
1422            Ok(())
1423        }
1424    }
1425}
1426
1427impl MeteredBytes for AppendRecordBatch {
1428    fn metered_bytes(&self) -> u64 {
1429        self.metered_bytes
1430    }
1431}
1432
1433impl IntoIterator for AppendRecordBatch {
1434    type Item = AppendRecord;
1435    type IntoIter = std::vec::IntoIter<Self::Item>;
1436
1437    fn into_iter(self) -> Self::IntoIter {
1438        self.records.into_iter()
1439    }
1440}
1441
1442impl<'a> IntoIterator for &'a AppendRecordBatch {
1443    type Item = &'a AppendRecord;
1444    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1445
1446    fn into_iter(self) -> Self::IntoIter {
1447        self.records.iter()
1448    }
1449}
1450
1451impl AsRef<[AppendRecord]> for AppendRecordBatch {
1452    fn as_ref(&self) -> &[AppendRecord] {
1453        &self.records
1454    }
1455}
1456
1457#[sync_docs]
1458#[derive(Debug, Default, Clone)]
1459pub struct AppendInput {
1460    pub records: AppendRecordBatch,
1461    pub match_seq_num: Option<u64>,
1462    pub fencing_token: Option<FencingToken>,
1463}
1464
1465impl MeteredBytes for AppendInput {
1466    fn metered_bytes(&self) -> u64 {
1467        self.records.metered_bytes()
1468    }
1469}
1470
1471impl AppendInput {
1472    /// Create a new append input from record batch.
1473    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1474        Self {
1475            records: records.into(),
1476            match_seq_num: None,
1477            fencing_token: None,
1478        }
1479    }
1480
1481    /// Overwrite match sequence number.
1482    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1483        Self {
1484            match_seq_num: Some(match_seq_num.into()),
1485            ..self
1486        }
1487    }
1488
1489    /// Overwrite fencing token.
1490    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1491        Self {
1492            fencing_token: Some(fencing_token),
1493            ..self
1494        }
1495    }
1496
1497    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1498        let Self {
1499            records,
1500            match_seq_num,
1501            fencing_token,
1502        } = self;
1503
1504        api::AppendInput {
1505            stream: stream.into(),
1506            records: records.into_iter().map(Into::into).collect(),
1507            match_seq_num,
1508            fencing_token: fencing_token.map(|f| f.0),
1509        }
1510    }
1511}
1512
1513/// Acknowledgment to an append request.
1514#[derive(Debug, Clone)]
1515pub struct AppendAck {
1516    /// Sequence number and timestamp of the first record that was appended.
1517    pub start: StreamPosition,
1518    /// Sequence number of the last record that was appended + 1,
1519    /// and timestamp of the last record that was appended.
1520    /// The difference between `end.seq_num` and `start.seq_num`
1521    /// will be the number of records appended.
1522    pub end: StreamPosition,
1523    /// Sequence number that will be assigned to the next record on the stream,
1524    /// and timestamp of the last record on the stream.
1525    /// This can be greater than the `end` position in case of concurrent appends.
1526    pub tail: StreamPosition,
1527}
1528
1529impl From<api::AppendOutput> for AppendAck {
1530    fn from(value: api::AppendOutput) -> Self {
1531        let api::AppendOutput {
1532            start_seq_num,
1533            start_timestamp,
1534            end_seq_num,
1535            end_timestamp,
1536            next_seq_num,
1537            last_timestamp,
1538        } = value;
1539        let start = StreamPosition {
1540            seq_num: start_seq_num,
1541            timestamp: start_timestamp,
1542        };
1543        let end = StreamPosition {
1544            seq_num: end_seq_num,
1545            timestamp: end_timestamp,
1546        };
1547        let tail = StreamPosition {
1548            seq_num: next_seq_num,
1549            timestamp: last_timestamp,
1550        };
1551        Self { start, end, tail }
1552    }
1553}
1554
1555impl TryFrom<api::AppendResponse> for AppendAck {
1556    type Error = ConvertError;
1557    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1558        let api::AppendResponse { output } = value;
1559        let output = output.ok_or("missing append output")?;
1560        Ok(output.into())
1561    }
1562}
1563
1564impl TryFrom<api::AppendSessionResponse> for AppendAck {
1565    type Error = ConvertError;
1566    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1567        let api::AppendSessionResponse { output } = value;
1568        let output = output.ok_or("missing append output")?;
1569        Ok(output.into())
1570    }
1571}
1572
1573#[sync_docs]
1574#[derive(Debug, Clone, Default)]
1575pub struct ReadLimit {
1576    pub count: Option<u64>,
1577    pub bytes: Option<u64>,
1578}
1579
1580impl ReadLimit {
1581    /// Create a new read limit.
1582    pub fn new() -> Self {
1583        Self::default()
1584    }
1585
1586    /// Overwrite count limit.
1587    pub fn with_count(self, count: u64) -> Self {
1588        Self {
1589            count: Some(count),
1590            ..self
1591        }
1592    }
1593
1594    /// Overwrite bytes limit.
1595    pub fn with_bytes(self, bytes: u64) -> Self {
1596        Self {
1597            bytes: Some(bytes),
1598            ..self
1599        }
1600    }
1601}
1602
1603/// Starting point for read requests.
1604#[derive(Debug, Clone)]
1605pub enum ReadStart {
1606    /// Sequence number.
1607    SeqNum(u64),
1608    /// Timestamp.
1609    Timestamp(u64),
1610    /// Number of records before the tail, i.e. the next sequence number.
1611    TailOffset(u64),
1612}
1613
1614impl Default for ReadStart {
1615    fn default() -> Self {
1616        Self::SeqNum(0)
1617    }
1618}
1619
1620impl From<ReadStart> for api::read_request::Start {
1621    fn from(start: ReadStart) -> Self {
1622        match start {
1623            ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1624            ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1625            ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1626        }
1627    }
1628}
1629
1630impl From<ReadStart> for api::read_session_request::Start {
1631    fn from(start: ReadStart) -> Self {
1632        match start {
1633            ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1634            ReadStart::Timestamp(timestamp) => {
1635                api::read_session_request::Start::Timestamp(timestamp)
1636            }
1637            ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1638        }
1639    }
1640}
1641
1642#[sync_docs]
1643#[derive(Debug, Clone, Default)]
1644pub struct ReadRequest {
1645    pub start: ReadStart,
1646    pub limit: ReadLimit,
1647}
1648
1649impl ReadRequest {
1650    /// Create a new request with the specified starting point.
1651    pub fn new(start: ReadStart) -> Self {
1652        Self {
1653            start,
1654            ..Default::default()
1655        }
1656    }
1657
1658    /// Overwrite limit.
1659    pub fn with_limit(self, limit: ReadLimit) -> Self {
1660        Self { limit, ..self }
1661    }
1662}
1663
1664impl ReadRequest {
1665    pub(crate) fn try_into_api_type(
1666        self,
1667        stream: impl Into<String>,
1668    ) -> Result<api::ReadRequest, ConvertError> {
1669        let Self { start, limit } = self;
1670
1671        let limit = if limit.count > Some(1000) {
1672            Err("read limit: count must not exceed 1000 for unary request")
1673        } else if limit.bytes > Some(MIB_BYTES) {
1674            Err("read limit: bytes must not exceed 1MiB for unary request")
1675        } else {
1676            Ok(api::ReadLimit {
1677                count: limit.count,
1678                bytes: limit.bytes,
1679            })
1680        }?;
1681
1682        Ok(api::ReadRequest {
1683            stream: stream.into(),
1684            start: Some(start.into()),
1685            limit: Some(limit),
1686        })
1687    }
1688}
1689
1690#[sync_docs]
1691#[derive(Debug, Clone)]
1692pub struct SequencedRecord {
1693    pub seq_num: u64,
1694    pub timestamp: u64,
1695    pub headers: Vec<Header>,
1696    pub body: Bytes,
1697}
1698
1699metered_impl!(SequencedRecord);
1700
1701impl From<api::SequencedRecord> for SequencedRecord {
1702    fn from(value: api::SequencedRecord) -> Self {
1703        let api::SequencedRecord {
1704            seq_num,
1705            timestamp,
1706            headers,
1707            body,
1708        } = value;
1709        Self {
1710            seq_num,
1711            timestamp,
1712            headers: headers.into_iter().map(Into::into).collect(),
1713            body,
1714        }
1715    }
1716}
1717
1718impl SequencedRecord {
1719    /// Try representing the sequenced record as a command record.
1720    pub fn as_command_record(&self) -> Option<CommandRecord> {
1721        if self.headers.len() != 1 {
1722            return None;
1723        }
1724
1725        let header = self.headers.first().expect("pre-validated length");
1726
1727        if !header.name.is_empty() {
1728            return None;
1729        }
1730
1731        match header.value.as_ref() {
1732            CommandRecord::FENCE => {
1733                let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1734                Some(CommandRecord {
1735                    command: Command::Fence { fencing_token },
1736                    timestamp: Some(self.timestamp),
1737                })
1738            }
1739            CommandRecord::TRIM => {
1740                let body: &[u8] = &self.body;
1741                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1742                Some(CommandRecord {
1743                    command: Command::Trim { seq_num },
1744                    timestamp: Some(self.timestamp),
1745                })
1746            }
1747            _ => None,
1748        }
1749    }
1750}
1751
1752#[sync_docs]
1753#[derive(Debug, Clone)]
1754pub struct SequencedRecordBatch {
1755    pub records: Vec<SequencedRecord>,
1756}
1757
1758impl MeteredBytes for SequencedRecordBatch {
1759    fn metered_bytes(&self) -> u64 {
1760        self.records.metered_bytes()
1761    }
1762}
1763
1764impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1765    fn from(value: api::SequencedRecordBatch) -> Self {
1766        let api::SequencedRecordBatch { records } = value;
1767        Self {
1768            records: records.into_iter().map(Into::into).collect(),
1769        }
1770    }
1771}
1772
1773#[sync_docs(ReadOutput = "Output")]
1774#[derive(Debug, Clone)]
1775pub enum ReadOutput {
1776    Batch(SequencedRecordBatch),
1777    NextSeqNum(u64),
1778}
1779
1780impl From<api::read_output::Output> for ReadOutput {
1781    fn from(value: api::read_output::Output) -> Self {
1782        match value {
1783            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1784            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1785        }
1786    }
1787}
1788
1789impl TryFrom<api::ReadOutput> for ReadOutput {
1790    type Error = ConvertError;
1791    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1792        let api::ReadOutput { output } = value;
1793        let output = output.ok_or("missing read output")?;
1794        Ok(output.into())
1795    }
1796}
1797
1798impl TryFrom<api::ReadResponse> for ReadOutput {
1799    type Error = ConvertError;
1800    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1801        let api::ReadResponse { output } = value;
1802        let output = output.ok_or("missing output in read response")?;
1803        output.try_into()
1804    }
1805}
1806
1807#[sync_docs]
1808#[derive(Debug, Clone, Default)]
1809pub struct ReadSessionRequest {
1810    pub start: ReadStart,
1811    pub limit: ReadLimit,
1812}
1813
1814impl ReadSessionRequest {
1815    /// Create a new request with the specified starting point.
1816    pub fn new(start: ReadStart) -> Self {
1817        Self {
1818            start,
1819            ..Default::default()
1820        }
1821    }
1822
1823    /// Overwrite limit.
1824    pub fn with_limit(self, limit: ReadLimit) -> Self {
1825        Self { limit, ..self }
1826    }
1827
1828    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1829        let Self { start, limit } = self;
1830        api::ReadSessionRequest {
1831            stream: stream.into(),
1832            start: Some(start.into()),
1833            limit: Some(api::ReadLimit {
1834                count: limit.count,
1835                bytes: limit.bytes,
1836            }),
1837            heartbeats: false,
1838        }
1839    }
1840}
1841
1842impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1843    type Error = ConvertError;
1844    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1845        let api::ReadSessionResponse { output } = value;
1846        let output = output.ok_or("missing output in read session response")?;
1847        output.try_into()
1848    }
1849}
1850
1851/// Name of a basin.
1852///
1853/// Must be between 8 and 48 characters in length. Must comprise lowercase
1854/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1855#[derive(Debug, Clone)]
1856pub struct BasinName(String);
1857
1858impl AsRef<str> for BasinName {
1859    fn as_ref(&self) -> &str {
1860        &self.0
1861    }
1862}
1863
1864impl Deref for BasinName {
1865    type Target = str;
1866    fn deref(&self) -> &Self::Target {
1867        &self.0
1868    }
1869}
1870
1871impl TryFrom<String> for BasinName {
1872    type Error = ConvertError;
1873
1874    fn try_from(name: String) -> Result<Self, Self::Error> {
1875        if name.len() < 8 || name.len() > 48 {
1876            return Err("Basin name must be between 8 and 48 characters in length".into());
1877        }
1878
1879        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1880        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1881            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1882                .expect("Failed to compile basin name regex")
1883        });
1884
1885        if !regex.is_match(&name) {
1886            return Err(
1887                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1888                It cannot begin or end with a hyphen."
1889                    .into(),
1890            );
1891        }
1892
1893        Ok(Self(name))
1894    }
1895}
1896
1897impl FromStr for BasinName {
1898    type Err = ConvertError;
1899
1900    fn from_str(s: &str) -> Result<Self, Self::Err> {
1901        s.to_string().try_into()
1902    }
1903}
1904
1905impl std::fmt::Display for BasinName {
1906    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1907        f.write_str(&self.0)
1908    }
1909}
1910
1911/// Access token ID.
1912/// Must be between 1 and 96 characters.
1913#[derive(Debug, Clone)]
1914pub struct AccessTokenId(String);
1915
1916impl AsRef<str> for AccessTokenId {
1917    fn as_ref(&self) -> &str {
1918        &self.0
1919    }
1920}
1921
1922impl Deref for AccessTokenId {
1923    type Target = str;
1924
1925    fn deref(&self) -> &Self::Target {
1926        &self.0
1927    }
1928}
1929
1930impl TryFrom<String> for AccessTokenId {
1931    type Error = ConvertError;
1932
1933    fn try_from(name: String) -> Result<Self, Self::Error> {
1934        if name.is_empty() {
1935            return Err("Access token ID must not be empty".into());
1936        }
1937
1938        if name.len() > 96 {
1939            return Err("Access token ID must not exceed 96 characters".into());
1940        }
1941
1942        Ok(Self(name))
1943    }
1944}
1945
1946impl From<AccessTokenId> for String {
1947    fn from(value: AccessTokenId) -> Self {
1948        value.0
1949    }
1950}
1951
1952impl FromStr for AccessTokenId {
1953    type Err = ConvertError;
1954
1955    fn from_str(s: &str) -> Result<Self, Self::Err> {
1956        s.to_string().try_into()
1957    }
1958}
1959
1960impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1961    fn from(value: AccessTokenInfo) -> Self {
1962        Self {
1963            info: Some(value.into()),
1964        }
1965    }
1966}
1967
1968#[sync_docs]
1969#[derive(Debug, Clone)]
1970pub struct AccessTokenInfo {
1971    pub id: AccessTokenId,
1972    pub expires_at: Option<u32>,
1973    pub auto_prefix_streams: bool,
1974    pub scope: Option<AccessTokenScope>,
1975}
1976
1977impl AccessTokenInfo {
1978    /// Create a new access token info.
1979    pub fn new(id: AccessTokenId) -> Self {
1980        Self {
1981            id,
1982            expires_at: None,
1983            auto_prefix_streams: false,
1984            scope: None,
1985        }
1986    }
1987
1988    /// Overwrite expiration time.
1989    pub fn with_expires_at(self, expires_at: u32) -> Self {
1990        Self {
1991            expires_at: Some(expires_at),
1992            ..self
1993        }
1994    }
1995
1996    /// Overwrite auto prefix streams.
1997    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1998        Self {
1999            auto_prefix_streams,
2000            ..self
2001        }
2002    }
2003
2004    /// Overwrite scope.
2005    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2006        Self {
2007            scope: Some(scope),
2008            ..self
2009        }
2010    }
2011}
2012
2013impl From<AccessTokenInfo> for api::AccessTokenInfo {
2014    fn from(value: AccessTokenInfo) -> Self {
2015        let AccessTokenInfo {
2016            id,
2017            expires_at,
2018            auto_prefix_streams,
2019            scope,
2020        } = value;
2021        Self {
2022            id: id.into(),
2023            expires_at,
2024            auto_prefix_streams,
2025            scope: scope.map(Into::into),
2026        }
2027    }
2028}
2029
2030impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2031    type Error = ConvertError;
2032
2033    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2034        let api::AccessTokenInfo {
2035            id,
2036            expires_at,
2037            auto_prefix_streams,
2038            scope,
2039        } = value;
2040        Ok(Self {
2041            id: id.try_into()?,
2042            expires_at,
2043            auto_prefix_streams,
2044            scope: scope.map(Into::into),
2045        })
2046    }
2047}
2048
2049#[sync_docs]
2050#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2051pub enum Operation {
2052    ListBasins,
2053    CreateBasin,
2054    DeleteBasin,
2055    ReconfigureBasin,
2056    GetBasinConfig,
2057    IssueAccessToken,
2058    RevokeAccessToken,
2059    ListAccessTokens,
2060    ListStreams,
2061    CreateStream,
2062    DeleteStream,
2063    GetStreamConfig,
2064    ReconfigureStream,
2065    CheckTail,
2066    Append,
2067    Read,
2068    Trim,
2069    Fence,
2070}
2071
2072impl FromStr for Operation {
2073    type Err = ConvertError;
2074
2075    fn from_str(s: &str) -> Result<Self, Self::Err> {
2076        match s.to_lowercase().as_str() {
2077            "list-basins" => Ok(Self::ListBasins),
2078            "create-basin" => Ok(Self::CreateBasin),
2079            "delete-basin" => Ok(Self::DeleteBasin),
2080            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2081            "get-basin-config" => Ok(Self::GetBasinConfig),
2082            "issue-access-token" => Ok(Self::IssueAccessToken),
2083            "revoke-access-token" => Ok(Self::RevokeAccessToken),
2084            "list-access-tokens" => Ok(Self::ListAccessTokens),
2085            "list-streams" => Ok(Self::ListStreams),
2086            "create-stream" => Ok(Self::CreateStream),
2087            "delete-stream" => Ok(Self::DeleteStream),
2088            "get-stream-config" => Ok(Self::GetStreamConfig),
2089            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2090            "check-tail" => Ok(Self::CheckTail),
2091            "append" => Ok(Self::Append),
2092            "read" => Ok(Self::Read),
2093            "trim" => Ok(Self::Trim),
2094            "fence" => Ok(Self::Fence),
2095            _ => Err("invalid operation".into()),
2096        }
2097    }
2098}
2099
2100impl From<Operation> for api::Operation {
2101    fn from(value: Operation) -> Self {
2102        match value {
2103            Operation::ListBasins => Self::ListBasins,
2104            Operation::CreateBasin => Self::CreateBasin,
2105            Operation::DeleteBasin => Self::DeleteBasin,
2106            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2107            Operation::GetBasinConfig => Self::GetBasinConfig,
2108            Operation::IssueAccessToken => Self::IssueAccessToken,
2109            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2110            Operation::ListAccessTokens => Self::ListAccessTokens,
2111            Operation::ListStreams => Self::ListStreams,
2112            Operation::CreateStream => Self::CreateStream,
2113            Operation::DeleteStream => Self::DeleteStream,
2114            Operation::GetStreamConfig => Self::GetStreamConfig,
2115            Operation::ReconfigureStream => Self::ReconfigureStream,
2116            Operation::CheckTail => Self::CheckTail,
2117            Operation::Append => Self::Append,
2118            Operation::Read => Self::Read,
2119            Operation::Trim => Self::Trim,
2120            Operation::Fence => Self::Fence,
2121        }
2122    }
2123}
2124
2125impl From<api::Operation> for Option<Operation> {
2126    fn from(value: api::Operation) -> Self {
2127        match value {
2128            api::Operation::Unspecified => None,
2129            api::Operation::ListBasins => Some(Operation::ListBasins),
2130            api::Operation::CreateBasin => Some(Operation::CreateBasin),
2131            api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2132            api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2133            api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2134            api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2135            api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2136            api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2137            api::Operation::ListStreams => Some(Operation::ListStreams),
2138            api::Operation::CreateStream => Some(Operation::CreateStream),
2139            api::Operation::DeleteStream => Some(Operation::DeleteStream),
2140            api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2141            api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2142            api::Operation::CheckTail => Some(Operation::CheckTail),
2143            api::Operation::Append => Some(Operation::Append),
2144            api::Operation::Read => Some(Operation::Read),
2145            api::Operation::Trim => Some(Operation::Trim),
2146            api::Operation::Fence => Some(Operation::Fence),
2147        }
2148    }
2149}
2150
2151#[sync_docs]
2152#[derive(Debug, Clone, Default)]
2153pub struct AccessTokenScope {
2154    pub basins: Option<ResourceSet>,
2155    pub streams: Option<ResourceSet>,
2156    pub access_tokens: Option<ResourceSet>,
2157    pub op_groups: Option<PermittedOperationGroups>,
2158    pub ops: HashSet<Operation>,
2159}
2160
2161impl AccessTokenScope {
2162    /// Create a new access token scope.
2163    pub fn new() -> Self {
2164        Self::default()
2165    }
2166
2167    /// Overwrite resource set for access tokens.
2168    pub fn with_basins(self, basins: ResourceSet) -> Self {
2169        Self {
2170            basins: Some(basins),
2171            ..self
2172        }
2173    }
2174
2175    /// Overwrite resource set for streams.
2176    pub fn with_streams(self, streams: ResourceSet) -> Self {
2177        Self {
2178            streams: Some(streams),
2179            ..self
2180        }
2181    }
2182
2183    /// Overwrite resource set for access tokens.
2184    pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2185        Self {
2186            access_tokens: Some(access_tokens),
2187            ..self
2188        }
2189    }
2190
2191    /// Overwrite operation groups.
2192    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2193        Self {
2194            op_groups: Some(op_groups),
2195            ..self
2196        }
2197    }
2198
2199    /// Overwrite operations.
2200    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2201        Self {
2202            ops: ops.into_iter().collect(),
2203            ..self
2204        }
2205    }
2206
2207    /// Add an operation to operations.
2208    pub fn with_op(self, op: Operation) -> Self {
2209        let mut ops = self.ops;
2210        ops.insert(op);
2211        Self { ops, ..self }
2212    }
2213}
2214
2215impl From<AccessTokenScope> for api::AccessTokenScope {
2216    fn from(value: AccessTokenScope) -> Self {
2217        let AccessTokenScope {
2218            basins,
2219            streams,
2220            access_tokens,
2221            op_groups,
2222            ops,
2223        } = value;
2224        Self {
2225            basins: basins.map(Into::into),
2226            streams: streams.map(Into::into),
2227            access_tokens: access_tokens.map(Into::into),
2228            op_groups: op_groups.map(Into::into),
2229            ops: ops
2230                .into_iter()
2231                .map(api::Operation::from)
2232                .map(Into::into)
2233                .collect(),
2234        }
2235    }
2236}
2237
2238impl From<api::AccessTokenScope> for AccessTokenScope {
2239    fn from(value: api::AccessTokenScope) -> Self {
2240        let api::AccessTokenScope {
2241            basins,
2242            streams,
2243            access_tokens,
2244            op_groups,
2245            ops,
2246        } = value;
2247        Self {
2248            basins: basins.and_then(|set| set.matching.map(Into::into)),
2249            streams: streams.and_then(|set| set.matching.map(Into::into)),
2250            access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2251            op_groups: op_groups.map(Into::into),
2252            ops: ops
2253                .into_iter()
2254                .map(api::Operation::try_from)
2255                .flat_map(Result::ok)
2256                .flat_map(<Option<Operation>>::from)
2257                .collect(),
2258        }
2259    }
2260}
2261
2262impl From<ResourceSet> for api::ResourceSet {
2263    fn from(value: ResourceSet) -> Self {
2264        Self {
2265            matching: Some(value.into()),
2266        }
2267    }
2268}
2269
2270#[sync_docs(ResourceSet = "Matching")]
2271#[derive(Debug, Clone)]
2272pub enum ResourceSet {
2273    Exact(String),
2274    Prefix(String),
2275}
2276
2277impl From<ResourceSet> for api::resource_set::Matching {
2278    fn from(value: ResourceSet) -> Self {
2279        match value {
2280            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2281            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2282        }
2283    }
2284}
2285
2286impl From<api::resource_set::Matching> for ResourceSet {
2287    fn from(value: api::resource_set::Matching) -> Self {
2288        match value {
2289            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2290            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2291        }
2292    }
2293}
2294
2295#[sync_docs]
2296#[derive(Debug, Clone, Default)]
2297pub struct PermittedOperationGroups {
2298    pub account: Option<ReadWritePermissions>,
2299    pub basin: Option<ReadWritePermissions>,
2300    pub stream: Option<ReadWritePermissions>,
2301}
2302
2303impl PermittedOperationGroups {
2304    /// Create a new permitted operation groups.
2305    pub fn new() -> Self {
2306        Self::default()
2307    }
2308
2309    /// Overwrite account read-write permissions.
2310    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2311        Self {
2312            account: Some(account),
2313            ..self
2314        }
2315    }
2316
2317    /// Overwrite basin read-write permissions.
2318    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2319        Self {
2320            basin: Some(basin),
2321            ..self
2322        }
2323    }
2324
2325    /// Overwrite stream read-write permissions.
2326    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2327        Self {
2328            stream: Some(stream),
2329            ..self
2330        }
2331    }
2332}
2333
2334impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2335    fn from(value: PermittedOperationGroups) -> Self {
2336        let PermittedOperationGroups {
2337            account,
2338            basin,
2339            stream,
2340        } = value;
2341        Self {
2342            account: account.map(Into::into),
2343            basin: basin.map(Into::into),
2344            stream: stream.map(Into::into),
2345        }
2346    }
2347}
2348
2349impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2350    fn from(value: api::PermittedOperationGroups) -> Self {
2351        let api::PermittedOperationGroups {
2352            account,
2353            basin,
2354            stream,
2355        } = value;
2356        Self {
2357            account: account.map(Into::into),
2358            basin: basin.map(Into::into),
2359            stream: stream.map(Into::into),
2360        }
2361    }
2362}
2363
2364#[sync_docs]
2365#[derive(Debug, Clone, Default)]
2366pub struct ReadWritePermissions {
2367    pub read: bool,
2368    pub write: bool,
2369}
2370
2371impl ReadWritePermissions {
2372    /// Create a new read-write permission.
2373    pub fn new() -> Self {
2374        Self::default()
2375    }
2376
2377    /// Overwrite read permission.
2378    pub fn with_read(self, read: bool) -> Self {
2379        Self { read, ..self }
2380    }
2381
2382    /// Overwrite write permission.
2383    pub fn with_write(self, write: bool) -> Self {
2384        Self { write, ..self }
2385    }
2386}
2387
2388impl From<ReadWritePermissions> for api::ReadWritePermissions {
2389    fn from(value: ReadWritePermissions) -> Self {
2390        let ReadWritePermissions { read, write } = value;
2391        Self { read, write }
2392    }
2393}
2394
2395impl From<api::ReadWritePermissions> for ReadWritePermissions {
2396    fn from(value: api::ReadWritePermissions) -> Self {
2397        let api::ReadWritePermissions { read, write } = value;
2398        Self { read, write }
2399    }
2400}
2401
2402impl From<api::IssueAccessTokenResponse> for String {
2403    fn from(value: api::IssueAccessTokenResponse) -> Self {
2404        value.access_token
2405    }
2406}
2407
2408impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2409    fn from(value: AccessTokenId) -> Self {
2410        Self { id: value.into() }
2411    }
2412}
2413
2414impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2415    type Error = ConvertError;
2416    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2417        let token_info = value.info.ok_or("access token info is missing")?;
2418        token_info.try_into()
2419    }
2420}
2421
2422#[sync_docs]
2423#[derive(Debug, Clone, Default)]
2424pub struct ListAccessTokensRequest {
2425    pub prefix: String,
2426    pub start_after: String,
2427    pub limit: Option<usize>,
2428}
2429
2430impl ListAccessTokensRequest {
2431    /// Create a new request with prefix.
2432    pub fn new() -> Self {
2433        Self::default()
2434    }
2435
2436    /// Overwrite prefix.
2437    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2438        Self {
2439            prefix: prefix.into(),
2440            ..self
2441        }
2442    }
2443
2444    /// Overwrite start after.
2445    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2446        Self {
2447            start_after: start_after.into(),
2448            ..self
2449        }
2450    }
2451
2452    /// Overwrite limit.
2453    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2454        Self {
2455            limit: limit.into(),
2456            ..self
2457        }
2458    }
2459}
2460
2461impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2462    type Error = ConvertError;
2463    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2464        let ListAccessTokensRequest {
2465            prefix,
2466            start_after,
2467            limit,
2468        } = value;
2469        Ok(Self {
2470            prefix,
2471            start_after,
2472            limit: limit
2473                .map(TryInto::try_into)
2474                .transpose()
2475                .map_err(|_| "request limit does not fit into u64 bounds")?,
2476        })
2477    }
2478}
2479
2480#[sync_docs]
2481#[derive(Debug, Clone)]
2482pub struct ListAccessTokensResponse {
2483    pub access_tokens: Vec<AccessTokenInfo>,
2484    pub has_more: bool,
2485}
2486
2487impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2488    type Error = ConvertError;
2489    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2490        let api::ListAccessTokensResponse {
2491            access_tokens,
2492            has_more,
2493        } = value;
2494        let access_tokens = access_tokens
2495            .into_iter()
2496            .map(TryInto::try_into)
2497            .collect::<Result<Vec<_>, _>>()?;
2498        Ok(Self {
2499            access_tokens,
2500            has_more,
2501        })
2502    }
2503}