s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{
4    collections::HashSet,
5    ops::{Deref, RangeTo},
6    str::FromStr,
7    sync::OnceLock,
8    time::Duration,
9};
10
11use bytes::Bytes;
12use rand::Rng;
13use regex::Regex;
14use sync_docs::sync_docs;
15
16use crate::api;
17
18pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
19pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
20
21/// Error related to conversion from one type to another.
22#[derive(Debug, Clone, thiserror::Error)]
23#[error("{0}")]
24pub struct ConvertError(String);
25
26impl<T: Into<String>> From<T> for ConvertError {
27    fn from(value: T) -> Self {
28        Self(value.into())
29    }
30}
31
32/// Metered size of the object in bytes.
33///
34/// Bytes are calculated using the "metered bytes" formula:
35///
36/// ```python
37/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
38/// ```
39pub trait MeteredBytes {
40    /// Return the metered bytes of the object.
41    fn metered_bytes(&self) -> u64;
42}
43
44impl<T: MeteredBytes> MeteredBytes for Vec<T> {
45    fn metered_bytes(&self) -> u64 {
46        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
47    }
48}
49
50macro_rules! metered_impl {
51    ($ty:ty) => {
52        impl MeteredBytes for $ty {
53            fn metered_bytes(&self) -> u64 {
54                let bytes = 8
55                    + (2 * self.headers.len())
56                    + self
57                        .headers
58                        .iter()
59                        .map(|h| h.name.len() + h.value.len())
60                        .sum::<usize>()
61                    + self.body.len();
62                bytes as u64
63            }
64        }
65    };
66}
67
68#[sync_docs]
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BasinScope {
71    AwsUsEast1,
72}
73
74impl From<BasinScope> for api::BasinScope {
75    fn from(value: BasinScope) -> Self {
76        match value {
77            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
78        }
79    }
80}
81
82impl From<api::BasinScope> for Option<BasinScope> {
83    fn from(value: api::BasinScope) -> Self {
84        match value {
85            api::BasinScope::Unspecified => None,
86            api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
87        }
88    }
89}
90
91impl FromStr for BasinScope {
92    type Err = ConvertError;
93    fn from_str(value: &str) -> Result<Self, Self::Err> {
94        match value {
95            "aws:us-east-1" => Ok(Self::AwsUsEast1),
96            _ => Err("invalid basin scope value".into()),
97        }
98    }
99}
100
101#[sync_docs]
102#[derive(Debug, Clone)]
103pub struct CreateBasinRequest {
104    pub basin: BasinName,
105    pub config: Option<BasinConfig>,
106    pub scope: Option<BasinScope>,
107}
108
109impl CreateBasinRequest {
110    /// Create a new request with basin name.
111    pub fn new(basin: BasinName) -> Self {
112        Self {
113            basin,
114            config: None,
115            scope: None,
116        }
117    }
118
119    /// Overwrite basin configuration.
120    pub fn with_config(self, config: BasinConfig) -> Self {
121        Self {
122            config: Some(config),
123            ..self
124        }
125    }
126
127    /// Overwrite basin scope.
128    pub fn with_scope(self, scope: BasinScope) -> Self {
129        Self {
130            scope: Some(scope),
131            ..self
132        }
133    }
134}
135
136impl From<CreateBasinRequest> for api::CreateBasinRequest {
137    fn from(value: CreateBasinRequest) -> Self {
138        let CreateBasinRequest {
139            basin,
140            config,
141            scope,
142        } = value;
143        Self {
144            basin: basin.0,
145            config: config.map(Into::into),
146            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
147        }
148    }
149}
150
151#[sync_docs]
152#[derive(Debug, Clone, Default)]
153pub struct BasinConfig {
154    pub default_stream_config: Option<StreamConfig>,
155    pub create_stream_on_append: bool,
156    pub create_stream_on_read: bool,
157}
158
159impl BasinConfig {
160    /// Create a new basin config.
161    pub fn new() -> Self {
162        Self::default()
163    }
164
165    /// Overwrite the default stream config.
166    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
167        Self {
168            default_stream_config: Some(default_stream_config),
169            ..self
170        }
171    }
172
173    /// Overwrite `create_stream_on_append`.
174    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
175        Self {
176            create_stream_on_append,
177            ..self
178        }
179    }
180
181    /// Overwrite `create_stream_on_read`.
182    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
183        Self {
184            create_stream_on_read,
185            ..self
186        }
187    }
188}
189
190impl From<BasinConfig> for api::BasinConfig {
191    fn from(value: BasinConfig) -> Self {
192        let BasinConfig {
193            default_stream_config,
194            create_stream_on_append,
195            create_stream_on_read,
196        } = value;
197        Self {
198            default_stream_config: default_stream_config.map(Into::into),
199            create_stream_on_append,
200            create_stream_on_read,
201        }
202    }
203}
204
205impl From<api::BasinConfig> for BasinConfig {
206    fn from(value: api::BasinConfig) -> Self {
207        let api::BasinConfig {
208            default_stream_config,
209            create_stream_on_append,
210            create_stream_on_read,
211        } = value;
212        Self {
213            default_stream_config: default_stream_config.map(Into::into),
214            create_stream_on_append,
215            create_stream_on_read,
216        }
217    }
218}
219
220#[sync_docs]
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TimestampingMode {
223    ClientPrefer,
224    ClientRequire,
225    Arrival,
226}
227
228impl From<TimestampingMode> for api::TimestampingMode {
229    fn from(value: TimestampingMode) -> Self {
230        match value {
231            TimestampingMode::ClientPrefer => Self::ClientPrefer,
232            TimestampingMode::ClientRequire => Self::ClientRequire,
233            TimestampingMode::Arrival => Self::Arrival,
234        }
235    }
236}
237
238impl From<api::TimestampingMode> for Option<TimestampingMode> {
239    fn from(value: api::TimestampingMode) -> Self {
240        match value {
241            api::TimestampingMode::Unspecified => None,
242            api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
243            api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
244            api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
245        }
246    }
247}
248
249#[sync_docs(TimestampingConfig = "Timestamping")]
250#[derive(Debug, Clone, Default)]
251/// Timestamping behavior.
252pub struct TimestampingConfig {
253    pub mode: Option<TimestampingMode>,
254    pub uncapped: Option<bool>,
255}
256
257impl TimestampingConfig {
258    /// Create a new timestamping config.
259    pub fn new() -> Self {
260        Self::default()
261    }
262
263    /// Overwrite timestamping mode.
264    pub fn with_mode(self, mode: TimestampingMode) -> Self {
265        Self {
266            mode: Some(mode),
267            ..self
268        }
269    }
270
271    /// Overwrite the uncapped knob.
272    pub fn with_uncapped(self, uncapped: bool) -> Self {
273        Self {
274            uncapped: Some(uncapped),
275            ..self
276        }
277    }
278}
279
280impl From<TimestampingConfig> for api::stream_config::Timestamping {
281    fn from(value: TimestampingConfig) -> Self {
282        Self {
283            mode: value
284                .mode
285                .map(api::TimestampingMode::from)
286                .unwrap_or_default()
287                .into(),
288            uncapped: value.uncapped,
289        }
290    }
291}
292
293impl From<api::stream_config::Timestamping> for TimestampingConfig {
294    fn from(value: api::stream_config::Timestamping) -> Self {
295        let mode = value.mode().into();
296        let uncapped = value.uncapped;
297        Self { mode, uncapped }
298    }
299}
300
301#[sync_docs]
302#[derive(Debug, Clone, Default)]
303pub struct StreamConfig {
304    pub storage_class: Option<StorageClass>,
305    pub retention_policy: Option<RetentionPolicy>,
306    pub timestamping: Option<TimestampingConfig>,
307}
308
309impl StreamConfig {
310    /// Create a new stream config.
311    pub fn new() -> Self {
312        Self::default()
313    }
314
315    /// Overwrite storage class.
316    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
317        Self {
318            storage_class: Some(storage_class),
319            ..self
320        }
321    }
322
323    /// Overwrite retention policy.
324    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
325        Self {
326            retention_policy: Some(retention_policy),
327            ..self
328        }
329    }
330
331    /// Overwrite timestamping config.
332    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
333        Self {
334            timestamping: Some(timestamping),
335            ..self
336        }
337    }
338}
339
340impl From<StreamConfig> for api::StreamConfig {
341    fn from(value: StreamConfig) -> Self {
342        let StreamConfig {
343            storage_class,
344            retention_policy,
345            timestamping,
346        } = value;
347        Self {
348            storage_class: storage_class
349                .map(api::StorageClass::from)
350                .unwrap_or_default()
351                .into(),
352            retention_policy: retention_policy.map(Into::into),
353            timestamping: timestamping.map(Into::into),
354        }
355    }
356}
357
358impl From<api::StreamConfig> for StreamConfig {
359    fn from(value: api::StreamConfig) -> Self {
360        Self {
361            storage_class: value.storage_class().into(),
362            retention_policy: value.retention_policy.map(Into::into),
363            timestamping: value.timestamping.map(Into::into),
364        }
365    }
366}
367
368#[sync_docs]
369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
370pub enum StorageClass {
371    Standard,
372    Express,
373}
374
375impl From<StorageClass> for api::StorageClass {
376    fn from(value: StorageClass) -> Self {
377        match value {
378            StorageClass::Standard => Self::Standard,
379            StorageClass::Express => Self::Express,
380        }
381    }
382}
383
384impl From<api::StorageClass> for Option<StorageClass> {
385    fn from(value: api::StorageClass) -> Self {
386        match value {
387            api::StorageClass::Unspecified => None,
388            api::StorageClass::Standard => Some(StorageClass::Standard),
389            api::StorageClass::Express => Some(StorageClass::Express),
390        }
391    }
392}
393
394impl FromStr for StorageClass {
395    type Err = ConvertError;
396
397    fn from_str(value: &str) -> Result<Self, Self::Err> {
398        match value {
399            "standard" => Ok(Self::Standard),
400            "express" => Ok(Self::Express),
401            v => Err(format!("unknown storage class: {v}").into()),
402        }
403    }
404}
405
406#[sync_docs(Age = "Age")]
407#[derive(Debug, Clone)]
408pub enum RetentionPolicy {
409    Age(Duration),
410}
411
412impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
413    fn from(value: RetentionPolicy) -> Self {
414        match value {
415            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
416        }
417    }
418}
419
420impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
421    fn from(value: api::stream_config::RetentionPolicy) -> Self {
422        match value {
423            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
424        }
425    }
426}
427
428#[sync_docs]
429#[derive(Debug, Clone, Copy, PartialEq, Eq)]
430pub enum BasinState {
431    Active,
432    Creating,
433    Deleting,
434}
435
436impl From<BasinState> for api::BasinState {
437    fn from(value: BasinState) -> Self {
438        match value {
439            BasinState::Active => Self::Active,
440            BasinState::Creating => Self::Creating,
441            BasinState::Deleting => Self::Deleting,
442        }
443    }
444}
445
446impl From<api::BasinState> for Option<BasinState> {
447    fn from(value: api::BasinState) -> Self {
448        match value {
449            api::BasinState::Unspecified => None,
450            api::BasinState::Active => Some(BasinState::Active),
451            api::BasinState::Creating => Some(BasinState::Creating),
452            api::BasinState::Deleting => Some(BasinState::Deleting),
453        }
454    }
455}
456
457impl std::fmt::Display for BasinState {
458    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459        match self {
460            BasinState::Active => write!(f, "active"),
461            BasinState::Creating => write!(f, "creating"),
462            BasinState::Deleting => write!(f, "deleting"),
463        }
464    }
465}
466
467#[sync_docs]
468#[derive(Debug, Clone)]
469pub struct BasinInfo {
470    pub name: String,
471    pub scope: Option<BasinScope>,
472    pub state: Option<BasinState>,
473}
474
475impl From<BasinInfo> for api::BasinInfo {
476    fn from(value: BasinInfo) -> Self {
477        let BasinInfo { name, scope, state } = value;
478        Self {
479            name,
480            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
481            state: state.map(api::BasinState::from).unwrap_or_default().into(),
482        }
483    }
484}
485
486impl From<api::BasinInfo> for BasinInfo {
487    fn from(value: api::BasinInfo) -> Self {
488        let scope = value.scope().into();
489        let state = value.state().into();
490        let name = value.name;
491        Self { name, scope, state }
492    }
493}
494
495impl TryFrom<api::CreateBasinResponse> for BasinInfo {
496    type Error = ConvertError;
497    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
498        let api::CreateBasinResponse { info } = value;
499        let info = info.ok_or("missing basin info")?;
500        Ok(info.into())
501    }
502}
503
504#[sync_docs]
505#[derive(Debug, Clone, Default)]
506pub struct ListStreamsRequest {
507    pub prefix: String,
508    pub start_after: String,
509    pub limit: Option<usize>,
510}
511
512impl ListStreamsRequest {
513    /// Create a new request.
514    pub fn new() -> Self {
515        Self::default()
516    }
517
518    /// Overwrite prefix.
519    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
520        Self {
521            prefix: prefix.into(),
522            ..self
523        }
524    }
525
526    /// Overwrite start after.
527    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
528        Self {
529            start_after: start_after.into(),
530            ..self
531        }
532    }
533
534    /// Overwrite limit.
535    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
536        Self {
537            limit: limit.into(),
538            ..self
539        }
540    }
541}
542
543impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
544    type Error = ConvertError;
545    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
546        let ListStreamsRequest {
547            prefix,
548            start_after,
549            limit,
550        } = value;
551        Ok(Self {
552            prefix,
553            start_after,
554            limit: limit.map(|n| n as u64),
555        })
556    }
557}
558
559#[sync_docs]
560#[derive(Debug, Clone)]
561pub struct StreamInfo {
562    pub name: String,
563    pub created_at: u32,
564    pub deleted_at: Option<u32>,
565}
566
567impl From<api::StreamInfo> for StreamInfo {
568    fn from(value: api::StreamInfo) -> Self {
569        Self {
570            name: value.name,
571            created_at: value.created_at,
572            deleted_at: value.deleted_at,
573        }
574    }
575}
576
577impl TryFrom<api::CreateStreamResponse> for StreamInfo {
578    type Error = ConvertError;
579
580    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
581        let api::CreateStreamResponse { info } = value;
582        let info = info.ok_or("missing stream info")?;
583        Ok(info.into())
584    }
585}
586
587#[sync_docs]
588#[derive(Debug, Clone)]
589pub struct ListStreamsResponse {
590    pub streams: Vec<StreamInfo>,
591    pub has_more: bool,
592}
593
594impl From<api::ListStreamsResponse> for ListStreamsResponse {
595    fn from(value: api::ListStreamsResponse) -> Self {
596        let api::ListStreamsResponse { streams, has_more } = value;
597        let streams = streams.into_iter().map(Into::into).collect();
598        Self { streams, has_more }
599    }
600}
601
602impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
603    type Error = ConvertError;
604
605    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
606        let api::GetBasinConfigResponse { config } = value;
607        let config = config.ok_or("missing basin config")?;
608        Ok(config.into())
609    }
610}
611
612impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
613    type Error = ConvertError;
614
615    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
616        let api::GetStreamConfigResponse { config } = value;
617        let config = config.ok_or("missing stream config")?;
618        Ok(config.into())
619    }
620}
621
622#[sync_docs]
623#[derive(Debug, Clone)]
624pub struct CreateStreamRequest {
625    pub stream: String,
626    pub config: Option<StreamConfig>,
627}
628
629impl CreateStreamRequest {
630    /// Create a new request with stream name.
631    pub fn new(stream: impl Into<String>) -> Self {
632        Self {
633            stream: stream.into(),
634            config: None,
635        }
636    }
637
638    /// Overwrite stream config.
639    pub fn with_config(self, config: StreamConfig) -> Self {
640        Self {
641            config: Some(config),
642            ..self
643        }
644    }
645}
646
647impl From<CreateStreamRequest> for api::CreateStreamRequest {
648    fn from(value: CreateStreamRequest) -> Self {
649        let CreateStreamRequest { stream, config } = value;
650        Self {
651            stream,
652            config: config.map(Into::into),
653        }
654    }
655}
656
657#[sync_docs]
658#[derive(Debug, Clone, Default)]
659pub struct ListBasinsRequest {
660    pub prefix: String,
661    pub start_after: String,
662    pub limit: Option<usize>,
663}
664
665impl ListBasinsRequest {
666    /// Create a new request.
667    pub fn new() -> Self {
668        Self::default()
669    }
670
671    /// Overwrite prefix.
672    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
673        Self {
674            prefix: prefix.into(),
675            ..self
676        }
677    }
678
679    /// Overwrite start after.
680    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
681        Self {
682            start_after: start_after.into(),
683            ..self
684        }
685    }
686
687    /// Overwrite limit.
688    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
689        Self {
690            limit: limit.into(),
691            ..self
692        }
693    }
694}
695
696impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
697    type Error = ConvertError;
698    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
699        let ListBasinsRequest {
700            prefix,
701            start_after,
702            limit,
703        } = value;
704        Ok(Self {
705            prefix,
706            start_after,
707            limit: limit
708                .map(TryInto::try_into)
709                .transpose()
710                .map_err(|_| "request limit does not fit into u64 bounds")?,
711        })
712    }
713}
714
715#[sync_docs]
716#[derive(Debug, Clone)]
717pub struct ListBasinsResponse {
718    pub basins: Vec<BasinInfo>,
719    pub has_more: bool,
720}
721
722impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
723    type Error = ConvertError;
724    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
725        let api::ListBasinsResponse { basins, has_more } = value;
726        Ok(Self {
727            basins: basins.into_iter().map(Into::into).collect(),
728            has_more,
729        })
730    }
731}
732
733#[sync_docs]
734#[derive(Debug, Clone)]
735pub struct DeleteBasinRequest {
736    pub basin: BasinName,
737    /// Delete basin if it exists else do nothing.
738    pub if_exists: bool,
739}
740
741impl DeleteBasinRequest {
742    /// Create a new request.
743    pub fn new(basin: BasinName) -> Self {
744        Self {
745            basin,
746            if_exists: false,
747        }
748    }
749
750    /// Overwrite the if exists parameter.
751    pub fn with_if_exists(self, if_exists: bool) -> Self {
752        Self { if_exists, ..self }
753    }
754}
755
756impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
757    fn from(value: DeleteBasinRequest) -> Self {
758        let DeleteBasinRequest { basin, .. } = value;
759        Self { basin: basin.0 }
760    }
761}
762
763#[sync_docs]
764#[derive(Debug, Clone)]
765pub struct DeleteStreamRequest {
766    pub stream: String,
767    /// Delete stream if it exists else do nothing.
768    pub if_exists: bool,
769}
770
771impl DeleteStreamRequest {
772    /// Create a new request.
773    pub fn new(stream: impl Into<String>) -> Self {
774        Self {
775            stream: stream.into(),
776            if_exists: false,
777        }
778    }
779
780    /// Overwrite the if exists parameter.
781    pub fn with_if_exists(self, if_exists: bool) -> Self {
782        Self { if_exists, ..self }
783    }
784}
785
786impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
787    fn from(value: DeleteStreamRequest) -> Self {
788        let DeleteStreamRequest { stream, .. } = value;
789        Self { stream }
790    }
791}
792
793#[sync_docs]
794#[derive(Debug, Clone)]
795pub struct ReconfigureBasinRequest {
796    pub basin: BasinName,
797    pub config: Option<BasinConfig>,
798    pub mask: Option<Vec<String>>,
799}
800
801impl ReconfigureBasinRequest {
802    /// Create a new request with basin name.
803    pub fn new(basin: BasinName) -> Self {
804        Self {
805            basin,
806            config: None,
807            mask: None,
808        }
809    }
810
811    /// Overwrite basin config.
812    pub fn with_config(self, config: BasinConfig) -> Self {
813        Self {
814            config: Some(config),
815            ..self
816        }
817    }
818
819    /// Overwrite field mask.
820    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
821        Self {
822            mask: Some(mask.into()),
823            ..self
824        }
825    }
826}
827
828impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
829    fn from(value: ReconfigureBasinRequest) -> Self {
830        let ReconfigureBasinRequest {
831            basin,
832            config,
833            mask,
834        } = value;
835        Self {
836            basin: basin.0,
837            config: config.map(Into::into),
838            mask: mask.map(|paths| prost_types::FieldMask { paths }),
839        }
840    }
841}
842
843impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
844    type Error = ConvertError;
845    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
846        let api::ReconfigureBasinResponse { config } = value;
847        let config = config.ok_or("missing basin config")?;
848        Ok(config.into())
849    }
850}
851
852#[sync_docs]
853#[derive(Debug, Clone)]
854pub struct ReconfigureStreamRequest {
855    pub stream: String,
856    pub config: Option<StreamConfig>,
857    pub mask: Option<Vec<String>>,
858}
859
860impl ReconfigureStreamRequest {
861    /// Create a new request with stream name.
862    pub fn new(stream: impl Into<String>) -> Self {
863        Self {
864            stream: stream.into(),
865            config: None,
866            mask: None,
867        }
868    }
869
870    /// Overwrite stream config.
871    pub fn with_config(self, config: StreamConfig) -> Self {
872        Self {
873            config: Some(config),
874            ..self
875        }
876    }
877
878    /// Overwrite field mask.
879    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
880        Self {
881            mask: Some(mask.into()),
882            ..self
883        }
884    }
885}
886
887impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
888    fn from(value: ReconfigureStreamRequest) -> Self {
889        let ReconfigureStreamRequest {
890            stream,
891            config,
892            mask,
893        } = value;
894        Self {
895            stream,
896            config: config.map(Into::into),
897            mask: mask.map(|paths| prost_types::FieldMask { paths }),
898        }
899    }
900}
901
902impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
903    type Error = ConvertError;
904    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
905        let api::ReconfigureStreamResponse { config } = value;
906        let config = config.ok_or("missing stream config")?;
907        Ok(config.into())
908    }
909}
910
911impl From<api::CheckTailResponse> for StreamPosition {
912    fn from(value: api::CheckTailResponse) -> Self {
913        let api::CheckTailResponse {
914            next_seq_num,
915            last_timestamp,
916        } = value;
917        StreamPosition {
918            seq_num: next_seq_num,
919            timestamp: last_timestamp,
920        }
921    }
922}
923
924/// Position of a record in a stream.
925#[derive(Debug, Clone, PartialEq, Eq)]
926pub struct StreamPosition {
927    /// Sequence number assigned by the service.
928    pub seq_num: u64,
929    /// Timestamp, which may be user-specified or assigned by the service.
930    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
931    pub timestamp: u64,
932}
933
934#[sync_docs]
935#[derive(Debug, Clone, PartialEq, Eq)]
936pub struct Header {
937    pub name: Bytes,
938    pub value: Bytes,
939}
940
941impl Header {
942    /// Create a new header from name and value.
943    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
944        Self {
945            name: name.into(),
946            value: value.into(),
947        }
948    }
949
950    /// Create a new header from value.
951    pub fn from_value(value: impl Into<Bytes>) -> Self {
952        Self {
953            name: Bytes::new(),
954            value: value.into(),
955        }
956    }
957}
958
959impl From<Header> for api::Header {
960    fn from(value: Header) -> Self {
961        let Header { name, value } = value;
962        Self { name, value }
963    }
964}
965
966impl From<api::Header> for Header {
967    fn from(value: api::Header) -> Self {
968        let api::Header { name, value } = value;
969        Self { name, value }
970    }
971}
972
973/// A fencing token can be enforced on append requests.
974///
975/// Must not be more than 36 bytes.
976#[derive(Debug, Clone, Default, PartialEq, Eq)]
977pub struct FencingToken(String);
978
979impl FencingToken {
980    const MAX_BYTES: usize = 36;
981
982    /// Generate a random alphanumeric fencing token of `n` bytes.
983    pub fn generate(n: usize) -> Result<Self, ConvertError> {
984        rand::rng()
985            .sample_iter(&rand::distr::Alphanumeric)
986            .take(n)
987            .map(char::from)
988            .collect::<String>()
989            .parse()
990    }
991}
992
993impl Deref for FencingToken {
994    type Target = str;
995
996    fn deref(&self) -> &Self::Target {
997        &self.0
998    }
999}
1000
1001impl std::fmt::Display for FencingToken {
1002    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1003        write!(f, "{}", self.0)
1004    }
1005}
1006
1007impl FromStr for FencingToken {
1008    type Err = ConvertError;
1009
1010    fn from_str(value: &str) -> Result<Self, Self::Err> {
1011        value.to_string().try_into()
1012    }
1013}
1014
1015impl TryFrom<String> for FencingToken {
1016    type Error = ConvertError;
1017
1018    fn try_from(value: String) -> Result<Self, Self::Error> {
1019        if value.len() > Self::MAX_BYTES {
1020            Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1021        } else {
1022            Ok(Self(value))
1023        }
1024    }
1025}
1026
1027impl From<FencingToken> for String {
1028    fn from(value: FencingToken) -> Self {
1029        value.0
1030    }
1031}
1032
1033/// Command to send through a `CommandRecord`.
1034#[derive(Debug, Clone)]
1035pub enum Command {
1036    /// Enforce a fencing token.
1037    ///
1038    /// Fencing is strongly consistent, and subsequent appends that specify a
1039    /// fencing token will be rejected if it does not match.
1040    Fence {
1041        /// Fencing token to enforce.
1042        ///
1043        /// Set empty to clear the token.
1044        fencing_token: FencingToken,
1045    },
1046    /// Request a trim till the sequence number.
1047    ///
1048    /// Trimming is eventually consistent, and trimmed records may be visible
1049    /// for a brief period
1050    Trim {
1051        /// Trim point.
1052        ///
1053        /// This sequence number is only allowed to advance, and any regression
1054        /// will be ignored.
1055        seq_num: u64,
1056    },
1057}
1058
1059/// A command record is a special kind of [`AppendRecord`] that can be used to
1060/// send command messages.
1061///
1062/// Such a record is signalled by a sole header with empty name. The header
1063/// value represents the operation and record body acts as the payload.
1064#[derive(Debug, Clone)]
1065pub struct CommandRecord {
1066    /// Command kind.
1067    pub command: Command,
1068    /// Timestamp for the record.
1069    pub timestamp: Option<u64>,
1070}
1071
1072impl CommandRecord {
1073    const FENCE: &[u8] = b"fence";
1074    const TRIM: &[u8] = b"trim";
1075
1076    /// Create a new fence command record.
1077    pub fn fence(fencing_token: FencingToken) -> Self {
1078        Self {
1079            command: Command::Fence { fencing_token },
1080            timestamp: None,
1081        }
1082    }
1083
1084    /// Create a new trim command record.
1085    pub fn trim(seq_num: impl Into<u64>) -> Self {
1086        Self {
1087            command: Command::Trim {
1088                seq_num: seq_num.into(),
1089            },
1090            timestamp: None,
1091        }
1092    }
1093
1094    /// Overwrite timestamp.
1095    pub fn with_timestamp(self, timestamp: u64) -> Self {
1096        Self {
1097            timestamp: Some(timestamp),
1098            ..self
1099        }
1100    }
1101}
1102
1103#[sync_docs]
1104#[derive(Debug, Clone, PartialEq, Eq)]
1105pub struct AppendRecord {
1106    timestamp: Option<u64>,
1107    headers: Vec<Header>,
1108    body: Bytes,
1109    #[cfg(test)]
1110    max_bytes: u64,
1111}
1112
1113metered_impl!(AppendRecord);
1114
1115impl AppendRecord {
1116    const MAX_BYTES: u64 = MIB_BYTES;
1117
1118    fn validated(self) -> Result<Self, ConvertError> {
1119        #[cfg(test)]
1120        let max_bytes = self.max_bytes;
1121        #[cfg(not(test))]
1122        let max_bytes = Self::MAX_BYTES;
1123
1124        if self.metered_bytes() > max_bytes {
1125            Err("AppendRecord should have metered size less than 1 MiB".into())
1126        } else {
1127            Ok(self)
1128        }
1129    }
1130
1131    /// Try creating a new append record with body.
1132    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1133        Self {
1134            timestamp: None,
1135            headers: Vec::new(),
1136            body: body.into(),
1137            #[cfg(test)]
1138            max_bytes: Self::MAX_BYTES,
1139        }
1140        .validated()
1141    }
1142
1143    #[cfg(test)]
1144    pub(crate) fn with_max_bytes(
1145        max_bytes: u64,
1146        body: impl Into<Bytes>,
1147    ) -> Result<Self, ConvertError> {
1148        Self {
1149            timestamp: None,
1150            headers: Vec::new(),
1151            body: body.into(),
1152            max_bytes,
1153        }
1154        .validated()
1155    }
1156
1157    /// Overwrite headers.
1158    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1159        Self {
1160            headers: headers.into(),
1161            ..self
1162        }
1163        .validated()
1164    }
1165
1166    /// Overwrite timestamp.
1167    pub fn with_timestamp(self, timestamp: u64) -> Self {
1168        Self {
1169            timestamp: Some(timestamp),
1170            ..self
1171        }
1172    }
1173
1174    /// Body of the record.
1175    pub fn body(&self) -> &[u8] {
1176        &self.body
1177    }
1178
1179    /// Headers of the record.
1180    pub fn headers(&self) -> &[Header] {
1181        &self.headers
1182    }
1183
1184    /// Timestamp for the record.
1185    pub fn timestamp(&self) -> Option<u64> {
1186        self.timestamp
1187    }
1188
1189    /// Consume the record and return parts.
1190    pub fn into_parts(self) -> AppendRecordParts {
1191        AppendRecordParts {
1192            timestamp: self.timestamp,
1193            headers: self.headers,
1194            body: self.body,
1195        }
1196    }
1197
1198    /// Try creating the record from parts.
1199    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1200        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1201        if let Some(timestamp) = parts.timestamp {
1202            Ok(record.with_timestamp(timestamp))
1203        } else {
1204            Ok(record)
1205        }
1206    }
1207}
1208
1209impl From<AppendRecord> for api::AppendRecord {
1210    fn from(value: AppendRecord) -> Self {
1211        Self {
1212            timestamp: value.timestamp,
1213            headers: value.headers.into_iter().map(Into::into).collect(),
1214            body: value.body,
1215        }
1216    }
1217}
1218
1219impl From<CommandRecord> for AppendRecord {
1220    fn from(value: CommandRecord) -> Self {
1221        let (header_value, body) = match value.command {
1222            Command::Fence { fencing_token } => (
1223                CommandRecord::FENCE,
1224                Bytes::copy_from_slice(fencing_token.as_bytes()),
1225            ),
1226            Command::Trim { seq_num } => (
1227                CommandRecord::TRIM,
1228                Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1229            ),
1230        };
1231        AppendRecordParts {
1232            timestamp: value.timestamp,
1233            headers: vec![Header::from_value(header_value)],
1234            body,
1235        }
1236        .try_into()
1237        .expect("command record is a valid append record")
1238    }
1239}
1240
1241#[sync_docs(AppendRecordParts = "AppendRecord")]
1242#[derive(Debug, Clone)]
1243pub struct AppendRecordParts {
1244    pub timestamp: Option<u64>,
1245    pub headers: Vec<Header>,
1246    pub body: Bytes,
1247}
1248
1249impl From<AppendRecord> for AppendRecordParts {
1250    fn from(value: AppendRecord) -> Self {
1251        value.into_parts()
1252    }
1253}
1254
1255impl TryFrom<AppendRecordParts> for AppendRecord {
1256    type Error = ConvertError;
1257
1258    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1259        Self::try_from_parts(value)
1260    }
1261}
1262
1263/// A collection of append records that can be sent together in a batch.
1264#[derive(Debug, Clone)]
1265pub struct AppendRecordBatch {
1266    records: Vec<AppendRecord>,
1267    metered_bytes: u64,
1268    max_capacity: usize,
1269    #[cfg(test)]
1270    max_bytes: u64,
1271}
1272
1273impl PartialEq for AppendRecordBatch {
1274    fn eq(&self, other: &Self) -> bool {
1275        if self.records.eq(&other.records) {
1276            assert_eq!(self.metered_bytes, other.metered_bytes);
1277            true
1278        } else {
1279            false
1280        }
1281    }
1282}
1283
1284impl Eq for AppendRecordBatch {}
1285
1286impl Default for AppendRecordBatch {
1287    fn default() -> Self {
1288        Self::new()
1289    }
1290}
1291
1292impl AppendRecordBatch {
1293    /// Maximum number of records that a batch can hold.
1294    ///
1295    /// A record batch cannot be created with a bigger capacity.
1296    pub const MAX_CAPACITY: usize = 1000;
1297
1298    /// Maximum metered bytes of the batch.
1299    pub const MAX_BYTES: u64 = MIB_BYTES;
1300
1301    /// Create an empty record batch.
1302    pub fn new() -> Self {
1303        Self::with_max_capacity(Self::MAX_CAPACITY)
1304    }
1305
1306    /// Create an empty record batch with custom max capacity.
1307    ///
1308    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1309    pub fn with_max_capacity(max_capacity: usize) -> Self {
1310        assert!(
1311            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1312            "Batch capacity must be between 1 and 1000"
1313        );
1314
1315        Self {
1316            records: Vec::with_capacity(max_capacity),
1317            metered_bytes: 0,
1318            max_capacity,
1319            #[cfg(test)]
1320            max_bytes: Self::MAX_BYTES,
1321        }
1322    }
1323
1324    #[cfg(test)]
1325    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1326        #[cfg(test)]
1327        assert!(
1328            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1329            "Batch size must be between 1 byte and 1 MiB"
1330        );
1331
1332        Self {
1333            max_bytes,
1334            ..Self::with_max_capacity(max_capacity)
1335        }
1336    }
1337
1338    /// Try creating a record batch from an iterator.
1339    ///
1340    /// If all the items of the iterator cannot be drained into the batch, the
1341    /// error returned contains a batch containing all records it could fit
1342    /// along-with the left over items from the iterator.
1343    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1344    where
1345        R: Into<AppendRecord>,
1346        T: IntoIterator<Item = R>,
1347    {
1348        let mut records = Self::new();
1349        let mut pending = Vec::new();
1350
1351        let mut iter = iter.into_iter();
1352
1353        for record in iter.by_ref() {
1354            if let Err(record) = records.push(record) {
1355                pending.push(record);
1356                break;
1357            }
1358        }
1359
1360        if pending.is_empty() {
1361            Ok(records)
1362        } else {
1363            pending.extend(iter.map(Into::into));
1364            Err((records, pending))
1365        }
1366    }
1367
1368    /// Returns true if the batch contains no records.
1369    pub fn is_empty(&self) -> bool {
1370        if self.records.is_empty() {
1371            assert_eq!(self.metered_bytes, 0);
1372            true
1373        } else {
1374            false
1375        }
1376    }
1377
1378    /// Returns the number of records contained in the batch.
1379    pub fn len(&self) -> usize {
1380        self.records.len()
1381    }
1382
1383    #[cfg(test)]
1384    fn max_bytes(&self) -> u64 {
1385        self.max_bytes
1386    }
1387
1388    #[cfg(not(test))]
1389    fn max_bytes(&self) -> u64 {
1390        Self::MAX_BYTES
1391    }
1392
1393    /// Returns true if the batch cannot fit any more records.
1394    pub fn is_full(&self) -> bool {
1395        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1396    }
1397
1398    /// Try to append a new record into the batch.
1399    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1400        assert!(self.records.len() <= self.max_capacity);
1401        assert!(self.metered_bytes <= self.max_bytes());
1402
1403        let record = record.into();
1404        let record_size = record.metered_bytes();
1405        if self.records.len() >= self.max_capacity
1406            || self.metered_bytes + record_size > self.max_bytes()
1407        {
1408            Err(record)
1409        } else {
1410            self.records.push(record);
1411            self.metered_bytes += record_size;
1412            Ok(())
1413        }
1414    }
1415}
1416
1417impl MeteredBytes for AppendRecordBatch {
1418    fn metered_bytes(&self) -> u64 {
1419        self.metered_bytes
1420    }
1421}
1422
1423impl IntoIterator for AppendRecordBatch {
1424    type Item = AppendRecord;
1425    type IntoIter = std::vec::IntoIter<Self::Item>;
1426
1427    fn into_iter(self) -> Self::IntoIter {
1428        self.records.into_iter()
1429    }
1430}
1431
1432impl<'a> IntoIterator for &'a AppendRecordBatch {
1433    type Item = &'a AppendRecord;
1434    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1435
1436    fn into_iter(self) -> Self::IntoIter {
1437        self.records.iter()
1438    }
1439}
1440
1441impl AsRef<[AppendRecord]> for AppendRecordBatch {
1442    fn as_ref(&self) -> &[AppendRecord] {
1443        &self.records
1444    }
1445}
1446
1447#[sync_docs]
1448#[derive(Debug, Default, Clone)]
1449pub struct AppendInput {
1450    pub records: AppendRecordBatch,
1451    pub match_seq_num: Option<u64>,
1452    pub fencing_token: Option<FencingToken>,
1453}
1454
1455impl MeteredBytes for AppendInput {
1456    fn metered_bytes(&self) -> u64 {
1457        self.records.metered_bytes()
1458    }
1459}
1460
1461impl AppendInput {
1462    /// Create a new append input from record batch.
1463    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1464        Self {
1465            records: records.into(),
1466            match_seq_num: None,
1467            fencing_token: None,
1468        }
1469    }
1470
1471    /// Overwrite match sequence number.
1472    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1473        Self {
1474            match_seq_num: Some(match_seq_num.into()),
1475            ..self
1476        }
1477    }
1478
1479    /// Overwrite fencing token.
1480    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1481        Self {
1482            fencing_token: Some(fencing_token),
1483            ..self
1484        }
1485    }
1486
1487    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1488        let Self {
1489            records,
1490            match_seq_num,
1491            fencing_token,
1492        } = self;
1493
1494        api::AppendInput {
1495            stream: stream.into(),
1496            records: records.into_iter().map(Into::into).collect(),
1497            match_seq_num,
1498            fencing_token: fencing_token.map(|f| f.0),
1499        }
1500    }
1501}
1502
1503/// Acknowledgment to an append request.
1504#[derive(Debug, Clone)]
1505pub struct AppendAck {
1506    /// Sequence number and timestamp of the first record that was appended.
1507    pub start: StreamPosition,
1508    /// Sequence number of the last record that was appended + 1,
1509    /// and timestamp of the last record that was appended.
1510    /// The difference between `end.seq_num` and `start.seq_num`
1511    /// will be the number of records appended.
1512    pub end: StreamPosition,
1513    /// Sequence number that will be assigned to the next record on the stream,
1514    /// and timestamp of the last record on the stream.
1515    /// This can be greater than the `end` position in case of concurrent appends.
1516    pub tail: StreamPosition,
1517}
1518
1519impl From<api::AppendOutput> for AppendAck {
1520    fn from(value: api::AppendOutput) -> Self {
1521        let api::AppendOutput {
1522            start_seq_num,
1523            start_timestamp,
1524            end_seq_num,
1525            end_timestamp,
1526            next_seq_num,
1527            last_timestamp,
1528        } = value;
1529        let start = StreamPosition {
1530            seq_num: start_seq_num,
1531            timestamp: start_timestamp,
1532        };
1533        let end = StreamPosition {
1534            seq_num: end_seq_num,
1535            timestamp: end_timestamp,
1536        };
1537        let tail = StreamPosition {
1538            seq_num: next_seq_num,
1539            timestamp: last_timestamp,
1540        };
1541        Self { start, end, tail }
1542    }
1543}
1544
1545impl TryFrom<api::AppendResponse> for AppendAck {
1546    type Error = ConvertError;
1547    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1548        let api::AppendResponse { output } = value;
1549        let output = output.ok_or("missing append output")?;
1550        Ok(output.into())
1551    }
1552}
1553
1554impl TryFrom<api::AppendSessionResponse> for AppendAck {
1555    type Error = ConvertError;
1556    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1557        let api::AppendSessionResponse { output } = value;
1558        let output = output.ok_or("missing append output")?;
1559        Ok(output.into())
1560    }
1561}
1562
1563#[sync_docs]
1564#[derive(Debug, Clone, Default)]
1565pub struct ReadLimit {
1566    pub count: Option<u64>,
1567    pub bytes: Option<u64>,
1568}
1569
1570impl ReadLimit {
1571    /// Create a new read limit.
1572    pub fn new() -> Self {
1573        Self::default()
1574    }
1575
1576    /// Overwrite count limit.
1577    pub fn with_count(self, count: u64) -> Self {
1578        Self {
1579            count: Some(count),
1580            ..self
1581        }
1582    }
1583
1584    /// Overwrite bytes limit.
1585    pub fn with_bytes(self, bytes: u64) -> Self {
1586        Self {
1587            bytes: Some(bytes),
1588            ..self
1589        }
1590    }
1591}
1592
1593/// Starting point for read requests.
1594#[derive(Debug, Clone)]
1595pub enum ReadStart {
1596    /// Sequence number.
1597    SeqNum(u64),
1598    /// Timestamp.
1599    Timestamp(u64),
1600    /// Number of records before the tail, i.e. the next sequence number.
1601    TailOffset(u64),
1602}
1603
1604impl Default for ReadStart {
1605    fn default() -> Self {
1606        Self::SeqNum(0)
1607    }
1608}
1609
1610impl From<ReadStart> for api::read_request::Start {
1611    fn from(start: ReadStart) -> Self {
1612        match start {
1613            ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1614            ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1615            ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1616        }
1617    }
1618}
1619
1620impl From<ReadStart> for api::read_session_request::Start {
1621    fn from(start: ReadStart) -> Self {
1622        match start {
1623            ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1624            ReadStart::Timestamp(timestamp) => {
1625                api::read_session_request::Start::Timestamp(timestamp)
1626            }
1627            ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1628        }
1629    }
1630}
1631
1632#[sync_docs]
1633#[derive(Debug, Clone, Default)]
1634pub struct ReadRequest {
1635    pub start: ReadStart,
1636    pub limit: ReadLimit,
1637    pub until: Option<RangeTo<u64>>,
1638    pub clamp: bool,
1639}
1640
1641impl ReadRequest {
1642    /// Create a new request with the specified starting point.
1643    pub fn new(start: ReadStart) -> Self {
1644        Self {
1645            start,
1646            ..Default::default()
1647        }
1648    }
1649
1650    /// Overwrite limit.
1651    pub fn with_limit(self, limit: ReadLimit) -> Self {
1652        Self { limit, ..self }
1653    }
1654
1655    /// Provide an `until` timestamp.
1656    pub fn with_until(self, until: RangeTo<u64>) -> Self {
1657        Self {
1658            until: Some(until),
1659            ..self
1660        }
1661    }
1662
1663    /// Clamp the start position at the tail position.
1664    pub fn with_clamp(self, clamp: bool) -> Self {
1665        Self { clamp, ..self }
1666    }
1667}
1668
1669impl ReadRequest {
1670    pub(crate) fn try_into_api_type(
1671        self,
1672        stream: impl Into<String>,
1673    ) -> Result<api::ReadRequest, ConvertError> {
1674        let Self {
1675            start,
1676            limit,
1677            until,
1678            clamp,
1679        } = self;
1680
1681        let limit = if limit.count > Some(1000) {
1682            Err("read limit: count must not exceed 1000 for unary request")
1683        } else if limit.bytes > Some(MIB_BYTES) {
1684            Err("read limit: bytes must not exceed 1MiB for unary request")
1685        } else {
1686            Ok(api::ReadLimit {
1687                count: limit.count,
1688                bytes: limit.bytes,
1689            })
1690        }?;
1691
1692        Ok(api::ReadRequest {
1693            stream: stream.into(),
1694            start: Some(start.into()),
1695            limit: Some(limit),
1696            until: until.map(|range| range.end),
1697            clamp,
1698        })
1699    }
1700}
1701
1702#[sync_docs]
1703#[derive(Debug, Clone)]
1704pub struct SequencedRecord {
1705    pub seq_num: u64,
1706    pub timestamp: u64,
1707    pub headers: Vec<Header>,
1708    pub body: Bytes,
1709}
1710
1711metered_impl!(SequencedRecord);
1712
1713impl From<api::SequencedRecord> for SequencedRecord {
1714    fn from(value: api::SequencedRecord) -> Self {
1715        let api::SequencedRecord {
1716            seq_num,
1717            timestamp,
1718            headers,
1719            body,
1720        } = value;
1721        Self {
1722            seq_num,
1723            timestamp,
1724            headers: headers.into_iter().map(Into::into).collect(),
1725            body,
1726        }
1727    }
1728}
1729
1730impl SequencedRecord {
1731    /// Try representing the sequenced record as a command record.
1732    pub fn as_command_record(&self) -> Option<CommandRecord> {
1733        if self.headers.len() != 1 {
1734            return None;
1735        }
1736
1737        let header = self.headers.first().expect("pre-validated length");
1738
1739        if !header.name.is_empty() {
1740            return None;
1741        }
1742
1743        match header.value.as_ref() {
1744            CommandRecord::FENCE => {
1745                let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1746                Some(CommandRecord {
1747                    command: Command::Fence { fencing_token },
1748                    timestamp: Some(self.timestamp),
1749                })
1750            }
1751            CommandRecord::TRIM => {
1752                let body: &[u8] = &self.body;
1753                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1754                Some(CommandRecord {
1755                    command: Command::Trim { seq_num },
1756                    timestamp: Some(self.timestamp),
1757                })
1758            }
1759            _ => None,
1760        }
1761    }
1762}
1763
1764#[sync_docs]
1765#[derive(Debug, Clone)]
1766pub struct SequencedRecordBatch {
1767    pub records: Vec<SequencedRecord>,
1768}
1769
1770impl MeteredBytes for SequencedRecordBatch {
1771    fn metered_bytes(&self) -> u64 {
1772        self.records.metered_bytes()
1773    }
1774}
1775
1776impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1777    fn from(value: api::SequencedRecordBatch) -> Self {
1778        let api::SequencedRecordBatch { records } = value;
1779        Self {
1780            records: records.into_iter().map(Into::into).collect(),
1781        }
1782    }
1783}
1784
1785#[sync_docs(ReadOutput = "Output")]
1786#[derive(Debug, Clone)]
1787pub enum ReadOutput {
1788    Batch(SequencedRecordBatch),
1789    NextSeqNum(u64),
1790}
1791
1792impl From<api::read_output::Output> for ReadOutput {
1793    fn from(value: api::read_output::Output) -> Self {
1794        match value {
1795            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1796            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1797        }
1798    }
1799}
1800
1801impl TryFrom<api::ReadOutput> for ReadOutput {
1802    type Error = ConvertError;
1803    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1804        let api::ReadOutput { output } = value;
1805        let output = output.ok_or("missing read output")?;
1806        Ok(output.into())
1807    }
1808}
1809
1810impl TryFrom<api::ReadResponse> for ReadOutput {
1811    type Error = ConvertError;
1812    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1813        let api::ReadResponse { output } = value;
1814        let output = output.ok_or("missing output in read response")?;
1815        output.try_into()
1816    }
1817}
1818
1819#[sync_docs]
1820#[derive(Debug, Clone, Default)]
1821pub struct ReadSessionRequest {
1822    pub start: ReadStart,
1823    pub limit: ReadLimit,
1824    pub until: Option<RangeTo<u64>>,
1825    pub clamp: bool,
1826}
1827
1828impl ReadSessionRequest {
1829    /// Create a new request with the specified starting point.
1830    pub fn new(start: ReadStart) -> Self {
1831        Self {
1832            start,
1833            ..Default::default()
1834        }
1835    }
1836
1837    /// Overwrite limit.
1838    pub fn with_limit(self, limit: ReadLimit) -> Self {
1839        Self { limit, ..self }
1840    }
1841
1842    /// Provide an `until` timestamp.
1843    pub fn with_until(self, until: RangeTo<u64>) -> Self {
1844        Self {
1845            until: Some(until),
1846            ..self
1847        }
1848    }
1849
1850    /// Clamp the start position at the tail position.
1851    pub fn with_clamp(self, clamp: bool) -> Self {
1852        Self { clamp, ..self }
1853    }
1854
1855    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1856        let Self {
1857            start,
1858            limit,
1859            until,
1860            clamp,
1861        } = self;
1862        api::ReadSessionRequest {
1863            stream: stream.into(),
1864            start: Some(start.into()),
1865            limit: Some(api::ReadLimit {
1866                count: limit.count,
1867                bytes: limit.bytes,
1868            }),
1869            heartbeats: false,
1870            until: until.map(|range| range.end),
1871            clamp,
1872        }
1873    }
1874}
1875
1876impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1877    type Error = ConvertError;
1878    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1879        let api::ReadSessionResponse { output } = value;
1880        let output = output.ok_or("missing output in read session response")?;
1881        output.try_into()
1882    }
1883}
1884
1885/// Name of a basin.
1886///
1887/// Must be between 8 and 48 characters in length. Must comprise lowercase
1888/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1889#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1890pub struct BasinName(String);
1891
1892impl Deref for BasinName {
1893    type Target = str;
1894    fn deref(&self) -> &Self::Target {
1895        &self.0
1896    }
1897}
1898
1899impl TryFrom<String> for BasinName {
1900    type Error = ConvertError;
1901
1902    fn try_from(name: String) -> Result<Self, Self::Error> {
1903        if name.len() < 8 || name.len() > 48 {
1904            return Err("Basin name must be between 8 and 48 characters in length".into());
1905        }
1906
1907        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1908        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1909            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1910                .expect("Failed to compile basin name regex")
1911        });
1912
1913        if !regex.is_match(&name) {
1914            return Err(
1915                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1916                It cannot begin or end with a hyphen."
1917                    .into(),
1918            );
1919        }
1920
1921        Ok(Self(name))
1922    }
1923}
1924
1925impl FromStr for BasinName {
1926    type Err = ConvertError;
1927
1928    fn from_str(s: &str) -> Result<Self, Self::Err> {
1929        s.to_string().try_into()
1930    }
1931}
1932
1933impl std::fmt::Display for BasinName {
1934    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1935        f.write_str(&self.0)
1936    }
1937}
1938
1939impl From<BasinName> for String {
1940    fn from(value: BasinName) -> Self {
1941        value.0
1942    }
1943}
1944
1945/// Access token ID.
1946/// Must be between 1 and 96 characters.
1947#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1948pub struct AccessTokenId(String);
1949
1950impl Deref for AccessTokenId {
1951    type Target = str;
1952
1953    fn deref(&self) -> &Self::Target {
1954        &self.0
1955    }
1956}
1957
1958impl TryFrom<String> for AccessTokenId {
1959    type Error = ConvertError;
1960
1961    fn try_from(name: String) -> Result<Self, Self::Error> {
1962        if name.is_empty() {
1963            return Err("Access token ID must not be empty".into());
1964        }
1965
1966        if name.len() > 96 {
1967            return Err("Access token ID must not exceed 96 characters".into());
1968        }
1969
1970        Ok(Self(name))
1971    }
1972}
1973
1974impl From<AccessTokenId> for String {
1975    fn from(value: AccessTokenId) -> Self {
1976        value.0
1977    }
1978}
1979
1980impl FromStr for AccessTokenId {
1981    type Err = ConvertError;
1982
1983    fn from_str(s: &str) -> Result<Self, Self::Err> {
1984        s.to_string().try_into()
1985    }
1986}
1987
1988impl std::fmt::Display for AccessTokenId {
1989    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1990        f.write_str(&self.0)
1991    }
1992}
1993
1994impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1995    fn from(value: AccessTokenInfo) -> Self {
1996        Self {
1997            info: Some(value.into()),
1998        }
1999    }
2000}
2001
2002#[sync_docs]
2003#[derive(Debug, Clone)]
2004pub struct AccessTokenInfo {
2005    pub id: AccessTokenId,
2006    pub expires_at: Option<u32>,
2007    pub auto_prefix_streams: bool,
2008    pub scope: Option<AccessTokenScope>,
2009}
2010
2011impl AccessTokenInfo {
2012    /// Create a new access token info.
2013    pub fn new(id: AccessTokenId) -> Self {
2014        Self {
2015            id,
2016            expires_at: None,
2017            auto_prefix_streams: false,
2018            scope: None,
2019        }
2020    }
2021
2022    /// Overwrite expiration time.
2023    pub fn with_expires_at(self, expires_at: u32) -> Self {
2024        Self {
2025            expires_at: Some(expires_at),
2026            ..self
2027        }
2028    }
2029
2030    /// Overwrite auto prefix streams.
2031    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2032        Self {
2033            auto_prefix_streams,
2034            ..self
2035        }
2036    }
2037
2038    /// Overwrite scope.
2039    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2040        Self {
2041            scope: Some(scope),
2042            ..self
2043        }
2044    }
2045}
2046
2047impl From<AccessTokenInfo> for api::AccessTokenInfo {
2048    fn from(value: AccessTokenInfo) -> Self {
2049        let AccessTokenInfo {
2050            id,
2051            expires_at,
2052            auto_prefix_streams,
2053            scope,
2054        } = value;
2055        Self {
2056            id: id.into(),
2057            expires_at,
2058            auto_prefix_streams,
2059            scope: scope.map(Into::into),
2060        }
2061    }
2062}
2063
2064impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2065    type Error = ConvertError;
2066
2067    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2068        let api::AccessTokenInfo {
2069            id,
2070            expires_at,
2071            auto_prefix_streams,
2072            scope,
2073        } = value;
2074        Ok(Self {
2075            id: id.try_into()?,
2076            expires_at,
2077            auto_prefix_streams,
2078            scope: scope.map(Into::into),
2079        })
2080    }
2081}
2082
2083#[sync_docs]
2084#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2085pub enum Operation {
2086    ListBasins,
2087    CreateBasin,
2088    DeleteBasin,
2089    ReconfigureBasin,
2090    GetBasinConfig,
2091    IssueAccessToken,
2092    RevokeAccessToken,
2093    ListAccessTokens,
2094    ListStreams,
2095    CreateStream,
2096    DeleteStream,
2097    GetStreamConfig,
2098    ReconfigureStream,
2099    CheckTail,
2100    Append,
2101    Read,
2102    Trim,
2103    Fence,
2104    AccountMetrics,
2105    BasinMetrics,
2106    StreamMetrics,
2107}
2108
2109impl FromStr for Operation {
2110    type Err = ConvertError;
2111
2112    fn from_str(s: &str) -> Result<Self, Self::Err> {
2113        match s.to_lowercase().as_str() {
2114            "list-basins" => Ok(Self::ListBasins),
2115            "create-basin" => Ok(Self::CreateBasin),
2116            "delete-basin" => Ok(Self::DeleteBasin),
2117            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2118            "get-basin-config" => Ok(Self::GetBasinConfig),
2119            "issue-access-token" => Ok(Self::IssueAccessToken),
2120            "revoke-access-token" => Ok(Self::RevokeAccessToken),
2121            "list-access-tokens" => Ok(Self::ListAccessTokens),
2122            "list-streams" => Ok(Self::ListStreams),
2123            "create-stream" => Ok(Self::CreateStream),
2124            "delete-stream" => Ok(Self::DeleteStream),
2125            "get-stream-config" => Ok(Self::GetStreamConfig),
2126            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2127            "check-tail" => Ok(Self::CheckTail),
2128            "append" => Ok(Self::Append),
2129            "read" => Ok(Self::Read),
2130            "trim" => Ok(Self::Trim),
2131            "fence" => Ok(Self::Fence),
2132            "account-metrics" => Ok(Self::AccountMetrics),
2133            "basin-metrics" => Ok(Self::BasinMetrics),
2134            "stream-metrics" => Ok(Self::StreamMetrics),
2135            _ => Err("invalid operation".into()),
2136        }
2137    }
2138}
2139
2140impl From<Operation> for api::Operation {
2141    fn from(value: Operation) -> Self {
2142        match value {
2143            Operation::ListBasins => Self::ListBasins,
2144            Operation::CreateBasin => Self::CreateBasin,
2145            Operation::DeleteBasin => Self::DeleteBasin,
2146            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2147            Operation::GetBasinConfig => Self::GetBasinConfig,
2148            Operation::IssueAccessToken => Self::IssueAccessToken,
2149            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2150            Operation::ListAccessTokens => Self::ListAccessTokens,
2151            Operation::ListStreams => Self::ListStreams,
2152            Operation::CreateStream => Self::CreateStream,
2153            Operation::DeleteStream => Self::DeleteStream,
2154            Operation::GetStreamConfig => Self::GetStreamConfig,
2155            Operation::ReconfigureStream => Self::ReconfigureStream,
2156            Operation::CheckTail => Self::CheckTail,
2157            Operation::Append => Self::Append,
2158            Operation::Read => Self::Read,
2159            Operation::Trim => Self::Trim,
2160            Operation::Fence => Self::Fence,
2161            Operation::AccountMetrics => Self::AccountMetrics,
2162            Operation::BasinMetrics => Self::BasinMetrics,
2163            Operation::StreamMetrics => Self::StreamMetrics,
2164        }
2165    }
2166}
2167
2168impl From<api::Operation> for Option<Operation> {
2169    fn from(value: api::Operation) -> Self {
2170        match value {
2171            api::Operation::Unspecified => None,
2172            api::Operation::ListBasins => Some(Operation::ListBasins),
2173            api::Operation::CreateBasin => Some(Operation::CreateBasin),
2174            api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2175            api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2176            api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2177            api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2178            api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2179            api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2180            api::Operation::ListStreams => Some(Operation::ListStreams),
2181            api::Operation::CreateStream => Some(Operation::CreateStream),
2182            api::Operation::DeleteStream => Some(Operation::DeleteStream),
2183            api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2184            api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2185            api::Operation::CheckTail => Some(Operation::CheckTail),
2186            api::Operation::Append => Some(Operation::Append),
2187            api::Operation::Read => Some(Operation::Read),
2188            api::Operation::Trim => Some(Operation::Trim),
2189            api::Operation::Fence => Some(Operation::Fence),
2190            api::Operation::AccountMetrics => Some(Operation::AccountMetrics),
2191            api::Operation::BasinMetrics => Some(Operation::BasinMetrics),
2192            api::Operation::StreamMetrics => Some(Operation::StreamMetrics),
2193        }
2194    }
2195}
2196
2197#[sync_docs]
2198#[derive(Debug, Clone, Default)]
2199pub struct AccessTokenScope {
2200    pub basins: Option<ResourceSet>,
2201    pub streams: Option<ResourceSet>,
2202    pub access_tokens: Option<ResourceSet>,
2203    pub op_groups: Option<PermittedOperationGroups>,
2204    pub ops: HashSet<Operation>,
2205}
2206
2207impl AccessTokenScope {
2208    /// Create a new access token scope.
2209    pub fn new() -> Self {
2210        Self::default()
2211    }
2212
2213    /// Overwrite resource set for access tokens.
2214    pub fn with_basins(self, basins: ResourceSet) -> Self {
2215        Self {
2216            basins: Some(basins),
2217            ..self
2218        }
2219    }
2220
2221    /// Overwrite resource set for streams.
2222    pub fn with_streams(self, streams: ResourceSet) -> Self {
2223        Self {
2224            streams: Some(streams),
2225            ..self
2226        }
2227    }
2228
2229    /// Overwrite resource set for access tokens.
2230    pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2231        Self {
2232            access_tokens: Some(access_tokens),
2233            ..self
2234        }
2235    }
2236
2237    /// Overwrite operation groups.
2238    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2239        Self {
2240            op_groups: Some(op_groups),
2241            ..self
2242        }
2243    }
2244
2245    /// Overwrite operations.
2246    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2247        Self {
2248            ops: ops.into_iter().collect(),
2249            ..self
2250        }
2251    }
2252
2253    /// Add an operation to operations.
2254    pub fn with_op(self, op: Operation) -> Self {
2255        let mut ops = self.ops;
2256        ops.insert(op);
2257        Self { ops, ..self }
2258    }
2259}
2260
2261impl From<AccessTokenScope> for api::AccessTokenScope {
2262    fn from(value: AccessTokenScope) -> Self {
2263        let AccessTokenScope {
2264            basins,
2265            streams,
2266            access_tokens,
2267            op_groups,
2268            ops,
2269        } = value;
2270        Self {
2271            basins: basins.map(Into::into),
2272            streams: streams.map(Into::into),
2273            access_tokens: access_tokens.map(Into::into),
2274            op_groups: op_groups.map(Into::into),
2275            ops: ops
2276                .into_iter()
2277                .map(api::Operation::from)
2278                .map(Into::into)
2279                .collect(),
2280        }
2281    }
2282}
2283
2284impl From<api::AccessTokenScope> for AccessTokenScope {
2285    fn from(value: api::AccessTokenScope) -> Self {
2286        let api::AccessTokenScope {
2287            basins,
2288            streams,
2289            access_tokens,
2290            op_groups,
2291            ops,
2292        } = value;
2293        Self {
2294            basins: basins.and_then(|set| set.matching.map(Into::into)),
2295            streams: streams.and_then(|set| set.matching.map(Into::into)),
2296            access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2297            op_groups: op_groups.map(Into::into),
2298            ops: ops
2299                .into_iter()
2300                .map(api::Operation::try_from)
2301                .flat_map(Result::ok)
2302                .flat_map(<Option<Operation>>::from)
2303                .collect(),
2304        }
2305    }
2306}
2307
2308impl From<ResourceSet> for api::ResourceSet {
2309    fn from(value: ResourceSet) -> Self {
2310        Self {
2311            matching: Some(value.into()),
2312        }
2313    }
2314}
2315
2316#[sync_docs(ResourceSet = "Matching")]
2317#[derive(Debug, Clone)]
2318pub enum ResourceSet {
2319    Exact(String),
2320    Prefix(String),
2321}
2322
2323impl From<ResourceSet> for api::resource_set::Matching {
2324    fn from(value: ResourceSet) -> Self {
2325        match value {
2326            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2327            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2328        }
2329    }
2330}
2331
2332impl From<api::resource_set::Matching> for ResourceSet {
2333    fn from(value: api::resource_set::Matching) -> Self {
2334        match value {
2335            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2336            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2337        }
2338    }
2339}
2340
2341#[sync_docs]
2342#[derive(Debug, Clone, Default)]
2343pub struct PermittedOperationGroups {
2344    pub account: Option<ReadWritePermissions>,
2345    pub basin: Option<ReadWritePermissions>,
2346    pub stream: Option<ReadWritePermissions>,
2347}
2348
2349impl PermittedOperationGroups {
2350    /// Create a new permitted operation groups.
2351    pub fn new() -> Self {
2352        Self::default()
2353    }
2354
2355    /// Overwrite account read-write permissions.
2356    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2357        Self {
2358            account: Some(account),
2359            ..self
2360        }
2361    }
2362
2363    /// Overwrite basin read-write permissions.
2364    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2365        Self {
2366            basin: Some(basin),
2367            ..self
2368        }
2369    }
2370
2371    /// Overwrite stream read-write permissions.
2372    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2373        Self {
2374            stream: Some(stream),
2375            ..self
2376        }
2377    }
2378}
2379
2380impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2381    fn from(value: PermittedOperationGroups) -> Self {
2382        let PermittedOperationGroups {
2383            account,
2384            basin,
2385            stream,
2386        } = value;
2387        Self {
2388            account: account.map(Into::into),
2389            basin: basin.map(Into::into),
2390            stream: stream.map(Into::into),
2391        }
2392    }
2393}
2394
2395impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2396    fn from(value: api::PermittedOperationGroups) -> Self {
2397        let api::PermittedOperationGroups {
2398            account,
2399            basin,
2400            stream,
2401        } = value;
2402        Self {
2403            account: account.map(Into::into),
2404            basin: basin.map(Into::into),
2405            stream: stream.map(Into::into),
2406        }
2407    }
2408}
2409
2410#[sync_docs]
2411#[derive(Debug, Clone, Default)]
2412pub struct ReadWritePermissions {
2413    pub read: bool,
2414    pub write: bool,
2415}
2416
2417impl ReadWritePermissions {
2418    /// Create a new read-write permission.
2419    pub fn new() -> Self {
2420        Self::default()
2421    }
2422
2423    /// Overwrite read permission.
2424    pub fn with_read(self, read: bool) -> Self {
2425        Self { read, ..self }
2426    }
2427
2428    /// Overwrite write permission.
2429    pub fn with_write(self, write: bool) -> Self {
2430        Self { write, ..self }
2431    }
2432}
2433
2434impl From<ReadWritePermissions> for api::ReadWritePermissions {
2435    fn from(value: ReadWritePermissions) -> Self {
2436        let ReadWritePermissions { read, write } = value;
2437        Self { read, write }
2438    }
2439}
2440
2441impl From<api::ReadWritePermissions> for ReadWritePermissions {
2442    fn from(value: api::ReadWritePermissions) -> Self {
2443        let api::ReadWritePermissions { read, write } = value;
2444        Self { read, write }
2445    }
2446}
2447
2448impl From<api::IssueAccessTokenResponse> for String {
2449    fn from(value: api::IssueAccessTokenResponse) -> Self {
2450        value.access_token
2451    }
2452}
2453
2454impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2455    fn from(value: AccessTokenId) -> Self {
2456        Self { id: value.into() }
2457    }
2458}
2459
2460impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2461    type Error = ConvertError;
2462    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2463        let token_info = value.info.ok_or("access token info is missing")?;
2464        token_info.try_into()
2465    }
2466}
2467
2468#[sync_docs]
2469#[derive(Debug, Clone, Default)]
2470pub struct ListAccessTokensRequest {
2471    pub prefix: String,
2472    pub start_after: String,
2473    pub limit: Option<usize>,
2474}
2475
2476impl ListAccessTokensRequest {
2477    /// Create a new request with prefix.
2478    pub fn new() -> Self {
2479        Self::default()
2480    }
2481
2482    /// Overwrite prefix.
2483    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2484        Self {
2485            prefix: prefix.into(),
2486            ..self
2487        }
2488    }
2489
2490    /// Overwrite start after.
2491    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2492        Self {
2493            start_after: start_after.into(),
2494            ..self
2495        }
2496    }
2497
2498    /// Overwrite limit.
2499    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2500        Self {
2501            limit: limit.into(),
2502            ..self
2503        }
2504    }
2505}
2506
2507impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2508    type Error = ConvertError;
2509    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2510        let ListAccessTokensRequest {
2511            prefix,
2512            start_after,
2513            limit,
2514        } = value;
2515        Ok(Self {
2516            prefix,
2517            start_after,
2518            limit: limit
2519                .map(TryInto::try_into)
2520                .transpose()
2521                .map_err(|_| "request limit does not fit into u64 bounds")?,
2522        })
2523    }
2524}
2525
2526#[sync_docs]
2527#[derive(Debug, Clone)]
2528pub struct ListAccessTokensResponse {
2529    pub access_tokens: Vec<AccessTokenInfo>,
2530    pub has_more: bool,
2531}
2532
2533impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2534    type Error = ConvertError;
2535    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2536        let api::ListAccessTokensResponse {
2537            access_tokens,
2538            has_more,
2539        } = value;
2540        let access_tokens = access_tokens
2541            .into_iter()
2542            .map(TryInto::try_into)
2543            .collect::<Result<Vec<_>, _>>()?;
2544        Ok(Self {
2545            access_tokens,
2546            has_more,
2547        })
2548    }
2549}