s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{
4    collections::HashSet,
5    ops::{Deref, RangeTo},
6    str::FromStr,
7    sync::OnceLock,
8    time::Duration,
9};
10
11use bytes::Bytes;
12use rand::Rng;
13use regex::Regex;
14use sync_docs::sync_docs;
15
16use crate::api;
17
18pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
19pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
20
21/// Error related to conversion from one type to another.
22#[derive(Debug, Clone, thiserror::Error)]
23#[error("{0}")]
24pub struct ConvertError(String);
25
26impl<T: Into<String>> From<T> for ConvertError {
27    fn from(value: T) -> Self {
28        Self(value.into())
29    }
30}
31
32/// Metered size of the object in bytes.
33///
34/// Bytes are calculated using the "metered bytes" formula:
35///
36/// ```python
37/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
38/// ```
39pub trait MeteredBytes {
40    /// Return the metered bytes of the object.
41    fn metered_bytes(&self) -> u64;
42}
43
44impl<T: MeteredBytes> MeteredBytes for Vec<T> {
45    fn metered_bytes(&self) -> u64 {
46        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
47    }
48}
49
50macro_rules! metered_impl {
51    ($ty:ty) => {
52        impl MeteredBytes for $ty {
53            fn metered_bytes(&self) -> u64 {
54                let bytes = 8
55                    + (2 * self.headers.len())
56                    + self
57                        .headers
58                        .iter()
59                        .map(|h| h.name.len() + h.value.len())
60                        .sum::<usize>()
61                    + self.body.len();
62                bytes as u64
63            }
64        }
65    };
66}
67
68#[sync_docs]
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BasinScope {
71    AwsUsEast1,
72}
73
74impl From<BasinScope> for api::BasinScope {
75    fn from(value: BasinScope) -> Self {
76        match value {
77            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
78        }
79    }
80}
81
82impl From<api::BasinScope> for Option<BasinScope> {
83    fn from(value: api::BasinScope) -> Self {
84        match value {
85            api::BasinScope::Unspecified => None,
86            api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
87        }
88    }
89}
90
91impl FromStr for BasinScope {
92    type Err = ConvertError;
93    fn from_str(value: &str) -> Result<Self, Self::Err> {
94        match value {
95            "aws:us-east-1" => Ok(Self::AwsUsEast1),
96            _ => Err("invalid basin scope value".into()),
97        }
98    }
99}
100
101#[sync_docs]
102#[derive(Debug, Clone)]
103pub struct CreateBasinRequest {
104    pub basin: BasinName,
105    pub config: Option<BasinConfig>,
106    pub scope: Option<BasinScope>,
107}
108
109impl CreateBasinRequest {
110    /// Create a new request with basin name.
111    pub fn new(basin: BasinName) -> Self {
112        Self {
113            basin,
114            config: None,
115            scope: None,
116        }
117    }
118
119    /// Overwrite basin configuration.
120    pub fn with_config(self, config: BasinConfig) -> Self {
121        Self {
122            config: Some(config),
123            ..self
124        }
125    }
126
127    /// Overwrite basin scope.
128    pub fn with_scope(self, scope: BasinScope) -> Self {
129        Self {
130            scope: Some(scope),
131            ..self
132        }
133    }
134}
135
136impl From<CreateBasinRequest> for api::CreateBasinRequest {
137    fn from(value: CreateBasinRequest) -> Self {
138        let CreateBasinRequest {
139            basin,
140            config,
141            scope,
142        } = value;
143        Self {
144            basin: basin.0,
145            config: config.map(Into::into),
146            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
147        }
148    }
149}
150
151#[sync_docs]
152#[derive(Debug, Clone, Default)]
153pub struct BasinConfig {
154    pub default_stream_config: Option<StreamConfig>,
155    pub create_stream_on_append: bool,
156    pub create_stream_on_read: bool,
157}
158
159impl BasinConfig {
160    /// Create a new basin config.
161    pub fn new() -> Self {
162        Self::default()
163    }
164
165    /// Overwrite the default stream config.
166    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
167        Self {
168            default_stream_config: Some(default_stream_config),
169            ..self
170        }
171    }
172
173    /// Overwrite `create_stream_on_append`.
174    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
175        Self {
176            create_stream_on_append,
177            ..self
178        }
179    }
180
181    /// Overwrite `create_stream_on_read`.
182    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
183        Self {
184            create_stream_on_read,
185            ..self
186        }
187    }
188}
189
190impl From<BasinConfig> for api::BasinConfig {
191    fn from(value: BasinConfig) -> Self {
192        let BasinConfig {
193            default_stream_config,
194            create_stream_on_append,
195            create_stream_on_read,
196        } = value;
197        Self {
198            default_stream_config: default_stream_config.map(Into::into),
199            create_stream_on_append,
200            create_stream_on_read,
201        }
202    }
203}
204
205impl From<api::BasinConfig> for BasinConfig {
206    fn from(value: api::BasinConfig) -> Self {
207        let api::BasinConfig {
208            default_stream_config,
209            create_stream_on_append,
210            create_stream_on_read,
211        } = value;
212        Self {
213            default_stream_config: default_stream_config.map(Into::into),
214            create_stream_on_append,
215            create_stream_on_read,
216        }
217    }
218}
219
220#[sync_docs]
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TimestampingMode {
223    ClientPrefer,
224    ClientRequire,
225    Arrival,
226}
227
228impl From<TimestampingMode> for api::TimestampingMode {
229    fn from(value: TimestampingMode) -> Self {
230        match value {
231            TimestampingMode::ClientPrefer => Self::ClientPrefer,
232            TimestampingMode::ClientRequire => Self::ClientRequire,
233            TimestampingMode::Arrival => Self::Arrival,
234        }
235    }
236}
237
238impl From<api::TimestampingMode> for Option<TimestampingMode> {
239    fn from(value: api::TimestampingMode) -> Self {
240        match value {
241            api::TimestampingMode::Unspecified => None,
242            api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
243            api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
244            api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
245        }
246    }
247}
248
249#[sync_docs(TimestampingConfig = "Timestamping")]
250#[derive(Debug, Clone, Default)]
251/// Timestamping behavior.
252pub struct TimestampingConfig {
253    pub mode: Option<TimestampingMode>,
254    pub uncapped: Option<bool>,
255}
256
257impl TimestampingConfig {
258    /// Create a new timestamping config.
259    pub fn new() -> Self {
260        Self::default()
261    }
262
263    /// Overwrite timestamping mode.
264    pub fn with_mode(self, mode: TimestampingMode) -> Self {
265        Self {
266            mode: Some(mode),
267            ..self
268        }
269    }
270
271    /// Overwrite the uncapped knob.
272    pub fn with_uncapped(self, uncapped: bool) -> Self {
273        Self {
274            uncapped: Some(uncapped),
275            ..self
276        }
277    }
278}
279
280impl From<TimestampingConfig> for api::stream_config::Timestamping {
281    fn from(value: TimestampingConfig) -> Self {
282        Self {
283            mode: value
284                .mode
285                .map(api::TimestampingMode::from)
286                .unwrap_or_default()
287                .into(),
288            uncapped: value.uncapped,
289        }
290    }
291}
292
293impl From<api::stream_config::Timestamping> for TimestampingConfig {
294    fn from(value: api::stream_config::Timestamping) -> Self {
295        let mode = value.mode().into();
296        let uncapped = value.uncapped;
297        Self { mode, uncapped }
298    }
299}
300
301#[sync_docs(DeleteOnEmptyConfig = "DeleteOnEmpty", min_age = "min_age_secs")]
302#[derive(Debug, Clone, Default)]
303/// Delete-on-empty config.
304pub struct DeleteOnEmptyConfig {
305    pub min_age: Duration,
306}
307
308impl DeleteOnEmptyConfig {
309    /// Create a new delete-on-empty config.
310    pub fn new() -> Self {
311        Self::default()
312    }
313
314    /// Overwrite min age.
315    pub fn with_min_age(self, min_age: Duration) -> Self {
316        Self { min_age }
317    }
318}
319
320impl From<DeleteOnEmptyConfig> for api::stream_config::DeleteOnEmpty {
321    fn from(value: DeleteOnEmptyConfig) -> Self {
322        Self {
323            min_age_secs: value.min_age.as_secs(),
324        }
325    }
326}
327
328impl From<api::stream_config::DeleteOnEmpty> for DeleteOnEmptyConfig {
329    fn from(value: api::stream_config::DeleteOnEmpty) -> Self {
330        Self {
331            min_age: Duration::from_secs(value.min_age_secs),
332        }
333    }
334}
335
336#[sync_docs]
337#[derive(Debug, Clone, Default)]
338pub struct StreamConfig {
339    pub storage_class: Option<StorageClass>,
340    pub retention_policy: Option<RetentionPolicy>,
341    pub timestamping: Option<TimestampingConfig>,
342    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
343}
344
345impl StreamConfig {
346    /// Create a new stream config.
347    pub fn new() -> Self {
348        Self::default()
349    }
350
351    /// Overwrite storage class.
352    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
353        Self {
354            storage_class: Some(storage_class),
355            ..self
356        }
357    }
358
359    /// Overwrite retention policy.
360    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
361        Self {
362            retention_policy: Some(retention_policy),
363            ..self
364        }
365    }
366
367    /// Overwrite timestamping config.
368    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
369        Self {
370            timestamping: Some(timestamping),
371            ..self
372        }
373    }
374
375    /// Overwrite delete-on-empty config.
376    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
377        Self {
378            delete_on_empty: Some(delete_on_empty),
379            ..self
380        }
381    }
382}
383
384impl From<StreamConfig> for api::StreamConfig {
385    fn from(value: StreamConfig) -> Self {
386        let StreamConfig {
387            storage_class,
388            retention_policy,
389            timestamping,
390            delete_on_empty,
391        } = value;
392        Self {
393            storage_class: storage_class
394                .map(api::StorageClass::from)
395                .unwrap_or_default()
396                .into(),
397            retention_policy: retention_policy.map(Into::into),
398            timestamping: timestamping.map(Into::into),
399            delete_on_empty: delete_on_empty.map(Into::into),
400        }
401    }
402}
403
404impl From<api::StreamConfig> for StreamConfig {
405    fn from(value: api::StreamConfig) -> Self {
406        Self {
407            storage_class: value.storage_class().into(),
408            retention_policy: value.retention_policy.map(Into::into),
409            timestamping: value.timestamping.map(Into::into),
410            delete_on_empty: value.delete_on_empty.map(Into::into),
411        }
412    }
413}
414
415#[sync_docs]
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417pub enum StorageClass {
418    Standard,
419    Express,
420}
421
422impl From<StorageClass> for api::StorageClass {
423    fn from(value: StorageClass) -> Self {
424        match value {
425            StorageClass::Standard => Self::Standard,
426            StorageClass::Express => Self::Express,
427        }
428    }
429}
430
431impl From<api::StorageClass> for Option<StorageClass> {
432    fn from(value: api::StorageClass) -> Self {
433        match value {
434            api::StorageClass::Unspecified => None,
435            api::StorageClass::Standard => Some(StorageClass::Standard),
436            api::StorageClass::Express => Some(StorageClass::Express),
437        }
438    }
439}
440
441impl FromStr for StorageClass {
442    type Err = ConvertError;
443
444    fn from_str(value: &str) -> Result<Self, Self::Err> {
445        match value {
446            "standard" => Ok(Self::Standard),
447            "express" => Ok(Self::Express),
448            v => Err(format!("unknown storage class: {v}").into()),
449        }
450    }
451}
452
453#[sync_docs(Age = "Age")]
454#[derive(Debug, Clone)]
455pub enum RetentionPolicy {
456    Age(Duration),
457}
458
459impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
460    fn from(value: RetentionPolicy) -> Self {
461        match value {
462            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
463        }
464    }
465}
466
467impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
468    fn from(value: api::stream_config::RetentionPolicy) -> Self {
469        match value {
470            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
471        }
472    }
473}
474
475#[sync_docs]
476#[derive(Debug, Clone, Copy, PartialEq, Eq)]
477pub enum BasinState {
478    Active,
479    Creating,
480    Deleting,
481}
482
483impl From<BasinState> for api::BasinState {
484    fn from(value: BasinState) -> Self {
485        match value {
486            BasinState::Active => Self::Active,
487            BasinState::Creating => Self::Creating,
488            BasinState::Deleting => Self::Deleting,
489        }
490    }
491}
492
493impl From<api::BasinState> for Option<BasinState> {
494    fn from(value: api::BasinState) -> Self {
495        match value {
496            api::BasinState::Unspecified => None,
497            api::BasinState::Active => Some(BasinState::Active),
498            api::BasinState::Creating => Some(BasinState::Creating),
499            api::BasinState::Deleting => Some(BasinState::Deleting),
500        }
501    }
502}
503
504impl std::fmt::Display for BasinState {
505    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
506        match self {
507            BasinState::Active => write!(f, "active"),
508            BasinState::Creating => write!(f, "creating"),
509            BasinState::Deleting => write!(f, "deleting"),
510        }
511    }
512}
513
514#[sync_docs]
515#[derive(Debug, Clone)]
516pub struct BasinInfo {
517    pub name: String,
518    pub scope: Option<BasinScope>,
519    pub state: Option<BasinState>,
520}
521
522impl From<BasinInfo> for api::BasinInfo {
523    fn from(value: BasinInfo) -> Self {
524        let BasinInfo { name, scope, state } = value;
525        Self {
526            name,
527            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
528            state: state.map(api::BasinState::from).unwrap_or_default().into(),
529        }
530    }
531}
532
533impl From<api::BasinInfo> for BasinInfo {
534    fn from(value: api::BasinInfo) -> Self {
535        let scope = value.scope().into();
536        let state = value.state().into();
537        let name = value.name;
538        Self { name, scope, state }
539    }
540}
541
542impl TryFrom<api::CreateBasinResponse> for BasinInfo {
543    type Error = ConvertError;
544    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
545        let api::CreateBasinResponse { info } = value;
546        let info = info.ok_or("missing basin info")?;
547        Ok(info.into())
548    }
549}
550
551#[sync_docs]
552#[derive(Debug, Clone, Default)]
553pub struct ListStreamsRequest {
554    pub prefix: String,
555    pub start_after: String,
556    pub limit: Option<usize>,
557}
558
559impl ListStreamsRequest {
560    /// Create a new request.
561    pub fn new() -> Self {
562        Self::default()
563    }
564
565    /// Overwrite prefix.
566    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
567        Self {
568            prefix: prefix.into(),
569            ..self
570        }
571    }
572
573    /// Overwrite start after.
574    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
575        Self {
576            start_after: start_after.into(),
577            ..self
578        }
579    }
580
581    /// Overwrite limit.
582    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
583        Self {
584            limit: limit.into(),
585            ..self
586        }
587    }
588}
589
590impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
591    type Error = ConvertError;
592    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
593        let ListStreamsRequest {
594            prefix,
595            start_after,
596            limit,
597        } = value;
598        Ok(Self {
599            prefix,
600            start_after,
601            limit: limit.map(|n| n as u64),
602        })
603    }
604}
605
606#[sync_docs]
607#[derive(Debug, Clone)]
608pub struct StreamInfo {
609    pub name: String,
610    pub created_at: u32,
611    pub deleted_at: Option<u32>,
612}
613
614impl From<api::StreamInfo> for StreamInfo {
615    fn from(value: api::StreamInfo) -> Self {
616        Self {
617            name: value.name,
618            created_at: value.created_at,
619            deleted_at: value.deleted_at,
620        }
621    }
622}
623
624impl TryFrom<api::CreateStreamResponse> for StreamInfo {
625    type Error = ConvertError;
626
627    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
628        let api::CreateStreamResponse { info } = value;
629        let info = info.ok_or("missing stream info")?;
630        Ok(info.into())
631    }
632}
633
634#[sync_docs]
635#[derive(Debug, Clone)]
636pub struct ListStreamsResponse {
637    pub streams: Vec<StreamInfo>,
638    pub has_more: bool,
639}
640
641impl From<api::ListStreamsResponse> for ListStreamsResponse {
642    fn from(value: api::ListStreamsResponse) -> Self {
643        let api::ListStreamsResponse { streams, has_more } = value;
644        let streams = streams.into_iter().map(Into::into).collect();
645        Self { streams, has_more }
646    }
647}
648
649impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
650    type Error = ConvertError;
651
652    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
653        let api::GetBasinConfigResponse { config } = value;
654        let config = config.ok_or("missing basin config")?;
655        Ok(config.into())
656    }
657}
658
659impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
660    type Error = ConvertError;
661
662    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
663        let api::GetStreamConfigResponse { config } = value;
664        let config = config.ok_or("missing stream config")?;
665        Ok(config.into())
666    }
667}
668
669#[sync_docs]
670#[derive(Debug, Clone)]
671pub struct CreateStreamRequest {
672    pub stream: String,
673    pub config: Option<StreamConfig>,
674}
675
676impl CreateStreamRequest {
677    /// Create a new request with stream name.
678    pub fn new(stream: impl Into<String>) -> Self {
679        Self {
680            stream: stream.into(),
681            config: None,
682        }
683    }
684
685    /// Overwrite stream config.
686    pub fn with_config(self, config: StreamConfig) -> Self {
687        Self {
688            config: Some(config),
689            ..self
690        }
691    }
692}
693
694impl From<CreateStreamRequest> for api::CreateStreamRequest {
695    fn from(value: CreateStreamRequest) -> Self {
696        let CreateStreamRequest { stream, config } = value;
697        Self {
698            stream,
699            config: config.map(Into::into),
700        }
701    }
702}
703
704#[sync_docs]
705#[derive(Debug, Clone, Default)]
706pub struct ListBasinsRequest {
707    pub prefix: String,
708    pub start_after: String,
709    pub limit: Option<usize>,
710}
711
712impl ListBasinsRequest {
713    /// Create a new request.
714    pub fn new() -> Self {
715        Self::default()
716    }
717
718    /// Overwrite prefix.
719    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
720        Self {
721            prefix: prefix.into(),
722            ..self
723        }
724    }
725
726    /// Overwrite start after.
727    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
728        Self {
729            start_after: start_after.into(),
730            ..self
731        }
732    }
733
734    /// Overwrite limit.
735    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
736        Self {
737            limit: limit.into(),
738            ..self
739        }
740    }
741}
742
743impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
744    type Error = ConvertError;
745    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
746        let ListBasinsRequest {
747            prefix,
748            start_after,
749            limit,
750        } = value;
751        Ok(Self {
752            prefix,
753            start_after,
754            limit: limit
755                .map(TryInto::try_into)
756                .transpose()
757                .map_err(|_| "request limit does not fit into u64 bounds")?,
758        })
759    }
760}
761
762#[sync_docs]
763#[derive(Debug, Clone)]
764pub struct ListBasinsResponse {
765    pub basins: Vec<BasinInfo>,
766    pub has_more: bool,
767}
768
769impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
770    type Error = ConvertError;
771    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
772        let api::ListBasinsResponse { basins, has_more } = value;
773        Ok(Self {
774            basins: basins.into_iter().map(Into::into).collect(),
775            has_more,
776        })
777    }
778}
779
780#[sync_docs]
781#[derive(Debug, Clone)]
782pub struct DeleteBasinRequest {
783    pub basin: BasinName,
784    /// Delete basin if it exists else do nothing.
785    pub if_exists: bool,
786}
787
788impl DeleteBasinRequest {
789    /// Create a new request.
790    pub fn new(basin: BasinName) -> Self {
791        Self {
792            basin,
793            if_exists: false,
794        }
795    }
796
797    /// Overwrite the if exists parameter.
798    pub fn with_if_exists(self, if_exists: bool) -> Self {
799        Self { if_exists, ..self }
800    }
801}
802
803impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
804    fn from(value: DeleteBasinRequest) -> Self {
805        let DeleteBasinRequest { basin, .. } = value;
806        Self { basin: basin.0 }
807    }
808}
809
810#[sync_docs]
811#[derive(Debug, Clone)]
812pub struct DeleteStreamRequest {
813    pub stream: String,
814    /// Delete stream if it exists else do nothing.
815    pub if_exists: bool,
816}
817
818impl DeleteStreamRequest {
819    /// Create a new request.
820    pub fn new(stream: impl Into<String>) -> Self {
821        Self {
822            stream: stream.into(),
823            if_exists: false,
824        }
825    }
826
827    /// Overwrite the if exists parameter.
828    pub fn with_if_exists(self, if_exists: bool) -> Self {
829        Self { if_exists, ..self }
830    }
831}
832
833impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
834    fn from(value: DeleteStreamRequest) -> Self {
835        let DeleteStreamRequest { stream, .. } = value;
836        Self { stream }
837    }
838}
839
840#[sync_docs]
841#[derive(Debug, Clone)]
842pub struct ReconfigureBasinRequest {
843    pub basin: BasinName,
844    pub config: Option<BasinConfig>,
845    pub mask: Option<Vec<String>>,
846}
847
848impl ReconfigureBasinRequest {
849    /// Create a new request with basin name.
850    pub fn new(basin: BasinName) -> Self {
851        Self {
852            basin,
853            config: None,
854            mask: None,
855        }
856    }
857
858    /// Overwrite basin config.
859    pub fn with_config(self, config: BasinConfig) -> Self {
860        Self {
861            config: Some(config),
862            ..self
863        }
864    }
865
866    /// Overwrite field mask.
867    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
868        Self {
869            mask: Some(mask.into()),
870            ..self
871        }
872    }
873}
874
875impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
876    fn from(value: ReconfigureBasinRequest) -> Self {
877        let ReconfigureBasinRequest {
878            basin,
879            config,
880            mask,
881        } = value;
882        Self {
883            basin: basin.0,
884            config: config.map(Into::into),
885            mask: mask.map(|paths| prost_types::FieldMask { paths }),
886        }
887    }
888}
889
890impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
891    type Error = ConvertError;
892    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
893        let api::ReconfigureBasinResponse { config } = value;
894        let config = config.ok_or("missing basin config")?;
895        Ok(config.into())
896    }
897}
898
899#[sync_docs]
900#[derive(Debug, Clone)]
901pub struct ReconfigureStreamRequest {
902    pub stream: String,
903    pub config: Option<StreamConfig>,
904    pub mask: Option<Vec<String>>,
905}
906
907impl ReconfigureStreamRequest {
908    /// Create a new request with stream name.
909    pub fn new(stream: impl Into<String>) -> Self {
910        Self {
911            stream: stream.into(),
912            config: None,
913            mask: None,
914        }
915    }
916
917    /// Overwrite stream config.
918    pub fn with_config(self, config: StreamConfig) -> Self {
919        Self {
920            config: Some(config),
921            ..self
922        }
923    }
924
925    /// Overwrite field mask.
926    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
927        Self {
928            mask: Some(mask.into()),
929            ..self
930        }
931    }
932}
933
934impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
935    fn from(value: ReconfigureStreamRequest) -> Self {
936        let ReconfigureStreamRequest {
937            stream,
938            config,
939            mask,
940        } = value;
941        Self {
942            stream,
943            config: config.map(Into::into),
944            mask: mask.map(|paths| prost_types::FieldMask { paths }),
945        }
946    }
947}
948
949impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
950    type Error = ConvertError;
951    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
952        let api::ReconfigureStreamResponse { config } = value;
953        let config = config.ok_or("missing stream config")?;
954        Ok(config.into())
955    }
956}
957
958impl From<api::CheckTailResponse> for StreamPosition {
959    fn from(value: api::CheckTailResponse) -> Self {
960        let api::CheckTailResponse {
961            next_seq_num,
962            last_timestamp,
963        } = value;
964        StreamPosition {
965            seq_num: next_seq_num,
966            timestamp: last_timestamp,
967        }
968    }
969}
970
971/// Position of a record in a stream.
972#[derive(Debug, Clone, PartialEq, Eq)]
973pub struct StreamPosition {
974    /// Sequence number assigned by the service.
975    pub seq_num: u64,
976    /// Timestamp, which may be user-specified or assigned by the service.
977    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
978    pub timestamp: u64,
979}
980
981#[sync_docs]
982#[derive(Debug, Clone, PartialEq, Eq)]
983pub struct Header {
984    pub name: Bytes,
985    pub value: Bytes,
986}
987
988impl Header {
989    /// Create a new header from name and value.
990    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
991        Self {
992            name: name.into(),
993            value: value.into(),
994        }
995    }
996
997    /// Create a new header from value.
998    pub fn from_value(value: impl Into<Bytes>) -> Self {
999        Self {
1000            name: Bytes::new(),
1001            value: value.into(),
1002        }
1003    }
1004}
1005
1006impl From<Header> for api::Header {
1007    fn from(value: Header) -> Self {
1008        let Header { name, value } = value;
1009        Self { name, value }
1010    }
1011}
1012
1013impl From<api::Header> for Header {
1014    fn from(value: api::Header) -> Self {
1015        let api::Header { name, value } = value;
1016        Self { name, value }
1017    }
1018}
1019
1020/// A fencing token can be enforced on append requests.
1021///
1022/// Must not be more than 36 bytes.
1023#[derive(Debug, Clone, Default, PartialEq, Eq)]
1024pub struct FencingToken(String);
1025
1026impl FencingToken {
1027    const MAX_BYTES: usize = 36;
1028
1029    /// Generate a random alphanumeric fencing token of `n` bytes.
1030    pub fn generate(n: usize) -> Result<Self, ConvertError> {
1031        rand::rng()
1032            .sample_iter(&rand::distr::Alphanumeric)
1033            .take(n)
1034            .map(char::from)
1035            .collect::<String>()
1036            .parse()
1037    }
1038}
1039
1040impl Deref for FencingToken {
1041    type Target = str;
1042
1043    fn deref(&self) -> &Self::Target {
1044        &self.0
1045    }
1046}
1047
1048impl std::fmt::Display for FencingToken {
1049    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1050        write!(f, "{}", self.0)
1051    }
1052}
1053
1054impl FromStr for FencingToken {
1055    type Err = ConvertError;
1056
1057    fn from_str(value: &str) -> Result<Self, Self::Err> {
1058        value.to_string().try_into()
1059    }
1060}
1061
1062impl TryFrom<String> for FencingToken {
1063    type Error = ConvertError;
1064
1065    fn try_from(value: String) -> Result<Self, Self::Error> {
1066        if value.len() > Self::MAX_BYTES {
1067            Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1068        } else {
1069            Ok(Self(value))
1070        }
1071    }
1072}
1073
1074impl From<FencingToken> for String {
1075    fn from(value: FencingToken) -> Self {
1076        value.0
1077    }
1078}
1079
1080/// Command to send through a `CommandRecord`.
1081#[derive(Debug, Clone)]
1082pub enum Command {
1083    /// Enforce a fencing token.
1084    ///
1085    /// Fencing is strongly consistent, and subsequent appends that specify a
1086    /// fencing token will be rejected if it does not match.
1087    Fence {
1088        /// Fencing token to enforce.
1089        ///
1090        /// Set empty to clear the token.
1091        fencing_token: FencingToken,
1092    },
1093    /// Request a trim till the sequence number.
1094    ///
1095    /// Trimming is eventually consistent, and trimmed records may be visible
1096    /// for a brief period
1097    Trim {
1098        /// Trim point.
1099        ///
1100        /// This sequence number is only allowed to advance, and any regression
1101        /// will be ignored.
1102        seq_num: u64,
1103    },
1104}
1105
1106/// A command record is a special kind of [`AppendRecord`] that can be used to
1107/// send command messages.
1108///
1109/// Such a record is signalled by a sole header with empty name. The header
1110/// value represents the operation and record body acts as the payload.
1111#[derive(Debug, Clone)]
1112pub struct CommandRecord {
1113    /// Command kind.
1114    pub command: Command,
1115    /// Timestamp for the record.
1116    pub timestamp: Option<u64>,
1117}
1118
1119impl CommandRecord {
1120    const FENCE: &[u8] = b"fence";
1121    const TRIM: &[u8] = b"trim";
1122
1123    /// Create a new fence command record.
1124    pub fn fence(fencing_token: FencingToken) -> Self {
1125        Self {
1126            command: Command::Fence { fencing_token },
1127            timestamp: None,
1128        }
1129    }
1130
1131    /// Create a new trim command record.
1132    pub fn trim(seq_num: impl Into<u64>) -> Self {
1133        Self {
1134            command: Command::Trim {
1135                seq_num: seq_num.into(),
1136            },
1137            timestamp: None,
1138        }
1139    }
1140
1141    /// Overwrite timestamp.
1142    pub fn with_timestamp(self, timestamp: u64) -> Self {
1143        Self {
1144            timestamp: Some(timestamp),
1145            ..self
1146        }
1147    }
1148}
1149
1150#[sync_docs]
1151#[derive(Debug, Clone, PartialEq, Eq)]
1152pub struct AppendRecord {
1153    timestamp: Option<u64>,
1154    headers: Vec<Header>,
1155    body: Bytes,
1156    #[cfg(test)]
1157    max_bytes: u64,
1158}
1159
1160metered_impl!(AppendRecord);
1161
1162impl AppendRecord {
1163    const MAX_BYTES: u64 = MIB_BYTES;
1164
1165    fn validated(self) -> Result<Self, ConvertError> {
1166        #[cfg(test)]
1167        let max_bytes = self.max_bytes;
1168        #[cfg(not(test))]
1169        let max_bytes = Self::MAX_BYTES;
1170
1171        if self.metered_bytes() > max_bytes {
1172            Err("AppendRecord should have metered size less than 1 MiB".into())
1173        } else {
1174            Ok(self)
1175        }
1176    }
1177
1178    /// Try creating a new append record with body.
1179    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1180        Self {
1181            timestamp: None,
1182            headers: Vec::new(),
1183            body: body.into(),
1184            #[cfg(test)]
1185            max_bytes: Self::MAX_BYTES,
1186        }
1187        .validated()
1188    }
1189
1190    #[cfg(test)]
1191    pub(crate) fn with_max_bytes(
1192        max_bytes: u64,
1193        body: impl Into<Bytes>,
1194    ) -> Result<Self, ConvertError> {
1195        Self {
1196            timestamp: None,
1197            headers: Vec::new(),
1198            body: body.into(),
1199            max_bytes,
1200        }
1201        .validated()
1202    }
1203
1204    /// Overwrite headers.
1205    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1206        Self {
1207            headers: headers.into(),
1208            ..self
1209        }
1210        .validated()
1211    }
1212
1213    /// Overwrite timestamp.
1214    pub fn with_timestamp(self, timestamp: u64) -> Self {
1215        Self {
1216            timestamp: Some(timestamp),
1217            ..self
1218        }
1219    }
1220
1221    /// Body of the record.
1222    pub fn body(&self) -> &[u8] {
1223        &self.body
1224    }
1225
1226    /// Headers of the record.
1227    pub fn headers(&self) -> &[Header] {
1228        &self.headers
1229    }
1230
1231    /// Timestamp for the record.
1232    pub fn timestamp(&self) -> Option<u64> {
1233        self.timestamp
1234    }
1235
1236    /// Consume the record and return parts.
1237    pub fn into_parts(self) -> AppendRecordParts {
1238        AppendRecordParts {
1239            timestamp: self.timestamp,
1240            headers: self.headers,
1241            body: self.body,
1242        }
1243    }
1244
1245    /// Try creating the record from parts.
1246    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1247        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1248        if let Some(timestamp) = parts.timestamp {
1249            Ok(record.with_timestamp(timestamp))
1250        } else {
1251            Ok(record)
1252        }
1253    }
1254}
1255
1256impl From<AppendRecord> for api::AppendRecord {
1257    fn from(value: AppendRecord) -> Self {
1258        Self {
1259            timestamp: value.timestamp,
1260            headers: value.headers.into_iter().map(Into::into).collect(),
1261            body: value.body,
1262        }
1263    }
1264}
1265
1266impl From<CommandRecord> for AppendRecord {
1267    fn from(value: CommandRecord) -> Self {
1268        let (header_value, body) = match value.command {
1269            Command::Fence { fencing_token } => (
1270                CommandRecord::FENCE,
1271                Bytes::copy_from_slice(fencing_token.as_bytes()),
1272            ),
1273            Command::Trim { seq_num } => (
1274                CommandRecord::TRIM,
1275                Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1276            ),
1277        };
1278        AppendRecordParts {
1279            timestamp: value.timestamp,
1280            headers: vec![Header::from_value(header_value)],
1281            body,
1282        }
1283        .try_into()
1284        .expect("command record is a valid append record")
1285    }
1286}
1287
1288#[sync_docs(AppendRecordParts = "AppendRecord")]
1289#[derive(Debug, Clone)]
1290pub struct AppendRecordParts {
1291    pub timestamp: Option<u64>,
1292    pub headers: Vec<Header>,
1293    pub body: Bytes,
1294}
1295
1296impl From<AppendRecord> for AppendRecordParts {
1297    fn from(value: AppendRecord) -> Self {
1298        value.into_parts()
1299    }
1300}
1301
1302impl TryFrom<AppendRecordParts> for AppendRecord {
1303    type Error = ConvertError;
1304
1305    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1306        Self::try_from_parts(value)
1307    }
1308}
1309
1310/// A collection of append records that can be sent together in a batch.
1311#[derive(Debug, Clone)]
1312pub struct AppendRecordBatch {
1313    records: Vec<AppendRecord>,
1314    metered_bytes: u64,
1315    max_capacity: usize,
1316    #[cfg(test)]
1317    max_bytes: u64,
1318}
1319
1320impl PartialEq for AppendRecordBatch {
1321    fn eq(&self, other: &Self) -> bool {
1322        if self.records.eq(&other.records) {
1323            assert_eq!(self.metered_bytes, other.metered_bytes);
1324            true
1325        } else {
1326            false
1327        }
1328    }
1329}
1330
1331impl Eq for AppendRecordBatch {}
1332
1333impl Default for AppendRecordBatch {
1334    fn default() -> Self {
1335        Self::new()
1336    }
1337}
1338
1339impl AppendRecordBatch {
1340    /// Maximum number of records that a batch can hold.
1341    ///
1342    /// A record batch cannot be created with a bigger capacity.
1343    pub const MAX_CAPACITY: usize = 1000;
1344
1345    /// Maximum metered bytes of the batch.
1346    pub const MAX_BYTES: u64 = MIB_BYTES;
1347
1348    /// Create an empty record batch.
1349    pub fn new() -> Self {
1350        Self::with_max_capacity(Self::MAX_CAPACITY)
1351    }
1352
1353    /// Create an empty record batch with custom max capacity.
1354    ///
1355    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1356    pub fn with_max_capacity(max_capacity: usize) -> Self {
1357        assert!(
1358            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1359            "Batch capacity must be between 1 and 1000"
1360        );
1361
1362        Self {
1363            records: Vec::with_capacity(max_capacity),
1364            metered_bytes: 0,
1365            max_capacity,
1366            #[cfg(test)]
1367            max_bytes: Self::MAX_BYTES,
1368        }
1369    }
1370
1371    #[cfg(test)]
1372    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1373        #[cfg(test)]
1374        assert!(
1375            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1376            "Batch size must be between 1 byte and 1 MiB"
1377        );
1378
1379        Self {
1380            max_bytes,
1381            ..Self::with_max_capacity(max_capacity)
1382        }
1383    }
1384
1385    /// Try creating a record batch from an iterator.
1386    ///
1387    /// If all the items of the iterator cannot be drained into the batch, the
1388    /// error returned contains a batch containing all records it could fit
1389    /// along-with the left over items from the iterator.
1390    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1391    where
1392        R: Into<AppendRecord>,
1393        T: IntoIterator<Item = R>,
1394    {
1395        let mut records = Self::new();
1396        let mut pending = Vec::new();
1397
1398        let mut iter = iter.into_iter();
1399
1400        for record in iter.by_ref() {
1401            if let Err(record) = records.push(record) {
1402                pending.push(record);
1403                break;
1404            }
1405        }
1406
1407        if pending.is_empty() {
1408            Ok(records)
1409        } else {
1410            pending.extend(iter.map(Into::into));
1411            Err((records, pending))
1412        }
1413    }
1414
1415    /// Returns true if the batch contains no records.
1416    pub fn is_empty(&self) -> bool {
1417        if self.records.is_empty() {
1418            assert_eq!(self.metered_bytes, 0);
1419            true
1420        } else {
1421            false
1422        }
1423    }
1424
1425    /// Returns the number of records contained in the batch.
1426    pub fn len(&self) -> usize {
1427        self.records.len()
1428    }
1429
1430    #[cfg(test)]
1431    fn max_bytes(&self) -> u64 {
1432        self.max_bytes
1433    }
1434
1435    #[cfg(not(test))]
1436    fn max_bytes(&self) -> u64 {
1437        Self::MAX_BYTES
1438    }
1439
1440    /// Returns true if the batch cannot fit any more records.
1441    pub fn is_full(&self) -> bool {
1442        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1443    }
1444
1445    /// Try to append a new record into the batch.
1446    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1447        assert!(self.records.len() <= self.max_capacity);
1448        assert!(self.metered_bytes <= self.max_bytes());
1449
1450        let record = record.into();
1451        let record_size = record.metered_bytes();
1452        if self.records.len() >= self.max_capacity
1453            || self.metered_bytes + record_size > self.max_bytes()
1454        {
1455            Err(record)
1456        } else {
1457            self.records.push(record);
1458            self.metered_bytes += record_size;
1459            Ok(())
1460        }
1461    }
1462}
1463
1464impl MeteredBytes for AppendRecordBatch {
1465    fn metered_bytes(&self) -> u64 {
1466        self.metered_bytes
1467    }
1468}
1469
1470impl IntoIterator for AppendRecordBatch {
1471    type Item = AppendRecord;
1472    type IntoIter = std::vec::IntoIter<Self::Item>;
1473
1474    fn into_iter(self) -> Self::IntoIter {
1475        self.records.into_iter()
1476    }
1477}
1478
1479impl<'a> IntoIterator for &'a AppendRecordBatch {
1480    type Item = &'a AppendRecord;
1481    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1482
1483    fn into_iter(self) -> Self::IntoIter {
1484        self.records.iter()
1485    }
1486}
1487
1488impl AsRef<[AppendRecord]> for AppendRecordBatch {
1489    fn as_ref(&self) -> &[AppendRecord] {
1490        &self.records
1491    }
1492}
1493
1494#[sync_docs]
1495#[derive(Debug, Default, Clone)]
1496pub struct AppendInput {
1497    pub records: AppendRecordBatch,
1498    pub match_seq_num: Option<u64>,
1499    pub fencing_token: Option<FencingToken>,
1500}
1501
1502impl MeteredBytes for AppendInput {
1503    fn metered_bytes(&self) -> u64 {
1504        self.records.metered_bytes()
1505    }
1506}
1507
1508impl AppendInput {
1509    /// Create a new append input from record batch.
1510    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1511        Self {
1512            records: records.into(),
1513            match_seq_num: None,
1514            fencing_token: None,
1515        }
1516    }
1517
1518    /// Overwrite match sequence number.
1519    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1520        Self {
1521            match_seq_num: Some(match_seq_num.into()),
1522            ..self
1523        }
1524    }
1525
1526    /// Overwrite fencing token.
1527    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1528        Self {
1529            fencing_token: Some(fencing_token),
1530            ..self
1531        }
1532    }
1533
1534    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1535        let Self {
1536            records,
1537            match_seq_num,
1538            fencing_token,
1539        } = self;
1540
1541        api::AppendInput {
1542            stream: stream.into(),
1543            records: records.into_iter().map(Into::into).collect(),
1544            match_seq_num,
1545            fencing_token: fencing_token.map(|f| f.0),
1546        }
1547    }
1548}
1549
1550/// Acknowledgment to an append request.
1551#[derive(Debug, Clone)]
1552pub struct AppendAck {
1553    /// Sequence number and timestamp of the first record that was appended.
1554    pub start: StreamPosition,
1555    /// Sequence number of the last record that was appended + 1,
1556    /// and timestamp of the last record that was appended.
1557    /// The difference between `end.seq_num` and `start.seq_num`
1558    /// will be the number of records appended.
1559    pub end: StreamPosition,
1560    /// Sequence number that will be assigned to the next record on the stream,
1561    /// and timestamp of the last record on the stream.
1562    /// This can be greater than the `end` position in case of concurrent appends.
1563    pub tail: StreamPosition,
1564}
1565
1566impl From<api::AppendOutput> for AppendAck {
1567    fn from(value: api::AppendOutput) -> Self {
1568        let api::AppendOutput {
1569            start_seq_num,
1570            start_timestamp,
1571            end_seq_num,
1572            end_timestamp,
1573            next_seq_num,
1574            last_timestamp,
1575        } = value;
1576        let start = StreamPosition {
1577            seq_num: start_seq_num,
1578            timestamp: start_timestamp,
1579        };
1580        let end = StreamPosition {
1581            seq_num: end_seq_num,
1582            timestamp: end_timestamp,
1583        };
1584        let tail = StreamPosition {
1585            seq_num: next_seq_num,
1586            timestamp: last_timestamp,
1587        };
1588        Self { start, end, tail }
1589    }
1590}
1591
1592impl TryFrom<api::AppendResponse> for AppendAck {
1593    type Error = ConvertError;
1594    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1595        let api::AppendResponse { output } = value;
1596        let output = output.ok_or("missing append output")?;
1597        Ok(output.into())
1598    }
1599}
1600
1601impl TryFrom<api::AppendSessionResponse> for AppendAck {
1602    type Error = ConvertError;
1603    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1604        let api::AppendSessionResponse { output } = value;
1605        let output = output.ok_or("missing append output")?;
1606        Ok(output.into())
1607    }
1608}
1609
1610#[sync_docs]
1611#[derive(Debug, Clone, Default)]
1612pub struct ReadLimit {
1613    pub count: Option<u64>,
1614    pub bytes: Option<u64>,
1615}
1616
1617impl ReadLimit {
1618    /// Create a new read limit.
1619    pub fn new() -> Self {
1620        Self::default()
1621    }
1622
1623    /// Overwrite count limit.
1624    pub fn with_count(self, count: u64) -> Self {
1625        Self {
1626            count: Some(count),
1627            ..self
1628        }
1629    }
1630
1631    /// Overwrite bytes limit.
1632    pub fn with_bytes(self, bytes: u64) -> Self {
1633        Self {
1634            bytes: Some(bytes),
1635            ..self
1636        }
1637    }
1638}
1639
1640/// Starting point for read requests.
1641#[derive(Debug, Clone)]
1642pub enum ReadStart {
1643    /// Sequence number.
1644    SeqNum(u64),
1645    /// Timestamp.
1646    Timestamp(u64),
1647    /// Number of records before the tail, i.e. the next sequence number.
1648    TailOffset(u64),
1649}
1650
1651impl Default for ReadStart {
1652    fn default() -> Self {
1653        Self::SeqNum(0)
1654    }
1655}
1656
1657impl From<ReadStart> for api::read_request::Start {
1658    fn from(start: ReadStart) -> Self {
1659        match start {
1660            ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1661            ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1662            ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1663        }
1664    }
1665}
1666
1667impl From<ReadStart> for api::read_session_request::Start {
1668    fn from(start: ReadStart) -> Self {
1669        match start {
1670            ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1671            ReadStart::Timestamp(timestamp) => {
1672                api::read_session_request::Start::Timestamp(timestamp)
1673            }
1674            ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1675        }
1676    }
1677}
1678
1679#[sync_docs]
1680#[derive(Debug, Clone, Default)]
1681pub struct ReadRequest {
1682    pub start: ReadStart,
1683    pub limit: ReadLimit,
1684    pub until: Option<RangeTo<u64>>,
1685    pub clamp: bool,
1686}
1687
1688impl ReadRequest {
1689    /// Create a new request with the specified starting point.
1690    pub fn new(start: ReadStart) -> Self {
1691        Self {
1692            start,
1693            ..Default::default()
1694        }
1695    }
1696
1697    /// Overwrite limit.
1698    pub fn with_limit(self, limit: ReadLimit) -> Self {
1699        Self { limit, ..self }
1700    }
1701
1702    /// Provide an `until` timestamp.
1703    pub fn with_until(self, until: RangeTo<u64>) -> Self {
1704        Self {
1705            until: Some(until),
1706            ..self
1707        }
1708    }
1709
1710    /// Clamp the start position at the tail position.
1711    pub fn with_clamp(self, clamp: bool) -> Self {
1712        Self { clamp, ..self }
1713    }
1714}
1715
1716impl ReadRequest {
1717    pub(crate) fn try_into_api_type(
1718        self,
1719        stream: impl Into<String>,
1720    ) -> Result<api::ReadRequest, ConvertError> {
1721        let Self {
1722            start,
1723            limit,
1724            until,
1725            clamp,
1726        } = self;
1727
1728        let limit = if limit.count > Some(1000) {
1729            Err("read limit: count must not exceed 1000 for unary request")
1730        } else if limit.bytes > Some(MIB_BYTES) {
1731            Err("read limit: bytes must not exceed 1MiB for unary request")
1732        } else {
1733            Ok(api::ReadLimit {
1734                count: limit.count,
1735                bytes: limit.bytes,
1736            })
1737        }?;
1738
1739        Ok(api::ReadRequest {
1740            stream: stream.into(),
1741            start: Some(start.into()),
1742            limit: Some(limit),
1743            until: until.map(|range| range.end),
1744            clamp,
1745        })
1746    }
1747}
1748
1749#[sync_docs]
1750#[derive(Debug, Clone)]
1751pub struct SequencedRecord {
1752    pub seq_num: u64,
1753    pub timestamp: u64,
1754    pub headers: Vec<Header>,
1755    pub body: Bytes,
1756}
1757
1758metered_impl!(SequencedRecord);
1759
1760impl From<api::SequencedRecord> for SequencedRecord {
1761    fn from(value: api::SequencedRecord) -> Self {
1762        let api::SequencedRecord {
1763            seq_num,
1764            timestamp,
1765            headers,
1766            body,
1767        } = value;
1768        Self {
1769            seq_num,
1770            timestamp,
1771            headers: headers.into_iter().map(Into::into).collect(),
1772            body,
1773        }
1774    }
1775}
1776
1777impl SequencedRecord {
1778    /// Try representing the sequenced record as a command record.
1779    pub fn as_command_record(&self) -> Option<CommandRecord> {
1780        if self.headers.len() != 1 {
1781            return None;
1782        }
1783
1784        let header = self.headers.first().expect("pre-validated length");
1785
1786        if !header.name.is_empty() {
1787            return None;
1788        }
1789
1790        match header.value.as_ref() {
1791            CommandRecord::FENCE => {
1792                let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1793                Some(CommandRecord {
1794                    command: Command::Fence { fencing_token },
1795                    timestamp: Some(self.timestamp),
1796                })
1797            }
1798            CommandRecord::TRIM => {
1799                let body: &[u8] = &self.body;
1800                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1801                Some(CommandRecord {
1802                    command: Command::Trim { seq_num },
1803                    timestamp: Some(self.timestamp),
1804                })
1805            }
1806            _ => None,
1807        }
1808    }
1809}
1810
1811#[sync_docs]
1812#[derive(Debug, Clone)]
1813pub struct SequencedRecordBatch {
1814    pub records: Vec<SequencedRecord>,
1815}
1816
1817impl MeteredBytes for SequencedRecordBatch {
1818    fn metered_bytes(&self) -> u64 {
1819        self.records.metered_bytes()
1820    }
1821}
1822
1823impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1824    fn from(value: api::SequencedRecordBatch) -> Self {
1825        let api::SequencedRecordBatch { records } = value;
1826        Self {
1827            records: records.into_iter().map(Into::into).collect(),
1828        }
1829    }
1830}
1831
1832#[sync_docs(ReadOutput = "Output")]
1833#[derive(Debug, Clone)]
1834pub enum ReadOutput {
1835    Batch(SequencedRecordBatch),
1836    NextSeqNum(u64),
1837}
1838
1839impl From<api::read_output::Output> for ReadOutput {
1840    fn from(value: api::read_output::Output) -> Self {
1841        match value {
1842            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1843            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1844        }
1845    }
1846}
1847
1848impl TryFrom<api::ReadOutput> for ReadOutput {
1849    type Error = ConvertError;
1850    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1851        let api::ReadOutput { output } = value;
1852        let output = output.ok_or("missing read output")?;
1853        Ok(output.into())
1854    }
1855}
1856
1857impl TryFrom<api::ReadResponse> for ReadOutput {
1858    type Error = ConvertError;
1859    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1860        let api::ReadResponse { output } = value;
1861        let output = output.ok_or("missing output in read response")?;
1862        output.try_into()
1863    }
1864}
1865
1866#[sync_docs]
1867#[derive(Debug, Clone, Default)]
1868pub struct ReadSessionRequest {
1869    pub start: ReadStart,
1870    pub limit: ReadLimit,
1871    pub until: Option<RangeTo<u64>>,
1872    pub clamp: bool,
1873}
1874
1875impl ReadSessionRequest {
1876    /// Create a new request with the specified starting point.
1877    pub fn new(start: ReadStart) -> Self {
1878        Self {
1879            start,
1880            ..Default::default()
1881        }
1882    }
1883
1884    /// Overwrite limit.
1885    pub fn with_limit(self, limit: ReadLimit) -> Self {
1886        Self { limit, ..self }
1887    }
1888
1889    /// Provide an `until` timestamp.
1890    pub fn with_until(self, until: RangeTo<u64>) -> Self {
1891        Self {
1892            until: Some(until),
1893            ..self
1894        }
1895    }
1896
1897    /// Clamp the start position at the tail position.
1898    pub fn with_clamp(self, clamp: bool) -> Self {
1899        Self { clamp, ..self }
1900    }
1901
1902    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1903        let Self {
1904            start,
1905            limit,
1906            until,
1907            clamp,
1908        } = self;
1909        api::ReadSessionRequest {
1910            stream: stream.into(),
1911            start: Some(start.into()),
1912            limit: Some(api::ReadLimit {
1913                count: limit.count,
1914                bytes: limit.bytes,
1915            }),
1916            heartbeats: false,
1917            until: until.map(|range| range.end),
1918            clamp,
1919        }
1920    }
1921}
1922
1923impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1924    type Error = ConvertError;
1925    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1926        let api::ReadSessionResponse { output } = value;
1927        let output = output.ok_or("missing output in read session response")?;
1928        output.try_into()
1929    }
1930}
1931
1932/// Name of a basin.
1933///
1934/// Must be between 8 and 48 characters in length. Must comprise lowercase
1935/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1936#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1937pub struct BasinName(String);
1938
1939impl Deref for BasinName {
1940    type Target = str;
1941    fn deref(&self) -> &Self::Target {
1942        &self.0
1943    }
1944}
1945
1946impl TryFrom<String> for BasinName {
1947    type Error = ConvertError;
1948
1949    fn try_from(name: String) -> Result<Self, Self::Error> {
1950        if name.len() < 8 || name.len() > 48 {
1951            return Err("Basin name must be between 8 and 48 characters in length".into());
1952        }
1953
1954        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1955        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1956            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1957                .expect("Failed to compile basin name regex")
1958        });
1959
1960        if !regex.is_match(&name) {
1961            return Err(
1962                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1963                It cannot begin or end with a hyphen."
1964                    .into(),
1965            );
1966        }
1967
1968        Ok(Self(name))
1969    }
1970}
1971
1972impl FromStr for BasinName {
1973    type Err = ConvertError;
1974
1975    fn from_str(s: &str) -> Result<Self, Self::Err> {
1976        s.to_string().try_into()
1977    }
1978}
1979
1980impl std::fmt::Display for BasinName {
1981    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1982        f.write_str(&self.0)
1983    }
1984}
1985
1986impl From<BasinName> for String {
1987    fn from(value: BasinName) -> Self {
1988        value.0
1989    }
1990}
1991
1992/// Access token ID.
1993/// Must be between 1 and 96 characters.
1994#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1995pub struct AccessTokenId(String);
1996
1997impl Deref for AccessTokenId {
1998    type Target = str;
1999
2000    fn deref(&self) -> &Self::Target {
2001        &self.0
2002    }
2003}
2004
2005impl TryFrom<String> for AccessTokenId {
2006    type Error = ConvertError;
2007
2008    fn try_from(name: String) -> Result<Self, Self::Error> {
2009        if name.is_empty() {
2010            return Err("Access token ID must not be empty".into());
2011        }
2012
2013        if name.len() > 96 {
2014            return Err("Access token ID must not exceed 96 characters".into());
2015        }
2016
2017        Ok(Self(name))
2018    }
2019}
2020
2021impl From<AccessTokenId> for String {
2022    fn from(value: AccessTokenId) -> Self {
2023        value.0
2024    }
2025}
2026
2027impl FromStr for AccessTokenId {
2028    type Err = ConvertError;
2029
2030    fn from_str(s: &str) -> Result<Self, Self::Err> {
2031        s.to_string().try_into()
2032    }
2033}
2034
2035impl std::fmt::Display for AccessTokenId {
2036    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2037        f.write_str(&self.0)
2038    }
2039}
2040
2041impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
2042    fn from(value: AccessTokenInfo) -> Self {
2043        Self {
2044            info: Some(value.into()),
2045        }
2046    }
2047}
2048
2049#[sync_docs]
2050#[derive(Debug, Clone)]
2051pub struct AccessTokenInfo {
2052    pub id: AccessTokenId,
2053    pub expires_at: Option<u32>,
2054    pub auto_prefix_streams: bool,
2055    pub scope: Option<AccessTokenScope>,
2056}
2057
2058impl AccessTokenInfo {
2059    /// Create a new access token info.
2060    pub fn new(id: AccessTokenId) -> Self {
2061        Self {
2062            id,
2063            expires_at: None,
2064            auto_prefix_streams: false,
2065            scope: None,
2066        }
2067    }
2068
2069    /// Overwrite expiration time.
2070    pub fn with_expires_at(self, expires_at: u32) -> Self {
2071        Self {
2072            expires_at: Some(expires_at),
2073            ..self
2074        }
2075    }
2076
2077    /// Overwrite auto prefix streams.
2078    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2079        Self {
2080            auto_prefix_streams,
2081            ..self
2082        }
2083    }
2084
2085    /// Overwrite scope.
2086    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2087        Self {
2088            scope: Some(scope),
2089            ..self
2090        }
2091    }
2092}
2093
2094impl From<AccessTokenInfo> for api::AccessTokenInfo {
2095    fn from(value: AccessTokenInfo) -> Self {
2096        let AccessTokenInfo {
2097            id,
2098            expires_at,
2099            auto_prefix_streams,
2100            scope,
2101        } = value;
2102        Self {
2103            id: id.into(),
2104            expires_at,
2105            auto_prefix_streams,
2106            scope: scope.map(Into::into),
2107        }
2108    }
2109}
2110
2111impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2112    type Error = ConvertError;
2113
2114    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2115        let api::AccessTokenInfo {
2116            id,
2117            expires_at,
2118            auto_prefix_streams,
2119            scope,
2120        } = value;
2121        Ok(Self {
2122            id: id.try_into()?,
2123            expires_at,
2124            auto_prefix_streams,
2125            scope: scope.map(Into::into),
2126        })
2127    }
2128}
2129
2130#[sync_docs]
2131#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2132pub enum Operation {
2133    ListBasins,
2134    CreateBasin,
2135    DeleteBasin,
2136    ReconfigureBasin,
2137    GetBasinConfig,
2138    IssueAccessToken,
2139    RevokeAccessToken,
2140    ListAccessTokens,
2141    ListStreams,
2142    CreateStream,
2143    DeleteStream,
2144    GetStreamConfig,
2145    ReconfigureStream,
2146    CheckTail,
2147    Append,
2148    Read,
2149    Trim,
2150    Fence,
2151    AccountMetrics,
2152    BasinMetrics,
2153    StreamMetrics,
2154}
2155
2156impl FromStr for Operation {
2157    type Err = ConvertError;
2158
2159    fn from_str(s: &str) -> Result<Self, Self::Err> {
2160        match s.to_lowercase().as_str() {
2161            "list-basins" => Ok(Self::ListBasins),
2162            "create-basin" => Ok(Self::CreateBasin),
2163            "delete-basin" => Ok(Self::DeleteBasin),
2164            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2165            "get-basin-config" => Ok(Self::GetBasinConfig),
2166            "issue-access-token" => Ok(Self::IssueAccessToken),
2167            "revoke-access-token" => Ok(Self::RevokeAccessToken),
2168            "list-access-tokens" => Ok(Self::ListAccessTokens),
2169            "list-streams" => Ok(Self::ListStreams),
2170            "create-stream" => Ok(Self::CreateStream),
2171            "delete-stream" => Ok(Self::DeleteStream),
2172            "get-stream-config" => Ok(Self::GetStreamConfig),
2173            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2174            "check-tail" => Ok(Self::CheckTail),
2175            "append" => Ok(Self::Append),
2176            "read" => Ok(Self::Read),
2177            "trim" => Ok(Self::Trim),
2178            "fence" => Ok(Self::Fence),
2179            "account-metrics" => Ok(Self::AccountMetrics),
2180            "basin-metrics" => Ok(Self::BasinMetrics),
2181            "stream-metrics" => Ok(Self::StreamMetrics),
2182            _ => Err("invalid operation".into()),
2183        }
2184    }
2185}
2186
2187impl From<Operation> for api::Operation {
2188    fn from(value: Operation) -> Self {
2189        match value {
2190            Operation::ListBasins => Self::ListBasins,
2191            Operation::CreateBasin => Self::CreateBasin,
2192            Operation::DeleteBasin => Self::DeleteBasin,
2193            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2194            Operation::GetBasinConfig => Self::GetBasinConfig,
2195            Operation::IssueAccessToken => Self::IssueAccessToken,
2196            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2197            Operation::ListAccessTokens => Self::ListAccessTokens,
2198            Operation::ListStreams => Self::ListStreams,
2199            Operation::CreateStream => Self::CreateStream,
2200            Operation::DeleteStream => Self::DeleteStream,
2201            Operation::GetStreamConfig => Self::GetStreamConfig,
2202            Operation::ReconfigureStream => Self::ReconfigureStream,
2203            Operation::CheckTail => Self::CheckTail,
2204            Operation::Append => Self::Append,
2205            Operation::Read => Self::Read,
2206            Operation::Trim => Self::Trim,
2207            Operation::Fence => Self::Fence,
2208            Operation::AccountMetrics => Self::AccountMetrics,
2209            Operation::BasinMetrics => Self::BasinMetrics,
2210            Operation::StreamMetrics => Self::StreamMetrics,
2211        }
2212    }
2213}
2214
2215impl From<api::Operation> for Option<Operation> {
2216    fn from(value: api::Operation) -> Self {
2217        match value {
2218            api::Operation::Unspecified => None,
2219            api::Operation::ListBasins => Some(Operation::ListBasins),
2220            api::Operation::CreateBasin => Some(Operation::CreateBasin),
2221            api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2222            api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2223            api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2224            api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2225            api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2226            api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2227            api::Operation::ListStreams => Some(Operation::ListStreams),
2228            api::Operation::CreateStream => Some(Operation::CreateStream),
2229            api::Operation::DeleteStream => Some(Operation::DeleteStream),
2230            api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2231            api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2232            api::Operation::CheckTail => Some(Operation::CheckTail),
2233            api::Operation::Append => Some(Operation::Append),
2234            api::Operation::Read => Some(Operation::Read),
2235            api::Operation::Trim => Some(Operation::Trim),
2236            api::Operation::Fence => Some(Operation::Fence),
2237            api::Operation::AccountMetrics => Some(Operation::AccountMetrics),
2238            api::Operation::BasinMetrics => Some(Operation::BasinMetrics),
2239            api::Operation::StreamMetrics => Some(Operation::StreamMetrics),
2240        }
2241    }
2242}
2243
2244#[sync_docs]
2245#[derive(Debug, Clone, Default)]
2246pub struct AccessTokenScope {
2247    pub basins: Option<ResourceSet>,
2248    pub streams: Option<ResourceSet>,
2249    pub access_tokens: Option<ResourceSet>,
2250    pub op_groups: Option<PermittedOperationGroups>,
2251    pub ops: HashSet<Operation>,
2252}
2253
2254impl AccessTokenScope {
2255    /// Create a new access token scope.
2256    pub fn new() -> Self {
2257        Self::default()
2258    }
2259
2260    /// Overwrite resource set for access tokens.
2261    pub fn with_basins(self, basins: ResourceSet) -> Self {
2262        Self {
2263            basins: Some(basins),
2264            ..self
2265        }
2266    }
2267
2268    /// Overwrite resource set for streams.
2269    pub fn with_streams(self, streams: ResourceSet) -> Self {
2270        Self {
2271            streams: Some(streams),
2272            ..self
2273        }
2274    }
2275
2276    /// Overwrite resource set for access tokens.
2277    pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2278        Self {
2279            access_tokens: Some(access_tokens),
2280            ..self
2281        }
2282    }
2283
2284    /// Overwrite operation groups.
2285    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2286        Self {
2287            op_groups: Some(op_groups),
2288            ..self
2289        }
2290    }
2291
2292    /// Overwrite operations.
2293    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2294        Self {
2295            ops: ops.into_iter().collect(),
2296            ..self
2297        }
2298    }
2299
2300    /// Add an operation to operations.
2301    pub fn with_op(self, op: Operation) -> Self {
2302        let mut ops = self.ops;
2303        ops.insert(op);
2304        Self { ops, ..self }
2305    }
2306}
2307
2308impl From<AccessTokenScope> for api::AccessTokenScope {
2309    fn from(value: AccessTokenScope) -> Self {
2310        let AccessTokenScope {
2311            basins,
2312            streams,
2313            access_tokens,
2314            op_groups,
2315            ops,
2316        } = value;
2317        Self {
2318            basins: basins.map(Into::into),
2319            streams: streams.map(Into::into),
2320            access_tokens: access_tokens.map(Into::into),
2321            op_groups: op_groups.map(Into::into),
2322            ops: ops
2323                .into_iter()
2324                .map(api::Operation::from)
2325                .map(Into::into)
2326                .collect(),
2327        }
2328    }
2329}
2330
2331impl From<api::AccessTokenScope> for AccessTokenScope {
2332    fn from(value: api::AccessTokenScope) -> Self {
2333        let api::AccessTokenScope {
2334            basins,
2335            streams,
2336            access_tokens,
2337            op_groups,
2338            ops,
2339        } = value;
2340        Self {
2341            basins: basins.and_then(|set| set.matching.map(Into::into)),
2342            streams: streams.and_then(|set| set.matching.map(Into::into)),
2343            access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2344            op_groups: op_groups.map(Into::into),
2345            ops: ops
2346                .into_iter()
2347                .map(api::Operation::try_from)
2348                .flat_map(Result::ok)
2349                .flat_map(<Option<Operation>>::from)
2350                .collect(),
2351        }
2352    }
2353}
2354
2355impl From<ResourceSet> for api::ResourceSet {
2356    fn from(value: ResourceSet) -> Self {
2357        Self {
2358            matching: Some(value.into()),
2359        }
2360    }
2361}
2362
2363#[sync_docs(ResourceSet = "Matching")]
2364#[derive(Debug, Clone)]
2365pub enum ResourceSet {
2366    Exact(String),
2367    Prefix(String),
2368}
2369
2370impl From<ResourceSet> for api::resource_set::Matching {
2371    fn from(value: ResourceSet) -> Self {
2372        match value {
2373            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2374            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2375        }
2376    }
2377}
2378
2379impl From<api::resource_set::Matching> for ResourceSet {
2380    fn from(value: api::resource_set::Matching) -> Self {
2381        match value {
2382            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2383            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2384        }
2385    }
2386}
2387
2388#[sync_docs]
2389#[derive(Debug, Clone, Default)]
2390pub struct PermittedOperationGroups {
2391    pub account: Option<ReadWritePermissions>,
2392    pub basin: Option<ReadWritePermissions>,
2393    pub stream: Option<ReadWritePermissions>,
2394}
2395
2396impl PermittedOperationGroups {
2397    /// Create a new permitted operation groups.
2398    pub fn new() -> Self {
2399        Self::default()
2400    }
2401
2402    /// Overwrite account read-write permissions.
2403    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2404        Self {
2405            account: Some(account),
2406            ..self
2407        }
2408    }
2409
2410    /// Overwrite basin read-write permissions.
2411    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2412        Self {
2413            basin: Some(basin),
2414            ..self
2415        }
2416    }
2417
2418    /// Overwrite stream read-write permissions.
2419    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2420        Self {
2421            stream: Some(stream),
2422            ..self
2423        }
2424    }
2425}
2426
2427impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2428    fn from(value: PermittedOperationGroups) -> Self {
2429        let PermittedOperationGroups {
2430            account,
2431            basin,
2432            stream,
2433        } = value;
2434        Self {
2435            account: account.map(Into::into),
2436            basin: basin.map(Into::into),
2437            stream: stream.map(Into::into),
2438        }
2439    }
2440}
2441
2442impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2443    fn from(value: api::PermittedOperationGroups) -> Self {
2444        let api::PermittedOperationGroups {
2445            account,
2446            basin,
2447            stream,
2448        } = value;
2449        Self {
2450            account: account.map(Into::into),
2451            basin: basin.map(Into::into),
2452            stream: stream.map(Into::into),
2453        }
2454    }
2455}
2456
2457#[sync_docs]
2458#[derive(Debug, Clone, Default)]
2459pub struct ReadWritePermissions {
2460    pub read: bool,
2461    pub write: bool,
2462}
2463
2464impl ReadWritePermissions {
2465    /// Create a new read-write permission.
2466    pub fn new() -> Self {
2467        Self::default()
2468    }
2469
2470    /// Overwrite read permission.
2471    pub fn with_read(self, read: bool) -> Self {
2472        Self { read, ..self }
2473    }
2474
2475    /// Overwrite write permission.
2476    pub fn with_write(self, write: bool) -> Self {
2477        Self { write, ..self }
2478    }
2479}
2480
2481impl From<ReadWritePermissions> for api::ReadWritePermissions {
2482    fn from(value: ReadWritePermissions) -> Self {
2483        let ReadWritePermissions { read, write } = value;
2484        Self { read, write }
2485    }
2486}
2487
2488impl From<api::ReadWritePermissions> for ReadWritePermissions {
2489    fn from(value: api::ReadWritePermissions) -> Self {
2490        let api::ReadWritePermissions { read, write } = value;
2491        Self { read, write }
2492    }
2493}
2494
2495impl From<api::IssueAccessTokenResponse> for String {
2496    fn from(value: api::IssueAccessTokenResponse) -> Self {
2497        value.access_token
2498    }
2499}
2500
2501impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2502    fn from(value: AccessTokenId) -> Self {
2503        Self { id: value.into() }
2504    }
2505}
2506
2507impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2508    type Error = ConvertError;
2509    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2510        let token_info = value.info.ok_or("access token info is missing")?;
2511        token_info.try_into()
2512    }
2513}
2514
2515#[sync_docs]
2516#[derive(Debug, Clone, Default)]
2517pub struct ListAccessTokensRequest {
2518    pub prefix: String,
2519    pub start_after: String,
2520    pub limit: Option<usize>,
2521}
2522
2523impl ListAccessTokensRequest {
2524    /// Create a new request with prefix.
2525    pub fn new() -> Self {
2526        Self::default()
2527    }
2528
2529    /// Overwrite prefix.
2530    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2531        Self {
2532            prefix: prefix.into(),
2533            ..self
2534        }
2535    }
2536
2537    /// Overwrite start after.
2538    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2539        Self {
2540            start_after: start_after.into(),
2541            ..self
2542        }
2543    }
2544
2545    /// Overwrite limit.
2546    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2547        Self {
2548            limit: limit.into(),
2549            ..self
2550        }
2551    }
2552}
2553
2554impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2555    type Error = ConvertError;
2556    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2557        let ListAccessTokensRequest {
2558            prefix,
2559            start_after,
2560            limit,
2561        } = value;
2562        Ok(Self {
2563            prefix,
2564            start_after,
2565            limit: limit
2566                .map(TryInto::try_into)
2567                .transpose()
2568                .map_err(|_| "request limit does not fit into u64 bounds")?,
2569        })
2570    }
2571}
2572
2573#[sync_docs]
2574#[derive(Debug, Clone)]
2575pub struct ListAccessTokensResponse {
2576    pub access_tokens: Vec<AccessTokenInfo>,
2577    pub has_more: bool,
2578}
2579
2580impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2581    type Error = ConvertError;
2582    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2583        let api::ListAccessTokensResponse {
2584            access_tokens,
2585            has_more,
2586        } = value;
2587        let access_tokens = access_tokens
2588            .into_iter()
2589            .map(TryInto::try_into)
2590            .collect::<Result<Vec<_>, _>>()?;
2591        Ok(Self {
2592            access_tokens,
2593            has_more,
2594        })
2595    }
2596}