s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
14
15/// Error related to conversion from one type to another.
16#[derive(Debug, Clone, thiserror::Error)]
17#[error("{0}")]
18pub struct ConvertError(String);
19
20impl<T: Into<String>> From<T> for ConvertError {
21    fn from(value: T) -> Self {
22        Self(value.into())
23    }
24}
25
26/// Metered size of the object in bytes.
27///
28/// Bytes are calculated using the "metered bytes" formula:
29///
30/// ```python
31/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
32/// ```
33pub trait MeteredBytes {
34    /// Return the metered bytes of the object.
35    fn metered_bytes(&self) -> u64;
36}
37
38impl<T: MeteredBytes> MeteredBytes for Vec<T> {
39    fn metered_bytes(&self) -> u64 {
40        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
41    }
42}
43
44macro_rules! metered_impl {
45    ($ty:ty) => {
46        impl MeteredBytes for $ty {
47            fn metered_bytes(&self) -> u64 {
48                let bytes = 8
49                    + (2 * self.headers.len())
50                    + self
51                        .headers
52                        .iter()
53                        .map(|h| h.name.len() + h.value.len())
54                        .sum::<usize>()
55                    + self.body.len();
56                bytes as u64
57            }
58        }
59    };
60}
61
62#[sync_docs]
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum BasinScope {
65    AwsUsEast1,
66}
67
68impl From<BasinScope> for api::BasinScope {
69    fn from(value: BasinScope) -> Self {
70        match value {
71            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
72        }
73    }
74}
75
76impl From<api::BasinScope> for Option<BasinScope> {
77    fn from(value: api::BasinScope) -> Self {
78        match value {
79            api::BasinScope::Unspecified => None,
80            api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
81        }
82    }
83}
84
85impl FromStr for BasinScope {
86    type Err = ConvertError;
87    fn from_str(value: &str) -> Result<Self, Self::Err> {
88        match value {
89            "aws:us-east-1" => Ok(Self::AwsUsEast1),
90            _ => Err("invalid basin scope value".into()),
91        }
92    }
93}
94
95#[sync_docs]
96#[derive(Debug, Clone)]
97pub struct CreateBasinRequest {
98    pub basin: BasinName,
99    pub config: Option<BasinConfig>,
100    pub scope: Option<BasinScope>,
101}
102
103impl CreateBasinRequest {
104    /// Create a new request with basin name.
105    pub fn new(basin: BasinName) -> Self {
106        Self {
107            basin,
108            config: None,
109            scope: None,
110        }
111    }
112
113    /// Overwrite basin configuration.
114    pub fn with_config(self, config: BasinConfig) -> Self {
115        Self {
116            config: Some(config),
117            ..self
118        }
119    }
120
121    /// Overwrite basin scope.
122    pub fn with_scope(self, scope: BasinScope) -> Self {
123        Self {
124            scope: Some(scope),
125            ..self
126        }
127    }
128}
129
130impl From<CreateBasinRequest> for api::CreateBasinRequest {
131    fn from(value: CreateBasinRequest) -> Self {
132        let CreateBasinRequest {
133            basin,
134            config,
135            scope,
136        } = value;
137        Self {
138            basin: basin.0,
139            config: config.map(Into::into),
140            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
141        }
142    }
143}
144
145#[sync_docs]
146#[derive(Debug, Clone, Default)]
147pub struct BasinConfig {
148    pub default_stream_config: Option<StreamConfig>,
149    pub create_stream_on_append: bool,
150    pub create_stream_on_read: bool,
151}
152
153impl BasinConfig {
154    /// Create a new basin config.
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Overwrite the default stream config.
160    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
161        Self {
162            default_stream_config: Some(default_stream_config),
163            ..self
164        }
165    }
166
167    /// Overwrite `create_stream_on_append`.
168    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
169        Self {
170            create_stream_on_append,
171            ..self
172        }
173    }
174
175    /// Overwrite `create_stream_on_read`.
176    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
177        Self {
178            create_stream_on_read,
179            ..self
180        }
181    }
182}
183
184impl From<BasinConfig> for api::BasinConfig {
185    fn from(value: BasinConfig) -> Self {
186        let BasinConfig {
187            default_stream_config,
188            create_stream_on_append,
189            create_stream_on_read,
190        } = value;
191        Self {
192            default_stream_config: default_stream_config.map(Into::into),
193            create_stream_on_append,
194            create_stream_on_read,
195        }
196    }
197}
198
199impl From<api::BasinConfig> for BasinConfig {
200    fn from(value: api::BasinConfig) -> Self {
201        let api::BasinConfig {
202            default_stream_config,
203            create_stream_on_append,
204            create_stream_on_read,
205        } = value;
206        Self {
207            default_stream_config: default_stream_config.map(Into::into),
208            create_stream_on_append,
209            create_stream_on_read,
210        }
211    }
212}
213
214#[sync_docs]
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum TimestampingMode {
217    ClientPrefer,
218    ClientRequire,
219    Arrival,
220}
221
222impl From<TimestampingMode> for api::TimestampingMode {
223    fn from(value: TimestampingMode) -> Self {
224        match value {
225            TimestampingMode::ClientPrefer => Self::ClientPrefer,
226            TimestampingMode::ClientRequire => Self::ClientRequire,
227            TimestampingMode::Arrival => Self::Arrival,
228        }
229    }
230}
231
232impl From<api::TimestampingMode> for Option<TimestampingMode> {
233    fn from(value: api::TimestampingMode) -> Self {
234        match value {
235            api::TimestampingMode::Unspecified => None,
236            api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
237            api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
238            api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
239        }
240    }
241}
242
243#[sync_docs(TimestampingConfig = "Timestamping")]
244#[derive(Debug, Clone, Default)]
245/// Timestamping behavior.
246pub struct TimestampingConfig {
247    pub mode: Option<TimestampingMode>,
248    pub uncapped: Option<bool>,
249}
250
251impl TimestampingConfig {
252    /// Create a new timestamping config.
253    pub fn new() -> Self {
254        Self::default()
255    }
256
257    /// Overwrite timestamping mode.
258    pub fn with_mode(self, mode: TimestampingMode) -> Self {
259        Self {
260            mode: Some(mode),
261            ..self
262        }
263    }
264
265    /// Overwrite the uncapped knob.
266    pub fn with_uncapped(self, uncapped: bool) -> Self {
267        Self {
268            uncapped: Some(uncapped),
269            ..self
270        }
271    }
272}
273
274impl From<TimestampingConfig> for api::stream_config::Timestamping {
275    fn from(value: TimestampingConfig) -> Self {
276        Self {
277            mode: value
278                .mode
279                .map(api::TimestampingMode::from)
280                .unwrap_or_default()
281                .into(),
282            uncapped: value.uncapped,
283        }
284    }
285}
286
287impl From<api::stream_config::Timestamping> for TimestampingConfig {
288    fn from(value: api::stream_config::Timestamping) -> Self {
289        let mode = value.mode().into();
290        let uncapped = value.uncapped;
291        Self { mode, uncapped }
292    }
293}
294
295#[sync_docs]
296#[derive(Debug, Clone, Default)]
297pub struct StreamConfig {
298    pub storage_class: Option<StorageClass>,
299    pub retention_policy: Option<RetentionPolicy>,
300    pub timestamping: Option<TimestampingConfig>,
301}
302
303impl StreamConfig {
304    /// Create a new stream config.
305    pub fn new() -> Self {
306        Self::default()
307    }
308
309    /// Overwrite storage class.
310    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
311        Self {
312            storage_class: Some(storage_class),
313            ..self
314        }
315    }
316
317    /// Overwrite retention policy.
318    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
319        Self {
320            retention_policy: Some(retention_policy),
321            ..self
322        }
323    }
324
325    /// Overwrite timestamping config.
326    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
327        Self {
328            timestamping: Some(timestamping),
329            ..self
330        }
331    }
332}
333
334impl From<StreamConfig> for api::StreamConfig {
335    fn from(value: StreamConfig) -> Self {
336        let StreamConfig {
337            storage_class,
338            retention_policy,
339            timestamping,
340        } = value;
341        Self {
342            storage_class: storage_class
343                .map(api::StorageClass::from)
344                .unwrap_or_default()
345                .into(),
346            retention_policy: retention_policy.map(Into::into),
347            timestamping: timestamping.map(Into::into),
348        }
349    }
350}
351
352impl From<api::StreamConfig> for StreamConfig {
353    fn from(value: api::StreamConfig) -> Self {
354        Self {
355            storage_class: value.storage_class().into(),
356            retention_policy: value.retention_policy.map(Into::into),
357            timestamping: value.timestamping.map(Into::into),
358        }
359    }
360}
361
362#[sync_docs]
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364pub enum StorageClass {
365    Standard,
366    Express,
367}
368
369impl From<StorageClass> for api::StorageClass {
370    fn from(value: StorageClass) -> Self {
371        match value {
372            StorageClass::Standard => Self::Standard,
373            StorageClass::Express => Self::Express,
374        }
375    }
376}
377
378impl From<api::StorageClass> for Option<StorageClass> {
379    fn from(value: api::StorageClass) -> Self {
380        match value {
381            api::StorageClass::Unspecified => None,
382            api::StorageClass::Standard => Some(StorageClass::Standard),
383            api::StorageClass::Express => Some(StorageClass::Express),
384        }
385    }
386}
387
388impl FromStr for StorageClass {
389    type Err = ConvertError;
390
391    fn from_str(value: &str) -> Result<Self, Self::Err> {
392        match value {
393            "standard" => Ok(Self::Standard),
394            "express" => Ok(Self::Express),
395            v => Err(format!("unknown storage class: {v}").into()),
396        }
397    }
398}
399
400#[sync_docs(Age = "Age")]
401#[derive(Debug, Clone)]
402pub enum RetentionPolicy {
403    Age(Duration),
404}
405
406impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
407    fn from(value: RetentionPolicy) -> Self {
408        match value {
409            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
410        }
411    }
412}
413
414impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
415    fn from(value: api::stream_config::RetentionPolicy) -> Self {
416        match value {
417            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
418        }
419    }
420}
421
422#[sync_docs]
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum BasinState {
425    Active,
426    Creating,
427    Deleting,
428}
429
430impl From<BasinState> for api::BasinState {
431    fn from(value: BasinState) -> Self {
432        match value {
433            BasinState::Active => Self::Active,
434            BasinState::Creating => Self::Creating,
435            BasinState::Deleting => Self::Deleting,
436        }
437    }
438}
439
440impl From<api::BasinState> for Option<BasinState> {
441    fn from(value: api::BasinState) -> Self {
442        match value {
443            api::BasinState::Unspecified => None,
444            api::BasinState::Active => Some(BasinState::Active),
445            api::BasinState::Creating => Some(BasinState::Creating),
446            api::BasinState::Deleting => Some(BasinState::Deleting),
447        }
448    }
449}
450
451impl std::fmt::Display for BasinState {
452    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453        match self {
454            BasinState::Active => write!(f, "active"),
455            BasinState::Creating => write!(f, "creating"),
456            BasinState::Deleting => write!(f, "deleting"),
457        }
458    }
459}
460
461#[sync_docs]
462#[derive(Debug, Clone)]
463pub struct BasinInfo {
464    pub name: String,
465    pub scope: Option<BasinScope>,
466    pub state: Option<BasinState>,
467}
468
469impl From<BasinInfo> for api::BasinInfo {
470    fn from(value: BasinInfo) -> Self {
471        let BasinInfo { name, scope, state } = value;
472        Self {
473            name,
474            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
475            state: state.map(api::BasinState::from).unwrap_or_default().into(),
476        }
477    }
478}
479
480impl From<api::BasinInfo> for BasinInfo {
481    fn from(value: api::BasinInfo) -> Self {
482        let scope = value.scope().into();
483        let state = value.state().into();
484        let name = value.name;
485        Self { name, scope, state }
486    }
487}
488
489impl TryFrom<api::CreateBasinResponse> for BasinInfo {
490    type Error = ConvertError;
491    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
492        let api::CreateBasinResponse { info } = value;
493        let info = info.ok_or("missing basin info")?;
494        Ok(info.into())
495    }
496}
497
498#[sync_docs]
499#[derive(Debug, Clone, Default)]
500pub struct ListStreamsRequest {
501    pub prefix: String,
502    pub start_after: String,
503    pub limit: Option<usize>,
504}
505
506impl ListStreamsRequest {
507    /// Create a new request.
508    pub fn new() -> Self {
509        Self::default()
510    }
511
512    /// Overwrite prefix.
513    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
514        Self {
515            prefix: prefix.into(),
516            ..self
517        }
518    }
519
520    /// Overwrite start after.
521    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
522        Self {
523            start_after: start_after.into(),
524            ..self
525        }
526    }
527
528    /// Overwrite limit.
529    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
530        Self {
531            limit: limit.into(),
532            ..self
533        }
534    }
535}
536
537impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
538    type Error = ConvertError;
539    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
540        let ListStreamsRequest {
541            prefix,
542            start_after,
543            limit,
544        } = value;
545        Ok(Self {
546            prefix,
547            start_after,
548            limit: limit.map(|n| n as u64),
549        })
550    }
551}
552
553#[sync_docs]
554#[derive(Debug, Clone)]
555pub struct StreamInfo {
556    pub name: String,
557    pub created_at: u32,
558    pub deleted_at: Option<u32>,
559}
560
561impl From<api::StreamInfo> for StreamInfo {
562    fn from(value: api::StreamInfo) -> Self {
563        Self {
564            name: value.name,
565            created_at: value.created_at,
566            deleted_at: value.deleted_at,
567        }
568    }
569}
570
571impl TryFrom<api::CreateStreamResponse> for StreamInfo {
572    type Error = ConvertError;
573
574    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
575        let api::CreateStreamResponse { info } = value;
576        let info = info.ok_or("missing stream info")?;
577        Ok(info.into())
578    }
579}
580
581#[sync_docs]
582#[derive(Debug, Clone)]
583pub struct ListStreamsResponse {
584    pub streams: Vec<StreamInfo>,
585    pub has_more: bool,
586}
587
588impl From<api::ListStreamsResponse> for ListStreamsResponse {
589    fn from(value: api::ListStreamsResponse) -> Self {
590        let api::ListStreamsResponse { streams, has_more } = value;
591        let streams = streams.into_iter().map(Into::into).collect();
592        Self { streams, has_more }
593    }
594}
595
596impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
597    type Error = ConvertError;
598
599    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
600        let api::GetBasinConfigResponse { config } = value;
601        let config = config.ok_or("missing basin config")?;
602        Ok(config.into())
603    }
604}
605
606impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
607    type Error = ConvertError;
608
609    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
610        let api::GetStreamConfigResponse { config } = value;
611        let config = config.ok_or("missing stream config")?;
612        Ok(config.into())
613    }
614}
615
616#[sync_docs]
617#[derive(Debug, Clone)]
618pub struct CreateStreamRequest {
619    pub stream: String,
620    pub config: Option<StreamConfig>,
621}
622
623impl CreateStreamRequest {
624    /// Create a new request with stream name.
625    pub fn new(stream: impl Into<String>) -> Self {
626        Self {
627            stream: stream.into(),
628            config: None,
629        }
630    }
631
632    /// Overwrite stream config.
633    pub fn with_config(self, config: StreamConfig) -> Self {
634        Self {
635            config: Some(config),
636            ..self
637        }
638    }
639}
640
641impl From<CreateStreamRequest> for api::CreateStreamRequest {
642    fn from(value: CreateStreamRequest) -> Self {
643        let CreateStreamRequest { stream, config } = value;
644        Self {
645            stream,
646            config: config.map(Into::into),
647        }
648    }
649}
650
651#[sync_docs]
652#[derive(Debug, Clone, Default)]
653pub struct ListBasinsRequest {
654    pub prefix: String,
655    pub start_after: String,
656    pub limit: Option<usize>,
657}
658
659impl ListBasinsRequest {
660    /// Create a new request.
661    pub fn new() -> Self {
662        Self::default()
663    }
664
665    /// Overwrite prefix.
666    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
667        Self {
668            prefix: prefix.into(),
669            ..self
670        }
671    }
672
673    /// Overwrite start after.
674    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
675        Self {
676            start_after: start_after.into(),
677            ..self
678        }
679    }
680
681    /// Overwrite limit.
682    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
683        Self {
684            limit: limit.into(),
685            ..self
686        }
687    }
688}
689
690impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
691    type Error = ConvertError;
692    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
693        let ListBasinsRequest {
694            prefix,
695            start_after,
696            limit,
697        } = value;
698        Ok(Self {
699            prefix,
700            start_after,
701            limit: limit
702                .map(TryInto::try_into)
703                .transpose()
704                .map_err(|_| "request limit does not fit into u64 bounds")?,
705        })
706    }
707}
708
709#[sync_docs]
710#[derive(Debug, Clone)]
711pub struct ListBasinsResponse {
712    pub basins: Vec<BasinInfo>,
713    pub has_more: bool,
714}
715
716impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
717    type Error = ConvertError;
718    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
719        let api::ListBasinsResponse { basins, has_more } = value;
720        Ok(Self {
721            basins: basins.into_iter().map(Into::into).collect(),
722            has_more,
723        })
724    }
725}
726
727#[sync_docs]
728#[derive(Debug, Clone)]
729pub struct DeleteBasinRequest {
730    pub basin: BasinName,
731    /// Delete basin if it exists else do nothing.
732    pub if_exists: bool,
733}
734
735impl DeleteBasinRequest {
736    /// Create a new request.
737    pub fn new(basin: BasinName) -> Self {
738        Self {
739            basin,
740            if_exists: false,
741        }
742    }
743
744    /// Overwrite the if exists parameter.
745    pub fn with_if_exists(self, if_exists: bool) -> Self {
746        Self { if_exists, ..self }
747    }
748}
749
750impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
751    fn from(value: DeleteBasinRequest) -> Self {
752        let DeleteBasinRequest { basin, .. } = value;
753        Self { basin: basin.0 }
754    }
755}
756
757#[sync_docs]
758#[derive(Debug, Clone)]
759pub struct DeleteStreamRequest {
760    pub stream: String,
761    /// Delete stream if it exists else do nothing.
762    pub if_exists: bool,
763}
764
765impl DeleteStreamRequest {
766    /// Create a new request.
767    pub fn new(stream: impl Into<String>) -> Self {
768        Self {
769            stream: stream.into(),
770            if_exists: false,
771        }
772    }
773
774    /// Overwrite the if exists parameter.
775    pub fn with_if_exists(self, if_exists: bool) -> Self {
776        Self { if_exists, ..self }
777    }
778}
779
780impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
781    fn from(value: DeleteStreamRequest) -> Self {
782        let DeleteStreamRequest { stream, .. } = value;
783        Self { stream }
784    }
785}
786
787#[sync_docs]
788#[derive(Debug, Clone)]
789pub struct ReconfigureBasinRequest {
790    pub basin: BasinName,
791    pub config: Option<BasinConfig>,
792    pub mask: Option<Vec<String>>,
793}
794
795impl ReconfigureBasinRequest {
796    /// Create a new request with basin name.
797    pub fn new(basin: BasinName) -> Self {
798        Self {
799            basin,
800            config: None,
801            mask: None,
802        }
803    }
804
805    /// Overwrite basin config.
806    pub fn with_config(self, config: BasinConfig) -> Self {
807        Self {
808            config: Some(config),
809            ..self
810        }
811    }
812
813    /// Overwrite field mask.
814    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
815        Self {
816            mask: Some(mask.into()),
817            ..self
818        }
819    }
820}
821
822impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
823    fn from(value: ReconfigureBasinRequest) -> Self {
824        let ReconfigureBasinRequest {
825            basin,
826            config,
827            mask,
828        } = value;
829        Self {
830            basin: basin.0,
831            config: config.map(Into::into),
832            mask: mask.map(|paths| prost_types::FieldMask { paths }),
833        }
834    }
835}
836
837impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
838    type Error = ConvertError;
839    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
840        let api::ReconfigureBasinResponse { config } = value;
841        let config = config.ok_or("missing basin config")?;
842        Ok(config.into())
843    }
844}
845
846#[sync_docs]
847#[derive(Debug, Clone)]
848pub struct ReconfigureStreamRequest {
849    pub stream: String,
850    pub config: Option<StreamConfig>,
851    pub mask: Option<Vec<String>>,
852}
853
854impl ReconfigureStreamRequest {
855    /// Create a new request with stream name.
856    pub fn new(stream: impl Into<String>) -> Self {
857        Self {
858            stream: stream.into(),
859            config: None,
860            mask: None,
861        }
862    }
863
864    /// Overwrite stream config.
865    pub fn with_config(self, config: StreamConfig) -> Self {
866        Self {
867            config: Some(config),
868            ..self
869        }
870    }
871
872    /// Overwrite field mask.
873    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
874        Self {
875            mask: Some(mask.into()),
876            ..self
877        }
878    }
879}
880
881impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
882    fn from(value: ReconfigureStreamRequest) -> Self {
883        let ReconfigureStreamRequest {
884            stream,
885            config,
886            mask,
887        } = value;
888        Self {
889            stream,
890            config: config.map(Into::into),
891            mask: mask.map(|paths| prost_types::FieldMask { paths }),
892        }
893    }
894}
895
896impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
897    type Error = ConvertError;
898    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
899        let api::ReconfigureStreamResponse { config } = value;
900        let config = config.ok_or("missing stream config")?;
901        Ok(config.into())
902    }
903}
904
905impl From<api::CheckTailResponse> for StreamPosition {
906    fn from(value: api::CheckTailResponse) -> Self {
907        let api::CheckTailResponse {
908            next_seq_num,
909            last_timestamp,
910        } = value;
911        StreamPosition {
912            seq_num: next_seq_num,
913            timestamp: last_timestamp,
914        }
915    }
916}
917
918/// Position of a record in a stream.
919#[derive(Debug, Clone, PartialEq, Eq)]
920pub struct StreamPosition {
921    /// Sequence number assigned by the service.
922    pub seq_num: u64,
923    /// Timestamp, which may be user-specified or assigned by the service.
924    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
925    pub timestamp: u64,
926}
927
928#[sync_docs]
929#[derive(Debug, Clone, PartialEq, Eq)]
930pub struct Header {
931    pub name: Bytes,
932    pub value: Bytes,
933}
934
935impl Header {
936    /// Create a new header from name and value.
937    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
938        Self {
939            name: name.into(),
940            value: value.into(),
941        }
942    }
943
944    /// Create a new header from value.
945    pub fn from_value(value: impl Into<Bytes>) -> Self {
946        Self {
947            name: Bytes::new(),
948            value: value.into(),
949        }
950    }
951}
952
953impl From<Header> for api::Header {
954    fn from(value: Header) -> Self {
955        let Header { name, value } = value;
956        Self { name, value }
957    }
958}
959
960impl From<api::Header> for Header {
961    fn from(value: api::Header) -> Self {
962        let api::Header { name, value } = value;
963        Self { name, value }
964    }
965}
966
967/// A fencing token can be enforced on append requests.
968///
969/// Must not be more than 36 bytes.
970#[derive(Debug, Clone, Default, PartialEq, Eq)]
971pub struct FencingToken(String);
972
973impl FencingToken {
974    const MAX_BYTES: usize = 36;
975
976    /// Generate a random alphanumeric fencing token of `n` bytes.
977    pub fn generate(n: usize) -> Result<Self, ConvertError> {
978        rand::rng()
979            .sample_iter(&rand::distr::Alphanumeric)
980            .take(n)
981            .map(char::from)
982            .collect::<String>()
983            .parse()
984    }
985}
986
987impl Deref for FencingToken {
988    type Target = str;
989
990    fn deref(&self) -> &Self::Target {
991        &self.0
992    }
993}
994
995impl std::fmt::Display for FencingToken {
996    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
997        write!(f, "{}", self.0)
998    }
999}
1000
1001impl FromStr for FencingToken {
1002    type Err = ConvertError;
1003
1004    fn from_str(value: &str) -> Result<Self, Self::Err> {
1005        value.to_string().try_into()
1006    }
1007}
1008
1009impl TryFrom<String> for FencingToken {
1010    type Error = ConvertError;
1011
1012    fn try_from(value: String) -> Result<Self, Self::Error> {
1013        if value.len() > Self::MAX_BYTES {
1014            Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1015        } else {
1016            Ok(Self(value))
1017        }
1018    }
1019}
1020
1021impl From<FencingToken> for String {
1022    fn from(value: FencingToken) -> Self {
1023        value.0
1024    }
1025}
1026
1027/// Command to send through a `CommandRecord`.
1028#[derive(Debug, Clone)]
1029pub enum Command {
1030    /// Enforce a fencing token.
1031    ///
1032    /// Fencing is strongly consistent, and subsequent appends that specify a
1033    /// fencing token will be rejected if it does not match.
1034    Fence {
1035        /// Fencing token to enforce.
1036        ///
1037        /// Set empty to clear the token.
1038        fencing_token: FencingToken,
1039    },
1040    /// Request a trim till the sequence number.
1041    ///
1042    /// Trimming is eventually consistent, and trimmed records may be visible
1043    /// for a brief period
1044    Trim {
1045        /// Trim point.
1046        ///
1047        /// This sequence number is only allowed to advance, and any regression
1048        /// will be ignored.
1049        seq_num: u64,
1050    },
1051}
1052
1053/// A command record is a special kind of [`AppendRecord`] that can be used to
1054/// send command messages.
1055///
1056/// Such a record is signalled by a sole header with empty name. The header
1057/// value represents the operation and record body acts as the payload.
1058#[derive(Debug, Clone)]
1059pub struct CommandRecord {
1060    /// Command kind.
1061    pub command: Command,
1062    /// Timestamp for the record.
1063    pub timestamp: Option<u64>,
1064}
1065
1066impl CommandRecord {
1067    const FENCE: &[u8] = b"fence";
1068    const TRIM: &[u8] = b"trim";
1069
1070    /// Create a new fence command record.
1071    pub fn fence(fencing_token: FencingToken) -> Self {
1072        Self {
1073            command: Command::Fence { fencing_token },
1074            timestamp: None,
1075        }
1076    }
1077
1078    /// Create a new trim command record.
1079    pub fn trim(seq_num: impl Into<u64>) -> Self {
1080        Self {
1081            command: Command::Trim {
1082                seq_num: seq_num.into(),
1083            },
1084            timestamp: None,
1085        }
1086    }
1087
1088    /// Overwrite timestamp.
1089    pub fn with_timestamp(self, timestamp: u64) -> Self {
1090        Self {
1091            timestamp: Some(timestamp),
1092            ..self
1093        }
1094    }
1095}
1096
1097#[sync_docs]
1098#[derive(Debug, Clone, PartialEq, Eq)]
1099pub struct AppendRecord {
1100    timestamp: Option<u64>,
1101    headers: Vec<Header>,
1102    body: Bytes,
1103    #[cfg(test)]
1104    max_bytes: u64,
1105}
1106
1107metered_impl!(AppendRecord);
1108
1109impl AppendRecord {
1110    const MAX_BYTES: u64 = MIB_BYTES;
1111
1112    fn validated(self) -> Result<Self, ConvertError> {
1113        #[cfg(test)]
1114        let max_bytes = self.max_bytes;
1115        #[cfg(not(test))]
1116        let max_bytes = Self::MAX_BYTES;
1117
1118        if self.metered_bytes() > max_bytes {
1119            Err("AppendRecord should have metered size less than 1 MiB".into())
1120        } else {
1121            Ok(self)
1122        }
1123    }
1124
1125    /// Try creating a new append record with body.
1126    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1127        Self {
1128            timestamp: None,
1129            headers: Vec::new(),
1130            body: body.into(),
1131            #[cfg(test)]
1132            max_bytes: Self::MAX_BYTES,
1133        }
1134        .validated()
1135    }
1136
1137    #[cfg(test)]
1138    pub(crate) fn with_max_bytes(
1139        max_bytes: u64,
1140        body: impl Into<Bytes>,
1141    ) -> Result<Self, ConvertError> {
1142        Self {
1143            timestamp: None,
1144            headers: Vec::new(),
1145            body: body.into(),
1146            max_bytes,
1147        }
1148        .validated()
1149    }
1150
1151    /// Overwrite headers.
1152    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1153        Self {
1154            headers: headers.into(),
1155            ..self
1156        }
1157        .validated()
1158    }
1159
1160    /// Overwrite timestamp.
1161    pub fn with_timestamp(self, timestamp: u64) -> Self {
1162        Self {
1163            timestamp: Some(timestamp),
1164            ..self
1165        }
1166    }
1167
1168    /// Body of the record.
1169    pub fn body(&self) -> &[u8] {
1170        &self.body
1171    }
1172
1173    /// Headers of the record.
1174    pub fn headers(&self) -> &[Header] {
1175        &self.headers
1176    }
1177
1178    /// Timestamp for the record.
1179    pub fn timestamp(&self) -> Option<u64> {
1180        self.timestamp
1181    }
1182
1183    /// Consume the record and return parts.
1184    pub fn into_parts(self) -> AppendRecordParts {
1185        AppendRecordParts {
1186            timestamp: self.timestamp,
1187            headers: self.headers,
1188            body: self.body,
1189        }
1190    }
1191
1192    /// Try creating the record from parts.
1193    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1194        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1195        if let Some(timestamp) = parts.timestamp {
1196            Ok(record.with_timestamp(timestamp))
1197        } else {
1198            Ok(record)
1199        }
1200    }
1201}
1202
1203impl From<AppendRecord> for api::AppendRecord {
1204    fn from(value: AppendRecord) -> Self {
1205        Self {
1206            timestamp: value.timestamp,
1207            headers: value.headers.into_iter().map(Into::into).collect(),
1208            body: value.body,
1209        }
1210    }
1211}
1212
1213impl From<CommandRecord> for AppendRecord {
1214    fn from(value: CommandRecord) -> Self {
1215        let (header_value, body) = match value.command {
1216            Command::Fence { fencing_token } => (
1217                CommandRecord::FENCE,
1218                Bytes::copy_from_slice(fencing_token.as_bytes()),
1219            ),
1220            Command::Trim { seq_num } => (
1221                CommandRecord::TRIM,
1222                Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1223            ),
1224        };
1225        AppendRecordParts {
1226            timestamp: value.timestamp,
1227            headers: vec![Header::from_value(header_value)],
1228            body,
1229        }
1230        .try_into()
1231        .expect("command record is a valid append record")
1232    }
1233}
1234
1235#[sync_docs(AppendRecordParts = "AppendRecord")]
1236#[derive(Debug, Clone)]
1237pub struct AppendRecordParts {
1238    pub timestamp: Option<u64>,
1239    pub headers: Vec<Header>,
1240    pub body: Bytes,
1241}
1242
1243impl From<AppendRecord> for AppendRecordParts {
1244    fn from(value: AppendRecord) -> Self {
1245        value.into_parts()
1246    }
1247}
1248
1249impl TryFrom<AppendRecordParts> for AppendRecord {
1250    type Error = ConvertError;
1251
1252    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1253        Self::try_from_parts(value)
1254    }
1255}
1256
1257/// A collection of append records that can be sent together in a batch.
1258#[derive(Debug, Clone)]
1259pub struct AppendRecordBatch {
1260    records: Vec<AppendRecord>,
1261    metered_bytes: u64,
1262    max_capacity: usize,
1263    #[cfg(test)]
1264    max_bytes: u64,
1265}
1266
1267impl PartialEq for AppendRecordBatch {
1268    fn eq(&self, other: &Self) -> bool {
1269        if self.records.eq(&other.records) {
1270            assert_eq!(self.metered_bytes, other.metered_bytes);
1271            true
1272        } else {
1273            false
1274        }
1275    }
1276}
1277
1278impl Eq for AppendRecordBatch {}
1279
1280impl Default for AppendRecordBatch {
1281    fn default() -> Self {
1282        Self::new()
1283    }
1284}
1285
1286impl AppendRecordBatch {
1287    /// Maximum number of records that a batch can hold.
1288    ///
1289    /// A record batch cannot be created with a bigger capacity.
1290    pub const MAX_CAPACITY: usize = 1000;
1291
1292    /// Maximum metered bytes of the batch.
1293    pub const MAX_BYTES: u64 = MIB_BYTES;
1294
1295    /// Create an empty record batch.
1296    pub fn new() -> Self {
1297        Self::with_max_capacity(Self::MAX_CAPACITY)
1298    }
1299
1300    /// Create an empty record batch with custom max capacity.
1301    ///
1302    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1303    pub fn with_max_capacity(max_capacity: usize) -> Self {
1304        assert!(
1305            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1306            "Batch capacity must be between 1 and 1000"
1307        );
1308
1309        Self {
1310            records: Vec::with_capacity(max_capacity),
1311            metered_bytes: 0,
1312            max_capacity,
1313            #[cfg(test)]
1314            max_bytes: Self::MAX_BYTES,
1315        }
1316    }
1317
1318    #[cfg(test)]
1319    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1320        #[cfg(test)]
1321        assert!(
1322            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1323            "Batch size must be between 1 byte and 1 MiB"
1324        );
1325
1326        Self {
1327            max_bytes,
1328            ..Self::with_max_capacity(max_capacity)
1329        }
1330    }
1331
1332    /// Try creating a record batch from an iterator.
1333    ///
1334    /// If all the items of the iterator cannot be drained into the batch, the
1335    /// error returned contains a batch containing all records it could fit
1336    /// along-with the left over items from the iterator.
1337    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1338    where
1339        R: Into<AppendRecord>,
1340        T: IntoIterator<Item = R>,
1341    {
1342        let mut records = Self::new();
1343        let mut pending = Vec::new();
1344
1345        let mut iter = iter.into_iter();
1346
1347        for record in iter.by_ref() {
1348            if let Err(record) = records.push(record) {
1349                pending.push(record);
1350                break;
1351            }
1352        }
1353
1354        if pending.is_empty() {
1355            Ok(records)
1356        } else {
1357            pending.extend(iter.map(Into::into));
1358            Err((records, pending))
1359        }
1360    }
1361
1362    /// Returns true if the batch contains no records.
1363    pub fn is_empty(&self) -> bool {
1364        if self.records.is_empty() {
1365            assert_eq!(self.metered_bytes, 0);
1366            true
1367        } else {
1368            false
1369        }
1370    }
1371
1372    /// Returns the number of records contained in the batch.
1373    pub fn len(&self) -> usize {
1374        self.records.len()
1375    }
1376
1377    #[cfg(test)]
1378    fn max_bytes(&self) -> u64 {
1379        self.max_bytes
1380    }
1381
1382    #[cfg(not(test))]
1383    fn max_bytes(&self) -> u64 {
1384        Self::MAX_BYTES
1385    }
1386
1387    /// Returns true if the batch cannot fit any more records.
1388    pub fn is_full(&self) -> bool {
1389        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1390    }
1391
1392    /// Try to append a new record into the batch.
1393    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1394        assert!(self.records.len() <= self.max_capacity);
1395        assert!(self.metered_bytes <= self.max_bytes());
1396
1397        let record = record.into();
1398        let record_size = record.metered_bytes();
1399        if self.records.len() >= self.max_capacity
1400            || self.metered_bytes + record_size > self.max_bytes()
1401        {
1402            Err(record)
1403        } else {
1404            self.records.push(record);
1405            self.metered_bytes += record_size;
1406            Ok(())
1407        }
1408    }
1409}
1410
1411impl MeteredBytes for AppendRecordBatch {
1412    fn metered_bytes(&self) -> u64 {
1413        self.metered_bytes
1414    }
1415}
1416
1417impl IntoIterator for AppendRecordBatch {
1418    type Item = AppendRecord;
1419    type IntoIter = std::vec::IntoIter<Self::Item>;
1420
1421    fn into_iter(self) -> Self::IntoIter {
1422        self.records.into_iter()
1423    }
1424}
1425
1426impl<'a> IntoIterator for &'a AppendRecordBatch {
1427    type Item = &'a AppendRecord;
1428    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1429
1430    fn into_iter(self) -> Self::IntoIter {
1431        self.records.iter()
1432    }
1433}
1434
1435impl AsRef<[AppendRecord]> for AppendRecordBatch {
1436    fn as_ref(&self) -> &[AppendRecord] {
1437        &self.records
1438    }
1439}
1440
1441#[sync_docs]
1442#[derive(Debug, Default, Clone)]
1443pub struct AppendInput {
1444    pub records: AppendRecordBatch,
1445    pub match_seq_num: Option<u64>,
1446    pub fencing_token: Option<FencingToken>,
1447}
1448
1449impl MeteredBytes for AppendInput {
1450    fn metered_bytes(&self) -> u64 {
1451        self.records.metered_bytes()
1452    }
1453}
1454
1455impl AppendInput {
1456    /// Create a new append input from record batch.
1457    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1458        Self {
1459            records: records.into(),
1460            match_seq_num: None,
1461            fencing_token: None,
1462        }
1463    }
1464
1465    /// Overwrite match sequence number.
1466    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1467        Self {
1468            match_seq_num: Some(match_seq_num.into()),
1469            ..self
1470        }
1471    }
1472
1473    /// Overwrite fencing token.
1474    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1475        Self {
1476            fencing_token: Some(fencing_token),
1477            ..self
1478        }
1479    }
1480
1481    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1482        let Self {
1483            records,
1484            match_seq_num,
1485            fencing_token,
1486        } = self;
1487
1488        api::AppendInput {
1489            stream: stream.into(),
1490            records: records.into_iter().map(Into::into).collect(),
1491            match_seq_num,
1492            fencing_token: fencing_token.map(|f| f.0),
1493        }
1494    }
1495}
1496
1497/// Acknowledgment to an append request.
1498#[derive(Debug, Clone)]
1499pub struct AppendAck {
1500    /// Sequence number and timestamp of the first record that was appended.
1501    pub start: StreamPosition,
1502    /// Sequence number of the last record that was appended + 1,
1503    /// and timestamp of the last record that was appended.
1504    /// The difference between `end.seq_num` and `start.seq_num`
1505    /// will be the number of records appended.
1506    pub end: StreamPosition,
1507    /// Sequence number that will be assigned to the next record on the stream,
1508    /// and timestamp of the last record on the stream.
1509    /// This can be greater than the `end` position in case of concurrent appends.
1510    pub tail: StreamPosition,
1511}
1512
1513impl From<api::AppendOutput> for AppendAck {
1514    fn from(value: api::AppendOutput) -> Self {
1515        let api::AppendOutput {
1516            start_seq_num,
1517            start_timestamp,
1518            end_seq_num,
1519            end_timestamp,
1520            next_seq_num,
1521            last_timestamp,
1522        } = value;
1523        let start = StreamPosition {
1524            seq_num: start_seq_num,
1525            timestamp: start_timestamp,
1526        };
1527        let end = StreamPosition {
1528            seq_num: end_seq_num,
1529            timestamp: end_timestamp,
1530        };
1531        let tail = StreamPosition {
1532            seq_num: next_seq_num,
1533            timestamp: last_timestamp,
1534        };
1535        Self { start, end, tail }
1536    }
1537}
1538
1539impl TryFrom<api::AppendResponse> for AppendAck {
1540    type Error = ConvertError;
1541    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1542        let api::AppendResponse { output } = value;
1543        let output = output.ok_or("missing append output")?;
1544        Ok(output.into())
1545    }
1546}
1547
1548impl TryFrom<api::AppendSessionResponse> for AppendAck {
1549    type Error = ConvertError;
1550    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1551        let api::AppendSessionResponse { output } = value;
1552        let output = output.ok_or("missing append output")?;
1553        Ok(output.into())
1554    }
1555}
1556
1557#[sync_docs]
1558#[derive(Debug, Clone, Default)]
1559pub struct ReadLimit {
1560    pub count: Option<u64>,
1561    pub bytes: Option<u64>,
1562}
1563
1564impl ReadLimit {
1565    /// Create a new read limit.
1566    pub fn new() -> Self {
1567        Self::default()
1568    }
1569
1570    /// Overwrite count limit.
1571    pub fn with_count(self, count: u64) -> Self {
1572        Self {
1573            count: Some(count),
1574            ..self
1575        }
1576    }
1577
1578    /// Overwrite bytes limit.
1579    pub fn with_bytes(self, bytes: u64) -> Self {
1580        Self {
1581            bytes: Some(bytes),
1582            ..self
1583        }
1584    }
1585}
1586
1587/// Starting point for read requests.
1588#[derive(Debug, Clone)]
1589pub enum ReadStart {
1590    /// Sequence number.
1591    SeqNum(u64),
1592    /// Timestamp.
1593    Timestamp(u64),
1594    /// Number of records before the tail, i.e. the next sequence number.
1595    TailOffset(u64),
1596}
1597
1598impl Default for ReadStart {
1599    fn default() -> Self {
1600        Self::SeqNum(0)
1601    }
1602}
1603
1604impl From<ReadStart> for api::read_request::Start {
1605    fn from(start: ReadStart) -> Self {
1606        match start {
1607            ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1608            ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1609            ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1610        }
1611    }
1612}
1613
1614impl From<ReadStart> for api::read_session_request::Start {
1615    fn from(start: ReadStart) -> Self {
1616        match start {
1617            ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1618            ReadStart::Timestamp(timestamp) => {
1619                api::read_session_request::Start::Timestamp(timestamp)
1620            }
1621            ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1622        }
1623    }
1624}
1625
1626#[sync_docs]
1627#[derive(Debug, Clone, Default)]
1628pub struct ReadRequest {
1629    pub start: ReadStart,
1630    pub limit: ReadLimit,
1631}
1632
1633impl ReadRequest {
1634    /// Create a new request with the specified starting point.
1635    pub fn new(start: ReadStart) -> Self {
1636        Self {
1637            start,
1638            ..Default::default()
1639        }
1640    }
1641
1642    /// Overwrite limit.
1643    pub fn with_limit(self, limit: ReadLimit) -> Self {
1644        Self { limit, ..self }
1645    }
1646}
1647
1648impl ReadRequest {
1649    pub(crate) fn try_into_api_type(
1650        self,
1651        stream: impl Into<String>,
1652    ) -> Result<api::ReadRequest, ConvertError> {
1653        let Self { start, limit } = self;
1654
1655        let limit = if limit.count > Some(1000) {
1656            Err("read limit: count must not exceed 1000 for unary request")
1657        } else if limit.bytes > Some(MIB_BYTES) {
1658            Err("read limit: bytes must not exceed 1MiB for unary request")
1659        } else {
1660            Ok(api::ReadLimit {
1661                count: limit.count,
1662                bytes: limit.bytes,
1663            })
1664        }?;
1665
1666        Ok(api::ReadRequest {
1667            stream: stream.into(),
1668            start: Some(start.into()),
1669            limit: Some(limit),
1670        })
1671    }
1672}
1673
1674#[sync_docs]
1675#[derive(Debug, Clone)]
1676pub struct SequencedRecord {
1677    pub seq_num: u64,
1678    pub timestamp: u64,
1679    pub headers: Vec<Header>,
1680    pub body: Bytes,
1681}
1682
1683metered_impl!(SequencedRecord);
1684
1685impl From<api::SequencedRecord> for SequencedRecord {
1686    fn from(value: api::SequencedRecord) -> Self {
1687        let api::SequencedRecord {
1688            seq_num,
1689            timestamp,
1690            headers,
1691            body,
1692        } = value;
1693        Self {
1694            seq_num,
1695            timestamp,
1696            headers: headers.into_iter().map(Into::into).collect(),
1697            body,
1698        }
1699    }
1700}
1701
1702impl SequencedRecord {
1703    /// Try representing the sequenced record as a command record.
1704    pub fn as_command_record(&self) -> Option<CommandRecord> {
1705        if self.headers.len() != 1 {
1706            return None;
1707        }
1708
1709        let header = self.headers.first().expect("pre-validated length");
1710
1711        if !header.name.is_empty() {
1712            return None;
1713        }
1714
1715        match header.value.as_ref() {
1716            CommandRecord::FENCE => {
1717                let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1718                Some(CommandRecord {
1719                    command: Command::Fence { fencing_token },
1720                    timestamp: Some(self.timestamp),
1721                })
1722            }
1723            CommandRecord::TRIM => {
1724                let body: &[u8] = &self.body;
1725                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1726                Some(CommandRecord {
1727                    command: Command::Trim { seq_num },
1728                    timestamp: Some(self.timestamp),
1729                })
1730            }
1731            _ => None,
1732        }
1733    }
1734}
1735
1736#[sync_docs]
1737#[derive(Debug, Clone)]
1738pub struct SequencedRecordBatch {
1739    pub records: Vec<SequencedRecord>,
1740}
1741
1742impl MeteredBytes for SequencedRecordBatch {
1743    fn metered_bytes(&self) -> u64 {
1744        self.records.metered_bytes()
1745    }
1746}
1747
1748impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1749    fn from(value: api::SequencedRecordBatch) -> Self {
1750        let api::SequencedRecordBatch { records } = value;
1751        Self {
1752            records: records.into_iter().map(Into::into).collect(),
1753        }
1754    }
1755}
1756
1757#[sync_docs(ReadOutput = "Output")]
1758#[derive(Debug, Clone)]
1759pub enum ReadOutput {
1760    Batch(SequencedRecordBatch),
1761    NextSeqNum(u64),
1762}
1763
1764impl From<api::read_output::Output> for ReadOutput {
1765    fn from(value: api::read_output::Output) -> Self {
1766        match value {
1767            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1768            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1769        }
1770    }
1771}
1772
1773impl TryFrom<api::ReadOutput> for ReadOutput {
1774    type Error = ConvertError;
1775    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1776        let api::ReadOutput { output } = value;
1777        let output = output.ok_or("missing read output")?;
1778        Ok(output.into())
1779    }
1780}
1781
1782impl TryFrom<api::ReadResponse> for ReadOutput {
1783    type Error = ConvertError;
1784    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1785        let api::ReadResponse { output } = value;
1786        let output = output.ok_or("missing output in read response")?;
1787        output.try_into()
1788    }
1789}
1790
1791#[sync_docs]
1792#[derive(Debug, Clone, Default)]
1793pub struct ReadSessionRequest {
1794    pub start: ReadStart,
1795    pub limit: ReadLimit,
1796}
1797
1798impl ReadSessionRequest {
1799    /// Create a new request with the specified starting point.
1800    pub fn new(start: ReadStart) -> Self {
1801        Self {
1802            start,
1803            ..Default::default()
1804        }
1805    }
1806
1807    /// Overwrite limit.
1808    pub fn with_limit(self, limit: ReadLimit) -> Self {
1809        Self { limit, ..self }
1810    }
1811
1812    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1813        let Self { start, limit } = self;
1814        api::ReadSessionRequest {
1815            stream: stream.into(),
1816            start: Some(start.into()),
1817            limit: Some(api::ReadLimit {
1818                count: limit.count,
1819                bytes: limit.bytes,
1820            }),
1821            heartbeats: false,
1822        }
1823    }
1824}
1825
1826impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1827    type Error = ConvertError;
1828    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1829        let api::ReadSessionResponse { output } = value;
1830        let output = output.ok_or("missing output in read session response")?;
1831        output.try_into()
1832    }
1833}
1834
1835/// Name of a basin.
1836///
1837/// Must be between 8 and 48 characters in length. Must comprise lowercase
1838/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1839#[derive(Debug, Clone)]
1840pub struct BasinName(String);
1841
1842impl Deref for BasinName {
1843    type Target = str;
1844    fn deref(&self) -> &Self::Target {
1845        &self.0
1846    }
1847}
1848
1849impl TryFrom<String> for BasinName {
1850    type Error = ConvertError;
1851
1852    fn try_from(name: String) -> Result<Self, Self::Error> {
1853        if name.len() < 8 || name.len() > 48 {
1854            return Err("Basin name must be between 8 and 48 characters in length".into());
1855        }
1856
1857        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1858        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1859            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1860                .expect("Failed to compile basin name regex")
1861        });
1862
1863        if !regex.is_match(&name) {
1864            return Err(
1865                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1866                It cannot begin or end with a hyphen."
1867                    .into(),
1868            );
1869        }
1870
1871        Ok(Self(name))
1872    }
1873}
1874
1875impl FromStr for BasinName {
1876    type Err = ConvertError;
1877
1878    fn from_str(s: &str) -> Result<Self, Self::Err> {
1879        s.to_string().try_into()
1880    }
1881}
1882
1883impl std::fmt::Display for BasinName {
1884    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1885        f.write_str(&self.0)
1886    }
1887}
1888
1889impl From<BasinName> for String {
1890    fn from(value: BasinName) -> Self {
1891        value.0
1892    }
1893}
1894
1895/// Access token ID.
1896/// Must be between 1 and 96 characters.
1897#[derive(Debug, Clone)]
1898pub struct AccessTokenId(String);
1899
1900impl Deref for AccessTokenId {
1901    type Target = str;
1902
1903    fn deref(&self) -> &Self::Target {
1904        &self.0
1905    }
1906}
1907
1908impl TryFrom<String> for AccessTokenId {
1909    type Error = ConvertError;
1910
1911    fn try_from(name: String) -> Result<Self, Self::Error> {
1912        if name.is_empty() {
1913            return Err("Access token ID must not be empty".into());
1914        }
1915
1916        if name.len() > 96 {
1917            return Err("Access token ID must not exceed 96 characters".into());
1918        }
1919
1920        Ok(Self(name))
1921    }
1922}
1923
1924impl From<AccessTokenId> for String {
1925    fn from(value: AccessTokenId) -> Self {
1926        value.0
1927    }
1928}
1929
1930impl FromStr for AccessTokenId {
1931    type Err = ConvertError;
1932
1933    fn from_str(s: &str) -> Result<Self, Self::Err> {
1934        s.to_string().try_into()
1935    }
1936}
1937
1938impl std::fmt::Display for AccessTokenId {
1939    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1940        f.write_str(&self.0)
1941    }
1942}
1943
1944impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1945    fn from(value: AccessTokenInfo) -> Self {
1946        Self {
1947            info: Some(value.into()),
1948        }
1949    }
1950}
1951
1952#[sync_docs]
1953#[derive(Debug, Clone)]
1954pub struct AccessTokenInfo {
1955    pub id: AccessTokenId,
1956    pub expires_at: Option<u32>,
1957    pub auto_prefix_streams: bool,
1958    pub scope: Option<AccessTokenScope>,
1959}
1960
1961impl AccessTokenInfo {
1962    /// Create a new access token info.
1963    pub fn new(id: AccessTokenId) -> Self {
1964        Self {
1965            id,
1966            expires_at: None,
1967            auto_prefix_streams: false,
1968            scope: None,
1969        }
1970    }
1971
1972    /// Overwrite expiration time.
1973    pub fn with_expires_at(self, expires_at: u32) -> Self {
1974        Self {
1975            expires_at: Some(expires_at),
1976            ..self
1977        }
1978    }
1979
1980    /// Overwrite auto prefix streams.
1981    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1982        Self {
1983            auto_prefix_streams,
1984            ..self
1985        }
1986    }
1987
1988    /// Overwrite scope.
1989    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1990        Self {
1991            scope: Some(scope),
1992            ..self
1993        }
1994    }
1995}
1996
1997impl From<AccessTokenInfo> for api::AccessTokenInfo {
1998    fn from(value: AccessTokenInfo) -> Self {
1999        let AccessTokenInfo {
2000            id,
2001            expires_at,
2002            auto_prefix_streams,
2003            scope,
2004        } = value;
2005        Self {
2006            id: id.into(),
2007            expires_at,
2008            auto_prefix_streams,
2009            scope: scope.map(Into::into),
2010        }
2011    }
2012}
2013
2014impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2015    type Error = ConvertError;
2016
2017    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2018        let api::AccessTokenInfo {
2019            id,
2020            expires_at,
2021            auto_prefix_streams,
2022            scope,
2023        } = value;
2024        Ok(Self {
2025            id: id.try_into()?,
2026            expires_at,
2027            auto_prefix_streams,
2028            scope: scope.map(Into::into),
2029        })
2030    }
2031}
2032
2033#[sync_docs]
2034#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2035pub enum Operation {
2036    ListBasins,
2037    CreateBasin,
2038    DeleteBasin,
2039    ReconfigureBasin,
2040    GetBasinConfig,
2041    IssueAccessToken,
2042    RevokeAccessToken,
2043    ListAccessTokens,
2044    ListStreams,
2045    CreateStream,
2046    DeleteStream,
2047    GetStreamConfig,
2048    ReconfigureStream,
2049    CheckTail,
2050    Append,
2051    Read,
2052    Trim,
2053    Fence,
2054}
2055
2056impl FromStr for Operation {
2057    type Err = ConvertError;
2058
2059    fn from_str(s: &str) -> Result<Self, Self::Err> {
2060        match s.to_lowercase().as_str() {
2061            "list-basins" => Ok(Self::ListBasins),
2062            "create-basin" => Ok(Self::CreateBasin),
2063            "delete-basin" => Ok(Self::DeleteBasin),
2064            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2065            "get-basin-config" => Ok(Self::GetBasinConfig),
2066            "issue-access-token" => Ok(Self::IssueAccessToken),
2067            "revoke-access-token" => Ok(Self::RevokeAccessToken),
2068            "list-access-tokens" => Ok(Self::ListAccessTokens),
2069            "list-streams" => Ok(Self::ListStreams),
2070            "create-stream" => Ok(Self::CreateStream),
2071            "delete-stream" => Ok(Self::DeleteStream),
2072            "get-stream-config" => Ok(Self::GetStreamConfig),
2073            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2074            "check-tail" => Ok(Self::CheckTail),
2075            "append" => Ok(Self::Append),
2076            "read" => Ok(Self::Read),
2077            "trim" => Ok(Self::Trim),
2078            "fence" => Ok(Self::Fence),
2079            _ => Err("invalid operation".into()),
2080        }
2081    }
2082}
2083
2084impl From<Operation> for api::Operation {
2085    fn from(value: Operation) -> Self {
2086        match value {
2087            Operation::ListBasins => Self::ListBasins,
2088            Operation::CreateBasin => Self::CreateBasin,
2089            Operation::DeleteBasin => Self::DeleteBasin,
2090            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2091            Operation::GetBasinConfig => Self::GetBasinConfig,
2092            Operation::IssueAccessToken => Self::IssueAccessToken,
2093            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2094            Operation::ListAccessTokens => Self::ListAccessTokens,
2095            Operation::ListStreams => Self::ListStreams,
2096            Operation::CreateStream => Self::CreateStream,
2097            Operation::DeleteStream => Self::DeleteStream,
2098            Operation::GetStreamConfig => Self::GetStreamConfig,
2099            Operation::ReconfigureStream => Self::ReconfigureStream,
2100            Operation::CheckTail => Self::CheckTail,
2101            Operation::Append => Self::Append,
2102            Operation::Read => Self::Read,
2103            Operation::Trim => Self::Trim,
2104            Operation::Fence => Self::Fence,
2105        }
2106    }
2107}
2108
2109impl From<api::Operation> for Option<Operation> {
2110    fn from(value: api::Operation) -> Self {
2111        match value {
2112            api::Operation::Unspecified => None,
2113            api::Operation::ListBasins => Some(Operation::ListBasins),
2114            api::Operation::CreateBasin => Some(Operation::CreateBasin),
2115            api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2116            api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2117            api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2118            api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2119            api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2120            api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2121            api::Operation::ListStreams => Some(Operation::ListStreams),
2122            api::Operation::CreateStream => Some(Operation::CreateStream),
2123            api::Operation::DeleteStream => Some(Operation::DeleteStream),
2124            api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2125            api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2126            api::Operation::CheckTail => Some(Operation::CheckTail),
2127            api::Operation::Append => Some(Operation::Append),
2128            api::Operation::Read => Some(Operation::Read),
2129            api::Operation::Trim => Some(Operation::Trim),
2130            api::Operation::Fence => Some(Operation::Fence),
2131        }
2132    }
2133}
2134
2135#[sync_docs]
2136#[derive(Debug, Clone, Default)]
2137pub struct AccessTokenScope {
2138    pub basins: Option<ResourceSet>,
2139    pub streams: Option<ResourceSet>,
2140    pub access_tokens: Option<ResourceSet>,
2141    pub op_groups: Option<PermittedOperationGroups>,
2142    pub ops: HashSet<Operation>,
2143}
2144
2145impl AccessTokenScope {
2146    /// Create a new access token scope.
2147    pub fn new() -> Self {
2148        Self::default()
2149    }
2150
2151    /// Overwrite resource set for access tokens.
2152    pub fn with_basins(self, basins: ResourceSet) -> Self {
2153        Self {
2154            basins: Some(basins),
2155            ..self
2156        }
2157    }
2158
2159    /// Overwrite resource set for streams.
2160    pub fn with_streams(self, streams: ResourceSet) -> Self {
2161        Self {
2162            streams: Some(streams),
2163            ..self
2164        }
2165    }
2166
2167    /// Overwrite resource set for access tokens.
2168    pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2169        Self {
2170            access_tokens: Some(access_tokens),
2171            ..self
2172        }
2173    }
2174
2175    /// Overwrite operation groups.
2176    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2177        Self {
2178            op_groups: Some(op_groups),
2179            ..self
2180        }
2181    }
2182
2183    /// Overwrite operations.
2184    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2185        Self {
2186            ops: ops.into_iter().collect(),
2187            ..self
2188        }
2189    }
2190
2191    /// Add an operation to operations.
2192    pub fn with_op(self, op: Operation) -> Self {
2193        let mut ops = self.ops;
2194        ops.insert(op);
2195        Self { ops, ..self }
2196    }
2197}
2198
2199impl From<AccessTokenScope> for api::AccessTokenScope {
2200    fn from(value: AccessTokenScope) -> Self {
2201        let AccessTokenScope {
2202            basins,
2203            streams,
2204            access_tokens,
2205            op_groups,
2206            ops,
2207        } = value;
2208        Self {
2209            basins: basins.map(Into::into),
2210            streams: streams.map(Into::into),
2211            access_tokens: access_tokens.map(Into::into),
2212            op_groups: op_groups.map(Into::into),
2213            ops: ops
2214                .into_iter()
2215                .map(api::Operation::from)
2216                .map(Into::into)
2217                .collect(),
2218        }
2219    }
2220}
2221
2222impl From<api::AccessTokenScope> for AccessTokenScope {
2223    fn from(value: api::AccessTokenScope) -> Self {
2224        let api::AccessTokenScope {
2225            basins,
2226            streams,
2227            access_tokens,
2228            op_groups,
2229            ops,
2230        } = value;
2231        Self {
2232            basins: basins.and_then(|set| set.matching.map(Into::into)),
2233            streams: streams.and_then(|set| set.matching.map(Into::into)),
2234            access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2235            op_groups: op_groups.map(Into::into),
2236            ops: ops
2237                .into_iter()
2238                .map(api::Operation::try_from)
2239                .flat_map(Result::ok)
2240                .flat_map(<Option<Operation>>::from)
2241                .collect(),
2242        }
2243    }
2244}
2245
2246impl From<ResourceSet> for api::ResourceSet {
2247    fn from(value: ResourceSet) -> Self {
2248        Self {
2249            matching: Some(value.into()),
2250        }
2251    }
2252}
2253
2254#[sync_docs(ResourceSet = "Matching")]
2255#[derive(Debug, Clone)]
2256pub enum ResourceSet {
2257    Exact(String),
2258    Prefix(String),
2259}
2260
2261impl From<ResourceSet> for api::resource_set::Matching {
2262    fn from(value: ResourceSet) -> Self {
2263        match value {
2264            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2265            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2266        }
2267    }
2268}
2269
2270impl From<api::resource_set::Matching> for ResourceSet {
2271    fn from(value: api::resource_set::Matching) -> Self {
2272        match value {
2273            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2274            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2275        }
2276    }
2277}
2278
2279#[sync_docs]
2280#[derive(Debug, Clone, Default)]
2281pub struct PermittedOperationGroups {
2282    pub account: Option<ReadWritePermissions>,
2283    pub basin: Option<ReadWritePermissions>,
2284    pub stream: Option<ReadWritePermissions>,
2285}
2286
2287impl PermittedOperationGroups {
2288    /// Create a new permitted operation groups.
2289    pub fn new() -> Self {
2290        Self::default()
2291    }
2292
2293    /// Overwrite account read-write permissions.
2294    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2295        Self {
2296            account: Some(account),
2297            ..self
2298        }
2299    }
2300
2301    /// Overwrite basin read-write permissions.
2302    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2303        Self {
2304            basin: Some(basin),
2305            ..self
2306        }
2307    }
2308
2309    /// Overwrite stream read-write permissions.
2310    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2311        Self {
2312            stream: Some(stream),
2313            ..self
2314        }
2315    }
2316}
2317
2318impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2319    fn from(value: PermittedOperationGroups) -> Self {
2320        let PermittedOperationGroups {
2321            account,
2322            basin,
2323            stream,
2324        } = value;
2325        Self {
2326            account: account.map(Into::into),
2327            basin: basin.map(Into::into),
2328            stream: stream.map(Into::into),
2329        }
2330    }
2331}
2332
2333impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2334    fn from(value: api::PermittedOperationGroups) -> Self {
2335        let api::PermittedOperationGroups {
2336            account,
2337            basin,
2338            stream,
2339        } = value;
2340        Self {
2341            account: account.map(Into::into),
2342            basin: basin.map(Into::into),
2343            stream: stream.map(Into::into),
2344        }
2345    }
2346}
2347
2348#[sync_docs]
2349#[derive(Debug, Clone, Default)]
2350pub struct ReadWritePermissions {
2351    pub read: bool,
2352    pub write: bool,
2353}
2354
2355impl ReadWritePermissions {
2356    /// Create a new read-write permission.
2357    pub fn new() -> Self {
2358        Self::default()
2359    }
2360
2361    /// Overwrite read permission.
2362    pub fn with_read(self, read: bool) -> Self {
2363        Self { read, ..self }
2364    }
2365
2366    /// Overwrite write permission.
2367    pub fn with_write(self, write: bool) -> Self {
2368        Self { write, ..self }
2369    }
2370}
2371
2372impl From<ReadWritePermissions> for api::ReadWritePermissions {
2373    fn from(value: ReadWritePermissions) -> Self {
2374        let ReadWritePermissions { read, write } = value;
2375        Self { read, write }
2376    }
2377}
2378
2379impl From<api::ReadWritePermissions> for ReadWritePermissions {
2380    fn from(value: api::ReadWritePermissions) -> Self {
2381        let api::ReadWritePermissions { read, write } = value;
2382        Self { read, write }
2383    }
2384}
2385
2386impl From<api::IssueAccessTokenResponse> for String {
2387    fn from(value: api::IssueAccessTokenResponse) -> Self {
2388        value.access_token
2389    }
2390}
2391
2392impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2393    fn from(value: AccessTokenId) -> Self {
2394        Self { id: value.into() }
2395    }
2396}
2397
2398impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2399    type Error = ConvertError;
2400    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2401        let token_info = value.info.ok_or("access token info is missing")?;
2402        token_info.try_into()
2403    }
2404}
2405
2406#[sync_docs]
2407#[derive(Debug, Clone, Default)]
2408pub struct ListAccessTokensRequest {
2409    pub prefix: String,
2410    pub start_after: String,
2411    pub limit: Option<usize>,
2412}
2413
2414impl ListAccessTokensRequest {
2415    /// Create a new request with prefix.
2416    pub fn new() -> Self {
2417        Self::default()
2418    }
2419
2420    /// Overwrite prefix.
2421    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2422        Self {
2423            prefix: prefix.into(),
2424            ..self
2425        }
2426    }
2427
2428    /// Overwrite start after.
2429    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2430        Self {
2431            start_after: start_after.into(),
2432            ..self
2433        }
2434    }
2435
2436    /// Overwrite limit.
2437    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2438        Self {
2439            limit: limit.into(),
2440            ..self
2441        }
2442    }
2443}
2444
2445impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2446    type Error = ConvertError;
2447    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2448        let ListAccessTokensRequest {
2449            prefix,
2450            start_after,
2451            limit,
2452        } = value;
2453        Ok(Self {
2454            prefix,
2455            start_after,
2456            limit: limit
2457                .map(TryInto::try_into)
2458                .transpose()
2459                .map_err(|_| "request limit does not fit into u64 bounds")?,
2460        })
2461    }
2462}
2463
2464#[sync_docs]
2465#[derive(Debug, Clone)]
2466pub struct ListAccessTokensResponse {
2467    pub access_tokens: Vec<AccessTokenInfo>,
2468    pub has_more: bool,
2469}
2470
2471impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2472    type Error = ConvertError;
2473    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2474        let api::ListAccessTokensResponse {
2475            access_tokens,
2476            has_more,
2477        } = value;
2478        let access_tokens = access_tokens
2479            .into_iter()
2480            .map(TryInto::try_into)
2481            .collect::<Result<Vec<_>, _>>()?;
2482        Ok(Self {
2483            access_tokens,
2484            has_more,
2485        })
2486    }
2487}