1use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20 fn from(value: T) -> Self {
21 Self(value.into())
22 }
23}
24
25pub trait MeteredBytes {
33 fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38 fn metered_bytes(&self) -> u64 {
39 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40 }
41}
42
43macro_rules! metered_impl {
44 ($ty:ty) => {
45 impl MeteredBytes for $ty {
46 fn metered_bytes(&self) -> u64 {
47 let bytes = 8
48 + (2 * self.headers.len())
49 + self
50 .headers
51 .iter()
52 .map(|h| h.name.len() + h.value.len())
53 .sum::<usize>()
54 + self.body.len();
55 bytes as u64
56 }
57 }
58 };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64 #[default]
65 Unspecified,
66 AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70 fn from(value: BasinScope) -> Self {
71 match value {
72 BasinScope::Unspecified => Self::Unspecified,
73 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74 }
75 }
76}
77
78impl From<api::BasinScope> for BasinScope {
79 fn from(value: api::BasinScope) -> Self {
80 match value {
81 api::BasinScope::Unspecified => Self::Unspecified,
82 api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83 }
84 }
85}
86
87impl FromStr for BasinScope {
88 type Err = ConvertError;
89 fn from_str(value: &str) -> Result<Self, Self::Err> {
90 match value {
91 "unspecified" => Ok(Self::Unspecified),
92 "aws:us-east-1" => Ok(Self::AwsUsEast1),
93 _ => Err("invalid basin scope value".into()),
94 }
95 }
96}
97
98impl From<BasinScope> for i32 {
99 fn from(value: BasinScope) -> Self {
100 api::BasinScope::from(value).into()
101 }
102}
103
104impl TryFrom<i32> for BasinScope {
105 type Error = ConvertError;
106 fn try_from(value: i32) -> Result<Self, Self::Error> {
107 api::BasinScope::try_from(value)
108 .map(Into::into)
109 .map_err(|_| "invalid basin scope value".into())
110 }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116 pub basin: BasinName,
117 pub config: Option<BasinConfig>,
118 pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122 pub fn new(basin: BasinName) -> Self {
124 Self {
125 basin,
126 config: None,
127 scope: BasinScope::Unspecified,
128 }
129 }
130
131 pub fn with_config(self, config: BasinConfig) -> Self {
133 Self {
134 config: Some(config),
135 ..self
136 }
137 }
138
139 pub fn with_scope(self, scope: BasinScope) -> Self {
141 Self { scope, ..self }
142 }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146 fn from(value: CreateBasinRequest) -> Self {
147 let CreateBasinRequest {
148 basin,
149 config,
150 scope,
151 } = value;
152 Self {
153 basin: basin.0,
154 config: config.map(Into::into),
155 scope: scope.into(),
156 }
157 }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163 pub default_stream_config: Option<StreamConfig>,
164 pub create_stream_on_append: bool,
165 pub create_stream_on_read: bool,
166}
167
168impl BasinConfig {
169 pub fn new() -> Self {
171 Self::default()
172 }
173
174 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
176 Self {
177 default_stream_config: Some(default_stream_config),
178 ..self
179 }
180 }
181
182 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
184 Self {
185 create_stream_on_append,
186 ..self
187 }
188 }
189
190 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
192 Self {
193 create_stream_on_read,
194 ..self
195 }
196 }
197}
198
199impl From<BasinConfig> for api::BasinConfig {
200 fn from(value: BasinConfig) -> Self {
201 let BasinConfig {
202 default_stream_config,
203 create_stream_on_append,
204 create_stream_on_read,
205 } = value;
206 Self {
207 default_stream_config: default_stream_config.map(Into::into),
208 create_stream_on_append,
209 create_stream_on_read,
210 }
211 }
212}
213
214impl TryFrom<api::BasinConfig> for BasinConfig {
215 type Error = ConvertError;
216 fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
217 let api::BasinConfig {
218 default_stream_config,
219 create_stream_on_append,
220 create_stream_on_read,
221 } = value;
222 Ok(Self {
223 default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
224 create_stream_on_append,
225 create_stream_on_read,
226 })
227 }
228}
229
230#[sync_docs]
231#[derive(Debug, Clone, Default)]
232pub struct StreamConfig {
233 pub storage_class: StorageClass,
234 pub retention_policy: Option<RetentionPolicy>,
235 pub require_client_timestamps: Option<bool>,
236 pub uncapped_client_timestamps: Option<bool>,
237}
238
239impl StreamConfig {
240 pub fn new() -> Self {
242 Self::default()
243 }
244
245 pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
247 Self {
248 storage_class: storage_class.into(),
249 ..self
250 }
251 }
252
253 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
255 Self {
256 retention_policy: Some(retention_policy),
257 ..self
258 }
259 }
260
261 pub fn with_require_client_timestamps(self, require_client_timestamps: bool) -> Self {
263 Self {
264 require_client_timestamps: Some(require_client_timestamps),
265 ..self
266 }
267 }
268
269 pub fn with_uncapped_client_timestamps(self, uncapped_client_timestamps: bool) -> Self {
271 Self {
272 uncapped_client_timestamps: Some(uncapped_client_timestamps),
273 ..self
274 }
275 }
276}
277
278impl From<StreamConfig> for api::StreamConfig {
279 fn from(value: StreamConfig) -> Self {
280 let StreamConfig {
281 storage_class,
282 retention_policy,
283 require_client_timestamps,
284 uncapped_client_timestamps,
285 } = value;
286 Self {
287 storage_class: storage_class.into(),
288 retention_policy: retention_policy.map(Into::into),
289 require_client_timestamps,
290 uncapped_client_timestamps,
291 }
292 }
293}
294
295impl TryFrom<api::StreamConfig> for StreamConfig {
296 type Error = ConvertError;
297 fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
298 let api::StreamConfig {
299 storage_class,
300 retention_policy,
301 require_client_timestamps,
302 uncapped_client_timestamps,
303 } = value;
304 Ok(Self {
305 storage_class: storage_class.try_into()?,
306 retention_policy: retention_policy.map(Into::into),
307 require_client_timestamps,
308 uncapped_client_timestamps,
309 })
310 }
311}
312
313#[sync_docs]
314#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
315pub enum StorageClass {
316 #[default]
317 Unspecified,
318 Standard,
319 Express,
320}
321
322impl From<StorageClass> for api::StorageClass {
323 fn from(value: StorageClass) -> Self {
324 match value {
325 StorageClass::Unspecified => Self::Unspecified,
326 StorageClass::Standard => Self::Standard,
327 StorageClass::Express => Self::Express,
328 }
329 }
330}
331
332impl From<api::StorageClass> for StorageClass {
333 fn from(value: api::StorageClass) -> Self {
334 match value {
335 api::StorageClass::Unspecified => Self::Unspecified,
336 api::StorageClass::Standard => Self::Standard,
337 api::StorageClass::Express => Self::Express,
338 }
339 }
340}
341
342impl FromStr for StorageClass {
343 type Err = ConvertError;
344 fn from_str(value: &str) -> Result<Self, Self::Err> {
345 match value {
346 "unspecified" => Ok(Self::Unspecified),
347 "standard" => Ok(Self::Standard),
348 "express" => Ok(Self::Express),
349 _ => Err("invalid storage class value".into()),
350 }
351 }
352}
353
354impl From<StorageClass> for i32 {
355 fn from(value: StorageClass) -> Self {
356 api::StorageClass::from(value).into()
357 }
358}
359
360impl TryFrom<i32> for StorageClass {
361 type Error = ConvertError;
362 fn try_from(value: i32) -> Result<Self, Self::Error> {
363 api::StorageClass::try_from(value)
364 .map(Into::into)
365 .map_err(|_| "invalid storage class value".into())
366 }
367}
368
369#[sync_docs(Age = "Age")]
370#[derive(Debug, Clone)]
371pub enum RetentionPolicy {
372 Age(Duration),
373}
374
375impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
376 fn from(value: RetentionPolicy) -> Self {
377 match value {
378 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
379 }
380 }
381}
382
383impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
384 fn from(value: api::stream_config::RetentionPolicy) -> Self {
385 match value {
386 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
387 }
388 }
389}
390
391#[sync_docs]
392#[derive(Debug, Clone, Copy, PartialEq, Eq)]
393pub enum BasinState {
394 Unspecified,
395 Active,
396 Creating,
397 Deleting,
398}
399
400impl From<BasinState> for api::BasinState {
401 fn from(value: BasinState) -> Self {
402 match value {
403 BasinState::Unspecified => Self::Unspecified,
404 BasinState::Active => Self::Active,
405 BasinState::Creating => Self::Creating,
406 BasinState::Deleting => Self::Deleting,
407 }
408 }
409}
410
411impl From<api::BasinState> for BasinState {
412 fn from(value: api::BasinState) -> Self {
413 match value {
414 api::BasinState::Unspecified => Self::Unspecified,
415 api::BasinState::Active => Self::Active,
416 api::BasinState::Creating => Self::Creating,
417 api::BasinState::Deleting => Self::Deleting,
418 }
419 }
420}
421
422impl From<BasinState> for i32 {
423 fn from(value: BasinState) -> Self {
424 api::BasinState::from(value).into()
425 }
426}
427
428impl TryFrom<i32> for BasinState {
429 type Error = ConvertError;
430 fn try_from(value: i32) -> Result<Self, Self::Error> {
431 api::BasinState::try_from(value)
432 .map(Into::into)
433 .map_err(|_| "invalid basin status value".into())
434 }
435}
436
437impl std::fmt::Display for BasinState {
438 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439 match self {
440 BasinState::Unspecified => write!(f, "unspecified"),
441 BasinState::Active => write!(f, "active"),
442 BasinState::Creating => write!(f, "creating"),
443 BasinState::Deleting => write!(f, "deleting"),
444 }
445 }
446}
447
448#[sync_docs]
449#[derive(Debug, Clone)]
450pub struct BasinInfo {
451 pub name: String,
452 pub scope: BasinScope,
453 pub state: BasinState,
454}
455
456impl From<BasinInfo> for api::BasinInfo {
457 fn from(value: BasinInfo) -> Self {
458 let BasinInfo { name, scope, state } = value;
459 Self {
460 name,
461 scope: scope.into(),
462 state: state.into(),
463 }
464 }
465}
466
467impl TryFrom<api::BasinInfo> for BasinInfo {
468 type Error = ConvertError;
469 fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
470 let api::BasinInfo { name, scope, state } = value;
471 Ok(Self {
472 name,
473 scope: scope.try_into()?,
474 state: state.try_into()?,
475 })
476 }
477}
478
479impl TryFrom<api::CreateBasinResponse> for BasinInfo {
480 type Error = ConvertError;
481 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
482 let api::CreateBasinResponse { info } = value;
483 let info = info.ok_or("missing basin info")?;
484 info.try_into()
485 }
486}
487
488#[sync_docs]
489#[derive(Debug, Clone, Default)]
490pub struct ListStreamsRequest {
491 pub prefix: String,
492 pub start_after: String,
493 pub limit: Option<usize>,
494}
495
496impl ListStreamsRequest {
497 pub fn new() -> Self {
499 Self::default()
500 }
501
502 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
504 Self {
505 prefix: prefix.into(),
506 ..self
507 }
508 }
509
510 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
512 Self {
513 start_after: start_after.into(),
514 ..self
515 }
516 }
517
518 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
520 Self {
521 limit: limit.into(),
522 ..self
523 }
524 }
525}
526
527impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
528 type Error = ConvertError;
529 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
530 let ListStreamsRequest {
531 prefix,
532 start_after,
533 limit,
534 } = value;
535 Ok(Self {
536 prefix,
537 start_after,
538 limit: limit.map(|n| n as u64),
539 })
540 }
541}
542
543#[sync_docs]
544#[derive(Debug, Clone)]
545pub struct StreamInfo {
546 pub name: String,
547 pub created_at: u32,
548 pub deleted_at: Option<u32>,
549}
550
551impl From<api::StreamInfo> for StreamInfo {
552 fn from(value: api::StreamInfo) -> Self {
553 Self {
554 name: value.name,
555 created_at: value.created_at,
556 deleted_at: value.deleted_at,
557 }
558 }
559}
560
561impl TryFrom<api::CreateStreamResponse> for StreamInfo {
562 type Error = ConvertError;
563
564 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
565 let api::CreateStreamResponse { info } = value;
566 let info = info.ok_or("missing stream info")?;
567 Ok(info.into())
568 }
569}
570
571#[sync_docs]
572#[derive(Debug, Clone)]
573pub struct ListStreamsResponse {
574 pub streams: Vec<StreamInfo>,
575 pub has_more: bool,
576}
577
578impl From<api::ListStreamsResponse> for ListStreamsResponse {
579 fn from(value: api::ListStreamsResponse) -> Self {
580 let api::ListStreamsResponse { streams, has_more } = value;
581 let streams = streams.into_iter().map(Into::into).collect();
582 Self { streams, has_more }
583 }
584}
585
586impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
587 type Error = ConvertError;
588 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
589 let api::GetBasinConfigResponse { config } = value;
590 let config = config.ok_or("missing basin config")?;
591 config.try_into()
592 }
593}
594
595impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
596 type Error = ConvertError;
597 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
598 let api::GetStreamConfigResponse { config } = value;
599 let config = config.ok_or("missing stream config")?;
600 config.try_into()
601 }
602}
603
604#[sync_docs]
605#[derive(Debug, Clone)]
606pub struct CreateStreamRequest {
607 pub stream: String,
608 pub config: Option<StreamConfig>,
609}
610
611impl CreateStreamRequest {
612 pub fn new(stream: impl Into<String>) -> Self {
614 Self {
615 stream: stream.into(),
616 config: None,
617 }
618 }
619
620 pub fn with_config(self, config: StreamConfig) -> Self {
622 Self {
623 config: Some(config),
624 ..self
625 }
626 }
627}
628
629impl From<CreateStreamRequest> for api::CreateStreamRequest {
630 fn from(value: CreateStreamRequest) -> Self {
631 let CreateStreamRequest { stream, config } = value;
632 Self {
633 stream,
634 config: config.map(Into::into),
635 }
636 }
637}
638
639#[sync_docs]
640#[derive(Debug, Clone, Default)]
641pub struct ListBasinsRequest {
642 pub prefix: String,
643 pub start_after: String,
644 pub limit: Option<usize>,
645}
646
647impl ListBasinsRequest {
648 pub fn new() -> Self {
650 Self::default()
651 }
652
653 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
655 Self {
656 prefix: prefix.into(),
657 ..self
658 }
659 }
660
661 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
663 Self {
664 start_after: start_after.into(),
665 ..self
666 }
667 }
668
669 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
671 Self {
672 limit: limit.into(),
673 ..self
674 }
675 }
676}
677
678impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
679 type Error = ConvertError;
680 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
681 let ListBasinsRequest {
682 prefix,
683 start_after,
684 limit,
685 } = value;
686 Ok(Self {
687 prefix,
688 start_after,
689 limit: limit
690 .map(TryInto::try_into)
691 .transpose()
692 .map_err(|_| "request limit does not fit into u64 bounds")?,
693 })
694 }
695}
696
697#[sync_docs]
698#[derive(Debug, Clone)]
699pub struct ListBasinsResponse {
700 pub basins: Vec<BasinInfo>,
701 pub has_more: bool,
702}
703
704impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
705 type Error = ConvertError;
706 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
707 let api::ListBasinsResponse { basins, has_more } = value;
708 Ok(Self {
709 basins: basins
710 .into_iter()
711 .map(TryInto::try_into)
712 .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
713 has_more,
714 })
715 }
716}
717
718#[sync_docs]
719#[derive(Debug, Clone)]
720pub struct DeleteBasinRequest {
721 pub basin: BasinName,
722 pub if_exists: bool,
724}
725
726impl DeleteBasinRequest {
727 pub fn new(basin: BasinName) -> Self {
729 Self {
730 basin,
731 if_exists: false,
732 }
733 }
734
735 pub fn with_if_exists(self, if_exists: bool) -> Self {
737 Self { if_exists, ..self }
738 }
739}
740
741impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
742 fn from(value: DeleteBasinRequest) -> Self {
743 let DeleteBasinRequest { basin, .. } = value;
744 Self { basin: basin.0 }
745 }
746}
747
748#[sync_docs]
749#[derive(Debug, Clone)]
750pub struct DeleteStreamRequest {
751 pub stream: String,
752 pub if_exists: bool,
754}
755
756impl DeleteStreamRequest {
757 pub fn new(stream: impl Into<String>) -> Self {
759 Self {
760 stream: stream.into(),
761 if_exists: false,
762 }
763 }
764
765 pub fn with_if_exists(self, if_exists: bool) -> Self {
767 Self { if_exists, ..self }
768 }
769}
770
771impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
772 fn from(value: DeleteStreamRequest) -> Self {
773 let DeleteStreamRequest { stream, .. } = value;
774 Self { stream }
775 }
776}
777
778#[sync_docs]
779#[derive(Debug, Clone)]
780pub struct ReconfigureBasinRequest {
781 pub basin: BasinName,
782 pub config: Option<BasinConfig>,
783 pub mask: Option<Vec<String>>,
784}
785
786impl ReconfigureBasinRequest {
787 pub fn new(basin: BasinName) -> Self {
789 Self {
790 basin,
791 config: None,
792 mask: None,
793 }
794 }
795
796 pub fn with_config(self, config: BasinConfig) -> Self {
798 Self {
799 config: Some(config),
800 ..self
801 }
802 }
803
804 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
806 Self {
807 mask: Some(mask.into()),
808 ..self
809 }
810 }
811}
812
813impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
814 fn from(value: ReconfigureBasinRequest) -> Self {
815 let ReconfigureBasinRequest {
816 basin,
817 config,
818 mask,
819 } = value;
820 Self {
821 basin: basin.0,
822 config: config.map(Into::into),
823 mask: mask.map(|paths| prost_types::FieldMask { paths }),
824 }
825 }
826}
827
828impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
829 type Error = ConvertError;
830 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
831 let api::ReconfigureBasinResponse { config } = value;
832 let config = config.ok_or("missing basin config")?;
833 config.try_into()
834 }
835}
836
837#[sync_docs]
838#[derive(Debug, Clone)]
839pub struct ReconfigureStreamRequest {
840 pub stream: String,
841 pub config: Option<StreamConfig>,
842 pub mask: Option<Vec<String>>,
843}
844
845impl ReconfigureStreamRequest {
846 pub fn new(stream: impl Into<String>) -> Self {
848 Self {
849 stream: stream.into(),
850 config: None,
851 mask: None,
852 }
853 }
854
855 pub fn with_config(self, config: StreamConfig) -> Self {
857 Self {
858 config: Some(config),
859 ..self
860 }
861 }
862
863 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
865 Self {
866 mask: Some(mask.into()),
867 ..self
868 }
869 }
870}
871
872impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
873 fn from(value: ReconfigureStreamRequest) -> Self {
874 let ReconfigureStreamRequest {
875 stream,
876 config,
877 mask,
878 } = value;
879 Self {
880 stream,
881 config: config.map(Into::into),
882 mask: mask.map(|paths| prost_types::FieldMask { paths }),
883 }
884 }
885}
886
887impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
888 type Error = ConvertError;
889 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
890 let api::ReconfigureStreamResponse { config } = value;
891 let config = config.ok_or("missing stream config")?;
892 config.try_into()
893 }
894}
895
896impl From<api::CheckTailResponse> for StreamPosition {
897 fn from(value: api::CheckTailResponse) -> Self {
898 let api::CheckTailResponse {
899 next_seq_num,
900 last_timestamp,
901 } = value;
902 StreamPosition {
903 seq_num: next_seq_num,
904 timestamp: last_timestamp,
905 }
906 }
907}
908
909#[derive(Debug, Clone, PartialEq, Eq)]
911pub struct StreamPosition {
912 pub seq_num: u64,
914 pub timestamp: u64,
917}
918
919#[sync_docs]
920#[derive(Debug, Clone, PartialEq, Eq)]
921pub struct Header {
922 pub name: Bytes,
923 pub value: Bytes,
924}
925
926impl Header {
927 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
929 Self {
930 name: name.into(),
931 value: value.into(),
932 }
933 }
934
935 pub fn from_value(value: impl Into<Bytes>) -> Self {
937 Self {
938 name: Bytes::new(),
939 value: value.into(),
940 }
941 }
942}
943
944impl From<Header> for api::Header {
945 fn from(value: Header) -> Self {
946 let Header { name, value } = value;
947 Self { name, value }
948 }
949}
950
951impl From<api::Header> for Header {
952 fn from(value: api::Header) -> Self {
953 let api::Header { name, value } = value;
954 Self { name, value }
955 }
956}
957
958#[derive(Debug, Clone, Default, PartialEq, Eq)]
962pub struct FencingToken(Bytes);
963
964impl FencingToken {
965 const MAX_BYTES: usize = 16;
966
967 pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
969 let bytes = bytes.into();
970 if bytes.len() > Self::MAX_BYTES {
971 Err(format!(
972 "Size of a fencing token cannot exceed {} bytes",
973 Self::MAX_BYTES
974 )
975 .into())
976 } else {
977 Ok(Self(bytes))
978 }
979 }
980
981 pub fn generate(n: usize) -> Result<Self, ConvertError> {
983 Self::new(
984 rand::rng()
985 .sample_iter(rand::distr::StandardUniform)
986 .take(n)
987 .collect::<Bytes>(),
988 )
989 }
990}
991
992impl AsRef<Bytes> for FencingToken {
993 fn as_ref(&self) -> &Bytes {
994 &self.0
995 }
996}
997
998impl AsRef<[u8]> for FencingToken {
999 fn as_ref(&self) -> &[u8] {
1000 &self.0
1001 }
1002}
1003
1004impl Deref for FencingToken {
1005 type Target = [u8];
1006
1007 fn deref(&self) -> &Self::Target {
1008 &self.0
1009 }
1010}
1011
1012impl From<FencingToken> for Bytes {
1013 fn from(value: FencingToken) -> Self {
1014 value.0
1015 }
1016}
1017
1018impl From<FencingToken> for Vec<u8> {
1019 fn from(value: FencingToken) -> Self {
1020 value.0.into()
1021 }
1022}
1023
1024impl TryFrom<Bytes> for FencingToken {
1025 type Error = ConvertError;
1026
1027 fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1028 Self::new(value)
1029 }
1030}
1031
1032impl TryFrom<Vec<u8>> for FencingToken {
1033 type Error = ConvertError;
1034
1035 fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
1036 Self::new(value)
1037 }
1038}
1039
1040#[derive(Debug, Clone)]
1042pub enum Command {
1043 Fence {
1048 fencing_token: FencingToken,
1052 },
1053 Trim {
1058 seq_num: u64,
1063 },
1064}
1065
1066#[derive(Debug, Clone)]
1072pub struct CommandRecord {
1073 pub command: Command,
1075 pub timestamp: Option<u64>,
1077}
1078
1079impl CommandRecord {
1080 const FENCE: &[u8] = b"fence";
1081 const TRIM: &[u8] = b"trim";
1082
1083 pub fn fence(fencing_token: FencingToken) -> Self {
1085 Self {
1086 command: Command::Fence { fencing_token },
1087 timestamp: None,
1088 }
1089 }
1090
1091 pub fn trim(seq_num: impl Into<u64>) -> Self {
1093 Self {
1094 command: Command::Trim {
1095 seq_num: seq_num.into(),
1096 },
1097 timestamp: None,
1098 }
1099 }
1100
1101 pub fn with_timestamp(self, timestamp: u64) -> Self {
1103 Self {
1104 timestamp: Some(timestamp),
1105 ..self
1106 }
1107 }
1108}
1109
1110#[sync_docs]
1111#[derive(Debug, Clone, PartialEq, Eq)]
1112pub struct AppendRecord {
1113 timestamp: Option<u64>,
1114 headers: Vec<Header>,
1115 body: Bytes,
1116 #[cfg(test)]
1117 max_bytes: u64,
1118}
1119
1120metered_impl!(AppendRecord);
1121
1122impl AppendRecord {
1123 const MAX_BYTES: u64 = MIB_BYTES;
1124
1125 fn validated(self) -> Result<Self, ConvertError> {
1126 #[cfg(test)]
1127 let max_bytes = self.max_bytes;
1128 #[cfg(not(test))]
1129 let max_bytes = Self::MAX_BYTES;
1130
1131 if self.metered_bytes() > max_bytes {
1132 Err("AppendRecord should have metered size less than 1 MiB".into())
1133 } else {
1134 Ok(self)
1135 }
1136 }
1137
1138 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1140 Self {
1141 timestamp: None,
1142 headers: Vec::new(),
1143 body: body.into(),
1144 #[cfg(test)]
1145 max_bytes: Self::MAX_BYTES,
1146 }
1147 .validated()
1148 }
1149
1150 #[cfg(test)]
1151 pub(crate) fn with_max_bytes(
1152 max_bytes: u64,
1153 body: impl Into<Bytes>,
1154 ) -> Result<Self, ConvertError> {
1155 Self {
1156 timestamp: None,
1157 headers: Vec::new(),
1158 body: body.into(),
1159 max_bytes,
1160 }
1161 .validated()
1162 }
1163
1164 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1166 Self {
1167 headers: headers.into(),
1168 ..self
1169 }
1170 .validated()
1171 }
1172
1173 pub fn with_timestamp(self, timestamp: u64) -> Self {
1175 Self {
1176 timestamp: Some(timestamp),
1177 ..self
1178 }
1179 }
1180
1181 pub fn body(&self) -> &[u8] {
1183 &self.body
1184 }
1185
1186 pub fn headers(&self) -> &[Header] {
1188 &self.headers
1189 }
1190
1191 pub fn timestamp(&self) -> Option<u64> {
1193 self.timestamp
1194 }
1195
1196 pub fn into_parts(self) -> AppendRecordParts {
1198 AppendRecordParts {
1199 timestamp: self.timestamp,
1200 headers: self.headers,
1201 body: self.body,
1202 }
1203 }
1204
1205 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1207 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1208 if let Some(timestamp) = parts.timestamp {
1209 Ok(record.with_timestamp(timestamp))
1210 } else {
1211 Ok(record)
1212 }
1213 }
1214}
1215
1216impl From<AppendRecord> for api::AppendRecord {
1217 fn from(value: AppendRecord) -> Self {
1218 Self {
1219 timestamp: value.timestamp,
1220 headers: value.headers.into_iter().map(Into::into).collect(),
1221 body: value.body,
1222 }
1223 }
1224}
1225
1226impl From<CommandRecord> for AppendRecord {
1227 fn from(value: CommandRecord) -> Self {
1228 let (header_value, body) = match value.command {
1229 Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1230 Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1231 };
1232 AppendRecordParts {
1233 timestamp: value.timestamp,
1234 headers: vec![Header::from_value(header_value)],
1235 body: body.into(),
1236 }
1237 .try_into()
1238 .expect("command record is a valid append record")
1239 }
1240}
1241
1242#[sync_docs(AppendRecordParts = "AppendRecord")]
1243#[derive(Debug, Clone)]
1244pub struct AppendRecordParts {
1245 pub timestamp: Option<u64>,
1246 pub headers: Vec<Header>,
1247 pub body: Bytes,
1248}
1249
1250impl From<AppendRecord> for AppendRecordParts {
1251 fn from(value: AppendRecord) -> Self {
1252 value.into_parts()
1253 }
1254}
1255
1256impl TryFrom<AppendRecordParts> for AppendRecord {
1257 type Error = ConvertError;
1258
1259 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1260 Self::try_from_parts(value)
1261 }
1262}
1263
1264#[derive(Debug, Clone)]
1266pub struct AppendRecordBatch {
1267 records: Vec<AppendRecord>,
1268 metered_bytes: u64,
1269 max_capacity: usize,
1270 #[cfg(test)]
1271 max_bytes: u64,
1272}
1273
1274impl PartialEq for AppendRecordBatch {
1275 fn eq(&self, other: &Self) -> bool {
1276 if self.records.eq(&other.records) {
1277 assert_eq!(self.metered_bytes, other.metered_bytes);
1278 true
1279 } else {
1280 false
1281 }
1282 }
1283}
1284
1285impl Eq for AppendRecordBatch {}
1286
1287impl Default for AppendRecordBatch {
1288 fn default() -> Self {
1289 Self::new()
1290 }
1291}
1292
1293impl AppendRecordBatch {
1294 pub const MAX_CAPACITY: usize = 1000;
1298
1299 pub const MAX_BYTES: u64 = MIB_BYTES;
1301
1302 pub fn new() -> Self {
1304 Self::with_max_capacity(Self::MAX_CAPACITY)
1305 }
1306
1307 pub fn with_max_capacity(max_capacity: usize) -> Self {
1311 assert!(
1312 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1313 "Batch capacity must be between 1 and 1000"
1314 );
1315
1316 Self {
1317 records: Vec::with_capacity(max_capacity),
1318 metered_bytes: 0,
1319 max_capacity,
1320 #[cfg(test)]
1321 max_bytes: Self::MAX_BYTES,
1322 }
1323 }
1324
1325 #[cfg(test)]
1326 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1327 #[cfg(test)]
1328 assert!(
1329 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1330 "Batch size must be between 1 byte and 1 MiB"
1331 );
1332
1333 Self {
1334 max_bytes,
1335 ..Self::with_max_capacity(max_capacity)
1336 }
1337 }
1338
1339 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1345 where
1346 R: Into<AppendRecord>,
1347 T: IntoIterator<Item = R>,
1348 {
1349 let mut records = Self::new();
1350 let mut pending = Vec::new();
1351
1352 let mut iter = iter.into_iter();
1353
1354 for record in iter.by_ref() {
1355 if let Err(record) = records.push(record) {
1356 pending.push(record);
1357 break;
1358 }
1359 }
1360
1361 if pending.is_empty() {
1362 Ok(records)
1363 } else {
1364 pending.extend(iter.map(Into::into));
1365 Err((records, pending))
1366 }
1367 }
1368
1369 pub fn is_empty(&self) -> bool {
1371 if self.records.is_empty() {
1372 assert_eq!(self.metered_bytes, 0);
1373 true
1374 } else {
1375 false
1376 }
1377 }
1378
1379 pub fn len(&self) -> usize {
1381 self.records.len()
1382 }
1383
1384 #[cfg(test)]
1385 fn max_bytes(&self) -> u64 {
1386 self.max_bytes
1387 }
1388
1389 #[cfg(not(test))]
1390 fn max_bytes(&self) -> u64 {
1391 Self::MAX_BYTES
1392 }
1393
1394 pub fn is_full(&self) -> bool {
1396 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1397 }
1398
1399 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1401 assert!(self.records.len() <= self.max_capacity);
1402 assert!(self.metered_bytes <= self.max_bytes());
1403
1404 let record = record.into();
1405 let record_size = record.metered_bytes();
1406 if self.records.len() >= self.max_capacity
1407 || self.metered_bytes + record_size > self.max_bytes()
1408 {
1409 Err(record)
1410 } else {
1411 self.records.push(record);
1412 self.metered_bytes += record_size;
1413 Ok(())
1414 }
1415 }
1416}
1417
1418impl MeteredBytes for AppendRecordBatch {
1419 fn metered_bytes(&self) -> u64 {
1420 self.metered_bytes
1421 }
1422}
1423
1424impl IntoIterator for AppendRecordBatch {
1425 type Item = AppendRecord;
1426 type IntoIter = std::vec::IntoIter<Self::Item>;
1427
1428 fn into_iter(self) -> Self::IntoIter {
1429 self.records.into_iter()
1430 }
1431}
1432
1433impl<'a> IntoIterator for &'a AppendRecordBatch {
1434 type Item = &'a AppendRecord;
1435 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1436
1437 fn into_iter(self) -> Self::IntoIter {
1438 self.records.iter()
1439 }
1440}
1441
1442impl AsRef<[AppendRecord]> for AppendRecordBatch {
1443 fn as_ref(&self) -> &[AppendRecord] {
1444 &self.records
1445 }
1446}
1447
1448#[sync_docs]
1449#[derive(Debug, Default, Clone)]
1450pub struct AppendInput {
1451 pub records: AppendRecordBatch,
1452 pub match_seq_num: Option<u64>,
1453 pub fencing_token: Option<FencingToken>,
1454}
1455
1456impl MeteredBytes for AppendInput {
1457 fn metered_bytes(&self) -> u64 {
1458 self.records.metered_bytes()
1459 }
1460}
1461
1462impl AppendInput {
1463 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1465 Self {
1466 records: records.into(),
1467 match_seq_num: None,
1468 fencing_token: None,
1469 }
1470 }
1471
1472 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1474 Self {
1475 match_seq_num: Some(match_seq_num.into()),
1476 ..self
1477 }
1478 }
1479
1480 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1482 Self {
1483 fencing_token: Some(fencing_token),
1484 ..self
1485 }
1486 }
1487
1488 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1489 let Self {
1490 records,
1491 match_seq_num,
1492 fencing_token,
1493 } = self;
1494
1495 api::AppendInput {
1496 stream: stream.into(),
1497 records: records.into_iter().map(Into::into).collect(),
1498 match_seq_num,
1499 fencing_token: fencing_token.map(|f| f.0),
1500 }
1501 }
1502}
1503
1504#[derive(Debug, Clone)]
1506pub struct AppendAck {
1507 pub start: StreamPosition,
1509 pub end: StreamPosition,
1514 pub tail: StreamPosition,
1518}
1519
1520impl From<api::AppendOutput> for AppendAck {
1521 fn from(value: api::AppendOutput) -> Self {
1522 let api::AppendOutput {
1523 start_seq_num,
1524 start_timestamp,
1525 end_seq_num,
1526 end_timestamp,
1527 next_seq_num,
1528 last_timestamp,
1529 } = value;
1530 let start = StreamPosition {
1531 seq_num: start_seq_num,
1532 timestamp: start_timestamp,
1533 };
1534 let end = StreamPosition {
1535 seq_num: end_seq_num,
1536 timestamp: end_timestamp,
1537 };
1538 let tail = StreamPosition {
1539 seq_num: next_seq_num,
1540 timestamp: last_timestamp,
1541 };
1542 Self { start, end, tail }
1543 }
1544}
1545
1546impl TryFrom<api::AppendResponse> for AppendAck {
1547 type Error = ConvertError;
1548 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1549 let api::AppendResponse { output } = value;
1550 let output = output.ok_or("missing append output")?;
1551 Ok(output.into())
1552 }
1553}
1554
1555impl TryFrom<api::AppendSessionResponse> for AppendAck {
1556 type Error = ConvertError;
1557 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1558 let api::AppendSessionResponse { output } = value;
1559 let output = output.ok_or("missing append output")?;
1560 Ok(output.into())
1561 }
1562}
1563
1564#[sync_docs]
1565#[derive(Debug, Clone, Default)]
1566pub struct ReadLimit {
1567 pub count: Option<u64>,
1568 pub bytes: Option<u64>,
1569}
1570
1571impl ReadLimit {
1572 pub fn new() -> Self {
1574 Self::default()
1575 }
1576
1577 pub fn with_count(self, count: u64) -> Self {
1579 Self {
1580 count: Some(count),
1581 ..self
1582 }
1583 }
1584
1585 pub fn with_bytes(self, bytes: u64) -> Self {
1587 Self {
1588 bytes: Some(bytes),
1589 ..self
1590 }
1591 }
1592}
1593
1594#[derive(Debug, Clone)]
1596pub enum ReadStart {
1597 SeqNum(u64),
1599 Timestamp(u64),
1601 TailOffset(u64),
1603}
1604
1605impl Default for ReadStart {
1606 fn default() -> Self {
1607 Self::SeqNum(0)
1608 }
1609}
1610
1611impl From<ReadStart> for api::read_request::Start {
1612 fn from(start: ReadStart) -> Self {
1613 match start {
1614 ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1615 ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1616 ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1617 }
1618 }
1619}
1620
1621impl From<ReadStart> for api::read_session_request::Start {
1622 fn from(start: ReadStart) -> Self {
1623 match start {
1624 ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1625 ReadStart::Timestamp(timestamp) => {
1626 api::read_session_request::Start::Timestamp(timestamp)
1627 }
1628 ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1629 }
1630 }
1631}
1632
1633#[sync_docs]
1634#[derive(Debug, Clone, Default)]
1635pub struct ReadRequest {
1636 pub start: ReadStart,
1637 pub limit: ReadLimit,
1638}
1639
1640impl ReadRequest {
1641 pub fn new(start: ReadStart) -> Self {
1643 Self {
1644 start,
1645 ..Default::default()
1646 }
1647 }
1648
1649 pub fn with_limit(self, limit: ReadLimit) -> Self {
1651 Self { limit, ..self }
1652 }
1653}
1654
1655impl ReadRequest {
1656 pub(crate) fn try_into_api_type(
1657 self,
1658 stream: impl Into<String>,
1659 ) -> Result<api::ReadRequest, ConvertError> {
1660 let Self { start, limit } = self;
1661
1662 let limit = if limit.count > Some(1000) {
1663 Err("read limit: count must not exceed 1000 for unary request")
1664 } else if limit.bytes > Some(MIB_BYTES) {
1665 Err("read limit: bytes must not exceed 1MiB for unary request")
1666 } else {
1667 Ok(api::ReadLimit {
1668 count: limit.count,
1669 bytes: limit.bytes,
1670 })
1671 }?;
1672
1673 Ok(api::ReadRequest {
1674 stream: stream.into(),
1675 start: Some(start.into()),
1676 limit: Some(limit),
1677 })
1678 }
1679}
1680
1681#[sync_docs]
1682#[derive(Debug, Clone)]
1683pub struct SequencedRecord {
1684 pub seq_num: u64,
1685 pub timestamp: u64,
1686 pub headers: Vec<Header>,
1687 pub body: Bytes,
1688}
1689
1690metered_impl!(SequencedRecord);
1691
1692impl From<api::SequencedRecord> for SequencedRecord {
1693 fn from(value: api::SequencedRecord) -> Self {
1694 let api::SequencedRecord {
1695 seq_num,
1696 timestamp,
1697 headers,
1698 body,
1699 } = value;
1700 Self {
1701 seq_num,
1702 timestamp,
1703 headers: headers.into_iter().map(Into::into).collect(),
1704 body,
1705 }
1706 }
1707}
1708
1709impl SequencedRecord {
1710 pub fn as_command_record(&self) -> Option<CommandRecord> {
1712 if self.headers.len() != 1 {
1713 return None;
1714 }
1715
1716 let header = self.headers.first().expect("pre-validated length");
1717
1718 if !header.name.is_empty() {
1719 return None;
1720 }
1721
1722 match header.value.as_ref() {
1723 CommandRecord::FENCE => {
1724 let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1725 Some(CommandRecord {
1726 command: Command::Fence { fencing_token },
1727 timestamp: Some(self.timestamp),
1728 })
1729 }
1730 CommandRecord::TRIM => {
1731 let body: &[u8] = &self.body;
1732 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1733 Some(CommandRecord {
1734 command: Command::Trim { seq_num },
1735 timestamp: Some(self.timestamp),
1736 })
1737 }
1738 _ => None,
1739 }
1740 }
1741}
1742
1743#[sync_docs]
1744#[derive(Debug, Clone)]
1745pub struct SequencedRecordBatch {
1746 pub records: Vec<SequencedRecord>,
1747}
1748
1749impl MeteredBytes for SequencedRecordBatch {
1750 fn metered_bytes(&self) -> u64 {
1751 self.records.metered_bytes()
1752 }
1753}
1754
1755impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1756 fn from(value: api::SequencedRecordBatch) -> Self {
1757 let api::SequencedRecordBatch { records } = value;
1758 Self {
1759 records: records.into_iter().map(Into::into).collect(),
1760 }
1761 }
1762}
1763
1764#[sync_docs(ReadOutput = "Output")]
1765#[derive(Debug, Clone)]
1766pub enum ReadOutput {
1767 Batch(SequencedRecordBatch),
1768 NextSeqNum(u64),
1769}
1770
1771impl From<api::read_output::Output> for ReadOutput {
1772 fn from(value: api::read_output::Output) -> Self {
1773 match value {
1774 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1775 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1776 }
1777 }
1778}
1779
1780impl TryFrom<api::ReadOutput> for ReadOutput {
1781 type Error = ConvertError;
1782 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1783 let api::ReadOutput { output } = value;
1784 let output = output.ok_or("missing read output")?;
1785 Ok(output.into())
1786 }
1787}
1788
1789impl TryFrom<api::ReadResponse> for ReadOutput {
1790 type Error = ConvertError;
1791 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1792 let api::ReadResponse { output } = value;
1793 let output = output.ok_or("missing output in read response")?;
1794 output.try_into()
1795 }
1796}
1797
1798#[sync_docs]
1799#[derive(Debug, Clone, Default)]
1800pub struct ReadSessionRequest {
1801 pub start: ReadStart,
1802 pub limit: ReadLimit,
1803}
1804
1805impl ReadSessionRequest {
1806 pub fn new(start: ReadStart) -> Self {
1808 Self {
1809 start,
1810 ..Default::default()
1811 }
1812 }
1813
1814 pub fn with_limit(self, limit: ReadLimit) -> Self {
1816 Self { limit, ..self }
1817 }
1818
1819 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1820 let Self { start, limit } = self;
1821 api::ReadSessionRequest {
1822 stream: stream.into(),
1823 start: Some(start.into()),
1824 limit: Some(api::ReadLimit {
1825 count: limit.count,
1826 bytes: limit.bytes,
1827 }),
1828 heartbeats: false,
1829 }
1830 }
1831}
1832
1833impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1834 type Error = ConvertError;
1835 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1836 let api::ReadSessionResponse { output } = value;
1837 let output = output.ok_or("missing output in read session response")?;
1838 output.try_into()
1839 }
1840}
1841
1842#[derive(Debug, Clone)]
1847pub struct BasinName(String);
1848
1849impl AsRef<str> for BasinName {
1850 fn as_ref(&self) -> &str {
1851 &self.0
1852 }
1853}
1854
1855impl Deref for BasinName {
1856 type Target = str;
1857 fn deref(&self) -> &Self::Target {
1858 &self.0
1859 }
1860}
1861
1862impl TryFrom<String> for BasinName {
1863 type Error = ConvertError;
1864
1865 fn try_from(name: String) -> Result<Self, Self::Error> {
1866 if name.len() < 8 || name.len() > 48 {
1867 return Err("Basin name must be between 8 and 48 characters in length".into());
1868 }
1869
1870 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1871 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1872 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1873 .expect("Failed to compile basin name regex")
1874 });
1875
1876 if !regex.is_match(&name) {
1877 return Err(
1878 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1879 It cannot begin or end with a hyphen."
1880 .into(),
1881 );
1882 }
1883
1884 Ok(Self(name))
1885 }
1886}
1887
1888impl FromStr for BasinName {
1889 type Err = ConvertError;
1890
1891 fn from_str(s: &str) -> Result<Self, Self::Err> {
1892 s.to_string().try_into()
1893 }
1894}
1895
1896impl std::fmt::Display for BasinName {
1897 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1898 f.write_str(&self.0)
1899 }
1900}
1901
1902#[derive(Debug, Clone)]
1905pub struct AccessTokenId(String);
1906
1907impl AsRef<str> for AccessTokenId {
1908 fn as_ref(&self) -> &str {
1909 &self.0
1910 }
1911}
1912
1913impl Deref for AccessTokenId {
1914 type Target = str;
1915
1916 fn deref(&self) -> &Self::Target {
1917 &self.0
1918 }
1919}
1920
1921impl TryFrom<String> for AccessTokenId {
1922 type Error = ConvertError;
1923
1924 fn try_from(name: String) -> Result<Self, Self::Error> {
1925 if name.is_empty() {
1926 return Err("Access token ID must not be empty".into());
1927 }
1928
1929 if name.len() > 50 {
1930 return Err("Access token ID must not exceed 50 characters".into());
1931 }
1932
1933 Ok(Self(name))
1934 }
1935}
1936
1937impl From<AccessTokenId> for String {
1938 fn from(value: AccessTokenId) -> Self {
1939 value.0
1940 }
1941}
1942
1943impl FromStr for AccessTokenId {
1944 type Err = ConvertError;
1945
1946 fn from_str(s: &str) -> Result<Self, Self::Err> {
1947 s.to_string().try_into()
1948 }
1949}
1950
1951impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1952 fn from(value: AccessTokenInfo) -> Self {
1953 Self {
1954 info: Some(value.into()),
1955 }
1956 }
1957}
1958
1959#[sync_docs]
1960#[derive(Debug, Clone)]
1961pub struct AccessTokenInfo {
1962 pub id: AccessTokenId,
1963 pub expires_at: Option<u32>,
1964 pub auto_prefix_streams: bool,
1965 pub scope: Option<AccessTokenScope>,
1966}
1967
1968impl AccessTokenInfo {
1969 pub fn new(id: AccessTokenId) -> Self {
1971 Self {
1972 id,
1973 expires_at: None,
1974 auto_prefix_streams: false,
1975 scope: None,
1976 }
1977 }
1978
1979 pub fn with_expires_at(self, expires_at: u32) -> Self {
1981 Self {
1982 expires_at: Some(expires_at),
1983 ..self
1984 }
1985 }
1986
1987 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1989 Self {
1990 auto_prefix_streams,
1991 ..self
1992 }
1993 }
1994
1995 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1997 Self {
1998 scope: Some(scope),
1999 ..self
2000 }
2001 }
2002}
2003
2004impl From<AccessTokenInfo> for api::AccessTokenInfo {
2005 fn from(value: AccessTokenInfo) -> Self {
2006 let AccessTokenInfo {
2007 id,
2008 expires_at,
2009 auto_prefix_streams,
2010 scope,
2011 } = value;
2012 Self {
2013 id: id.into(),
2014 expires_at,
2015 auto_prefix_streams,
2016 scope: scope.map(Into::into),
2017 }
2018 }
2019}
2020
2021impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2022 type Error = ConvertError;
2023 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2024 let api::AccessTokenInfo {
2025 id,
2026 expires_at,
2027 auto_prefix_streams,
2028 scope,
2029 } = value;
2030 Ok(Self {
2031 id: id.try_into()?,
2032 expires_at,
2033 auto_prefix_streams,
2034 scope: scope.map(TryInto::try_into).transpose()?,
2035 })
2036 }
2037}
2038
2039#[sync_docs]
2040#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2041pub enum Operation {
2042 Unspecified,
2043 ListBasins,
2044 CreateBasin,
2045 DeleteBasin,
2046 ReconfigureBasin,
2047 GetBasinConfig,
2048 IssueAccessToken,
2049 RevokeAccessToken,
2050 ListAccessTokens,
2051 ListStreams,
2052 CreateStream,
2053 DeleteStream,
2054 GetStreamConfig,
2055 ReconfigureStream,
2056 CheckTail,
2057 Append,
2058 Read,
2059 Trim,
2060 Fence,
2061}
2062
2063impl FromStr for Operation {
2064 type Err = ConvertError;
2065
2066 fn from_str(s: &str) -> Result<Self, Self::Err> {
2067 match s.to_lowercase().as_str() {
2068 "unspecified" => Ok(Self::Unspecified),
2069 "list-basins" => Ok(Self::ListBasins),
2070 "create-basin" => Ok(Self::CreateBasin),
2071 "delete-basin" => Ok(Self::DeleteBasin),
2072 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2073 "get-basin-config" => Ok(Self::GetBasinConfig),
2074 "issue-access-token" => Ok(Self::IssueAccessToken),
2075 "revoke-access-token" => Ok(Self::RevokeAccessToken),
2076 "list-access-tokens" => Ok(Self::ListAccessTokens),
2077 "list-streams" => Ok(Self::ListStreams),
2078 "create-stream" => Ok(Self::CreateStream),
2079 "delete-stream" => Ok(Self::DeleteStream),
2080 "get-stream-config" => Ok(Self::GetStreamConfig),
2081 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2082 "check-tail" => Ok(Self::CheckTail),
2083 "append" => Ok(Self::Append),
2084 "read" => Ok(Self::Read),
2085 "trim" => Ok(Self::Trim),
2086 "fence" => Ok(Self::Fence),
2087 _ => Err("invalid operation".into()),
2088 }
2089 }
2090}
2091
2092impl From<Operation> for i32 {
2093 fn from(value: Operation) -> Self {
2094 api::Operation::from(value).into()
2095 }
2096}
2097
2098impl TryFrom<i32> for Operation {
2099 type Error = ConvertError;
2100 fn try_from(value: i32) -> Result<Self, Self::Error> {
2101 api::Operation::try_from(value)
2102 .map_err(|_| "invalid operation".into())
2103 .map(Into::into)
2104 }
2105}
2106
2107impl From<Operation> for api::Operation {
2108 fn from(value: Operation) -> Self {
2109 match value {
2110 Operation::Unspecified => Self::Unspecified,
2111 Operation::ListBasins => Self::ListBasins,
2112 Operation::CreateBasin => Self::CreateBasin,
2113 Operation::DeleteBasin => Self::DeleteBasin,
2114 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2115 Operation::GetBasinConfig => Self::GetBasinConfig,
2116 Operation::IssueAccessToken => Self::IssueAccessToken,
2117 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2118 Operation::ListAccessTokens => Self::ListAccessTokens,
2119 Operation::ListStreams => Self::ListStreams,
2120 Operation::CreateStream => Self::CreateStream,
2121 Operation::DeleteStream => Self::DeleteStream,
2122 Operation::GetStreamConfig => Self::GetStreamConfig,
2123 Operation::ReconfigureStream => Self::ReconfigureStream,
2124 Operation::CheckTail => Self::CheckTail,
2125 Operation::Append => Self::Append,
2126 Operation::Read => Self::Read,
2127 Operation::Trim => Self::Trim,
2128 Operation::Fence => Self::Fence,
2129 }
2130 }
2131}
2132
2133impl From<api::Operation> for Operation {
2134 fn from(value: api::Operation) -> Self {
2135 match value {
2136 api::Operation::Unspecified => Self::Unspecified,
2137 api::Operation::ListBasins => Self::ListBasins,
2138 api::Operation::CreateBasin => Self::CreateBasin,
2139 api::Operation::DeleteBasin => Self::DeleteBasin,
2140 api::Operation::ReconfigureBasin => Self::ReconfigureBasin,
2141 api::Operation::GetBasinConfig => Self::GetBasinConfig,
2142 api::Operation::IssueAccessToken => Self::IssueAccessToken,
2143 api::Operation::RevokeAccessToken => Self::RevokeAccessToken,
2144 api::Operation::ListAccessTokens => Self::ListAccessTokens,
2145 api::Operation::ListStreams => Self::ListStreams,
2146 api::Operation::CreateStream => Self::CreateStream,
2147 api::Operation::DeleteStream => Self::DeleteStream,
2148 api::Operation::GetStreamConfig => Self::GetStreamConfig,
2149 api::Operation::ReconfigureStream => Self::ReconfigureStream,
2150 api::Operation::CheckTail => Self::CheckTail,
2151 api::Operation::Append => Self::Append,
2152 api::Operation::Read => Self::Read,
2153 api::Operation::Trim => Self::Trim,
2154 api::Operation::Fence => Self::Fence,
2155 }
2156 }
2157}
2158
2159#[sync_docs]
2160#[derive(Debug, Clone, Default)]
2161pub struct AccessTokenScope {
2162 pub basins: Option<ResourceSet>,
2163 pub streams: Option<ResourceSet>,
2164 pub access_tokens: Option<ResourceSet>,
2165 pub op_groups: Option<PermittedOperationGroups>,
2166 pub ops: HashSet<Operation>,
2167}
2168
2169impl AccessTokenScope {
2170 pub fn new() -> Self {
2172 Self::default()
2173 }
2174
2175 pub fn with_basins(self, basins: ResourceSet) -> Self {
2177 Self {
2178 basins: Some(basins),
2179 ..self
2180 }
2181 }
2182
2183 pub fn with_streams(self, streams: ResourceSet) -> Self {
2185 Self {
2186 streams: Some(streams),
2187 ..self
2188 }
2189 }
2190
2191 pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2193 Self {
2194 access_tokens: Some(access_tokens),
2195 ..self
2196 }
2197 }
2198
2199 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2201 Self {
2202 op_groups: Some(op_groups),
2203 ..self
2204 }
2205 }
2206
2207 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2209 Self {
2210 ops: ops.into_iter().collect(),
2211 ..self
2212 }
2213 }
2214
2215 pub fn with_op(self, op: Operation) -> Self {
2217 let mut ops = self.ops;
2218 ops.insert(op);
2219 Self { ops, ..self }
2220 }
2221}
2222
2223impl From<AccessTokenScope> for api::AccessTokenScope {
2224 fn from(value: AccessTokenScope) -> Self {
2225 let AccessTokenScope {
2226 basins,
2227 streams,
2228 access_tokens,
2229 op_groups,
2230 ops,
2231 } = value;
2232 Self {
2233 basins: basins.map(Into::into),
2234 streams: streams.map(Into::into),
2235 access_tokens: access_tokens.map(Into::into),
2236 op_groups: op_groups.map(Into::into),
2237 ops: ops.into_iter().map(Into::into).collect(),
2238 }
2239 }
2240}
2241
2242impl TryFrom<api::AccessTokenScope> for AccessTokenScope {
2243 type Error = ConvertError;
2244 fn try_from(value: api::AccessTokenScope) -> Result<Self, Self::Error> {
2245 let api::AccessTokenScope {
2246 basins,
2247 streams,
2248 access_tokens,
2249 op_groups,
2250 ops,
2251 } = value;
2252 Ok(Self {
2253 basins: basins.and_then(|set| set.matching.map(Into::into)),
2254 streams: streams.and_then(|set| set.matching.map(Into::into)),
2255 access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2256 op_groups: op_groups.map(Into::into),
2257 ops: ops
2258 .into_iter()
2259 .map(Operation::try_from)
2260 .collect::<Result<HashSet<_>, _>>()?,
2261 })
2262 }
2263}
2264
2265impl From<ResourceSet> for api::ResourceSet {
2266 fn from(value: ResourceSet) -> Self {
2267 Self {
2268 matching: Some(value.into()),
2269 }
2270 }
2271}
2272
2273#[sync_docs(ResourceSet = "Matching")]
2274#[derive(Debug, Clone)]
2275pub enum ResourceSet {
2276 Exact(String),
2277 Prefix(String),
2278}
2279
2280impl From<ResourceSet> for api::resource_set::Matching {
2281 fn from(value: ResourceSet) -> Self {
2282 match value {
2283 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2284 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2285 }
2286 }
2287}
2288
2289impl From<api::resource_set::Matching> for ResourceSet {
2290 fn from(value: api::resource_set::Matching) -> Self {
2291 match value {
2292 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2293 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2294 }
2295 }
2296}
2297
2298#[sync_docs]
2299#[derive(Debug, Clone, Default)]
2300pub struct PermittedOperationGroups {
2301 pub account: Option<ReadWritePermissions>,
2302 pub basin: Option<ReadWritePermissions>,
2303 pub stream: Option<ReadWritePermissions>,
2304}
2305
2306impl PermittedOperationGroups {
2307 pub fn new() -> Self {
2309 Self::default()
2310 }
2311
2312 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2314 Self {
2315 account: Some(account),
2316 ..self
2317 }
2318 }
2319
2320 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2322 Self {
2323 basin: Some(basin),
2324 ..self
2325 }
2326 }
2327
2328 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2330 Self {
2331 stream: Some(stream),
2332 ..self
2333 }
2334 }
2335}
2336
2337impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2338 fn from(value: PermittedOperationGroups) -> Self {
2339 let PermittedOperationGroups {
2340 account,
2341 basin,
2342 stream,
2343 } = value;
2344 Self {
2345 account: account.map(Into::into),
2346 basin: basin.map(Into::into),
2347 stream: stream.map(Into::into),
2348 }
2349 }
2350}
2351
2352impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2353 fn from(value: api::PermittedOperationGroups) -> Self {
2354 let api::PermittedOperationGroups {
2355 account,
2356 basin,
2357 stream,
2358 } = value;
2359 Self {
2360 account: account.map(Into::into),
2361 basin: basin.map(Into::into),
2362 stream: stream.map(Into::into),
2363 }
2364 }
2365}
2366
2367#[sync_docs]
2368#[derive(Debug, Clone, Default)]
2369pub struct ReadWritePermissions {
2370 pub read: bool,
2371 pub write: bool,
2372}
2373
2374impl ReadWritePermissions {
2375 pub fn new() -> Self {
2377 Self::default()
2378 }
2379
2380 pub fn with_read(self, read: bool) -> Self {
2382 Self { read, ..self }
2383 }
2384
2385 pub fn with_write(self, write: bool) -> Self {
2387 Self { write, ..self }
2388 }
2389}
2390
2391impl From<ReadWritePermissions> for api::ReadWritePermissions {
2392 fn from(value: ReadWritePermissions) -> Self {
2393 let ReadWritePermissions { read, write } = value;
2394 Self { read, write }
2395 }
2396}
2397
2398impl From<api::ReadWritePermissions> for ReadWritePermissions {
2399 fn from(value: api::ReadWritePermissions) -> Self {
2400 let api::ReadWritePermissions { read, write } = value;
2401 Self { read, write }
2402 }
2403}
2404
2405impl From<api::IssueAccessTokenResponse> for String {
2406 fn from(value: api::IssueAccessTokenResponse) -> Self {
2407 value.access_token
2408 }
2409}
2410
2411impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2412 fn from(value: AccessTokenId) -> Self {
2413 Self { id: value.into() }
2414 }
2415}
2416
2417impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2418 type Error = ConvertError;
2419 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2420 let token_info = value.info.ok_or("access token info is missing")?;
2421 token_info.try_into()
2422 }
2423}
2424
2425#[sync_docs]
2426#[derive(Debug, Clone, Default)]
2427pub struct ListAccessTokensRequest {
2428 pub prefix: String,
2429 pub start_after: String,
2430 pub limit: Option<usize>,
2431}
2432
2433impl ListAccessTokensRequest {
2434 pub fn new() -> Self {
2436 Self::default()
2437 }
2438
2439 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2441 Self {
2442 prefix: prefix.into(),
2443 ..self
2444 }
2445 }
2446
2447 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2449 Self {
2450 start_after: start_after.into(),
2451 ..self
2452 }
2453 }
2454
2455 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2457 Self {
2458 limit: limit.into(),
2459 ..self
2460 }
2461 }
2462}
2463
2464impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2465 type Error = ConvertError;
2466 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2467 let ListAccessTokensRequest {
2468 prefix,
2469 start_after,
2470 limit,
2471 } = value;
2472 Ok(Self {
2473 prefix,
2474 start_after,
2475 limit: limit
2476 .map(TryInto::try_into)
2477 .transpose()
2478 .map_err(|_| "request limit does not fit into u64 bounds")?,
2479 })
2480 }
2481}
2482
2483#[sync_docs]
2484#[derive(Debug, Clone)]
2485pub struct ListAccessTokensResponse {
2486 pub access_tokens: Vec<AccessTokenInfo>,
2487 pub has_more: bool,
2488}
2489
2490impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2491 type Error = ConvertError;
2492 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2493 let api::ListAccessTokensResponse {
2494 access_tokens,
2495 has_more,
2496 } = value;
2497 let access_tokens = access_tokens
2498 .into_iter()
2499 .map(TryInto::try_into)
2500 .collect::<Result<Vec<_>, _>>()?;
2501 Ok(Self {
2502 access_tokens,
2503 has_more,
2504 })
2505 }
2506}