s2/
types.rs

1//! Types for interacting with S2 services.
2
3use std::{
4    collections::HashSet,
5    ops::{Deref, RangeTo},
6    str::FromStr,
7    sync::OnceLock,
8    time::Duration,
9};
10
11use bytes::Bytes;
12use rand::Rng;
13use regex::Regex;
14use sync_docs::sync_docs;
15
16use crate::api;
17
18pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
19pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
20
21/// Error related to conversion from one type to another.
22#[derive(Debug, Clone, thiserror::Error)]
23#[error("{0}")]
24pub struct ConvertError(String);
25
26impl<T: Into<String>> From<T> for ConvertError {
27    fn from(value: T) -> Self {
28        Self(value.into())
29    }
30}
31
32/// Metered size of the object in bytes.
33///
34/// Bytes are calculated using the "metered bytes" formula:
35///
36/// ```python
37/// metered_bytes = lambda record: 8 + 2 * len(record.headers) + sum((len(h.key) + len(h.value)) for h in record.headers) + len(record.body)
38/// ```
39pub trait MeteredBytes {
40    /// Return the metered bytes of the object.
41    fn metered_bytes(&self) -> u64;
42}
43
44impl<T: MeteredBytes> MeteredBytes for Vec<T> {
45    fn metered_bytes(&self) -> u64 {
46        self.iter().fold(0, |acc, item| acc + item.metered_bytes())
47    }
48}
49
50macro_rules! metered_impl {
51    ($ty:ty) => {
52        impl MeteredBytes for $ty {
53            fn metered_bytes(&self) -> u64 {
54                let bytes = 8
55                    + (2 * self.headers.len())
56                    + self
57                        .headers
58                        .iter()
59                        .map(|h| h.name.len() + h.value.len())
60                        .sum::<usize>()
61                    + self.body.len();
62                bytes as u64
63            }
64        }
65    };
66}
67
68#[sync_docs]
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BasinScope {
71    AwsUsEast1,
72}
73
74impl From<BasinScope> for api::BasinScope {
75    fn from(value: BasinScope) -> Self {
76        match value {
77            BasinScope::AwsUsEast1 => Self::AwsUsEast1,
78        }
79    }
80}
81
82impl From<api::BasinScope> for Option<BasinScope> {
83    fn from(value: api::BasinScope) -> Self {
84        match value {
85            api::BasinScope::Unspecified => None,
86            api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
87        }
88    }
89}
90
91impl FromStr for BasinScope {
92    type Err = ConvertError;
93    fn from_str(value: &str) -> Result<Self, Self::Err> {
94        match value {
95            "aws:us-east-1" => Ok(Self::AwsUsEast1),
96            _ => Err("invalid basin scope value".into()),
97        }
98    }
99}
100
101#[sync_docs]
102#[derive(Debug, Clone)]
103pub struct CreateBasinRequest {
104    pub basin: BasinName,
105    pub config: Option<BasinConfig>,
106    pub scope: Option<BasinScope>,
107}
108
109impl CreateBasinRequest {
110    /// Create a new request with basin name.
111    pub fn new(basin: BasinName) -> Self {
112        Self {
113            basin,
114            config: None,
115            scope: None,
116        }
117    }
118
119    /// Overwrite basin configuration.
120    pub fn with_config(self, config: BasinConfig) -> Self {
121        Self {
122            config: Some(config),
123            ..self
124        }
125    }
126
127    /// Overwrite basin scope.
128    pub fn with_scope(self, scope: BasinScope) -> Self {
129        Self {
130            scope: Some(scope),
131            ..self
132        }
133    }
134}
135
136impl From<CreateBasinRequest> for api::CreateBasinRequest {
137    fn from(value: CreateBasinRequest) -> Self {
138        let CreateBasinRequest {
139            basin,
140            config,
141            scope,
142        } = value;
143        Self {
144            basin: basin.0,
145            config: config.map(Into::into),
146            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
147        }
148    }
149}
150
151#[sync_docs]
152#[derive(Debug, Clone, Default)]
153pub struct BasinConfig {
154    pub default_stream_config: Option<StreamConfig>,
155    pub create_stream_on_append: bool,
156    pub create_stream_on_read: bool,
157}
158
159impl BasinConfig {
160    /// Create a new basin config.
161    pub fn new() -> Self {
162        Self::default()
163    }
164
165    /// Overwrite the default stream config.
166    pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
167        Self {
168            default_stream_config: Some(default_stream_config),
169            ..self
170        }
171    }
172
173    /// Overwrite `create_stream_on_append`.
174    pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
175        Self {
176            create_stream_on_append,
177            ..self
178        }
179    }
180
181    /// Overwrite `create_stream_on_read`.
182    pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
183        Self {
184            create_stream_on_read,
185            ..self
186        }
187    }
188}
189
190impl From<BasinConfig> for api::BasinConfig {
191    fn from(value: BasinConfig) -> Self {
192        let BasinConfig {
193            default_stream_config,
194            create_stream_on_append,
195            create_stream_on_read,
196        } = value;
197        Self {
198            default_stream_config: default_stream_config.map(Into::into),
199            create_stream_on_append,
200            create_stream_on_read,
201        }
202    }
203}
204
205impl From<api::BasinConfig> for BasinConfig {
206    fn from(value: api::BasinConfig) -> Self {
207        let api::BasinConfig {
208            default_stream_config,
209            create_stream_on_append,
210            create_stream_on_read,
211        } = value;
212        Self {
213            default_stream_config: default_stream_config.map(Into::into),
214            create_stream_on_append,
215            create_stream_on_read,
216        }
217    }
218}
219
220#[sync_docs]
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TimestampingMode {
223    ClientPrefer,
224    ClientRequire,
225    Arrival,
226}
227
228impl From<TimestampingMode> for api::TimestampingMode {
229    fn from(value: TimestampingMode) -> Self {
230        match value {
231            TimestampingMode::ClientPrefer => Self::ClientPrefer,
232            TimestampingMode::ClientRequire => Self::ClientRequire,
233            TimestampingMode::Arrival => Self::Arrival,
234        }
235    }
236}
237
238impl From<api::TimestampingMode> for Option<TimestampingMode> {
239    fn from(value: api::TimestampingMode) -> Self {
240        match value {
241            api::TimestampingMode::Unspecified => None,
242            api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
243            api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
244            api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
245        }
246    }
247}
248
249#[sync_docs(TimestampingConfig = "Timestamping")]
250#[derive(Debug, Clone, Default)]
251/// Timestamping behavior.
252pub struct TimestampingConfig {
253    pub mode: Option<TimestampingMode>,
254    pub uncapped: Option<bool>,
255}
256
257impl TimestampingConfig {
258    /// Create a new timestamping config.
259    pub fn new() -> Self {
260        Self::default()
261    }
262
263    /// Overwrite timestamping mode.
264    pub fn with_mode(self, mode: TimestampingMode) -> Self {
265        Self {
266            mode: Some(mode),
267            ..self
268        }
269    }
270
271    /// Overwrite the uncapped knob.
272    pub fn with_uncapped(self, uncapped: bool) -> Self {
273        Self {
274            uncapped: Some(uncapped),
275            ..self
276        }
277    }
278}
279
280impl From<TimestampingConfig> for api::stream_config::Timestamping {
281    fn from(value: TimestampingConfig) -> Self {
282        Self {
283            mode: value
284                .mode
285                .map(api::TimestampingMode::from)
286                .unwrap_or_default()
287                .into(),
288            uncapped: value.uncapped,
289        }
290    }
291}
292
293impl From<api::stream_config::Timestamping> for TimestampingConfig {
294    fn from(value: api::stream_config::Timestamping) -> Self {
295        let mode = value.mode().into();
296        let uncapped = value.uncapped;
297        Self { mode, uncapped }
298    }
299}
300
301#[sync_docs(DeleteOnEmptyConfig = "DeleteOnEmpty", min_age = "min_age_secs")]
302#[derive(Debug, Clone, Default)]
303/// Delete-on-empty config.
304pub struct DeleteOnEmptyConfig {
305    pub min_age: Duration,
306}
307
308impl DeleteOnEmptyConfig {
309    /// Create a new delete-on-empty config.
310    pub fn new() -> Self {
311        Self::default()
312    }
313
314    /// Overwrite min age.
315    pub fn with_min_age(self, min_age: Duration) -> Self {
316        Self { min_age }
317    }
318}
319
320impl From<DeleteOnEmptyConfig> for api::stream_config::DeleteOnEmpty {
321    fn from(value: DeleteOnEmptyConfig) -> Self {
322        Self {
323            min_age_secs: value.min_age.as_secs(),
324        }
325    }
326}
327
328impl From<api::stream_config::DeleteOnEmpty> for DeleteOnEmptyConfig {
329    fn from(value: api::stream_config::DeleteOnEmpty) -> Self {
330        Self {
331            min_age: Duration::from_secs(value.min_age_secs),
332        }
333    }
334}
335
336#[sync_docs]
337#[derive(Debug, Clone, Default)]
338pub struct StreamConfig {
339    pub storage_class: Option<StorageClass>,
340    pub retention_policy: Option<RetentionPolicy>,
341    pub timestamping: Option<TimestampingConfig>,
342    pub delete_on_empty: Option<DeleteOnEmptyConfig>,
343}
344
345impl StreamConfig {
346    /// Create a new stream config.
347    pub fn new() -> Self {
348        Self::default()
349    }
350
351    /// Overwrite storage class.
352    pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
353        Self {
354            storage_class: Some(storage_class),
355            ..self
356        }
357    }
358
359    /// Overwrite retention policy.
360    pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
361        Self {
362            retention_policy: Some(retention_policy),
363            ..self
364        }
365    }
366
367    /// Overwrite timestamping config.
368    pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
369        Self {
370            timestamping: Some(timestamping),
371            ..self
372        }
373    }
374
375    /// Overwrite delete-on-empty config.
376    pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
377        Self {
378            delete_on_empty: Some(delete_on_empty),
379            ..self
380        }
381    }
382}
383
384impl From<StreamConfig> for api::StreamConfig {
385    fn from(value: StreamConfig) -> Self {
386        let StreamConfig {
387            storage_class,
388            retention_policy,
389            timestamping,
390            delete_on_empty,
391        } = value;
392        Self {
393            storage_class: storage_class
394                .map(api::StorageClass::from)
395                .unwrap_or_default()
396                .into(),
397            retention_policy: retention_policy.map(Into::into),
398            timestamping: timestamping.map(Into::into),
399            delete_on_empty: delete_on_empty.map(Into::into),
400        }
401    }
402}
403
404impl From<api::StreamConfig> for StreamConfig {
405    fn from(value: api::StreamConfig) -> Self {
406        Self {
407            storage_class: value.storage_class().into(),
408            retention_policy: value.retention_policy.map(Into::into),
409            timestamping: value.timestamping.map(Into::into),
410            delete_on_empty: value.delete_on_empty.map(Into::into),
411        }
412    }
413}
414
415#[sync_docs]
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417pub enum StorageClass {
418    Standard,
419    Express,
420}
421
422impl From<StorageClass> for api::StorageClass {
423    fn from(value: StorageClass) -> Self {
424        match value {
425            StorageClass::Standard => Self::Standard,
426            StorageClass::Express => Self::Express,
427        }
428    }
429}
430
431impl From<api::StorageClass> for Option<StorageClass> {
432    fn from(value: api::StorageClass) -> Self {
433        match value {
434            api::StorageClass::Unspecified => None,
435            api::StorageClass::Standard => Some(StorageClass::Standard),
436            api::StorageClass::Express => Some(StorageClass::Express),
437        }
438    }
439}
440
441impl FromStr for StorageClass {
442    type Err = ConvertError;
443
444    fn from_str(value: &str) -> Result<Self, Self::Err> {
445        match value {
446            "standard" => Ok(Self::Standard),
447            "express" => Ok(Self::Express),
448            v => Err(format!("unknown storage class: {v}").into()),
449        }
450    }
451}
452
453#[sync_docs(Age = "Age")]
454#[derive(Debug, Clone)]
455pub enum RetentionPolicy {
456    Age(Duration),
457    Infinite(()),
458}
459
460impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
461    fn from(value: RetentionPolicy) -> Self {
462        match value {
463            RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
464            RetentionPolicy::Infinite(_) => {
465                Self::Infinite(api::stream_config::InfiniteRetention {})
466            }
467        }
468    }
469}
470
471impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
472    fn from(value: api::stream_config::RetentionPolicy) -> Self {
473        match value {
474            api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
475            api::stream_config::RetentionPolicy::Infinite(_) => Self::Infinite(()),
476        }
477    }
478}
479
480#[sync_docs]
481#[derive(Debug, Clone, Copy, PartialEq, Eq)]
482pub enum BasinState {
483    Active,
484    Creating,
485    Deleting,
486}
487
488impl From<BasinState> for api::BasinState {
489    fn from(value: BasinState) -> Self {
490        match value {
491            BasinState::Active => Self::Active,
492            BasinState::Creating => Self::Creating,
493            BasinState::Deleting => Self::Deleting,
494        }
495    }
496}
497
498impl From<api::BasinState> for Option<BasinState> {
499    fn from(value: api::BasinState) -> Self {
500        match value {
501            api::BasinState::Unspecified => None,
502            api::BasinState::Active => Some(BasinState::Active),
503            api::BasinState::Creating => Some(BasinState::Creating),
504            api::BasinState::Deleting => Some(BasinState::Deleting),
505        }
506    }
507}
508
509impl std::fmt::Display for BasinState {
510    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511        match self {
512            BasinState::Active => write!(f, "active"),
513            BasinState::Creating => write!(f, "creating"),
514            BasinState::Deleting => write!(f, "deleting"),
515        }
516    }
517}
518
519#[sync_docs]
520#[derive(Debug, Clone)]
521pub struct BasinInfo {
522    pub name: String,
523    pub scope: Option<BasinScope>,
524    pub state: Option<BasinState>,
525}
526
527impl From<BasinInfo> for api::BasinInfo {
528    fn from(value: BasinInfo) -> Self {
529        let BasinInfo { name, scope, state } = value;
530        Self {
531            name,
532            scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
533            state: state.map(api::BasinState::from).unwrap_or_default().into(),
534        }
535    }
536}
537
538impl From<api::BasinInfo> for BasinInfo {
539    fn from(value: api::BasinInfo) -> Self {
540        let scope = value.scope().into();
541        let state = value.state().into();
542        let name = value.name;
543        Self { name, scope, state }
544    }
545}
546
547impl TryFrom<api::CreateBasinResponse> for BasinInfo {
548    type Error = ConvertError;
549    fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
550        let api::CreateBasinResponse { info } = value;
551        let info = info.ok_or("missing basin info")?;
552        Ok(info.into())
553    }
554}
555
556#[sync_docs]
557#[derive(Debug, Clone, Default)]
558pub struct ListStreamsRequest {
559    pub prefix: String,
560    pub start_after: String,
561    pub limit: Option<usize>,
562}
563
564impl ListStreamsRequest {
565    /// Create a new request.
566    pub fn new() -> Self {
567        Self::default()
568    }
569
570    /// Overwrite prefix.
571    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
572        Self {
573            prefix: prefix.into(),
574            ..self
575        }
576    }
577
578    /// Overwrite start after.
579    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
580        Self {
581            start_after: start_after.into(),
582            ..self
583        }
584    }
585
586    /// Overwrite limit.
587    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
588        Self {
589            limit: limit.into(),
590            ..self
591        }
592    }
593}
594
595impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
596    type Error = ConvertError;
597    fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
598        let ListStreamsRequest {
599            prefix,
600            start_after,
601            limit,
602        } = value;
603        Ok(Self {
604            prefix,
605            start_after,
606            limit: limit.map(|n| n as u64),
607        })
608    }
609}
610
611#[sync_docs]
612#[derive(Debug, Clone)]
613pub struct StreamInfo {
614    pub name: String,
615    pub created_at: u32,
616    pub deleted_at: Option<u32>,
617}
618
619impl From<api::StreamInfo> for StreamInfo {
620    fn from(value: api::StreamInfo) -> Self {
621        Self {
622            name: value.name,
623            created_at: value.created_at,
624            deleted_at: value.deleted_at,
625        }
626    }
627}
628
629impl TryFrom<api::CreateStreamResponse> for StreamInfo {
630    type Error = ConvertError;
631
632    fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
633        let api::CreateStreamResponse { info } = value;
634        let info = info.ok_or("missing stream info")?;
635        Ok(info.into())
636    }
637}
638
639#[sync_docs]
640#[derive(Debug, Clone)]
641pub struct ListStreamsResponse {
642    pub streams: Vec<StreamInfo>,
643    pub has_more: bool,
644}
645
646impl From<api::ListStreamsResponse> for ListStreamsResponse {
647    fn from(value: api::ListStreamsResponse) -> Self {
648        let api::ListStreamsResponse { streams, has_more } = value;
649        let streams = streams.into_iter().map(Into::into).collect();
650        Self { streams, has_more }
651    }
652}
653
654impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
655    type Error = ConvertError;
656
657    fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
658        let api::GetBasinConfigResponse { config } = value;
659        let config = config.ok_or("missing basin config")?;
660        Ok(config.into())
661    }
662}
663
664impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
665    type Error = ConvertError;
666
667    fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
668        let api::GetStreamConfigResponse { config } = value;
669        let config = config.ok_or("missing stream config")?;
670        Ok(config.into())
671    }
672}
673
674#[sync_docs]
675#[derive(Debug, Clone)]
676pub struct CreateStreamRequest {
677    pub stream: String,
678    pub config: Option<StreamConfig>,
679}
680
681impl CreateStreamRequest {
682    /// Create a new request with stream name.
683    pub fn new(stream: impl Into<String>) -> Self {
684        Self {
685            stream: stream.into(),
686            config: None,
687        }
688    }
689
690    /// Overwrite stream config.
691    pub fn with_config(self, config: StreamConfig) -> Self {
692        Self {
693            config: Some(config),
694            ..self
695        }
696    }
697}
698
699impl From<CreateStreamRequest> for api::CreateStreamRequest {
700    fn from(value: CreateStreamRequest) -> Self {
701        let CreateStreamRequest { stream, config } = value;
702        Self {
703            stream,
704            config: config.map(Into::into),
705        }
706    }
707}
708
709#[sync_docs]
710#[derive(Debug, Clone, Default)]
711pub struct ListBasinsRequest {
712    pub prefix: String,
713    pub start_after: String,
714    pub limit: Option<usize>,
715}
716
717impl ListBasinsRequest {
718    /// Create a new request.
719    pub fn new() -> Self {
720        Self::default()
721    }
722
723    /// Overwrite prefix.
724    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
725        Self {
726            prefix: prefix.into(),
727            ..self
728        }
729    }
730
731    /// Overwrite start after.
732    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
733        Self {
734            start_after: start_after.into(),
735            ..self
736        }
737    }
738
739    /// Overwrite limit.
740    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
741        Self {
742            limit: limit.into(),
743            ..self
744        }
745    }
746}
747
748impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
749    type Error = ConvertError;
750    fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
751        let ListBasinsRequest {
752            prefix,
753            start_after,
754            limit,
755        } = value;
756        Ok(Self {
757            prefix,
758            start_after,
759            limit: limit
760                .map(TryInto::try_into)
761                .transpose()
762                .map_err(|_| "request limit does not fit into u64 bounds")?,
763        })
764    }
765}
766
767#[sync_docs]
768#[derive(Debug, Clone)]
769pub struct ListBasinsResponse {
770    pub basins: Vec<BasinInfo>,
771    pub has_more: bool,
772}
773
774impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
775    type Error = ConvertError;
776    fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
777        let api::ListBasinsResponse { basins, has_more } = value;
778        Ok(Self {
779            basins: basins.into_iter().map(Into::into).collect(),
780            has_more,
781        })
782    }
783}
784
785#[sync_docs]
786#[derive(Debug, Clone)]
787pub struct DeleteBasinRequest {
788    pub basin: BasinName,
789    /// Delete basin if it exists else do nothing.
790    pub if_exists: bool,
791}
792
793impl DeleteBasinRequest {
794    /// Create a new request.
795    pub fn new(basin: BasinName) -> Self {
796        Self {
797            basin,
798            if_exists: false,
799        }
800    }
801
802    /// Overwrite the if exists parameter.
803    pub fn with_if_exists(self, if_exists: bool) -> Self {
804        Self { if_exists, ..self }
805    }
806}
807
808impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
809    fn from(value: DeleteBasinRequest) -> Self {
810        let DeleteBasinRequest { basin, .. } = value;
811        Self { basin: basin.0 }
812    }
813}
814
815#[sync_docs]
816#[derive(Debug, Clone)]
817pub struct DeleteStreamRequest {
818    pub stream: String,
819    /// Delete stream if it exists else do nothing.
820    pub if_exists: bool,
821}
822
823impl DeleteStreamRequest {
824    /// Create a new request.
825    pub fn new(stream: impl Into<String>) -> Self {
826        Self {
827            stream: stream.into(),
828            if_exists: false,
829        }
830    }
831
832    /// Overwrite the if exists parameter.
833    pub fn with_if_exists(self, if_exists: bool) -> Self {
834        Self { if_exists, ..self }
835    }
836}
837
838impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
839    fn from(value: DeleteStreamRequest) -> Self {
840        let DeleteStreamRequest { stream, .. } = value;
841        Self { stream }
842    }
843}
844
845#[sync_docs]
846#[derive(Debug, Clone)]
847pub struct ReconfigureBasinRequest {
848    pub basin: BasinName,
849    pub config: Option<BasinConfig>,
850    pub mask: Option<Vec<String>>,
851}
852
853impl ReconfigureBasinRequest {
854    /// Create a new request with basin name.
855    pub fn new(basin: BasinName) -> Self {
856        Self {
857            basin,
858            config: None,
859            mask: None,
860        }
861    }
862
863    /// Overwrite basin config.
864    pub fn with_config(self, config: BasinConfig) -> Self {
865        Self {
866            config: Some(config),
867            ..self
868        }
869    }
870
871    /// Overwrite field mask.
872    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
873        Self {
874            mask: Some(mask.into()),
875            ..self
876        }
877    }
878}
879
880impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
881    fn from(value: ReconfigureBasinRequest) -> Self {
882        let ReconfigureBasinRequest {
883            basin,
884            config,
885            mask,
886        } = value;
887        Self {
888            basin: basin.0,
889            config: config.map(Into::into),
890            mask: mask.map(|paths| prost_types::FieldMask { paths }),
891        }
892    }
893}
894
895impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
896    type Error = ConvertError;
897    fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
898        let api::ReconfigureBasinResponse { config } = value;
899        let config = config.ok_or("missing basin config")?;
900        Ok(config.into())
901    }
902}
903
904#[sync_docs]
905#[derive(Debug, Clone)]
906pub struct ReconfigureStreamRequest {
907    pub stream: String,
908    pub config: Option<StreamConfig>,
909    pub mask: Option<Vec<String>>,
910}
911
912impl ReconfigureStreamRequest {
913    /// Create a new request with stream name.
914    pub fn new(stream: impl Into<String>) -> Self {
915        Self {
916            stream: stream.into(),
917            config: None,
918            mask: None,
919        }
920    }
921
922    /// Overwrite stream config.
923    pub fn with_config(self, config: StreamConfig) -> Self {
924        Self {
925            config: Some(config),
926            ..self
927        }
928    }
929
930    /// Overwrite field mask.
931    pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
932        Self {
933            mask: Some(mask.into()),
934            ..self
935        }
936    }
937}
938
939impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
940    fn from(value: ReconfigureStreamRequest) -> Self {
941        let ReconfigureStreamRequest {
942            stream,
943            config,
944            mask,
945        } = value;
946        Self {
947            stream,
948            config: config.map(Into::into),
949            mask: mask.map(|paths| prost_types::FieldMask { paths }),
950        }
951    }
952}
953
954impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
955    type Error = ConvertError;
956    fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
957        let api::ReconfigureStreamResponse { config } = value;
958        let config = config.ok_or("missing stream config")?;
959        Ok(config.into())
960    }
961}
962
963impl From<api::CheckTailResponse> for StreamPosition {
964    fn from(value: api::CheckTailResponse) -> Self {
965        let api::CheckTailResponse {
966            next_seq_num,
967            last_timestamp,
968        } = value;
969        StreamPosition {
970            seq_num: next_seq_num,
971            timestamp: last_timestamp,
972        }
973    }
974}
975
976/// Position of a record in a stream.
977#[derive(Debug, Clone, PartialEq, Eq)]
978pub struct StreamPosition {
979    /// Sequence number assigned by the service.
980    pub seq_num: u64,
981    /// Timestamp, which may be user-specified or assigned by the service.
982    /// If it is assigned by the service, it will represent milliseconds since Unix epoch.
983    pub timestamp: u64,
984}
985
986#[sync_docs]
987#[derive(Debug, Clone, PartialEq, Eq)]
988pub struct Header {
989    pub name: Bytes,
990    pub value: Bytes,
991}
992
993impl Header {
994    /// Create a new header from name and value.
995    pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
996        Self {
997            name: name.into(),
998            value: value.into(),
999        }
1000    }
1001
1002    /// Create a new header from value.
1003    pub fn from_value(value: impl Into<Bytes>) -> Self {
1004        Self {
1005            name: Bytes::new(),
1006            value: value.into(),
1007        }
1008    }
1009}
1010
1011impl From<Header> for api::Header {
1012    fn from(value: Header) -> Self {
1013        let Header { name, value } = value;
1014        Self { name, value }
1015    }
1016}
1017
1018impl From<api::Header> for Header {
1019    fn from(value: api::Header) -> Self {
1020        let api::Header { name, value } = value;
1021        Self { name, value }
1022    }
1023}
1024
1025/// A fencing token can be enforced on append requests.
1026///
1027/// Must not be more than 36 bytes.
1028#[derive(Debug, Clone, Default, PartialEq, Eq)]
1029pub struct FencingToken(String);
1030
1031impl FencingToken {
1032    const MAX_BYTES: usize = 36;
1033
1034    /// Generate a random alphanumeric fencing token of `n` bytes.
1035    pub fn generate(n: usize) -> Result<Self, ConvertError> {
1036        rand::rng()
1037            .sample_iter(&rand::distr::Alphanumeric)
1038            .take(n)
1039            .map(char::from)
1040            .collect::<String>()
1041            .parse()
1042    }
1043}
1044
1045impl Deref for FencingToken {
1046    type Target = str;
1047
1048    fn deref(&self) -> &Self::Target {
1049        &self.0
1050    }
1051}
1052
1053impl std::fmt::Display for FencingToken {
1054    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1055        write!(f, "{}", self.0)
1056    }
1057}
1058
1059impl FromStr for FencingToken {
1060    type Err = ConvertError;
1061
1062    fn from_str(value: &str) -> Result<Self, Self::Err> {
1063        value.to_string().try_into()
1064    }
1065}
1066
1067impl TryFrom<String> for FencingToken {
1068    type Error = ConvertError;
1069
1070    fn try_from(value: String) -> Result<Self, Self::Error> {
1071        if value.len() > Self::MAX_BYTES {
1072            Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1073        } else {
1074            Ok(Self(value))
1075        }
1076    }
1077}
1078
1079impl From<FencingToken> for String {
1080    fn from(value: FencingToken) -> Self {
1081        value.0
1082    }
1083}
1084
1085/// Command to send through a `CommandRecord`.
1086#[derive(Debug, Clone)]
1087pub enum Command {
1088    /// Enforce a fencing token.
1089    ///
1090    /// Fencing is strongly consistent, and subsequent appends that specify a
1091    /// fencing token will be rejected if it does not match.
1092    Fence {
1093        /// Fencing token to enforce.
1094        ///
1095        /// Set empty to clear the token.
1096        fencing_token: FencingToken,
1097    },
1098    /// Request a trim till the sequence number.
1099    ///
1100    /// Trimming is eventually consistent, and trimmed records may be visible
1101    /// for a brief period
1102    Trim {
1103        /// Trim point.
1104        ///
1105        /// This sequence number is only allowed to advance, and any regression
1106        /// will be ignored.
1107        seq_num: u64,
1108    },
1109}
1110
1111/// A command record is a special kind of [`AppendRecord`] that can be used to
1112/// send command messages.
1113///
1114/// Such a record is signalled by a sole header with empty name. The header
1115/// value represents the operation and record body acts as the payload.
1116#[derive(Debug, Clone)]
1117pub struct CommandRecord {
1118    /// Command kind.
1119    pub command: Command,
1120    /// Timestamp for the record.
1121    pub timestamp: Option<u64>,
1122}
1123
1124impl CommandRecord {
1125    const FENCE: &[u8] = b"fence";
1126    const TRIM: &[u8] = b"trim";
1127
1128    /// Create a new fence command record.
1129    pub fn fence(fencing_token: FencingToken) -> Self {
1130        Self {
1131            command: Command::Fence { fencing_token },
1132            timestamp: None,
1133        }
1134    }
1135
1136    /// Create a new trim command record.
1137    pub fn trim(seq_num: impl Into<u64>) -> Self {
1138        Self {
1139            command: Command::Trim {
1140                seq_num: seq_num.into(),
1141            },
1142            timestamp: None,
1143        }
1144    }
1145
1146    /// Overwrite timestamp.
1147    pub fn with_timestamp(self, timestamp: u64) -> Self {
1148        Self {
1149            timestamp: Some(timestamp),
1150            ..self
1151        }
1152    }
1153}
1154
1155#[sync_docs]
1156#[derive(Debug, Clone, PartialEq, Eq)]
1157pub struct AppendRecord {
1158    timestamp: Option<u64>,
1159    headers: Vec<Header>,
1160    body: Bytes,
1161    #[cfg(test)]
1162    max_bytes: u64,
1163}
1164
1165metered_impl!(AppendRecord);
1166
1167impl AppendRecord {
1168    const MAX_BYTES: u64 = MIB_BYTES;
1169
1170    fn validated(self) -> Result<Self, ConvertError> {
1171        #[cfg(test)]
1172        let max_bytes = self.max_bytes;
1173        #[cfg(not(test))]
1174        let max_bytes = Self::MAX_BYTES;
1175
1176        if self.metered_bytes() > max_bytes {
1177            Err("AppendRecord should have metered size less than 1 MiB".into())
1178        } else {
1179            Ok(self)
1180        }
1181    }
1182
1183    /// Try creating a new append record with body.
1184    pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1185        Self {
1186            timestamp: None,
1187            headers: Vec::new(),
1188            body: body.into(),
1189            #[cfg(test)]
1190            max_bytes: Self::MAX_BYTES,
1191        }
1192        .validated()
1193    }
1194
1195    #[cfg(test)]
1196    pub(crate) fn with_max_bytes(
1197        max_bytes: u64,
1198        body: impl Into<Bytes>,
1199    ) -> Result<Self, ConvertError> {
1200        Self {
1201            timestamp: None,
1202            headers: Vec::new(),
1203            body: body.into(),
1204            max_bytes,
1205        }
1206        .validated()
1207    }
1208
1209    /// Overwrite headers.
1210    pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1211        Self {
1212            headers: headers.into(),
1213            ..self
1214        }
1215        .validated()
1216    }
1217
1218    /// Overwrite timestamp.
1219    pub fn with_timestamp(self, timestamp: u64) -> Self {
1220        Self {
1221            timestamp: Some(timestamp),
1222            ..self
1223        }
1224    }
1225
1226    /// Body of the record.
1227    pub fn body(&self) -> &[u8] {
1228        &self.body
1229    }
1230
1231    /// Headers of the record.
1232    pub fn headers(&self) -> &[Header] {
1233        &self.headers
1234    }
1235
1236    /// Timestamp for the record.
1237    pub fn timestamp(&self) -> Option<u64> {
1238        self.timestamp
1239    }
1240
1241    /// Consume the record and return parts.
1242    pub fn into_parts(self) -> AppendRecordParts {
1243        AppendRecordParts {
1244            timestamp: self.timestamp,
1245            headers: self.headers,
1246            body: self.body,
1247        }
1248    }
1249
1250    /// Try creating the record from parts.
1251    pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1252        let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1253        if let Some(timestamp) = parts.timestamp {
1254            Ok(record.with_timestamp(timestamp))
1255        } else {
1256            Ok(record)
1257        }
1258    }
1259}
1260
1261impl From<AppendRecord> for api::AppendRecord {
1262    fn from(value: AppendRecord) -> Self {
1263        Self {
1264            timestamp: value.timestamp,
1265            headers: value.headers.into_iter().map(Into::into).collect(),
1266            body: value.body,
1267        }
1268    }
1269}
1270
1271impl From<CommandRecord> for AppendRecord {
1272    fn from(value: CommandRecord) -> Self {
1273        let (header_value, body) = match value.command {
1274            Command::Fence { fencing_token } => (
1275                CommandRecord::FENCE,
1276                Bytes::copy_from_slice(fencing_token.as_bytes()),
1277            ),
1278            Command::Trim { seq_num } => (
1279                CommandRecord::TRIM,
1280                Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1281            ),
1282        };
1283        AppendRecordParts {
1284            timestamp: value.timestamp,
1285            headers: vec![Header::from_value(header_value)],
1286            body,
1287        }
1288        .try_into()
1289        .expect("command record is a valid append record")
1290    }
1291}
1292
1293#[sync_docs(AppendRecordParts = "AppendRecord")]
1294#[derive(Debug, Clone)]
1295pub struct AppendRecordParts {
1296    pub timestamp: Option<u64>,
1297    pub headers: Vec<Header>,
1298    pub body: Bytes,
1299}
1300
1301impl From<AppendRecord> for AppendRecordParts {
1302    fn from(value: AppendRecord) -> Self {
1303        value.into_parts()
1304    }
1305}
1306
1307impl TryFrom<AppendRecordParts> for AppendRecord {
1308    type Error = ConvertError;
1309
1310    fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1311        Self::try_from_parts(value)
1312    }
1313}
1314
1315/// A collection of append records that can be sent together in a batch.
1316#[derive(Debug, Clone)]
1317pub struct AppendRecordBatch {
1318    records: Vec<AppendRecord>,
1319    metered_bytes: u64,
1320    max_capacity: usize,
1321    #[cfg(test)]
1322    max_bytes: u64,
1323}
1324
1325impl PartialEq for AppendRecordBatch {
1326    fn eq(&self, other: &Self) -> bool {
1327        if self.records.eq(&other.records) {
1328            assert_eq!(self.metered_bytes, other.metered_bytes);
1329            true
1330        } else {
1331            false
1332        }
1333    }
1334}
1335
1336impl Eq for AppendRecordBatch {}
1337
1338impl Default for AppendRecordBatch {
1339    fn default() -> Self {
1340        Self::new()
1341    }
1342}
1343
1344impl AppendRecordBatch {
1345    /// Maximum number of records that a batch can hold.
1346    ///
1347    /// A record batch cannot be created with a bigger capacity.
1348    pub const MAX_CAPACITY: usize = 1000;
1349
1350    /// Maximum metered bytes of the batch.
1351    pub const MAX_BYTES: u64 = MIB_BYTES;
1352
1353    /// Create an empty record batch.
1354    pub fn new() -> Self {
1355        Self::with_max_capacity(Self::MAX_CAPACITY)
1356    }
1357
1358    /// Create an empty record batch with custom max capacity.
1359    ///
1360    /// The capacity should not be more than [`Self::MAX_CAPACITY`].
1361    pub fn with_max_capacity(max_capacity: usize) -> Self {
1362        assert!(
1363            max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1364            "Batch capacity must be between 1 and 1000"
1365        );
1366
1367        Self {
1368            records: Vec::with_capacity(max_capacity),
1369            metered_bytes: 0,
1370            max_capacity,
1371            #[cfg(test)]
1372            max_bytes: Self::MAX_BYTES,
1373        }
1374    }
1375
1376    #[cfg(test)]
1377    pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1378        #[cfg(test)]
1379        assert!(
1380            max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1381            "Batch size must be between 1 byte and 1 MiB"
1382        );
1383
1384        Self {
1385            max_bytes,
1386            ..Self::with_max_capacity(max_capacity)
1387        }
1388    }
1389
1390    /// Try creating a record batch from an iterator.
1391    ///
1392    /// If all the items of the iterator cannot be drained into the batch, the
1393    /// error returned contains a batch containing all records it could fit
1394    /// along-with the left over items from the iterator.
1395    pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1396    where
1397        R: Into<AppendRecord>,
1398        T: IntoIterator<Item = R>,
1399    {
1400        let mut records = Self::new();
1401        let mut pending = Vec::new();
1402
1403        let mut iter = iter.into_iter();
1404
1405        for record in iter.by_ref() {
1406            if let Err(record) = records.push(record) {
1407                pending.push(record);
1408                break;
1409            }
1410        }
1411
1412        if pending.is_empty() {
1413            Ok(records)
1414        } else {
1415            pending.extend(iter.map(Into::into));
1416            Err((records, pending))
1417        }
1418    }
1419
1420    /// Returns true if the batch contains no records.
1421    pub fn is_empty(&self) -> bool {
1422        if self.records.is_empty() {
1423            assert_eq!(self.metered_bytes, 0);
1424            true
1425        } else {
1426            false
1427        }
1428    }
1429
1430    /// Returns the number of records contained in the batch.
1431    pub fn len(&self) -> usize {
1432        self.records.len()
1433    }
1434
1435    #[cfg(test)]
1436    fn max_bytes(&self) -> u64 {
1437        self.max_bytes
1438    }
1439
1440    #[cfg(not(test))]
1441    fn max_bytes(&self) -> u64 {
1442        Self::MAX_BYTES
1443    }
1444
1445    /// Returns true if the batch cannot fit any more records.
1446    pub fn is_full(&self) -> bool {
1447        self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1448    }
1449
1450    /// Try to append a new record into the batch.
1451    pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1452        assert!(self.records.len() <= self.max_capacity);
1453        assert!(self.metered_bytes <= self.max_bytes());
1454
1455        let record = record.into();
1456        let record_size = record.metered_bytes();
1457        if self.records.len() >= self.max_capacity
1458            || self.metered_bytes + record_size > self.max_bytes()
1459        {
1460            Err(record)
1461        } else {
1462            self.records.push(record);
1463            self.metered_bytes += record_size;
1464            Ok(())
1465        }
1466    }
1467}
1468
1469impl MeteredBytes for AppendRecordBatch {
1470    fn metered_bytes(&self) -> u64 {
1471        self.metered_bytes
1472    }
1473}
1474
1475impl IntoIterator for AppendRecordBatch {
1476    type Item = AppendRecord;
1477    type IntoIter = std::vec::IntoIter<Self::Item>;
1478
1479    fn into_iter(self) -> Self::IntoIter {
1480        self.records.into_iter()
1481    }
1482}
1483
1484impl<'a> IntoIterator for &'a AppendRecordBatch {
1485    type Item = &'a AppendRecord;
1486    type IntoIter = std::slice::Iter<'a, AppendRecord>;
1487
1488    fn into_iter(self) -> Self::IntoIter {
1489        self.records.iter()
1490    }
1491}
1492
1493impl AsRef<[AppendRecord]> for AppendRecordBatch {
1494    fn as_ref(&self) -> &[AppendRecord] {
1495        &self.records
1496    }
1497}
1498
1499#[sync_docs]
1500#[derive(Debug, Default, Clone)]
1501pub struct AppendInput {
1502    pub records: AppendRecordBatch,
1503    pub match_seq_num: Option<u64>,
1504    pub fencing_token: Option<FencingToken>,
1505}
1506
1507impl MeteredBytes for AppendInput {
1508    fn metered_bytes(&self) -> u64 {
1509        self.records.metered_bytes()
1510    }
1511}
1512
1513impl AppendInput {
1514    /// Create a new append input from record batch.
1515    pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1516        Self {
1517            records: records.into(),
1518            match_seq_num: None,
1519            fencing_token: None,
1520        }
1521    }
1522
1523    /// Overwrite match sequence number.
1524    pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1525        Self {
1526            match_seq_num: Some(match_seq_num.into()),
1527            ..self
1528        }
1529    }
1530
1531    /// Overwrite fencing token.
1532    pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1533        Self {
1534            fencing_token: Some(fencing_token),
1535            ..self
1536        }
1537    }
1538
1539    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1540        let Self {
1541            records,
1542            match_seq_num,
1543            fencing_token,
1544        } = self;
1545
1546        api::AppendInput {
1547            stream: stream.into(),
1548            records: records.into_iter().map(Into::into).collect(),
1549            match_seq_num,
1550            fencing_token: fencing_token.map(|f| f.0),
1551        }
1552    }
1553}
1554
1555/// Acknowledgment to an append request.
1556#[derive(Debug, Clone)]
1557pub struct AppendAck {
1558    /// Sequence number and timestamp of the first record that was appended.
1559    pub start: StreamPosition,
1560    /// Sequence number of the last record that was appended + 1,
1561    /// and timestamp of the last record that was appended.
1562    /// The difference between `end.seq_num` and `start.seq_num`
1563    /// will be the number of records appended.
1564    pub end: StreamPosition,
1565    /// Sequence number that will be assigned to the next record on the stream,
1566    /// and timestamp of the last record on the stream.
1567    /// This can be greater than the `end` position in case of concurrent appends.
1568    pub tail: StreamPosition,
1569}
1570
1571impl From<api::AppendOutput> for AppendAck {
1572    fn from(value: api::AppendOutput) -> Self {
1573        let api::AppendOutput {
1574            start_seq_num,
1575            start_timestamp,
1576            end_seq_num,
1577            end_timestamp,
1578            next_seq_num,
1579            last_timestamp,
1580        } = value;
1581        let start = StreamPosition {
1582            seq_num: start_seq_num,
1583            timestamp: start_timestamp,
1584        };
1585        let end = StreamPosition {
1586            seq_num: end_seq_num,
1587            timestamp: end_timestamp,
1588        };
1589        let tail = StreamPosition {
1590            seq_num: next_seq_num,
1591            timestamp: last_timestamp,
1592        };
1593        Self { start, end, tail }
1594    }
1595}
1596
1597impl TryFrom<api::AppendResponse> for AppendAck {
1598    type Error = ConvertError;
1599    fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1600        let api::AppendResponse { output } = value;
1601        let output = output.ok_or("missing append output")?;
1602        Ok(output.into())
1603    }
1604}
1605
1606impl TryFrom<api::AppendSessionResponse> for AppendAck {
1607    type Error = ConvertError;
1608    fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1609        let api::AppendSessionResponse { output } = value;
1610        let output = output.ok_or("missing append output")?;
1611        Ok(output.into())
1612    }
1613}
1614
1615#[sync_docs]
1616#[derive(Debug, Clone, Default)]
1617pub struct ReadLimit {
1618    pub count: Option<u64>,
1619    pub bytes: Option<u64>,
1620}
1621
1622impl ReadLimit {
1623    /// Create a new read limit.
1624    pub fn new() -> Self {
1625        Self::default()
1626    }
1627
1628    /// Overwrite count limit.
1629    pub fn with_count(self, count: u64) -> Self {
1630        Self {
1631            count: Some(count),
1632            ..self
1633        }
1634    }
1635
1636    /// Overwrite bytes limit.
1637    pub fn with_bytes(self, bytes: u64) -> Self {
1638        Self {
1639            bytes: Some(bytes),
1640            ..self
1641        }
1642    }
1643}
1644
1645/// Starting point for read requests.
1646#[derive(Debug, Clone)]
1647pub enum ReadStart {
1648    /// Sequence number.
1649    SeqNum(u64),
1650    /// Timestamp.
1651    Timestamp(u64),
1652    /// Number of records before the tail, i.e. the next sequence number.
1653    TailOffset(u64),
1654}
1655
1656impl Default for ReadStart {
1657    fn default() -> Self {
1658        Self::SeqNum(0)
1659    }
1660}
1661
1662impl From<ReadStart> for api::read_request::Start {
1663    fn from(start: ReadStart) -> Self {
1664        match start {
1665            ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1666            ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1667            ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1668        }
1669    }
1670}
1671
1672impl From<ReadStart> for api::read_session_request::Start {
1673    fn from(start: ReadStart) -> Self {
1674        match start {
1675            ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1676            ReadStart::Timestamp(timestamp) => {
1677                api::read_session_request::Start::Timestamp(timestamp)
1678            }
1679            ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1680        }
1681    }
1682}
1683
1684#[sync_docs]
1685#[derive(Debug, Clone, Default)]
1686pub struct ReadRequest {
1687    pub start: ReadStart,
1688    pub limit: ReadLimit,
1689    pub until: Option<RangeTo<u64>>,
1690    pub clamp: bool,
1691}
1692
1693impl ReadRequest {
1694    /// Create a new request with the specified starting point.
1695    pub fn new(start: ReadStart) -> Self {
1696        Self {
1697            start,
1698            ..Default::default()
1699        }
1700    }
1701
1702    /// Overwrite limit.
1703    pub fn with_limit(self, limit: ReadLimit) -> Self {
1704        Self { limit, ..self }
1705    }
1706
1707    /// Provide an `until` timestamp.
1708    pub fn with_until(self, until: RangeTo<u64>) -> Self {
1709        Self {
1710            until: Some(until),
1711            ..self
1712        }
1713    }
1714
1715    /// Clamp the start position at the tail position.
1716    pub fn with_clamp(self, clamp: bool) -> Self {
1717        Self { clamp, ..self }
1718    }
1719}
1720
1721impl ReadRequest {
1722    pub(crate) fn try_into_api_type(
1723        self,
1724        stream: impl Into<String>,
1725    ) -> Result<api::ReadRequest, ConvertError> {
1726        let Self {
1727            start,
1728            limit,
1729            until,
1730            clamp,
1731        } = self;
1732
1733        let limit = if limit.count > Some(1000) {
1734            Err("read limit: count must not exceed 1000 for unary request")
1735        } else if limit.bytes > Some(MIB_BYTES) {
1736            Err("read limit: bytes must not exceed 1MiB for unary request")
1737        } else {
1738            Ok(api::ReadLimit {
1739                count: limit.count,
1740                bytes: limit.bytes,
1741            })
1742        }?;
1743
1744        Ok(api::ReadRequest {
1745            stream: stream.into(),
1746            start: Some(start.into()),
1747            limit: Some(limit),
1748            until: until.map(|range| range.end),
1749            clamp,
1750        })
1751    }
1752}
1753
1754#[sync_docs]
1755#[derive(Debug, Clone)]
1756pub struct SequencedRecord {
1757    pub seq_num: u64,
1758    pub timestamp: u64,
1759    pub headers: Vec<Header>,
1760    pub body: Bytes,
1761}
1762
1763metered_impl!(SequencedRecord);
1764
1765impl From<api::SequencedRecord> for SequencedRecord {
1766    fn from(value: api::SequencedRecord) -> Self {
1767        let api::SequencedRecord {
1768            seq_num,
1769            timestamp,
1770            headers,
1771            body,
1772        } = value;
1773        Self {
1774            seq_num,
1775            timestamp,
1776            headers: headers.into_iter().map(Into::into).collect(),
1777            body,
1778        }
1779    }
1780}
1781
1782impl SequencedRecord {
1783    /// Try representing the sequenced record as a command record.
1784    pub fn as_command_record(&self) -> Option<CommandRecord> {
1785        if self.headers.len() != 1 {
1786            return None;
1787        }
1788
1789        let header = self.headers.first().expect("pre-validated length");
1790
1791        if !header.name.is_empty() {
1792            return None;
1793        }
1794
1795        match header.value.as_ref() {
1796            CommandRecord::FENCE => {
1797                let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1798                Some(CommandRecord {
1799                    command: Command::Fence { fencing_token },
1800                    timestamp: Some(self.timestamp),
1801                })
1802            }
1803            CommandRecord::TRIM => {
1804                let body: &[u8] = &self.body;
1805                let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1806                Some(CommandRecord {
1807                    command: Command::Trim { seq_num },
1808                    timestamp: Some(self.timestamp),
1809                })
1810            }
1811            _ => None,
1812        }
1813    }
1814}
1815
1816#[sync_docs]
1817#[derive(Debug, Clone)]
1818pub struct SequencedRecordBatch {
1819    pub records: Vec<SequencedRecord>,
1820}
1821
1822impl MeteredBytes for SequencedRecordBatch {
1823    fn metered_bytes(&self) -> u64 {
1824        self.records.metered_bytes()
1825    }
1826}
1827
1828impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1829    fn from(value: api::SequencedRecordBatch) -> Self {
1830        let api::SequencedRecordBatch { records } = value;
1831        Self {
1832            records: records.into_iter().map(Into::into).collect(),
1833        }
1834    }
1835}
1836
1837#[sync_docs(ReadOutput = "Output")]
1838#[derive(Debug, Clone)]
1839pub enum ReadOutput {
1840    Batch(SequencedRecordBatch),
1841    NextSeqNum(u64),
1842}
1843
1844impl From<api::read_output::Output> for ReadOutput {
1845    fn from(value: api::read_output::Output) -> Self {
1846        match value {
1847            api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1848            api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1849        }
1850    }
1851}
1852
1853impl TryFrom<api::ReadOutput> for ReadOutput {
1854    type Error = ConvertError;
1855    fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1856        let api::ReadOutput { output } = value;
1857        let output = output.ok_or("missing read output")?;
1858        Ok(output.into())
1859    }
1860}
1861
1862impl TryFrom<api::ReadResponse> for ReadOutput {
1863    type Error = ConvertError;
1864    fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1865        let api::ReadResponse { output } = value;
1866        let output = output.ok_or("missing output in read response")?;
1867        output.try_into()
1868    }
1869}
1870
1871#[sync_docs]
1872#[derive(Debug, Clone, Default)]
1873pub struct ReadSessionRequest {
1874    pub start: ReadStart,
1875    pub limit: ReadLimit,
1876    pub until: Option<RangeTo<u64>>,
1877    pub clamp: bool,
1878}
1879
1880impl ReadSessionRequest {
1881    /// Create a new request with the specified starting point.
1882    pub fn new(start: ReadStart) -> Self {
1883        Self {
1884            start,
1885            ..Default::default()
1886        }
1887    }
1888
1889    /// Overwrite limit.
1890    pub fn with_limit(self, limit: ReadLimit) -> Self {
1891        Self { limit, ..self }
1892    }
1893
1894    /// Provide an `until` timestamp.
1895    pub fn with_until(self, until: RangeTo<u64>) -> Self {
1896        Self {
1897            until: Some(until),
1898            ..self
1899        }
1900    }
1901
1902    /// Clamp the start position at the tail position.
1903    pub fn with_clamp(self, clamp: bool) -> Self {
1904        Self { clamp, ..self }
1905    }
1906
1907    pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1908        let Self {
1909            start,
1910            limit,
1911            until,
1912            clamp,
1913        } = self;
1914        api::ReadSessionRequest {
1915            stream: stream.into(),
1916            start: Some(start.into()),
1917            limit: Some(api::ReadLimit {
1918                count: limit.count,
1919                bytes: limit.bytes,
1920            }),
1921            heartbeats: false,
1922            until: until.map(|range| range.end),
1923            clamp,
1924        }
1925    }
1926}
1927
1928impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1929    type Error = ConvertError;
1930    fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1931        let api::ReadSessionResponse { output } = value;
1932        let output = output.ok_or("missing output in read session response")?;
1933        output.try_into()
1934    }
1935}
1936
1937/// Name of a basin.
1938///
1939/// Must be between 8 and 48 characters in length. Must comprise lowercase
1940/// letters, numbers, and hyphens. Cannot begin or end with a hyphen.
1941#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1942pub struct BasinName(String);
1943
1944impl Deref for BasinName {
1945    type Target = str;
1946    fn deref(&self) -> &Self::Target {
1947        &self.0
1948    }
1949}
1950
1951impl TryFrom<String> for BasinName {
1952    type Error = ConvertError;
1953
1954    fn try_from(name: String) -> Result<Self, Self::Error> {
1955        if name.len() < 8 || name.len() > 48 {
1956            return Err("Basin name must be between 8 and 48 characters in length".into());
1957        }
1958
1959        static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1960        let regex = BASIN_NAME_REGEX.get_or_init(|| {
1961            Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1962                .expect("Failed to compile basin name regex")
1963        });
1964
1965        if !regex.is_match(&name) {
1966            return Err(
1967                "Basin name must comprise lowercase letters, numbers, and hyphens. \
1968                It cannot begin or end with a hyphen."
1969                    .into(),
1970            );
1971        }
1972
1973        Ok(Self(name))
1974    }
1975}
1976
1977impl FromStr for BasinName {
1978    type Err = ConvertError;
1979
1980    fn from_str(s: &str) -> Result<Self, Self::Err> {
1981        s.to_string().try_into()
1982    }
1983}
1984
1985impl std::fmt::Display for BasinName {
1986    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1987        f.write_str(&self.0)
1988    }
1989}
1990
1991impl From<BasinName> for String {
1992    fn from(value: BasinName) -> Self {
1993        value.0
1994    }
1995}
1996
1997/// Access token ID.
1998/// Must be between 1 and 96 characters.
1999#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2000pub struct AccessTokenId(String);
2001
2002impl Deref for AccessTokenId {
2003    type Target = str;
2004
2005    fn deref(&self) -> &Self::Target {
2006        &self.0
2007    }
2008}
2009
2010impl TryFrom<String> for AccessTokenId {
2011    type Error = ConvertError;
2012
2013    fn try_from(name: String) -> Result<Self, Self::Error> {
2014        if name.is_empty() {
2015            return Err("Access token ID must not be empty".into());
2016        }
2017
2018        if name.len() > 96 {
2019            return Err("Access token ID must not exceed 96 characters".into());
2020        }
2021
2022        Ok(Self(name))
2023    }
2024}
2025
2026impl From<AccessTokenId> for String {
2027    fn from(value: AccessTokenId) -> Self {
2028        value.0
2029    }
2030}
2031
2032impl FromStr for AccessTokenId {
2033    type Err = ConvertError;
2034
2035    fn from_str(s: &str) -> Result<Self, Self::Err> {
2036        s.to_string().try_into()
2037    }
2038}
2039
2040impl std::fmt::Display for AccessTokenId {
2041    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2042        f.write_str(&self.0)
2043    }
2044}
2045
2046impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
2047    fn from(value: AccessTokenInfo) -> Self {
2048        Self {
2049            info: Some(value.into()),
2050        }
2051    }
2052}
2053
2054#[sync_docs]
2055#[derive(Debug, Clone)]
2056pub struct AccessTokenInfo {
2057    pub id: AccessTokenId,
2058    pub expires_at: Option<u32>,
2059    pub auto_prefix_streams: bool,
2060    pub scope: Option<AccessTokenScope>,
2061}
2062
2063impl AccessTokenInfo {
2064    /// Create a new access token info.
2065    pub fn new(id: AccessTokenId) -> Self {
2066        Self {
2067            id,
2068            expires_at: None,
2069            auto_prefix_streams: false,
2070            scope: None,
2071        }
2072    }
2073
2074    /// Overwrite expiration time.
2075    pub fn with_expires_at(self, expires_at: u32) -> Self {
2076        Self {
2077            expires_at: Some(expires_at),
2078            ..self
2079        }
2080    }
2081
2082    /// Overwrite auto prefix streams.
2083    pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2084        Self {
2085            auto_prefix_streams,
2086            ..self
2087        }
2088    }
2089
2090    /// Overwrite scope.
2091    pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2092        Self {
2093            scope: Some(scope),
2094            ..self
2095        }
2096    }
2097}
2098
2099impl From<AccessTokenInfo> for api::AccessTokenInfo {
2100    fn from(value: AccessTokenInfo) -> Self {
2101        let AccessTokenInfo {
2102            id,
2103            expires_at,
2104            auto_prefix_streams,
2105            scope,
2106        } = value;
2107        Self {
2108            id: id.into(),
2109            expires_at,
2110            auto_prefix_streams,
2111            scope: scope.map(Into::into),
2112        }
2113    }
2114}
2115
2116impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2117    type Error = ConvertError;
2118
2119    fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2120        let api::AccessTokenInfo {
2121            id,
2122            expires_at,
2123            auto_prefix_streams,
2124            scope,
2125        } = value;
2126        Ok(Self {
2127            id: id.try_into()?,
2128            expires_at,
2129            auto_prefix_streams,
2130            scope: scope.map(Into::into),
2131        })
2132    }
2133}
2134
2135#[sync_docs]
2136#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2137pub enum Operation {
2138    ListBasins,
2139    CreateBasin,
2140    DeleteBasin,
2141    ReconfigureBasin,
2142    GetBasinConfig,
2143    IssueAccessToken,
2144    RevokeAccessToken,
2145    ListAccessTokens,
2146    ListStreams,
2147    CreateStream,
2148    DeleteStream,
2149    GetStreamConfig,
2150    ReconfigureStream,
2151    CheckTail,
2152    Append,
2153    Read,
2154    Trim,
2155    Fence,
2156    AccountMetrics,
2157    BasinMetrics,
2158    StreamMetrics,
2159}
2160
2161impl FromStr for Operation {
2162    type Err = ConvertError;
2163
2164    fn from_str(s: &str) -> Result<Self, Self::Err> {
2165        match s.to_lowercase().as_str() {
2166            "list-basins" => Ok(Self::ListBasins),
2167            "create-basin" => Ok(Self::CreateBasin),
2168            "delete-basin" => Ok(Self::DeleteBasin),
2169            "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2170            "get-basin-config" => Ok(Self::GetBasinConfig),
2171            "issue-access-token" => Ok(Self::IssueAccessToken),
2172            "revoke-access-token" => Ok(Self::RevokeAccessToken),
2173            "list-access-tokens" => Ok(Self::ListAccessTokens),
2174            "list-streams" => Ok(Self::ListStreams),
2175            "create-stream" => Ok(Self::CreateStream),
2176            "delete-stream" => Ok(Self::DeleteStream),
2177            "get-stream-config" => Ok(Self::GetStreamConfig),
2178            "reconfigure-stream" => Ok(Self::ReconfigureStream),
2179            "check-tail" => Ok(Self::CheckTail),
2180            "append" => Ok(Self::Append),
2181            "read" => Ok(Self::Read),
2182            "trim" => Ok(Self::Trim),
2183            "fence" => Ok(Self::Fence),
2184            "account-metrics" => Ok(Self::AccountMetrics),
2185            "basin-metrics" => Ok(Self::BasinMetrics),
2186            "stream-metrics" => Ok(Self::StreamMetrics),
2187            _ => Err("invalid operation".into()),
2188        }
2189    }
2190}
2191
2192impl From<Operation> for api::Operation {
2193    fn from(value: Operation) -> Self {
2194        match value {
2195            Operation::ListBasins => Self::ListBasins,
2196            Operation::CreateBasin => Self::CreateBasin,
2197            Operation::DeleteBasin => Self::DeleteBasin,
2198            Operation::ReconfigureBasin => Self::ReconfigureBasin,
2199            Operation::GetBasinConfig => Self::GetBasinConfig,
2200            Operation::IssueAccessToken => Self::IssueAccessToken,
2201            Operation::RevokeAccessToken => Self::RevokeAccessToken,
2202            Operation::ListAccessTokens => Self::ListAccessTokens,
2203            Operation::ListStreams => Self::ListStreams,
2204            Operation::CreateStream => Self::CreateStream,
2205            Operation::DeleteStream => Self::DeleteStream,
2206            Operation::GetStreamConfig => Self::GetStreamConfig,
2207            Operation::ReconfigureStream => Self::ReconfigureStream,
2208            Operation::CheckTail => Self::CheckTail,
2209            Operation::Append => Self::Append,
2210            Operation::Read => Self::Read,
2211            Operation::Trim => Self::Trim,
2212            Operation::Fence => Self::Fence,
2213            Operation::AccountMetrics => Self::AccountMetrics,
2214            Operation::BasinMetrics => Self::BasinMetrics,
2215            Operation::StreamMetrics => Self::StreamMetrics,
2216        }
2217    }
2218}
2219
2220impl From<api::Operation> for Option<Operation> {
2221    fn from(value: api::Operation) -> Self {
2222        match value {
2223            api::Operation::Unspecified => None,
2224            api::Operation::ListBasins => Some(Operation::ListBasins),
2225            api::Operation::CreateBasin => Some(Operation::CreateBasin),
2226            api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2227            api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2228            api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2229            api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2230            api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2231            api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2232            api::Operation::ListStreams => Some(Operation::ListStreams),
2233            api::Operation::CreateStream => Some(Operation::CreateStream),
2234            api::Operation::DeleteStream => Some(Operation::DeleteStream),
2235            api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2236            api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2237            api::Operation::CheckTail => Some(Operation::CheckTail),
2238            api::Operation::Append => Some(Operation::Append),
2239            api::Operation::Read => Some(Operation::Read),
2240            api::Operation::Trim => Some(Operation::Trim),
2241            api::Operation::Fence => Some(Operation::Fence),
2242            api::Operation::AccountMetrics => Some(Operation::AccountMetrics),
2243            api::Operation::BasinMetrics => Some(Operation::BasinMetrics),
2244            api::Operation::StreamMetrics => Some(Operation::StreamMetrics),
2245        }
2246    }
2247}
2248
2249#[sync_docs]
2250#[derive(Debug, Clone, Default)]
2251pub struct AccessTokenScope {
2252    pub basins: Option<ResourceSet>,
2253    pub streams: Option<ResourceSet>,
2254    pub access_tokens: Option<ResourceSet>,
2255    pub op_groups: Option<PermittedOperationGroups>,
2256    pub ops: HashSet<Operation>,
2257}
2258
2259impl AccessTokenScope {
2260    /// Create a new access token scope.
2261    pub fn new() -> Self {
2262        Self::default()
2263    }
2264
2265    /// Overwrite resource set for access tokens.
2266    pub fn with_basins(self, basins: ResourceSet) -> Self {
2267        Self {
2268            basins: Some(basins),
2269            ..self
2270        }
2271    }
2272
2273    /// Overwrite resource set for streams.
2274    pub fn with_streams(self, streams: ResourceSet) -> Self {
2275        Self {
2276            streams: Some(streams),
2277            ..self
2278        }
2279    }
2280
2281    /// Overwrite resource set for access tokens.
2282    pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2283        Self {
2284            access_tokens: Some(access_tokens),
2285            ..self
2286        }
2287    }
2288
2289    /// Overwrite operation groups.
2290    pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2291        Self {
2292            op_groups: Some(op_groups),
2293            ..self
2294        }
2295    }
2296
2297    /// Overwrite operations.
2298    pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2299        Self {
2300            ops: ops.into_iter().collect(),
2301            ..self
2302        }
2303    }
2304
2305    /// Add an operation to operations.
2306    pub fn with_op(self, op: Operation) -> Self {
2307        let mut ops = self.ops;
2308        ops.insert(op);
2309        Self { ops, ..self }
2310    }
2311}
2312
2313impl From<AccessTokenScope> for api::AccessTokenScope {
2314    fn from(value: AccessTokenScope) -> Self {
2315        let AccessTokenScope {
2316            basins,
2317            streams,
2318            access_tokens,
2319            op_groups,
2320            ops,
2321        } = value;
2322        Self {
2323            basins: basins.map(Into::into),
2324            streams: streams.map(Into::into),
2325            access_tokens: access_tokens.map(Into::into),
2326            op_groups: op_groups.map(Into::into),
2327            ops: ops
2328                .into_iter()
2329                .map(api::Operation::from)
2330                .map(Into::into)
2331                .collect(),
2332        }
2333    }
2334}
2335
2336impl From<api::AccessTokenScope> for AccessTokenScope {
2337    fn from(value: api::AccessTokenScope) -> Self {
2338        let api::AccessTokenScope {
2339            basins,
2340            streams,
2341            access_tokens,
2342            op_groups,
2343            ops,
2344        } = value;
2345        Self {
2346            basins: basins.and_then(|set| set.matching.map(Into::into)),
2347            streams: streams.and_then(|set| set.matching.map(Into::into)),
2348            access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2349            op_groups: op_groups.map(Into::into),
2350            ops: ops
2351                .into_iter()
2352                .map(api::Operation::try_from)
2353                .flat_map(Result::ok)
2354                .flat_map(<Option<Operation>>::from)
2355                .collect(),
2356        }
2357    }
2358}
2359
2360impl From<ResourceSet> for api::ResourceSet {
2361    fn from(value: ResourceSet) -> Self {
2362        Self {
2363            matching: Some(value.into()),
2364        }
2365    }
2366}
2367
2368#[sync_docs(ResourceSet = "Matching")]
2369#[derive(Debug, Clone)]
2370pub enum ResourceSet {
2371    Exact(String),
2372    Prefix(String),
2373}
2374
2375impl From<ResourceSet> for api::resource_set::Matching {
2376    fn from(value: ResourceSet) -> Self {
2377        match value {
2378            ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2379            ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2380        }
2381    }
2382}
2383
2384impl From<api::resource_set::Matching> for ResourceSet {
2385    fn from(value: api::resource_set::Matching) -> Self {
2386        match value {
2387            api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2388            api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2389        }
2390    }
2391}
2392
2393#[sync_docs]
2394#[derive(Debug, Clone, Default)]
2395pub struct PermittedOperationGroups {
2396    pub account: Option<ReadWritePermissions>,
2397    pub basin: Option<ReadWritePermissions>,
2398    pub stream: Option<ReadWritePermissions>,
2399}
2400
2401impl PermittedOperationGroups {
2402    /// Create a new permitted operation groups.
2403    pub fn new() -> Self {
2404        Self::default()
2405    }
2406
2407    /// Overwrite account read-write permissions.
2408    pub fn with_account(self, account: ReadWritePermissions) -> Self {
2409        Self {
2410            account: Some(account),
2411            ..self
2412        }
2413    }
2414
2415    /// Overwrite basin read-write permissions.
2416    pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2417        Self {
2418            basin: Some(basin),
2419            ..self
2420        }
2421    }
2422
2423    /// Overwrite stream read-write permissions.
2424    pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2425        Self {
2426            stream: Some(stream),
2427            ..self
2428        }
2429    }
2430}
2431
2432impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2433    fn from(value: PermittedOperationGroups) -> Self {
2434        let PermittedOperationGroups {
2435            account,
2436            basin,
2437            stream,
2438        } = value;
2439        Self {
2440            account: account.map(Into::into),
2441            basin: basin.map(Into::into),
2442            stream: stream.map(Into::into),
2443        }
2444    }
2445}
2446
2447impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2448    fn from(value: api::PermittedOperationGroups) -> Self {
2449        let api::PermittedOperationGroups {
2450            account,
2451            basin,
2452            stream,
2453        } = value;
2454        Self {
2455            account: account.map(Into::into),
2456            basin: basin.map(Into::into),
2457            stream: stream.map(Into::into),
2458        }
2459    }
2460}
2461
2462#[sync_docs]
2463#[derive(Debug, Clone, Default)]
2464pub struct ReadWritePermissions {
2465    pub read: bool,
2466    pub write: bool,
2467}
2468
2469impl ReadWritePermissions {
2470    /// Create a new read-write permission.
2471    pub fn new() -> Self {
2472        Self::default()
2473    }
2474
2475    /// Overwrite read permission.
2476    pub fn with_read(self, read: bool) -> Self {
2477        Self { read, ..self }
2478    }
2479
2480    /// Overwrite write permission.
2481    pub fn with_write(self, write: bool) -> Self {
2482        Self { write, ..self }
2483    }
2484}
2485
2486impl From<ReadWritePermissions> for api::ReadWritePermissions {
2487    fn from(value: ReadWritePermissions) -> Self {
2488        let ReadWritePermissions { read, write } = value;
2489        Self { read, write }
2490    }
2491}
2492
2493impl From<api::ReadWritePermissions> for ReadWritePermissions {
2494    fn from(value: api::ReadWritePermissions) -> Self {
2495        let api::ReadWritePermissions { read, write } = value;
2496        Self { read, write }
2497    }
2498}
2499
2500impl From<api::IssueAccessTokenResponse> for String {
2501    fn from(value: api::IssueAccessTokenResponse) -> Self {
2502        value.access_token
2503    }
2504}
2505
2506impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2507    fn from(value: AccessTokenId) -> Self {
2508        Self { id: value.into() }
2509    }
2510}
2511
2512impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2513    type Error = ConvertError;
2514    fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2515        let token_info = value.info.ok_or("access token info is missing")?;
2516        token_info.try_into()
2517    }
2518}
2519
2520#[sync_docs]
2521#[derive(Debug, Clone, Default)]
2522pub struct ListAccessTokensRequest {
2523    pub prefix: String,
2524    pub start_after: String,
2525    pub limit: Option<usize>,
2526}
2527
2528impl ListAccessTokensRequest {
2529    /// Create a new request with prefix.
2530    pub fn new() -> Self {
2531        Self::default()
2532    }
2533
2534    /// Overwrite prefix.
2535    pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2536        Self {
2537            prefix: prefix.into(),
2538            ..self
2539        }
2540    }
2541
2542    /// Overwrite start after.
2543    pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2544        Self {
2545            start_after: start_after.into(),
2546            ..self
2547        }
2548    }
2549
2550    /// Overwrite limit.
2551    pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2552        Self {
2553            limit: limit.into(),
2554            ..self
2555        }
2556    }
2557}
2558
2559impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2560    type Error = ConvertError;
2561    fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2562        let ListAccessTokensRequest {
2563            prefix,
2564            start_after,
2565            limit,
2566        } = value;
2567        Ok(Self {
2568            prefix,
2569            start_after,
2570            limit: limit
2571                .map(TryInto::try_into)
2572                .transpose()
2573                .map_err(|_| "request limit does not fit into u64 bounds")?,
2574        })
2575    }
2576}
2577
2578#[sync_docs]
2579#[derive(Debug, Clone)]
2580pub struct ListAccessTokensResponse {
2581    pub access_tokens: Vec<AccessTokenInfo>,
2582    pub has_more: bool,
2583}
2584
2585impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2586    type Error = ConvertError;
2587    fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2588        let api::ListAccessTokensResponse {
2589            access_tokens,
2590            has_more,
2591        } = value;
2592        let access_tokens = access_tokens
2593            .into_iter()
2594            .map(TryInto::try_into)
2595            .collect::<Result<Vec<_>, _>>()?;
2596        Ok(Self {
2597            access_tokens,
2598            has_more,
2599        })
2600    }
2601}