s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{
4    collections::HashSet,
5    ops::{Deref, RangeTo},
6    str::FromStr,
7    sync::OnceLock,
8    time::Duration,
9};
10
11use bytes::Bytes;
12use rand::Rng;
13use regex::Regex;
14use sync_docs::sync_docs;
15
16use crate::api;
17
18pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
19pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
20
21/// Error related to conversion from one type to another.
22#[derive(Debug, Clone, thiserror::Error)]
23#[error("{0}")]
24pub struct ConvertError(String);
25
26impl<T: Into<String>> From<T> for ConvertError {
27    fn from(value: T) -> Self {
28        Self(value.into())
29    }
30}
31
32/// Metered size of the object in bytes.
33///
34/// Bytes are calculated using the "metered bytes" formula:
35///
36/// ```python
37/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
38/// ```
39pub trait MeteredBytes {
40    /// Return the metered bytes of the object.
41    fn metered_bytes(&self) -> u64;
42}
43
44impl<T: MeteredBytes> MeteredBytes for Vec<T> {
45    fn metered_bytes(&self) -> u64 {
46        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
47    }
48}
49
50macro_rules! metered_impl {
51    ($ty:ty) => {
52        impl MeteredBytes for $ty {
53            fn metered_bytes(&self) -> u64 {
54                let bytes = 8
55                    + (2 * self.headers.len())
56                    + self
57                        .headers
58                        .iter()
59                        .map(|h| h.name.len() + h.value.len())
60                        .sum::<usize>()
61                    + self.body.len();
62                bytes as u64
63            }
64        }
65    };
66}
67
68#[sync_docs]
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BasinScope {
71    AwsUsEast1,
72}
73
74impl From<BasinScope> for api::BasinScope {
75    fn from(value: BasinScope) -> Self {
76        match value {
77            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
78        }
79    }
80}
81
82impl From<api::BasinScope> for Option<BasinScope> {
83    fn from(value: api::BasinScope) -> Self {
84        match value {
85            api::BasinScope::Unspecified => None,
86            api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
87        }
88    }
89}
90
91impl FromStr for BasinScope {
92    type Err = ConvertError;
93    fn from_str(value: &str) -> Result<Self, Self::Err> {
94        match value {
95            "aws:us-east-1" => Ok(Self::AwsUsEast1),
96            _ => Err("invalid basin scope value".into()),
97        }
98    }
99}
100
101#[sync_docs]
102#[derive(Debug, Clone)]
103pub struct CreateBasinRequest {
104    pub basin: BasinName,
105    pub config: Option<BasinConfig>,
106    pub scope: Option<BasinScope>,
107}
108
109impl CreateBasinRequest {
110    /// Create a new request with basin name.
111    pub fn new(basin: BasinName) -> Self {
112        Self {
113            basin,
114            config: None,
115            scope: None,
116        }
117    }
118
119    /// Overwrite basin configuration.
120    pub fn with_config(self, config: BasinConfig) -> Self {
121        Self {
122            config: Some(config),
123            ..self
124        }
125    }
126
127    /// Overwrite basin scope.
128    pub fn with_scope(self, scope: BasinScope) -> Self {
129        Self {
130            scope: Some(scope),
131            ..self
132        }
133    }
134}
135
136impl From<CreateBasinRequest> for api::CreateBasinRequest {
137    fn from(value: CreateBasinRequest) -> Self {
138        let CreateBasinRequest {
139            basin,
140            config,
141            scope,
142        } = value;
143        Self {
144            basin: basin.0,
145            config: config.map(Into::into),
146            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
147        }
148    }
149}
150
151#[sync_docs]
152#[derive(Debug, Clone, Default)]
153pub struct BasinConfig {
154    pub default_stream_config: Option<StreamConfig>,
155    pub create_stream_on_append: bool,
156    pub create_stream_on_read: bool,
157}
158
159impl BasinConfig {
160    /// Create a new basin config.
161    pub fn new() -> Self {
162        Self::default()
163    }
164
165    /// Overwrite the default stream config.
166    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
167        Self {
168            default_stream_config: Some(default_stream_config),
169            ..self
170        }
171    }
172
173    /// Overwrite `create_stream_on_append`.
174    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
175        Self {
176            create_stream_on_append,
177            ..self
178        }
179    }
180
181    /// Overwrite `create_stream_on_read`.
182    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
183        Self {
184            create_stream_on_read,
185            ..self
186        }
187    }
188}
189
190impl From<BasinConfig> for api::BasinConfig {
191    fn from(value: BasinConfig) -> Self {
192        let BasinConfig {
193            default_stream_config,
194            create_stream_on_append,
195            create_stream_on_read,
196        } = value;
197        Self {
198            default_stream_config: default_stream_config.map(Into::into),
199            create_stream_on_append,
200            create_stream_on_read,
201        }
202    }
203}
204
205impl From<api::BasinConfig> for BasinConfig {
206    fn from(value: api::BasinConfig) -> Self {
207        let api::BasinConfig {
208            default_stream_config,
209            create_stream_on_append,
210            create_stream_on_read,
211        } = value;
212        Self {
213            default_stream_config: default_stream_config.map(Into::into),
214            create_stream_on_append,
215            create_stream_on_read,
216        }
217    }
218}
219
220#[sync_docs]
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TimestampingMode {
223    ClientPrefer,
224    ClientRequire,
225    Arrival,
226}
227
228impl From<TimestampingMode> for api::TimestampingMode {
229    fn from(value: TimestampingMode) -> Self {
230        match value {
231            TimestampingMode::ClientPrefer => Self::ClientPrefer,
232            TimestampingMode::ClientRequire => Self::ClientRequire,
233            TimestampingMode::Arrival => Self::Arrival,
234        }
235    }
236}
237
238impl From<api::TimestampingMode> for Option<TimestampingMode> {
239    fn from(value: api::TimestampingMode) -> Self {
240        match value {
241            api::TimestampingMode::Unspecified => None,
242            api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
243            api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
244            api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
245        }
246    }
247}
248
249#[sync_docs(TimestampingConfig = "Timestamping")]
250#[derive(Debug, Clone, Default)]
251/// Timestamping behavior.
252pub struct TimestampingConfig {
253    pub mode: Option<TimestampingMode>,
254    pub uncapped: Option<bool>,
255}
256
257impl TimestampingConfig {
258    /// Create a new timestamping config.
259    pub fn new() -> Self {
260        Self::default()
261    }
262
263    /// Overwrite timestamping mode.
264    pub fn with_mode(self, mode: TimestampingMode) -> Self {
265        Self {
266            mode: Some(mode),
267            ..self
268        }
269    }
270
271    /// Overwrite the uncapped knob.
272    pub fn with_uncapped(self, uncapped: bool) -> Self {
273        Self {
274            uncapped: Some(uncapped),
275            ..self
276        }
277    }
278}
279
280impl From<TimestampingConfig> for api::stream_config::Timestamping {
281    fn from(value: TimestampingConfig) -> Self {
282        Self {
283            mode: value
284                .mode
285                .map(api::TimestampingMode::from)
286                .unwrap_or_default()
287                .into(),
288            uncapped: value.uncapped,
289        }
290    }
291}
292
293impl From<api::stream_config::Timestamping> for TimestampingConfig {
294    fn from(value: api::stream_config::Timestamping) -> Self {
295        let mode = value.mode().into();
296        let uncapped = value.uncapped;
297        Self { mode, uncapped }
298    }
299}
300
301#[sync_docs]
302#[derive(Debug, Clone, Default)]
303pub struct StreamConfig {
304    pub storage_class: Option<StorageClass>,
305    pub retention_policy: Option<RetentionPolicy>,
306    pub timestamping: Option<TimestampingConfig>,
307}
308
309impl StreamConfig {
310    /// Create a new stream config.
311    pub fn new() -> Self {
312        Self::default()
313    }
314
315    /// Overwrite storage class.
316    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
317        Self {
318            storage_class: Some(storage_class),
319            ..self
320        }
321    }
322
323    /// Overwrite retention policy.
324    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
325        Self {
326            retention_policy: Some(retention_policy),
327            ..self
328        }
329    }
330
331    /// Overwrite timestamping config.
332    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
333        Self {
334            timestamping: Some(timestamping),
335            ..self
336        }
337    }
338}
339
340impl From<StreamConfig> for api::StreamConfig {
341    fn from(value: StreamConfig) -> Self {
342        let StreamConfig {
343            storage_class,
344            retention_policy,
345            timestamping,
346        } = value;
347        Self {
348            storage_class: storage_class
349                .map(api::StorageClass::from)
350                .unwrap_or_default()
351                .into(),
352            retention_policy: retention_policy.map(Into::into),
353            timestamping: timestamping.map(Into::into),
354        }
355    }
356}
357
358impl From<api::StreamConfig> for StreamConfig {
359    fn from(value: api::StreamConfig) -> Self {
360        Self {
361            storage_class: value.storage_class().into(),
362            retention_policy: value.retention_policy.map(Into::into),
363            timestamping: value.timestamping.map(Into::into),
364        }
365    }
366}
367
368#[sync_docs]
369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
370pub enum StorageClass {
371    Standard,
372    Express,
373}
374
375impl From<StorageClass> for api::StorageClass {
376    fn from(value: StorageClass) -> Self {
377        match value {
378            StorageClass::Standard => Self::Standard,
379            StorageClass::Express => Self::Express,
380        }
381    }
382}
383
384impl From<api::StorageClass> for Option<StorageClass> {
385    fn from(value: api::StorageClass) -> Self {
386        match value {
387            api::StorageClass::Unspecified => None,
388            api::StorageClass::Standard => Some(StorageClass::Standard),
389            api::StorageClass::Express => Some(StorageClass::Express),
390        }
391    }
392}
393
394impl FromStr for StorageClass {
395    type Err = ConvertError;
396
397    fn from_str(value: &str) -> Result<Self, Self::Err> {
398        match value {
399            "standard" => Ok(Self::Standard),
400            "express" => Ok(Self::Express),
401            v => Err(format!("unknown storage class: {v}").into()),
402        }
403    }
404}
405
406#[sync_docs(Age = "Age")]
407#[derive(Debug, Clone)]
408pub enum RetentionPolicy {
409    Age(Duration),
410}
411
412impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
413    fn from(value: RetentionPolicy) -> Self {
414        match value {
415            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
416        }
417    }
418}
419
420impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
421    fn from(value: api::stream_config::RetentionPolicy) -> Self {
422        match value {
423            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
424        }
425    }
426}
427
428#[sync_docs]
429#[derive(Debug, Clone, Copy, PartialEq, Eq)]
430pub enum BasinState {
431    Active,
432    Creating,
433    Deleting,
434}
435
436impl From<BasinState> for api::BasinState {
437    fn from(value: BasinState) -> Self {
438        match value {
439            BasinState::Active => Self::Active,
440            BasinState::Creating => Self::Creating,
441            BasinState::Deleting => Self::Deleting,
442        }
443    }
444}
445
446impl From<api::BasinState> for Option<BasinState> {
447    fn from(value: api::BasinState) -> Self {
448        match value {
449            api::BasinState::Unspecified => None,
450            api::BasinState::Active => Some(BasinState::Active),
451            api::BasinState::Creating => Some(BasinState::Creating),
452            api::BasinState::Deleting => Some(BasinState::Deleting),
453        }
454    }
455}
456
457impl std::fmt::Display for BasinState {
458    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459        match self {
460            BasinState::Active => write!(f, "active"),
461            BasinState::Creating => write!(f, "creating"),
462            BasinState::Deleting => write!(f, "deleting"),
463        }
464    }
465}
466
467#[sync_docs]
468#[derive(Debug, Clone)]
469pub struct BasinInfo {
470    pub name: String,
471    pub scope: Option<BasinScope>,
472    pub state: Option<BasinState>,
473}
474
475impl From<BasinInfo> for api::BasinInfo {
476    fn from(value: BasinInfo) -> Self {
477        let BasinInfo { name, scope, state } = value;
478        Self {
479            name,
480            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
481            state: state.map(api::BasinState::from).unwrap_or_default().into(),
482        }
483    }
484}
485
486impl From<api::BasinInfo> for BasinInfo {
487    fn from(value: api::BasinInfo) -> Self {
488        let scope = value.scope().into();
489        let state = value.state().into();
490        let name = value.name;
491        Self { name, scope, state }
492    }
493}
494
495impl TryFrom<api::CreateBasinResponse> for BasinInfo {
496    type Error = ConvertError;
497    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
498        let api::CreateBasinResponse { info } = value;
499        let info = info.ok_or("missing basin info")?;
500        Ok(info.into())
501    }
502}
503
504#[sync_docs]
505#[derive(Debug, Clone, Default)]
506pub struct ListStreamsRequest {
507    pub prefix: String,
508    pub start_after: String,
509    pub limit: Option<usize>,
510}
511
512impl ListStreamsRequest {
513    /// Create a new request.
514    pub fn new() -> Self {
515        Self::default()
516    }
517
518    /// Overwrite prefix.
519    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
520        Self {
521            prefix: prefix.into(),
522            ..self
523        }
524    }
525
526    /// Overwrite start after.
527    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
528        Self {
529            start_after: start_after.into(),
530            ..self
531        }
532    }
533
534    /// Overwrite limit.
535    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
536        Self {
537            limit: limit.into(),
538            ..self
539        }
540    }
541}
542
543impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
544    type Error = ConvertError;
545    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
546        let ListStreamsRequest {
547            prefix,
548            start_after,
549            limit,
550        } = value;
551        Ok(Self {
552            prefix,
553            start_after,
554            limit: limit.map(|n| n as u64),
555        })
556    }
557}
558
559#[sync_docs]
560#[derive(Debug, Clone)]
561pub struct StreamInfo {
562    pub name: String,
563    pub created_at: u32,
564    pub deleted_at: Option<u32>,
565}
566
567impl From<api::StreamInfo> for StreamInfo {
568    fn from(value: api::StreamInfo) -> Self {
569        Self {
570            name: value.name,
571            created_at: value.created_at,
572            deleted_at: value.deleted_at,
573        }
574    }
575}
576
577impl TryFrom<api::CreateStreamResponse> for StreamInfo {
578    type Error = ConvertError;
579
580    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
581        let api::CreateStreamResponse { info } = value;
582        let info = info.ok_or("missing stream info")?;
583        Ok(info.into())
584    }
585}
586
587#[sync_docs]
588#[derive(Debug, Clone)]
589pub struct ListStreamsResponse {
590    pub streams: Vec<StreamInfo>,
591    pub has_more: bool,
592}
593
594impl From<api::ListStreamsResponse> for ListStreamsResponse {
595    fn from(value: api::ListStreamsResponse) -> Self {
596        let api::ListStreamsResponse { streams, has_more } = value;
597        let streams = streams.into_iter().map(Into::into).collect();
598        Self { streams, has_more }
599    }
600}
601
602impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
603    type Error = ConvertError;
604
605    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
606        let api::GetBasinConfigResponse { config } = value;
607        let config = config.ok_or("missing basin config")?;
608        Ok(config.into())
609    }
610}
611
612impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
613    type Error = ConvertError;
614
615    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
616        let api::GetStreamConfigResponse { config } = value;
617        let config = config.ok_or("missing stream config")?;
618        Ok(config.into())
619    }
620}
621
622#[sync_docs]
623#[derive(Debug, Clone)]
624pub struct CreateStreamRequest {
625    pub stream: String,
626    pub config: Option<StreamConfig>,
627}
628
629impl CreateStreamRequest {
630    /// Create a new request with stream name.
631    pub fn new(stream: impl Into<String>) -> Self {
632        Self {
633            stream: stream.into(),
634            config: None,
635        }
636    }
637
638    /// Overwrite stream config.
639    pub fn with_config(self, config: StreamConfig) -> Self {
640        Self {
641            config: Some(config),
642            ..self
643        }
644    }
645}
646
647impl From<CreateStreamRequest> for api::CreateStreamRequest {
648    fn from(value: CreateStreamRequest) -> Self {
649        let CreateStreamRequest { stream, config } = value;
650        Self {
651            stream,
652            config: config.map(Into::into),
653        }
654    }
655}
656
657#[sync_docs]
658#[derive(Debug, Clone, Default)]
659pub struct ListBasinsRequest {
660    pub prefix: String,
661    pub start_after: String,
662    pub limit: Option<usize>,
663}
664
665impl ListBasinsRequest {
666    /// Create a new request.
667    pub fn new() -> Self {
668        Self::default()
669    }
670
671    /// Overwrite prefix.
672    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
673        Self {
674            prefix: prefix.into(),
675            ..self
676        }
677    }
678
679    /// Overwrite start after.
680    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
681        Self {
682            start_after: start_after.into(),
683            ..self
684        }
685    }
686
687    /// Overwrite limit.
688    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
689        Self {
690            limit: limit.into(),
691            ..self
692        }
693    }
694}
695
696impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
697    type Error = ConvertError;
698    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
699        let ListBasinsRequest {
700            prefix,
701            start_after,
702            limit,
703        } = value;
704        Ok(Self {
705            prefix,
706            start_after,
707            limit: limit
708                .map(TryInto::try_into)
709                .transpose()
710                .map_err(|_| "request limit does not fit into u64 bounds")?,
711        })
712    }
713}
714
715#[sync_docs]
716#[derive(Debug, Clone)]
717pub struct ListBasinsResponse {
718    pub basins: Vec<BasinInfo>,
719    pub has_more: bool,
720}
721
722impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
723    type Error = ConvertError;
724    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
725        let api::ListBasinsResponse { basins, has_more } = value;
726        Ok(Self {
727            basins: basins.into_iter().map(Into::into).collect(),
728            has_more,
729        })
730    }
731}
732
733#[sync_docs]
734#[derive(Debug, Clone)]
735pub struct DeleteBasinRequest {
736    pub basin: BasinName,
737    /// Delete basin if it exists else do nothing.
738    pub if_exists: bool,
739}
740
741impl DeleteBasinRequest {
742    /// Create a new request.
743    pub fn new(basin: BasinName) -> Self {
744        Self {
745            basin,
746            if_exists: false,
747        }
748    }
749
750    /// Overwrite the if exists parameter.
751    pub fn with_if_exists(self, if_exists: bool) -> Self {
752        Self { if_exists, ..self }
753    }
754}
755
756impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
757    fn from(value: DeleteBasinRequest) -> Self {
758        let DeleteBasinRequest { basin, .. } = value;
759        Self { basin: basin.0 }
760    }
761}
762
763#[sync_docs]
764#[derive(Debug, Clone)]
765pub struct DeleteStreamRequest {
766    pub stream: String,
767    /// Delete stream if it exists else do nothing.
768    pub if_exists: bool,
769}
770
771impl DeleteStreamRequest {
772    /// Create a new request.
773    pub fn new(stream: impl Into<String>) -> Self {
774        Self {
775            stream: stream.into(),
776            if_exists: false,
777        }
778    }
779
780    /// Overwrite the if exists parameter.
781    pub fn with_if_exists(self, if_exists: bool) -> Self {
782        Self { if_exists, ..self }
783    }
784}
785
786impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
787    fn from(value: DeleteStreamRequest) -> Self {
788        let DeleteStreamRequest { stream, .. } = value;
789        Self { stream }
790    }
791}
792
793#[sync_docs]
794#[derive(Debug, Clone)]
795pub struct ReconfigureBasinRequest {
796    pub basin: BasinName,
797    pub config: Option<BasinConfig>,
798    pub mask: Option<Vec<String>>,
799}
800
801impl ReconfigureBasinRequest {
802    /// Create a new request with basin name.
803    pub fn new(basin: BasinName) -> Self {
804        Self {
805            basin,
806            config: None,
807            mask: None,
808        }
809    }
810
811    /// Overwrite basin config.
812    pub fn with_config(self, config: BasinConfig) -> Self {
813        Self {
814            config: Some(config),
815            ..self
816        }
817    }
818
819    /// Overwrite field mask.
820    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
821        Self {
822            mask: Some(mask.into()),
823            ..self
824        }
825    }
826}
827
828impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
829    fn from(value: ReconfigureBasinRequest) -> Self {
830        let ReconfigureBasinRequest {
831            basin,
832            config,
833            mask,
834        } = value;
835        Self {
836            basin: basin.0,
837            config: config.map(Into::into),
838            mask: mask.map(|paths| prost_types::FieldMask { paths }),
839        }
840    }
841}
842
843impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
844    type Error = ConvertError;
845    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
846        let api::ReconfigureBasinResponse { config } = value;
847        let config = config.ok_or("missing basin config")?;
848        Ok(config.into())
849    }
850}
851
852#[sync_docs]
853#[derive(Debug, Clone)]
854pub struct ReconfigureStreamRequest {
855    pub stream: String,
856    pub config: Option<StreamConfig>,
857    pub mask: Option<Vec<String>>,
858}
859
860impl ReconfigureStreamRequest {
861    /// Create a new request with stream name.
862    pub fn new(stream: impl Into<String>) -> Self {
863        Self {
864            stream: stream.into(),
865            config: None,
866            mask: None,
867        }
868    }
869
870    /// Overwrite stream config.
871    pub fn with_config(self, config: StreamConfig) -> Self {
872        Self {
873            config: Some(config),
874            ..self
875        }
876    }
877
878    /// Overwrite field mask.
879    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
880        Self {
881            mask: Some(mask.into()),
882            ..self
883        }
884    }
885}
886
887impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
888    fn from(value: ReconfigureStreamRequest) -> Self {
889        let ReconfigureStreamRequest {
890            stream,
891            config,
892            mask,
893        } = value;
894        Self {
895            stream,
896            config: config.map(Into::into),
897            mask: mask.map(|paths| prost_types::FieldMask { paths }),
898        }
899    }
900}
901
902impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
903    type Error = ConvertError;
904    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
905        let api::ReconfigureStreamResponse { config } = value;
906        let config = config.ok_or("missing stream config")?;
907        Ok(config.into())
908    }
909}
910
911impl From<api::CheckTailResponse> for StreamPosition {
912    fn from(value: api::CheckTailResponse) -> Self {
913        let api::CheckTailResponse {
914            next_seq_num,
915            last_timestamp,
916        } = value;
917        StreamPosition {
918            seq_num: next_seq_num,
919            timestamp: last_timestamp,
920        }
921    }
922}
923
924/// Position of a record in a stream.
925#[derive(Debug, Clone, PartialEq, Eq)]
926pub struct StreamPosition {
927    /// Sequence number assigned by the service.
928    pub seq_num: u64,
929    /// Timestamp, which may be user-specified or assigned by the service.
930    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
931    pub timestamp: u64,
932}
933
934#[sync_docs]
935#[derive(Debug, Clone, PartialEq, Eq)]
936pub struct Header {
937    pub name: Bytes,
938    pub value: Bytes,
939}
940
941impl Header {
942    /// Create a new header from name and value.
943    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
944        Self {
945            name: name.into(),
946            value: value.into(),
947        }
948    }
949
950    /// Create a new header from value.
951    pub fn from_value(value: impl Into<Bytes>) -> Self {
952        Self {
953            name: Bytes::new(),
954            value: value.into(),
955        }
956    }
957}
958
959impl From<Header> for api::Header {
960    fn from(value: Header) -> Self {
961        let Header { name, value } = value;
962        Self { name, value }
963    }
964}
965
966impl From<api::Header> for Header {
967    fn from(value: api::Header) -> Self {
968        let api::Header { name, value } = value;
969        Self { name, value }
970    }
971}
972
973/// A fencing token can be enforced on append requests.
974///
975/// Must not be more than 36 bytes.
976#[derive(Debug, Clone, Default, PartialEq, Eq)]
977pub struct FencingToken(String);
978
979impl FencingToken {
980    const MAX_BYTES: usize = 36;
981
982    /// Generate a random alphanumeric fencing token of `n` bytes.
983    pub fn generate(n: usize) -> Result<Self, ConvertError> {
984        rand::rng()
985            .sample_iter(&rand::distr::Alphanumeric)
986            .take(n)
987            .map(char::from)
988            .collect::<String>()
989            .parse()
990    }
991}
992
993impl Deref for FencingToken {
994    type Target = str;
995
996    fn deref(&self) -> &Self::Target {
997        &self.0
998    }
999}
1000
1001impl std::fmt::Display for FencingToken {
1002    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1003        write!(f, "{}", self.0)
1004    }
1005}
1006
1007impl FromStr for FencingToken {
1008    type Err = ConvertError;
1009
1010    fn from_str(value: &str) -> Result<Self, Self::Err> {
1011        value.to_string().try_into()
1012    }
1013}
1014
1015impl TryFrom<String> for FencingToken {
1016    type Error = ConvertError;
1017
1018    fn try_from(value: String) -> Result<Self, Self::Error> {
1019        if value.len() > Self::MAX_BYTES {
1020            Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1021        } else {
1022            Ok(Self(value))
1023        }
1024    }
1025}
1026
1027impl From<FencingToken> for String {
1028    fn from(value: FencingToken) -> Self {
1029        value.0
1030    }
1031}
1032
1033/// Command to send through a `CommandRecord`.
1034#[derive(Debug, Clone)]
1035pub enum Command {
1036    /// Enforce a fencing token.
1037    ///
1038    /// Fencing is strongly consistent, and subsequent appends that specify a
1039    /// fencing token will be rejected if it does not match.
1040    Fence {
1041        /// Fencing token to enforce.
1042        ///
1043        /// Set empty to clear the token.
1044        fencing_token: FencingToken,
1045    },
1046    /// Request a trim till the sequence number.
1047    ///
1048    /// Trimming is eventually consistent, and trimmed records may be visible
1049    /// for a brief period
1050    Trim {
1051        /// Trim point.
1052        ///
1053        /// This sequence number is only allowed to advance, and any regression
1054        /// will be ignored.
1055        seq_num: u64,
1056    },
1057}
1058
1059/// A command record is a special kind of [`AppendRecord`] that can be used to
1060/// send command messages.
1061///
1062/// Such a record is signalled by a sole header with empty name. The header
1063/// value represents the operation and record body acts as the payload.
1064#[derive(Debug, Clone)]
1065pub struct CommandRecord {
1066    /// Command kind.
1067    pub command: Command,
1068    /// Timestamp for the record.
1069    pub timestamp: Option<u64>,
1070}
1071
1072impl CommandRecord {
1073    const FENCE: &[u8] = b"fence";
1074    const TRIM: &[u8] = b"trim";
1075
1076    /// Create a new fence command record.
1077    pub fn fence(fencing_token: FencingToken) -> Self {
1078        Self {
1079            command: Command::Fence { fencing_token },
1080            timestamp: None,
1081        }
1082    }
1083
1084    /// Create a new trim command record.
1085    pub fn trim(seq_num: impl Into<u64>) -> Self {
1086        Self {
1087            command: Command::Trim {
1088                seq_num: seq_num.into(),
1089            },
1090            timestamp: None,
1091        }
1092    }
1093
1094    /// Overwrite timestamp.
1095    pub fn with_timestamp(self, timestamp: u64) -> Self {
1096        Self {
1097            timestamp: Some(timestamp),
1098            ..self
1099        }
1100    }
1101}
1102
1103#[sync_docs]
1104#[derive(Debug, Clone, PartialEq, Eq)]
1105pub struct AppendRecord {
1106    timestamp: Option<u64>,
1107    headers: Vec<Header>,
1108    body: Bytes,
1109    #[cfg(test)]
1110    max_bytes: u64,
1111}
1112
1113metered_impl!(AppendRecord);
1114
1115impl AppendRecord {
1116    const MAX_BYTES: u64 = MIB_BYTES;
1117
1118    fn validated(self) -> Result<Self, ConvertError> {
1119        #[cfg(test)]
1120        let max_bytes = self.max_bytes;
1121        #[cfg(not(test))]
1122        let max_bytes = Self::MAX_BYTES;
1123
1124        if self.metered_bytes() > max_bytes {
1125            Err("AppendRecord should have metered size less than 1 MiB".into())
1126        } else {
1127            Ok(self)
1128        }
1129    }
1130
1131    /// Try creating a new append record with body.
1132    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1133        Self {
1134            timestamp: None,
1135            headers: Vec::new(),
1136            body: body.into(),
1137            #[cfg(test)]
1138            max_bytes: Self::MAX_BYTES,
1139        }
1140        .validated()
1141    }
1142
1143    #[cfg(test)]
1144    pub(crate) fn with_max_bytes(
1145        max_bytes: u64,
1146        body: impl Into<Bytes>,
1147    ) -> Result<Self, ConvertError> {
1148        Self {
1149            timestamp: None,
1150            headers: Vec::new(),
1151            body: body.into(),
1152            max_bytes,
1153        }
1154        .validated()
1155    }
1156
1157    /// Overwrite headers.
1158    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1159        Self {
1160            headers: headers.into(),
1161            ..self
1162        }
1163        .validated()
1164    }
1165
1166    /// Overwrite timestamp.
1167    pub fn with_timestamp(self, timestamp: u64) -> Self {
1168        Self {
1169            timestamp: Some(timestamp),
1170            ..self
1171        }
1172    }
1173
1174    /// Body of the record.
1175    pub fn body(&self) -> &[u8] {
1176        &self.body
1177    }
1178
1179    /// Headers of the record.
1180    pub fn headers(&self) -> &[Header] {
1181        &self.headers
1182    }
1183
1184    /// Timestamp for the record.
1185    pub fn timestamp(&self) -> Option<u64> {
1186        self.timestamp
1187    }
1188
1189    /// Consume the record and return parts.
1190    pub fn into_parts(self) -> AppendRecordParts {
1191        AppendRecordParts {
1192            timestamp: self.timestamp,
1193            headers: self.headers,
1194            body: self.body,
1195        }
1196    }
1197
1198    /// Try creating the record from parts.
1199    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1200        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1201        if let Some(timestamp) = parts.timestamp {
1202            Ok(record.with_timestamp(timestamp))
1203        } else {
1204            Ok(record)
1205        }
1206    }
1207}
1208
1209impl From<AppendRecord> for api::AppendRecord {
1210    fn from(value: AppendRecord) -> Self {
1211        Self {
1212            timestamp: value.timestamp,
1213            headers: value.headers.into_iter().map(Into::into).collect(),
1214            body: value.body,
1215        }
1216    }
1217}
1218
1219impl From<CommandRecord> for AppendRecord {
1220    fn from(value: CommandRecord) -> Self {
1221        let (header_value, body) = match value.command {
1222            Command::Fence { fencing_token } => (
1223                CommandRecord::FENCE,
1224                Bytes::copy_from_slice(fencing_token.as_bytes()),
1225            ),
1226            Command::Trim { seq_num } => (
1227                CommandRecord::TRIM,
1228                Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1229            ),
1230        };
1231        AppendRecordParts {
1232            timestamp: value.timestamp,
1233            headers: vec![Header::from_value(header_value)],
1234            body,
1235        }
1236        .try_into()
1237        .expect("command record is a valid append record")
1238    }
1239}
1240
1241#[sync_docs(AppendRecordParts = "AppendRecord")]
1242#[derive(Debug, Clone)]
1243pub struct AppendRecordParts {
1244    pub timestamp: Option<u64>,
1245    pub headers: Vec<Header>,
1246    pub body: Bytes,
1247}
1248
1249impl From<AppendRecord> for AppendRecordParts {
1250    fn from(value: AppendRecord) -> Self {
1251        value.into_parts()
1252    }
1253}
1254
1255impl TryFrom<AppendRecordParts> for AppendRecord {
1256    type Error = ConvertError;
1257
1258    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1259        Self::try_from_parts(value)
1260    }
1261}
1262
1263/// A collection of append records that can be sent together in a batch.
1264#[derive(Debug, Clone)]
1265pub struct AppendRecordBatch {
1266    records: Vec<AppendRecord>,
1267    metered_bytes: u64,
1268    max_capacity: usize,
1269    #[cfg(test)]
1270    max_bytes: u64,
1271}
1272
1273impl PartialEq for AppendRecordBatch {
1274    fn eq(&self, other: &Self) -> bool {
1275        if self.records.eq(&other.records) {
1276            assert_eq!(self.metered_bytes, other.metered_bytes);
1277            true
1278        } else {
1279            false
1280        }
1281    }
1282}
1283
1284impl Eq for AppendRecordBatch {}
1285
1286impl Default for AppendRecordBatch {
1287    fn default() -> Self {
1288        Self::new()
1289    }
1290}
1291
1292impl AppendRecordBatch {
1293    /// Maximum number of records that a batch can hold.
1294    ///
1295    /// A record batch cannot be created with a bigger capacity.
1296    pub const MAX_CAPACITY: usize = 1000;
1297
1298    /// Maximum metered bytes of the batch.
1299    pub const MAX_BYTES: u64 = MIB_BYTES;
1300
1301    /// Create an empty record batch.
1302    pub fn new() -> Self {
1303        Self::with_max_capacity(Self::MAX_CAPACITY)
1304    }
1305
1306    /// Create an empty record batch with custom max capacity.
1307    ///
1308    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1309    pub fn with_max_capacity(max_capacity: usize) -> Self {
1310        assert!(
1311            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1312            "Batch capacity must be between 1 and 1000"
1313        );
1314
1315        Self {
1316            records: Vec::with_capacity(max_capacity),
1317            metered_bytes: 0,
1318            max_capacity,
1319            #[cfg(test)]
1320            max_bytes: Self::MAX_BYTES,
1321        }
1322    }
1323
1324    #[cfg(test)]
1325    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1326        #[cfg(test)]
1327        assert!(
1328            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1329            "Batch size must be between 1 byte and 1 MiB"
1330        );
1331
1332        Self {
1333            max_bytes,
1334            ..Self::with_max_capacity(max_capacity)
1335        }
1336    }
1337
1338    /// Try creating a record batch from an iterator.
1339    ///
1340    /// If all the items of the iterator cannot be drained into the batch, the
1341    /// error returned contains a batch containing all records it could fit
1342    /// along-with the left over items from the iterator.
1343    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1344    where
1345        R: Into<AppendRecord>,
1346        T: IntoIterator<Item = R>,
1347    {
1348        let mut records = Self::new();
1349        let mut pending = Vec::new();
1350
1351        let mut iter = iter.into_iter();
1352
1353        for record in iter.by_ref() {
1354            if let Err(record) = records.push(record) {
1355                pending.push(record);
1356                break;
1357            }
1358        }
1359
1360        if pending.is_empty() {
1361            Ok(records)
1362        } else {
1363            pending.extend(iter.map(Into::into));
1364            Err((records, pending))
1365        }
1366    }
1367
1368    /// Returns true if the batch contains no records.
1369    pub fn is_empty(&self) -> bool {
1370        if self.records.is_empty() {
1371            assert_eq!(self.metered_bytes, 0);
1372            true
1373        } else {
1374            false
1375        }
1376    }
1377
1378    /// Returns the number of records contained in the batch.
1379    pub fn len(&self) -> usize {
1380        self.records.len()
1381    }
1382
1383    #[cfg(test)]
1384    fn max_bytes(&self) -> u64 {
1385        self.max_bytes
1386    }
1387
1388    #[cfg(not(test))]
1389    fn max_bytes(&self) -> u64 {
1390        Self::MAX_BYTES
1391    }
1392
1393    /// Returns true if the batch cannot fit any more records.
1394    pub fn is_full(&self) -> bool {
1395        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1396    }
1397
1398    /// Try to append a new record into the batch.
1399    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1400        assert!(self.records.len() <= self.max_capacity);
1401        assert!(self.metered_bytes <= self.max_bytes());
1402
1403        let record = record.into();
1404        let record_size = record.metered_bytes();
1405        if self.records.len() >= self.max_capacity
1406            || self.metered_bytes + record_size > self.max_bytes()
1407        {
1408            Err(record)
1409        } else {
1410            self.records.push(record);
1411            self.metered_bytes += record_size;
1412            Ok(())
1413        }
1414    }
1415}
1416
1417impl MeteredBytes for AppendRecordBatch {
1418    fn metered_bytes(&self) -> u64 {
1419        self.metered_bytes
1420    }
1421}
1422
1423impl IntoIterator for AppendRecordBatch {
1424    type Item = AppendRecord;
1425    type IntoIter = std::vec::IntoIter<Self::Item>;
1426
1427    fn into_iter(self) -> Self::IntoIter {
1428        self.records.into_iter()
1429    }
1430}
1431
1432impl<'a> IntoIterator for &'a AppendRecordBatch {
1433    type Item = &'a AppendRecord;
1434    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1435
1436    fn into_iter(self) -> Self::IntoIter {
1437        self.records.iter()
1438    }
1439}
1440
1441impl AsRef<[AppendRecord]> for AppendRecordBatch {
1442    fn as_ref(&self) -> &[AppendRecord] {
1443        &self.records
1444    }
1445}
1446
1447#[sync_docs]
1448#[derive(Debug, Default, Clone)]
1449pub struct AppendInput {
1450    pub records: AppendRecordBatch,
1451    pub match_seq_num: Option<u64>,
1452    pub fencing_token: Option<FencingToken>,
1453}
1454
1455impl MeteredBytes for AppendInput {
1456    fn metered_bytes(&self) -> u64 {
1457        self.records.metered_bytes()
1458    }
1459}
1460
1461impl AppendInput {
1462    /// Create a new append input from record batch.
1463    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1464        Self {
1465            records: records.into(),
1466            match_seq_num: None,
1467            fencing_token: None,
1468        }
1469    }
1470
1471    /// Overwrite match sequence number.
1472    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1473        Self {
1474            match_seq_num: Some(match_seq_num.into()),
1475            ..self
1476        }
1477    }
1478
1479    /// Overwrite fencing token.
1480    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1481        Self {
1482            fencing_token: Some(fencing_token),
1483            ..self
1484        }
1485    }
1486
1487    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1488        let Self {
1489            records,
1490            match_seq_num,
1491            fencing_token,
1492        } = self;
1493
1494        api::AppendInput {
1495            stream: stream.into(),
1496            records: records.into_iter().map(Into::into).collect(),
1497            match_seq_num,
1498            fencing_token: fencing_token.map(|f| f.0),
1499        }
1500    }
1501}
1502
1503/// Acknowledgment to an append request.
1504#[derive(Debug, Clone)]
1505pub struct AppendAck {
1506    /// Sequence number and timestamp of the first record that was appended.
1507    pub start: StreamPosition,
1508    /// Sequence number of the last record that was appended + 1,
1509    /// and timestamp of the last record that was appended.
1510    /// The difference between `end.seq_num` and `start.seq_num`
1511    /// will be the number of records appended.
1512    pub end: StreamPosition,
1513    /// Sequence number that will be assigned to the next record on the stream,
1514    /// and timestamp of the last record on the stream.
1515    /// This can be greater than the `end` position in case of concurrent appends.
1516    pub tail: StreamPosition,
1517}
1518
1519impl From<api::AppendOutput> for AppendAck {
1520    fn from(value: api::AppendOutput) -> Self {
1521        let api::AppendOutput {
1522            start_seq_num,
1523            start_timestamp,
1524            end_seq_num,
1525            end_timestamp,
1526            next_seq_num,
1527            last_timestamp,
1528        } = value;
1529        let start = StreamPosition {
1530            seq_num: start_seq_num,
1531            timestamp: start_timestamp,
1532        };
1533        let end = StreamPosition {
1534            seq_num: end_seq_num,
1535            timestamp: end_timestamp,
1536        };
1537        let tail = StreamPosition {
1538            seq_num: next_seq_num,
1539            timestamp: last_timestamp,
1540        };
1541        Self { start, end, tail }
1542    }
1543}
1544
1545impl TryFrom<api::AppendResponse> for AppendAck {
1546    type Error = ConvertError;
1547    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1548        let api::AppendResponse { output } = value;
1549        let output = output.ok_or("missing append output")?;
1550        Ok(output.into())
1551    }
1552}
1553
1554impl TryFrom<api::AppendSessionResponse> for AppendAck {
1555    type Error = ConvertError;
1556    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1557        let api::AppendSessionResponse { output } = value;
1558        let output = output.ok_or("missing append output")?;
1559        Ok(output.into())
1560    }
1561}
1562
1563#[sync_docs]
1564#[derive(Debug, Clone, Default)]
1565pub struct ReadLimit {
1566    pub count: Option<u64>,
1567    pub bytes: Option<u64>,
1568}
1569
1570impl ReadLimit {
1571    /// Create a new read limit.
1572    pub fn new() -> Self {
1573        Self::default()
1574    }
1575
1576    /// Overwrite count limit.
1577    pub fn with_count(self, count: u64) -> Self {
1578        Self {
1579            count: Some(count),
1580            ..self
1581        }
1582    }
1583
1584    /// Overwrite bytes limit.
1585    pub fn with_bytes(self, bytes: u64) -> Self {
1586        Self {
1587            bytes: Some(bytes),
1588            ..self
1589        }
1590    }
1591}
1592
1593/// Starting point for read requests.
1594#[derive(Debug, Clone)]
1595pub enum ReadStart {
1596    /// Sequence number.
1597    SeqNum(u64),
1598    /// Timestamp.
1599    Timestamp(u64),
1600    /// Number of records before the tail, i.e. the next sequence number.
1601    TailOffset(u64),
1602}
1603
1604impl Default for ReadStart {
1605    fn default() -> Self {
1606        Self::SeqNum(0)
1607    }
1608}
1609
1610impl From<ReadStart> for api::read_request::Start {
1611    fn from(start: ReadStart) -> Self {
1612        match start {
1613            ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1614            ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1615            ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1616        }
1617    }
1618}
1619
1620impl From<ReadStart> for api::read_session_request::Start {
1621    fn from(start: ReadStart) -> Self {
1622        match start {
1623            ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1624            ReadStart::Timestamp(timestamp) => {
1625                api::read_session_request::Start::Timestamp(timestamp)
1626            }
1627            ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1628        }
1629    }
1630}
1631
1632#[sync_docs]
1633#[derive(Debug, Clone, Default)]
1634pub struct ReadRequest {
1635    pub start: ReadStart,
1636    pub limit: ReadLimit,
1637    pub until: Option<RangeTo<u64>>,
1638}
1639
1640impl ReadRequest {
1641    /// Create a new request with the specified starting point.
1642    pub fn new(start: ReadStart) -> Self {
1643        Self {
1644            start,
1645            ..Default::default()
1646        }
1647    }
1648
1649    /// Overwrite limit.
1650    pub fn with_limit(self, limit: ReadLimit) -> Self {
1651        Self { limit, ..self }
1652    }
1653
1654    /// Provide an `until` timestamp.
1655    pub fn with_until(self, until: RangeTo<u64>) -> Self {
1656        Self {
1657            until: Some(until),
1658            ..self
1659        }
1660    }
1661}
1662
1663impl ReadRequest {
1664    pub(crate) fn try_into_api_type(
1665        self,
1666        stream: impl Into<String>,
1667    ) -> Result<api::ReadRequest, ConvertError> {
1668        let Self {
1669            start,
1670            limit,
1671            until,
1672        } = self;
1673
1674        let limit = if limit.count > Some(1000) {
1675            Err("read limit: count must not exceed 1000 for unary request")
1676        } else if limit.bytes > Some(MIB_BYTES) {
1677            Err("read limit: bytes must not exceed 1MiB for unary request")
1678        } else {
1679            Ok(api::ReadLimit {
1680                count: limit.count,
1681                bytes: limit.bytes,
1682            })
1683        }?;
1684
1685        Ok(api::ReadRequest {
1686            stream: stream.into(),
1687            start: Some(start.into()),
1688            limit: Some(limit),
1689            until: until.map(|range| range.end),
1690        })
1691    }
1692}
1693
1694#[sync_docs]
1695#[derive(Debug, Clone)]
1696pub struct SequencedRecord {
1697    pub seq_num: u64,
1698    pub timestamp: u64,
1699    pub headers: Vec<Header>,
1700    pub body: Bytes,
1701}
1702
1703metered_impl!(SequencedRecord);
1704
1705impl From<api::SequencedRecord> for SequencedRecord {
1706    fn from(value: api::SequencedRecord) -> Self {
1707        let api::SequencedRecord {
1708            seq_num,
1709            timestamp,
1710            headers,
1711            body,
1712        } = value;
1713        Self {
1714            seq_num,
1715            timestamp,
1716            headers: headers.into_iter().map(Into::into).collect(),
1717            body,
1718        }
1719    }
1720}
1721
1722impl SequencedRecord {
1723    /// Try representing the sequenced record as a command record.
1724    pub fn as_command_record(&self) -> Option<CommandRecord> {
1725        if self.headers.len() != 1 {
1726            return None;
1727        }
1728
1729        let header = self.headers.first().expect("pre-validated length");
1730
1731        if !header.name.is_empty() {
1732            return None;
1733        }
1734
1735        match header.value.as_ref() {
1736            CommandRecord::FENCE => {
1737                let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1738                Some(CommandRecord {
1739                    command: Command::Fence { fencing_token },
1740                    timestamp: Some(self.timestamp),
1741                })
1742            }
1743            CommandRecord::TRIM => {
1744                let body: &[u8] = &self.body;
1745                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1746                Some(CommandRecord {
1747                    command: Command::Trim { seq_num },
1748                    timestamp: Some(self.timestamp),
1749                })
1750            }
1751            _ => None,
1752        }
1753    }
1754}
1755
1756#[sync_docs]
1757#[derive(Debug, Clone)]
1758pub struct SequencedRecordBatch {
1759    pub records: Vec<SequencedRecord>,
1760}
1761
1762impl MeteredBytes for SequencedRecordBatch {
1763    fn metered_bytes(&self) -> u64 {
1764        self.records.metered_bytes()
1765    }
1766}
1767
1768impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1769    fn from(value: api::SequencedRecordBatch) -> Self {
1770        let api::SequencedRecordBatch { records } = value;
1771        Self {
1772            records: records.into_iter().map(Into::into).collect(),
1773        }
1774    }
1775}
1776
1777#[sync_docs(ReadOutput = "Output")]
1778#[derive(Debug, Clone)]
1779pub enum ReadOutput {
1780    Batch(SequencedRecordBatch),
1781    NextSeqNum(u64),
1782}
1783
1784impl From<api::read_output::Output> for ReadOutput {
1785    fn from(value: api::read_output::Output) -> Self {
1786        match value {
1787            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1788            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1789        }
1790    }
1791}
1792
1793impl TryFrom<api::ReadOutput> for ReadOutput {
1794    type Error = ConvertError;
1795    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1796        let api::ReadOutput { output } = value;
1797        let output = output.ok_or("missing read output")?;
1798        Ok(output.into())
1799    }
1800}
1801
1802impl TryFrom<api::ReadResponse> for ReadOutput {
1803    type Error = ConvertError;
1804    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1805        let api::ReadResponse { output } = value;
1806        let output = output.ok_or("missing output in read response")?;
1807        output.try_into()
1808    }
1809}
1810
1811#[sync_docs]
1812#[derive(Debug, Clone, Default)]
1813pub struct ReadSessionRequest {
1814    pub start: ReadStart,
1815    pub limit: ReadLimit,
1816    pub until: Option<RangeTo<u64>>,
1817}
1818
1819impl ReadSessionRequest {
1820    /// Create a new request with the specified starting point.
1821    pub fn new(start: ReadStart) -> Self {
1822        Self {
1823            start,
1824            ..Default::default()
1825        }
1826    }
1827
1828    /// Overwrite limit.
1829    pub fn with_limit(self, limit: ReadLimit) -> Self {
1830        Self { limit, ..self }
1831    }
1832
1833    /// Provide an `until` timestamp.
1834    pub fn with_until(self, until: RangeTo<u64>) -> Self {
1835        Self {
1836            until: Some(until),
1837            ..self
1838        }
1839    }
1840
1841    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1842        let Self {
1843            start,
1844            limit,
1845            until,
1846        } = self;
1847        api::ReadSessionRequest {
1848            stream: stream.into(),
1849            start: Some(start.into()),
1850            limit: Some(api::ReadLimit {
1851                count: limit.count,
1852                bytes: limit.bytes,
1853            }),
1854            heartbeats: false,
1855            until: until.map(|range| range.end),
1856        }
1857    }
1858}
1859
1860impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1861    type Error = ConvertError;
1862    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1863        let api::ReadSessionResponse { output } = value;
1864        let output = output.ok_or("missing output in read session response")?;
1865        output.try_into()
1866    }
1867}
1868
1869/// Name of a basin.
1870///
1871/// Must be between 8 and 48 characters in length. Must comprise lowercase
1872/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1873#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1874pub struct BasinName(String);
1875
1876impl Deref for BasinName {
1877    type Target = str;
1878    fn deref(&self) -> &Self::Target {
1879        &self.0
1880    }
1881}
1882
1883impl TryFrom<String> for BasinName {
1884    type Error = ConvertError;
1885
1886    fn try_from(name: String) -> Result<Self, Self::Error> {
1887        if name.len() < 8 || name.len() > 48 {
1888            return Err("Basin name must be between 8 and 48 characters in length".into());
1889        }
1890
1891        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1892        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1893            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1894                .expect("Failed to compile basin name regex")
1895        });
1896
1897        if !regex.is_match(&name) {
1898            return Err(
1899                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1900                It cannot begin or end with a hyphen."
1901                    .into(),
1902            );
1903        }
1904
1905        Ok(Self(name))
1906    }
1907}
1908
1909impl FromStr for BasinName {
1910    type Err = ConvertError;
1911
1912    fn from_str(s: &str) -> Result<Self, Self::Err> {
1913        s.to_string().try_into()
1914    }
1915}
1916
1917impl std::fmt::Display for BasinName {
1918    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1919        f.write_str(&self.0)
1920    }
1921}
1922
1923impl From<BasinName> for String {
1924    fn from(value: BasinName) -> Self {
1925        value.0
1926    }
1927}
1928
1929/// Access token ID.
1930/// Must be between 1 and 96 characters.
1931#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1932pub struct AccessTokenId(String);
1933
1934impl Deref for AccessTokenId {
1935    type Target = str;
1936
1937    fn deref(&self) -> &Self::Target {
1938        &self.0
1939    }
1940}
1941
1942impl TryFrom<String> for AccessTokenId {
1943    type Error = ConvertError;
1944
1945    fn try_from(name: String) -> Result<Self, Self::Error> {
1946        if name.is_empty() {
1947            return Err("Access token ID must not be empty".into());
1948        }
1949
1950        if name.len() > 96 {
1951            return Err("Access token ID must not exceed 96 characters".into());
1952        }
1953
1954        Ok(Self(name))
1955    }
1956}
1957
1958impl From<AccessTokenId> for String {
1959    fn from(value: AccessTokenId) -> Self {
1960        value.0
1961    }
1962}
1963
1964impl FromStr for AccessTokenId {
1965    type Err = ConvertError;
1966
1967    fn from_str(s: &str) -> Result<Self, Self::Err> {
1968        s.to_string().try_into()
1969    }
1970}
1971
1972impl std::fmt::Display for AccessTokenId {
1973    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1974        f.write_str(&self.0)
1975    }
1976}
1977
1978impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1979    fn from(value: AccessTokenInfo) -> Self {
1980        Self {
1981            info: Some(value.into()),
1982        }
1983    }
1984}
1985
1986#[sync_docs]
1987#[derive(Debug, Clone)]
1988pub struct AccessTokenInfo {
1989    pub id: AccessTokenId,
1990    pub expires_at: Option<u32>,
1991    pub auto_prefix_streams: bool,
1992    pub scope: Option<AccessTokenScope>,
1993}
1994
1995impl AccessTokenInfo {
1996    /// Create a new access token info.
1997    pub fn new(id: AccessTokenId) -> Self {
1998        Self {
1999            id,
2000            expires_at: None,
2001            auto_prefix_streams: false,
2002            scope: None,
2003        }
2004    }
2005
2006    /// Overwrite expiration time.
2007    pub fn with_expires_at(self, expires_at: u32) -> Self {
2008        Self {
2009            expires_at: Some(expires_at),
2010            ..self
2011        }
2012    }
2013
2014    /// Overwrite auto prefix streams.
2015    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2016        Self {
2017            auto_prefix_streams,
2018            ..self
2019        }
2020    }
2021
2022    /// Overwrite scope.
2023    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2024        Self {
2025            scope: Some(scope),
2026            ..self
2027        }
2028    }
2029}
2030
2031impl From<AccessTokenInfo> for api::AccessTokenInfo {
2032    fn from(value: AccessTokenInfo) -> Self {
2033        let AccessTokenInfo {
2034            id,
2035            expires_at,
2036            auto_prefix_streams,
2037            scope,
2038        } = value;
2039        Self {
2040            id: id.into(),
2041            expires_at,
2042            auto_prefix_streams,
2043            scope: scope.map(Into::into),
2044        }
2045    }
2046}
2047
2048impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2049    type Error = ConvertError;
2050
2051    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2052        let api::AccessTokenInfo {
2053            id,
2054            expires_at,
2055            auto_prefix_streams,
2056            scope,
2057        } = value;
2058        Ok(Self {
2059            id: id.try_into()?,
2060            expires_at,
2061            auto_prefix_streams,
2062            scope: scope.map(Into::into),
2063        })
2064    }
2065}
2066
2067#[sync_docs]
2068#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2069pub enum Operation {
2070    ListBasins,
2071    CreateBasin,
2072    DeleteBasin,
2073    ReconfigureBasin,
2074    GetBasinConfig,
2075    IssueAccessToken,
2076    RevokeAccessToken,
2077    ListAccessTokens,
2078    ListStreams,
2079    CreateStream,
2080    DeleteStream,
2081    GetStreamConfig,
2082    ReconfigureStream,
2083    CheckTail,
2084    Append,
2085    Read,
2086    Trim,
2087    Fence,
2088    AccountMetrics,
2089    BasinMetrics,
2090    StreamMetrics,
2091}
2092
2093impl FromStr for Operation {
2094    type Err = ConvertError;
2095
2096    fn from_str(s: &str) -> Result<Self, Self::Err> {
2097        match s.to_lowercase().as_str() {
2098            "list-basins" => Ok(Self::ListBasins),
2099            "create-basin" => Ok(Self::CreateBasin),
2100            "delete-basin" => Ok(Self::DeleteBasin),
2101            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2102            "get-basin-config" => Ok(Self::GetBasinConfig),
2103            "issue-access-token" => Ok(Self::IssueAccessToken),
2104            "revoke-access-token" => Ok(Self::RevokeAccessToken),
2105            "list-access-tokens" => Ok(Self::ListAccessTokens),
2106            "list-streams" => Ok(Self::ListStreams),
2107            "create-stream" => Ok(Self::CreateStream),
2108            "delete-stream" => Ok(Self::DeleteStream),
2109            "get-stream-config" => Ok(Self::GetStreamConfig),
2110            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2111            "check-tail" => Ok(Self::CheckTail),
2112            "append" => Ok(Self::Append),
2113            "read" => Ok(Self::Read),
2114            "trim" => Ok(Self::Trim),
2115            "fence" => Ok(Self::Fence),
2116            "account-metrics" => Ok(Self::AccountMetrics),
2117            "basin-metrics" => Ok(Self::BasinMetrics),
2118            "stream-metrics" => Ok(Self::StreamMetrics),
2119            _ => Err("invalid operation".into()),
2120        }
2121    }
2122}
2123
2124impl From<Operation> for api::Operation {
2125    fn from(value: Operation) -> Self {
2126        match value {
2127            Operation::ListBasins => Self::ListBasins,
2128            Operation::CreateBasin => Self::CreateBasin,
2129            Operation::DeleteBasin => Self::DeleteBasin,
2130            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2131            Operation::GetBasinConfig => Self::GetBasinConfig,
2132            Operation::IssueAccessToken => Self::IssueAccessToken,
2133            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2134            Operation::ListAccessTokens => Self::ListAccessTokens,
2135            Operation::ListStreams => Self::ListStreams,
2136            Operation::CreateStream => Self::CreateStream,
2137            Operation::DeleteStream => Self::DeleteStream,
2138            Operation::GetStreamConfig => Self::GetStreamConfig,
2139            Operation::ReconfigureStream => Self::ReconfigureStream,
2140            Operation::CheckTail => Self::CheckTail,
2141            Operation::Append => Self::Append,
2142            Operation::Read => Self::Read,
2143            Operation::Trim => Self::Trim,
2144            Operation::Fence => Self::Fence,
2145            Operation::AccountMetrics => Self::AccountMetrics,
2146            Operation::BasinMetrics => Self::BasinMetrics,
2147            Operation::StreamMetrics => Self::StreamMetrics,
2148        }
2149    }
2150}
2151
2152impl From<api::Operation> for Option<Operation> {
2153    fn from(value: api::Operation) -> Self {
2154        match value {
2155            api::Operation::Unspecified => None,
2156            api::Operation::ListBasins => Some(Operation::ListBasins),
2157            api::Operation::CreateBasin => Some(Operation::CreateBasin),
2158            api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2159            api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2160            api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2161            api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2162            api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2163            api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2164            api::Operation::ListStreams => Some(Operation::ListStreams),
2165            api::Operation::CreateStream => Some(Operation::CreateStream),
2166            api::Operation::DeleteStream => Some(Operation::DeleteStream),
2167            api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2168            api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2169            api::Operation::CheckTail => Some(Operation::CheckTail),
2170            api::Operation::Append => Some(Operation::Append),
2171            api::Operation::Read => Some(Operation::Read),
2172            api::Operation::Trim => Some(Operation::Trim),
2173            api::Operation::Fence => Some(Operation::Fence),
2174            api::Operation::AccountMetrics => Some(Operation::AccountMetrics),
2175            api::Operation::BasinMetrics => Some(Operation::BasinMetrics),
2176            api::Operation::StreamMetrics => Some(Operation::StreamMetrics),
2177        }
2178    }
2179}
2180
2181#[sync_docs]
2182#[derive(Debug, Clone, Default)]
2183pub struct AccessTokenScope {
2184    pub basins: Option<ResourceSet>,
2185    pub streams: Option<ResourceSet>,
2186    pub access_tokens: Option<ResourceSet>,
2187    pub op_groups: Option<PermittedOperationGroups>,
2188    pub ops: HashSet<Operation>,
2189}
2190
2191impl AccessTokenScope {
2192    /// Create a new access token scope.
2193    pub fn new() -> Self {
2194        Self::default()
2195    }
2196
2197    /// Overwrite resource set for access tokens.
2198    pub fn with_basins(self, basins: ResourceSet) -> Self {
2199        Self {
2200            basins: Some(basins),
2201            ..self
2202        }
2203    }
2204
2205    /// Overwrite resource set for streams.
2206    pub fn with_streams(self, streams: ResourceSet) -> Self {
2207        Self {
2208            streams: Some(streams),
2209            ..self
2210        }
2211    }
2212
2213    /// Overwrite resource set for access tokens.
2214    pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2215        Self {
2216            access_tokens: Some(access_tokens),
2217            ..self
2218        }
2219    }
2220
2221    /// Overwrite operation groups.
2222    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2223        Self {
2224            op_groups: Some(op_groups),
2225            ..self
2226        }
2227    }
2228
2229    /// Overwrite operations.
2230    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2231        Self {
2232            ops: ops.into_iter().collect(),
2233            ..self
2234        }
2235    }
2236
2237    /// Add an operation to operations.
2238    pub fn with_op(self, op: Operation) -> Self {
2239        let mut ops = self.ops;
2240        ops.insert(op);
2241        Self { ops, ..self }
2242    }
2243}
2244
2245impl From<AccessTokenScope> for api::AccessTokenScope {
2246    fn from(value: AccessTokenScope) -> Self {
2247        let AccessTokenScope {
2248            basins,
2249            streams,
2250            access_tokens,
2251            op_groups,
2252            ops,
2253        } = value;
2254        Self {
2255            basins: basins.map(Into::into),
2256            streams: streams.map(Into::into),
2257            access_tokens: access_tokens.map(Into::into),
2258            op_groups: op_groups.map(Into::into),
2259            ops: ops
2260                .into_iter()
2261                .map(api::Operation::from)
2262                .map(Into::into)
2263                .collect(),
2264        }
2265    }
2266}
2267
2268impl From<api::AccessTokenScope> for AccessTokenScope {
2269    fn from(value: api::AccessTokenScope) -> Self {
2270        let api::AccessTokenScope {
2271            basins,
2272            streams,
2273            access_tokens,
2274            op_groups,
2275            ops,
2276        } = value;
2277        Self {
2278            basins: basins.and_then(|set| set.matching.map(Into::into)),
2279            streams: streams.and_then(|set| set.matching.map(Into::into)),
2280            access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2281            op_groups: op_groups.map(Into::into),
2282            ops: ops
2283                .into_iter()
2284                .map(api::Operation::try_from)
2285                .flat_map(Result::ok)
2286                .flat_map(<Option<Operation>>::from)
2287                .collect(),
2288        }
2289    }
2290}
2291
2292impl From<ResourceSet> for api::ResourceSet {
2293    fn from(value: ResourceSet) -> Self {
2294        Self {
2295            matching: Some(value.into()),
2296        }
2297    }
2298}
2299
2300#[sync_docs(ResourceSet = "Matching")]
2301#[derive(Debug, Clone)]
2302pub enum ResourceSet {
2303    Exact(String),
2304    Prefix(String),
2305}
2306
2307impl From<ResourceSet> for api::resource_set::Matching {
2308    fn from(value: ResourceSet) -> Self {
2309        match value {
2310            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2311            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2312        }
2313    }
2314}
2315
2316impl From<api::resource_set::Matching> for ResourceSet {
2317    fn from(value: api::resource_set::Matching) -> Self {
2318        match value {
2319            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2320            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2321        }
2322    }
2323}
2324
2325#[sync_docs]
2326#[derive(Debug, Clone, Default)]
2327pub struct PermittedOperationGroups {
2328    pub account: Option<ReadWritePermissions>,
2329    pub basin: Option<ReadWritePermissions>,
2330    pub stream: Option<ReadWritePermissions>,
2331}
2332
2333impl PermittedOperationGroups {
2334    /// Create a new permitted operation groups.
2335    pub fn new() -> Self {
2336        Self::default()
2337    }
2338
2339    /// Overwrite account read-write permissions.
2340    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2341        Self {
2342            account: Some(account),
2343            ..self
2344        }
2345    }
2346
2347    /// Overwrite basin read-write permissions.
2348    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2349        Self {
2350            basin: Some(basin),
2351            ..self
2352        }
2353    }
2354
2355    /// Overwrite stream read-write permissions.
2356    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2357        Self {
2358            stream: Some(stream),
2359            ..self
2360        }
2361    }
2362}
2363
2364impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2365    fn from(value: PermittedOperationGroups) -> Self {
2366        let PermittedOperationGroups {
2367            account,
2368            basin,
2369            stream,
2370        } = value;
2371        Self {
2372            account: account.map(Into::into),
2373            basin: basin.map(Into::into),
2374            stream: stream.map(Into::into),
2375        }
2376    }
2377}
2378
2379impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2380    fn from(value: api::PermittedOperationGroups) -> Self {
2381        let api::PermittedOperationGroups {
2382            account,
2383            basin,
2384            stream,
2385        } = value;
2386        Self {
2387            account: account.map(Into::into),
2388            basin: basin.map(Into::into),
2389            stream: stream.map(Into::into),
2390        }
2391    }
2392}
2393
2394#[sync_docs]
2395#[derive(Debug, Clone, Default)]
2396pub struct ReadWritePermissions {
2397    pub read: bool,
2398    pub write: bool,
2399}
2400
2401impl ReadWritePermissions {
2402    /// Create a new read-write permission.
2403    pub fn new() -> Self {
2404        Self::default()
2405    }
2406
2407    /// Overwrite read permission.
2408    pub fn with_read(self, read: bool) -> Self {
2409        Self { read, ..self }
2410    }
2411
2412    /// Overwrite write permission.
2413    pub fn with_write(self, write: bool) -> Self {
2414        Self { write, ..self }
2415    }
2416}
2417
2418impl From<ReadWritePermissions> for api::ReadWritePermissions {
2419    fn from(value: ReadWritePermissions) -> Self {
2420        let ReadWritePermissions { read, write } = value;
2421        Self { read, write }
2422    }
2423}
2424
2425impl From<api::ReadWritePermissions> for ReadWritePermissions {
2426    fn from(value: api::ReadWritePermissions) -> Self {
2427        let api::ReadWritePermissions { read, write } = value;
2428        Self { read, write }
2429    }
2430}
2431
2432impl From<api::IssueAccessTokenResponse> for String {
2433    fn from(value: api::IssueAccessTokenResponse) -> Self {
2434        value.access_token
2435    }
2436}
2437
2438impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2439    fn from(value: AccessTokenId) -> Self {
2440        Self { id: value.into() }
2441    }
2442}
2443
2444impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2445    type Error = ConvertError;
2446    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2447        let token_info = value.info.ok_or("access token info is missing")?;
2448        token_info.try_into()
2449    }
2450}
2451
2452#[sync_docs]
2453#[derive(Debug, Clone, Default)]
2454pub struct ListAccessTokensRequest {
2455    pub prefix: String,
2456    pub start_after: String,
2457    pub limit: Option<usize>,
2458}
2459
2460impl ListAccessTokensRequest {
2461    /// Create a new request with prefix.
2462    pub fn new() -> Self {
2463        Self::default()
2464    }
2465
2466    /// Overwrite prefix.
2467    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2468        Self {
2469            prefix: prefix.into(),
2470            ..self
2471        }
2472    }
2473
2474    /// Overwrite start after.
2475    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2476        Self {
2477            start_after: start_after.into(),
2478            ..self
2479        }
2480    }
2481
2482    /// Overwrite limit.
2483    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2484        Self {
2485            limit: limit.into(),
2486            ..self
2487        }
2488    }
2489}
2490
2491impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2492    type Error = ConvertError;
2493    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2494        let ListAccessTokensRequest {
2495            prefix,
2496            start_after,
2497            limit,
2498        } = value;
2499        Ok(Self {
2500            prefix,
2501            start_after,
2502            limit: limit
2503                .map(TryInto::try_into)
2504                .transpose()
2505                .map_err(|_| "request limit does not fit into u64 bounds")?,
2506        })
2507    }
2508}
2509
2510#[sync_docs]
2511#[derive(Debug, Clone)]
2512pub struct ListAccessTokensResponse {
2513    pub access_tokens: Vec<AccessTokenInfo>,
2514    pub has_more: bool,
2515}
2516
2517impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2518    type Error = ConvertError;
2519    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2520        let api::ListAccessTokensResponse {
2521            access_tokens,
2522            has_more,
2523        } = value;
2524        let access_tokens = access_tokens
2525            .into_iter()
2526            .map(TryInto::try_into)
2527            .collect::<Result<Vec<_>, _>>()?;
2528        Ok(Self {
2529            access_tokens,
2530            has_more,
2531        })
2532    }
2533}