1use std::{
4 collections::HashSet,
5 ops::{Deref, RangeTo},
6 str::FromStr,
7 sync::OnceLock,
8 time::Duration,
9};
10
11use bytes::Bytes;
12use rand::Rng;
13use regex::Regex;
14use sync_docs::sync_docs;
15
16use crate::api;
17
18pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
19pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
20
21#[derive(Debug, Clone, thiserror::Error)]
23#[error("{0}")]
24pub struct ConvertError(String);
25
26impl<T: Into<String>> From<T> for ConvertError {
27 fn from(value: T) -> Self {
28 Self(value.into())
29 }
30}
31
32pub trait MeteredBytes {
40 fn metered_bytes(&self) -> u64;
42}
43
44impl<T: MeteredBytes> MeteredBytes for Vec<T> {
45 fn metered_bytes(&self) -> u64 {
46 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
47 }
48}
49
50macro_rules! metered_impl {
51 ($ty:ty) => {
52 impl MeteredBytes for $ty {
53 fn metered_bytes(&self) -> u64 {
54 let bytes = 8
55 + (2 * self.headers.len())
56 + self
57 .headers
58 .iter()
59 .map(|h| h.name.len() + h.value.len())
60 .sum::<usize>()
61 + self.body.len();
62 bytes as u64
63 }
64 }
65 };
66}
67
68#[sync_docs]
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BasinScope {
71 AwsUsEast1,
72}
73
74impl From<BasinScope> for api::BasinScope {
75 fn from(value: BasinScope) -> Self {
76 match value {
77 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
78 }
79 }
80}
81
82impl From<api::BasinScope> for Option<BasinScope> {
83 fn from(value: api::BasinScope) -> Self {
84 match value {
85 api::BasinScope::Unspecified => None,
86 api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
87 }
88 }
89}
90
91impl FromStr for BasinScope {
92 type Err = ConvertError;
93 fn from_str(value: &str) -> Result<Self, Self::Err> {
94 match value {
95 "aws:us-east-1" => Ok(Self::AwsUsEast1),
96 _ => Err("invalid basin scope value".into()),
97 }
98 }
99}
100
101#[sync_docs]
102#[derive(Debug, Clone)]
103pub struct CreateBasinRequest {
104 pub basin: BasinName,
105 pub config: Option<BasinConfig>,
106 pub scope: Option<BasinScope>,
107}
108
109impl CreateBasinRequest {
110 pub fn new(basin: BasinName) -> Self {
112 Self {
113 basin,
114 config: None,
115 scope: None,
116 }
117 }
118
119 pub fn with_config(self, config: BasinConfig) -> Self {
121 Self {
122 config: Some(config),
123 ..self
124 }
125 }
126
127 pub fn with_scope(self, scope: BasinScope) -> Self {
129 Self {
130 scope: Some(scope),
131 ..self
132 }
133 }
134}
135
136impl From<CreateBasinRequest> for api::CreateBasinRequest {
137 fn from(value: CreateBasinRequest) -> Self {
138 let CreateBasinRequest {
139 basin,
140 config,
141 scope,
142 } = value;
143 Self {
144 basin: basin.0,
145 config: config.map(Into::into),
146 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
147 }
148 }
149}
150
151#[sync_docs]
152#[derive(Debug, Clone, Default)]
153pub struct BasinConfig {
154 pub default_stream_config: Option<StreamConfig>,
155 pub create_stream_on_append: bool,
156 pub create_stream_on_read: bool,
157}
158
159impl BasinConfig {
160 pub fn new() -> Self {
162 Self::default()
163 }
164
165 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
167 Self {
168 default_stream_config: Some(default_stream_config),
169 ..self
170 }
171 }
172
173 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
175 Self {
176 create_stream_on_append,
177 ..self
178 }
179 }
180
181 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
183 Self {
184 create_stream_on_read,
185 ..self
186 }
187 }
188}
189
190impl From<BasinConfig> for api::BasinConfig {
191 fn from(value: BasinConfig) -> Self {
192 let BasinConfig {
193 default_stream_config,
194 create_stream_on_append,
195 create_stream_on_read,
196 } = value;
197 Self {
198 default_stream_config: default_stream_config.map(Into::into),
199 create_stream_on_append,
200 create_stream_on_read,
201 }
202 }
203}
204
205impl From<api::BasinConfig> for BasinConfig {
206 fn from(value: api::BasinConfig) -> Self {
207 let api::BasinConfig {
208 default_stream_config,
209 create_stream_on_append,
210 create_stream_on_read,
211 } = value;
212 Self {
213 default_stream_config: default_stream_config.map(Into::into),
214 create_stream_on_append,
215 create_stream_on_read,
216 }
217 }
218}
219
220#[sync_docs]
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TimestampingMode {
223 ClientPrefer,
224 ClientRequire,
225 Arrival,
226}
227
228impl From<TimestampingMode> for api::TimestampingMode {
229 fn from(value: TimestampingMode) -> Self {
230 match value {
231 TimestampingMode::ClientPrefer => Self::ClientPrefer,
232 TimestampingMode::ClientRequire => Self::ClientRequire,
233 TimestampingMode::Arrival => Self::Arrival,
234 }
235 }
236}
237
238impl From<api::TimestampingMode> for Option<TimestampingMode> {
239 fn from(value: api::TimestampingMode) -> Self {
240 match value {
241 api::TimestampingMode::Unspecified => None,
242 api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
243 api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
244 api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
245 }
246 }
247}
248
249#[sync_docs(TimestampingConfig = "Timestamping")]
250#[derive(Debug, Clone, Default)]
251pub struct TimestampingConfig {
253 pub mode: Option<TimestampingMode>,
254 pub uncapped: Option<bool>,
255}
256
257impl TimestampingConfig {
258 pub fn new() -> Self {
260 Self::default()
261 }
262
263 pub fn with_mode(self, mode: TimestampingMode) -> Self {
265 Self {
266 mode: Some(mode),
267 ..self
268 }
269 }
270
271 pub fn with_uncapped(self, uncapped: bool) -> Self {
273 Self {
274 uncapped: Some(uncapped),
275 ..self
276 }
277 }
278}
279
280impl From<TimestampingConfig> for api::stream_config::Timestamping {
281 fn from(value: TimestampingConfig) -> Self {
282 Self {
283 mode: value
284 .mode
285 .map(api::TimestampingMode::from)
286 .unwrap_or_default()
287 .into(),
288 uncapped: value.uncapped,
289 }
290 }
291}
292
293impl From<api::stream_config::Timestamping> for TimestampingConfig {
294 fn from(value: api::stream_config::Timestamping) -> Self {
295 let mode = value.mode().into();
296 let uncapped = value.uncapped;
297 Self { mode, uncapped }
298 }
299}
300
301#[sync_docs(DeleteOnEmptyConfig = "DeleteOnEmpty", min_age = "min_age_secs")]
302#[derive(Debug, Clone, Default)]
303pub struct DeleteOnEmptyConfig {
305 pub min_age: Duration,
306}
307
308impl DeleteOnEmptyConfig {
309 pub fn new() -> Self {
311 Self::default()
312 }
313
314 pub fn with_min_age(self, min_age: Duration) -> Self {
316 Self { min_age }
317 }
318}
319
320impl From<DeleteOnEmptyConfig> for api::stream_config::DeleteOnEmpty {
321 fn from(value: DeleteOnEmptyConfig) -> Self {
322 Self {
323 min_age_secs: value.min_age.as_secs(),
324 }
325 }
326}
327
328impl From<api::stream_config::DeleteOnEmpty> for DeleteOnEmptyConfig {
329 fn from(value: api::stream_config::DeleteOnEmpty) -> Self {
330 Self {
331 min_age: Duration::from_secs(value.min_age_secs),
332 }
333 }
334}
335
336#[sync_docs]
337#[derive(Debug, Clone, Default)]
338pub struct StreamConfig {
339 pub storage_class: Option<StorageClass>,
340 pub retention_policy: Option<RetentionPolicy>,
341 pub timestamping: Option<TimestampingConfig>,
342 pub delete_on_empty: Option<DeleteOnEmptyConfig>,
343}
344
345impl StreamConfig {
346 pub fn new() -> Self {
348 Self::default()
349 }
350
351 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
353 Self {
354 storage_class: Some(storage_class),
355 ..self
356 }
357 }
358
359 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
361 Self {
362 retention_policy: Some(retention_policy),
363 ..self
364 }
365 }
366
367 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
369 Self {
370 timestamping: Some(timestamping),
371 ..self
372 }
373 }
374
375 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmptyConfig) -> Self {
377 Self {
378 delete_on_empty: Some(delete_on_empty),
379 ..self
380 }
381 }
382}
383
384impl From<StreamConfig> for api::StreamConfig {
385 fn from(value: StreamConfig) -> Self {
386 let StreamConfig {
387 storage_class,
388 retention_policy,
389 timestamping,
390 delete_on_empty,
391 } = value;
392 Self {
393 storage_class: storage_class
394 .map(api::StorageClass::from)
395 .unwrap_or_default()
396 .into(),
397 retention_policy: retention_policy.map(Into::into),
398 timestamping: timestamping.map(Into::into),
399 delete_on_empty: delete_on_empty.map(Into::into),
400 }
401 }
402}
403
404impl From<api::StreamConfig> for StreamConfig {
405 fn from(value: api::StreamConfig) -> Self {
406 Self {
407 storage_class: value.storage_class().into(),
408 retention_policy: value.retention_policy.map(Into::into),
409 timestamping: value.timestamping.map(Into::into),
410 delete_on_empty: value.delete_on_empty.map(Into::into),
411 }
412 }
413}
414
415#[sync_docs]
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417pub enum StorageClass {
418 Standard,
419 Express,
420}
421
422impl From<StorageClass> for api::StorageClass {
423 fn from(value: StorageClass) -> Self {
424 match value {
425 StorageClass::Standard => Self::Standard,
426 StorageClass::Express => Self::Express,
427 }
428 }
429}
430
431impl From<api::StorageClass> for Option<StorageClass> {
432 fn from(value: api::StorageClass) -> Self {
433 match value {
434 api::StorageClass::Unspecified => None,
435 api::StorageClass::Standard => Some(StorageClass::Standard),
436 api::StorageClass::Express => Some(StorageClass::Express),
437 }
438 }
439}
440
441impl FromStr for StorageClass {
442 type Err = ConvertError;
443
444 fn from_str(value: &str) -> Result<Self, Self::Err> {
445 match value {
446 "standard" => Ok(Self::Standard),
447 "express" => Ok(Self::Express),
448 v => Err(format!("unknown storage class: {v}").into()),
449 }
450 }
451}
452
453#[sync_docs(Age = "Age")]
454#[derive(Debug, Clone)]
455pub enum RetentionPolicy {
456 Age(Duration),
457 Infinite(()),
458}
459
460impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
461 fn from(value: RetentionPolicy) -> Self {
462 match value {
463 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
464 RetentionPolicy::Infinite(_) => {
465 Self::Infinite(api::stream_config::InfiniteRetention {})
466 }
467 }
468 }
469}
470
471impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
472 fn from(value: api::stream_config::RetentionPolicy) -> Self {
473 match value {
474 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
475 api::stream_config::RetentionPolicy::Infinite(_) => Self::Infinite(()),
476 }
477 }
478}
479
480#[sync_docs]
481#[derive(Debug, Clone, Copy, PartialEq, Eq)]
482pub enum BasinState {
483 Active,
484 Creating,
485 Deleting,
486}
487
488impl From<BasinState> for api::BasinState {
489 fn from(value: BasinState) -> Self {
490 match value {
491 BasinState::Active => Self::Active,
492 BasinState::Creating => Self::Creating,
493 BasinState::Deleting => Self::Deleting,
494 }
495 }
496}
497
498impl From<api::BasinState> for Option<BasinState> {
499 fn from(value: api::BasinState) -> Self {
500 match value {
501 api::BasinState::Unspecified => None,
502 api::BasinState::Active => Some(BasinState::Active),
503 api::BasinState::Creating => Some(BasinState::Creating),
504 api::BasinState::Deleting => Some(BasinState::Deleting),
505 }
506 }
507}
508
509impl std::fmt::Display for BasinState {
510 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511 match self {
512 BasinState::Active => write!(f, "active"),
513 BasinState::Creating => write!(f, "creating"),
514 BasinState::Deleting => write!(f, "deleting"),
515 }
516 }
517}
518
519#[sync_docs]
520#[derive(Debug, Clone)]
521pub struct BasinInfo {
522 pub name: String,
523 pub scope: Option<BasinScope>,
524 pub state: Option<BasinState>,
525}
526
527impl From<BasinInfo> for api::BasinInfo {
528 fn from(value: BasinInfo) -> Self {
529 let BasinInfo { name, scope, state } = value;
530 Self {
531 name,
532 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
533 state: state.map(api::BasinState::from).unwrap_or_default().into(),
534 }
535 }
536}
537
538impl From<api::BasinInfo> for BasinInfo {
539 fn from(value: api::BasinInfo) -> Self {
540 let scope = value.scope().into();
541 let state = value.state().into();
542 let name = value.name;
543 Self { name, scope, state }
544 }
545}
546
547impl TryFrom<api::CreateBasinResponse> for BasinInfo {
548 type Error = ConvertError;
549 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
550 let api::CreateBasinResponse { info } = value;
551 let info = info.ok_or("missing basin info")?;
552 Ok(info.into())
553 }
554}
555
556#[sync_docs]
557#[derive(Debug, Clone, Default)]
558pub struct ListStreamsRequest {
559 pub prefix: String,
560 pub start_after: String,
561 pub limit: Option<usize>,
562}
563
564impl ListStreamsRequest {
565 pub fn new() -> Self {
567 Self::default()
568 }
569
570 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
572 Self {
573 prefix: prefix.into(),
574 ..self
575 }
576 }
577
578 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
580 Self {
581 start_after: start_after.into(),
582 ..self
583 }
584 }
585
586 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
588 Self {
589 limit: limit.into(),
590 ..self
591 }
592 }
593}
594
595impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
596 type Error = ConvertError;
597 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
598 let ListStreamsRequest {
599 prefix,
600 start_after,
601 limit,
602 } = value;
603 Ok(Self {
604 prefix,
605 start_after,
606 limit: limit.map(|n| n as u64),
607 })
608 }
609}
610
611#[sync_docs]
612#[derive(Debug, Clone)]
613pub struct StreamInfo {
614 pub name: String,
615 pub created_at: u32,
616 pub deleted_at: Option<u32>,
617}
618
619impl From<api::StreamInfo> for StreamInfo {
620 fn from(value: api::StreamInfo) -> Self {
621 Self {
622 name: value.name,
623 created_at: value.created_at,
624 deleted_at: value.deleted_at,
625 }
626 }
627}
628
629impl TryFrom<api::CreateStreamResponse> for StreamInfo {
630 type Error = ConvertError;
631
632 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
633 let api::CreateStreamResponse { info } = value;
634 let info = info.ok_or("missing stream info")?;
635 Ok(info.into())
636 }
637}
638
639#[sync_docs]
640#[derive(Debug, Clone)]
641pub struct ListStreamsResponse {
642 pub streams: Vec<StreamInfo>,
643 pub has_more: bool,
644}
645
646impl From<api::ListStreamsResponse> for ListStreamsResponse {
647 fn from(value: api::ListStreamsResponse) -> Self {
648 let api::ListStreamsResponse { streams, has_more } = value;
649 let streams = streams.into_iter().map(Into::into).collect();
650 Self { streams, has_more }
651 }
652}
653
654impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
655 type Error = ConvertError;
656
657 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
658 let api::GetBasinConfigResponse { config } = value;
659 let config = config.ok_or("missing basin config")?;
660 Ok(config.into())
661 }
662}
663
664impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
665 type Error = ConvertError;
666
667 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
668 let api::GetStreamConfigResponse { config } = value;
669 let config = config.ok_or("missing stream config")?;
670 Ok(config.into())
671 }
672}
673
674#[sync_docs]
675#[derive(Debug, Clone)]
676pub struct CreateStreamRequest {
677 pub stream: String,
678 pub config: Option<StreamConfig>,
679}
680
681impl CreateStreamRequest {
682 pub fn new(stream: impl Into<String>) -> Self {
684 Self {
685 stream: stream.into(),
686 config: None,
687 }
688 }
689
690 pub fn with_config(self, config: StreamConfig) -> Self {
692 Self {
693 config: Some(config),
694 ..self
695 }
696 }
697}
698
699impl From<CreateStreamRequest> for api::CreateStreamRequest {
700 fn from(value: CreateStreamRequest) -> Self {
701 let CreateStreamRequest { stream, config } = value;
702 Self {
703 stream,
704 config: config.map(Into::into),
705 }
706 }
707}
708
709#[sync_docs]
710#[derive(Debug, Clone, Default)]
711pub struct ListBasinsRequest {
712 pub prefix: String,
713 pub start_after: String,
714 pub limit: Option<usize>,
715}
716
717impl ListBasinsRequest {
718 pub fn new() -> Self {
720 Self::default()
721 }
722
723 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
725 Self {
726 prefix: prefix.into(),
727 ..self
728 }
729 }
730
731 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
733 Self {
734 start_after: start_after.into(),
735 ..self
736 }
737 }
738
739 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
741 Self {
742 limit: limit.into(),
743 ..self
744 }
745 }
746}
747
748impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
749 type Error = ConvertError;
750 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
751 let ListBasinsRequest {
752 prefix,
753 start_after,
754 limit,
755 } = value;
756 Ok(Self {
757 prefix,
758 start_after,
759 limit: limit
760 .map(TryInto::try_into)
761 .transpose()
762 .map_err(|_| "request limit does not fit into u64 bounds")?,
763 })
764 }
765}
766
767#[sync_docs]
768#[derive(Debug, Clone)]
769pub struct ListBasinsResponse {
770 pub basins: Vec<BasinInfo>,
771 pub has_more: bool,
772}
773
774impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
775 type Error = ConvertError;
776 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
777 let api::ListBasinsResponse { basins, has_more } = value;
778 Ok(Self {
779 basins: basins.into_iter().map(Into::into).collect(),
780 has_more,
781 })
782 }
783}
784
785#[sync_docs]
786#[derive(Debug, Clone)]
787pub struct DeleteBasinRequest {
788 pub basin: BasinName,
789 pub if_exists: bool,
791}
792
793impl DeleteBasinRequest {
794 pub fn new(basin: BasinName) -> Self {
796 Self {
797 basin,
798 if_exists: false,
799 }
800 }
801
802 pub fn with_if_exists(self, if_exists: bool) -> Self {
804 Self { if_exists, ..self }
805 }
806}
807
808impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
809 fn from(value: DeleteBasinRequest) -> Self {
810 let DeleteBasinRequest { basin, .. } = value;
811 Self { basin: basin.0 }
812 }
813}
814
815#[sync_docs]
816#[derive(Debug, Clone)]
817pub struct DeleteStreamRequest {
818 pub stream: String,
819 pub if_exists: bool,
821}
822
823impl DeleteStreamRequest {
824 pub fn new(stream: impl Into<String>) -> Self {
826 Self {
827 stream: stream.into(),
828 if_exists: false,
829 }
830 }
831
832 pub fn with_if_exists(self, if_exists: bool) -> Self {
834 Self { if_exists, ..self }
835 }
836}
837
838impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
839 fn from(value: DeleteStreamRequest) -> Self {
840 let DeleteStreamRequest { stream, .. } = value;
841 Self { stream }
842 }
843}
844
845#[sync_docs]
846#[derive(Debug, Clone)]
847pub struct ReconfigureBasinRequest {
848 pub basin: BasinName,
849 pub config: Option<BasinConfig>,
850 pub mask: Option<Vec<String>>,
851}
852
853impl ReconfigureBasinRequest {
854 pub fn new(basin: BasinName) -> Self {
856 Self {
857 basin,
858 config: None,
859 mask: None,
860 }
861 }
862
863 pub fn with_config(self, config: BasinConfig) -> Self {
865 Self {
866 config: Some(config),
867 ..self
868 }
869 }
870
871 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
873 Self {
874 mask: Some(mask.into()),
875 ..self
876 }
877 }
878}
879
880impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
881 fn from(value: ReconfigureBasinRequest) -> Self {
882 let ReconfigureBasinRequest {
883 basin,
884 config,
885 mask,
886 } = value;
887 Self {
888 basin: basin.0,
889 config: config.map(Into::into),
890 mask: mask.map(|paths| prost_types::FieldMask { paths }),
891 }
892 }
893}
894
895impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
896 type Error = ConvertError;
897 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
898 let api::ReconfigureBasinResponse { config } = value;
899 let config = config.ok_or("missing basin config")?;
900 Ok(config.into())
901 }
902}
903
904#[sync_docs]
905#[derive(Debug, Clone)]
906pub struct ReconfigureStreamRequest {
907 pub stream: String,
908 pub config: Option<StreamConfig>,
909 pub mask: Option<Vec<String>>,
910}
911
912impl ReconfigureStreamRequest {
913 pub fn new(stream: impl Into<String>) -> Self {
915 Self {
916 stream: stream.into(),
917 config: None,
918 mask: None,
919 }
920 }
921
922 pub fn with_config(self, config: StreamConfig) -> Self {
924 Self {
925 config: Some(config),
926 ..self
927 }
928 }
929
930 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
932 Self {
933 mask: Some(mask.into()),
934 ..self
935 }
936 }
937}
938
939impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
940 fn from(value: ReconfigureStreamRequest) -> Self {
941 let ReconfigureStreamRequest {
942 stream,
943 config,
944 mask,
945 } = value;
946 Self {
947 stream,
948 config: config.map(Into::into),
949 mask: mask.map(|paths| prost_types::FieldMask { paths }),
950 }
951 }
952}
953
954impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
955 type Error = ConvertError;
956 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
957 let api::ReconfigureStreamResponse { config } = value;
958 let config = config.ok_or("missing stream config")?;
959 Ok(config.into())
960 }
961}
962
963impl From<api::CheckTailResponse> for StreamPosition {
964 fn from(value: api::CheckTailResponse) -> Self {
965 let api::CheckTailResponse {
966 next_seq_num,
967 last_timestamp,
968 } = value;
969 StreamPosition {
970 seq_num: next_seq_num,
971 timestamp: last_timestamp,
972 }
973 }
974}
975
976#[derive(Debug, Clone, PartialEq, Eq)]
978pub struct StreamPosition {
979 pub seq_num: u64,
981 pub timestamp: u64,
984}
985
986#[sync_docs]
987#[derive(Debug, Clone, PartialEq, Eq)]
988pub struct Header {
989 pub name: Bytes,
990 pub value: Bytes,
991}
992
993impl Header {
994 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
996 Self {
997 name: name.into(),
998 value: value.into(),
999 }
1000 }
1001
1002 pub fn from_value(value: impl Into<Bytes>) -> Self {
1004 Self {
1005 name: Bytes::new(),
1006 value: value.into(),
1007 }
1008 }
1009}
1010
1011impl From<Header> for api::Header {
1012 fn from(value: Header) -> Self {
1013 let Header { name, value } = value;
1014 Self { name, value }
1015 }
1016}
1017
1018impl From<api::Header> for Header {
1019 fn from(value: api::Header) -> Self {
1020 let api::Header { name, value } = value;
1021 Self { name, value }
1022 }
1023}
1024
1025#[derive(Debug, Clone, Default, PartialEq, Eq)]
1029pub struct FencingToken(String);
1030
1031impl FencingToken {
1032 const MAX_BYTES: usize = 36;
1033
1034 pub fn generate(n: usize) -> Result<Self, ConvertError> {
1036 rand::rng()
1037 .sample_iter(&rand::distr::Alphanumeric)
1038 .take(n)
1039 .map(char::from)
1040 .collect::<String>()
1041 .parse()
1042 }
1043}
1044
1045impl Deref for FencingToken {
1046 type Target = str;
1047
1048 fn deref(&self) -> &Self::Target {
1049 &self.0
1050 }
1051}
1052
1053impl std::fmt::Display for FencingToken {
1054 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1055 write!(f, "{}", self.0)
1056 }
1057}
1058
1059impl FromStr for FencingToken {
1060 type Err = ConvertError;
1061
1062 fn from_str(value: &str) -> Result<Self, Self::Err> {
1063 value.to_string().try_into()
1064 }
1065}
1066
1067impl TryFrom<String> for FencingToken {
1068 type Error = ConvertError;
1069
1070 fn try_from(value: String) -> Result<Self, Self::Error> {
1071 if value.len() > Self::MAX_BYTES {
1072 Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1073 } else {
1074 Ok(Self(value))
1075 }
1076 }
1077}
1078
1079impl From<FencingToken> for String {
1080 fn from(value: FencingToken) -> Self {
1081 value.0
1082 }
1083}
1084
1085#[derive(Debug, Clone)]
1087pub enum Command {
1088 Fence {
1093 fencing_token: FencingToken,
1097 },
1098 Trim {
1103 seq_num: u64,
1108 },
1109}
1110
1111#[derive(Debug, Clone)]
1117pub struct CommandRecord {
1118 pub command: Command,
1120 pub timestamp: Option<u64>,
1122}
1123
1124impl CommandRecord {
1125 const FENCE: &[u8] = b"fence";
1126 const TRIM: &[u8] = b"trim";
1127
1128 pub fn fence(fencing_token: FencingToken) -> Self {
1130 Self {
1131 command: Command::Fence { fencing_token },
1132 timestamp: None,
1133 }
1134 }
1135
1136 pub fn trim(seq_num: impl Into<u64>) -> Self {
1138 Self {
1139 command: Command::Trim {
1140 seq_num: seq_num.into(),
1141 },
1142 timestamp: None,
1143 }
1144 }
1145
1146 pub fn with_timestamp(self, timestamp: u64) -> Self {
1148 Self {
1149 timestamp: Some(timestamp),
1150 ..self
1151 }
1152 }
1153}
1154
1155#[sync_docs]
1156#[derive(Debug, Clone, PartialEq, Eq)]
1157pub struct AppendRecord {
1158 timestamp: Option<u64>,
1159 headers: Vec<Header>,
1160 body: Bytes,
1161 #[cfg(test)]
1162 max_bytes: u64,
1163}
1164
1165metered_impl!(AppendRecord);
1166
1167impl AppendRecord {
1168 const MAX_BYTES: u64 = MIB_BYTES;
1169
1170 fn validated(self) -> Result<Self, ConvertError> {
1171 #[cfg(test)]
1172 let max_bytes = self.max_bytes;
1173 #[cfg(not(test))]
1174 let max_bytes = Self::MAX_BYTES;
1175
1176 if self.metered_bytes() > max_bytes {
1177 Err("AppendRecord should have metered size less than 1 MiB".into())
1178 } else {
1179 Ok(self)
1180 }
1181 }
1182
1183 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1185 Self {
1186 timestamp: None,
1187 headers: Vec::new(),
1188 body: body.into(),
1189 #[cfg(test)]
1190 max_bytes: Self::MAX_BYTES,
1191 }
1192 .validated()
1193 }
1194
1195 #[cfg(test)]
1196 pub(crate) fn with_max_bytes(
1197 max_bytes: u64,
1198 body: impl Into<Bytes>,
1199 ) -> Result<Self, ConvertError> {
1200 Self {
1201 timestamp: None,
1202 headers: Vec::new(),
1203 body: body.into(),
1204 max_bytes,
1205 }
1206 .validated()
1207 }
1208
1209 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1211 Self {
1212 headers: headers.into(),
1213 ..self
1214 }
1215 .validated()
1216 }
1217
1218 pub fn with_timestamp(self, timestamp: u64) -> Self {
1220 Self {
1221 timestamp: Some(timestamp),
1222 ..self
1223 }
1224 }
1225
1226 pub fn body(&self) -> &[u8] {
1228 &self.body
1229 }
1230
1231 pub fn headers(&self) -> &[Header] {
1233 &self.headers
1234 }
1235
1236 pub fn timestamp(&self) -> Option<u64> {
1238 self.timestamp
1239 }
1240
1241 pub fn into_parts(self) -> AppendRecordParts {
1243 AppendRecordParts {
1244 timestamp: self.timestamp,
1245 headers: self.headers,
1246 body: self.body,
1247 }
1248 }
1249
1250 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1252 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1253 if let Some(timestamp) = parts.timestamp {
1254 Ok(record.with_timestamp(timestamp))
1255 } else {
1256 Ok(record)
1257 }
1258 }
1259}
1260
1261impl From<AppendRecord> for api::AppendRecord {
1262 fn from(value: AppendRecord) -> Self {
1263 Self {
1264 timestamp: value.timestamp,
1265 headers: value.headers.into_iter().map(Into::into).collect(),
1266 body: value.body,
1267 }
1268 }
1269}
1270
1271impl From<CommandRecord> for AppendRecord {
1272 fn from(value: CommandRecord) -> Self {
1273 let (header_value, body) = match value.command {
1274 Command::Fence { fencing_token } => (
1275 CommandRecord::FENCE,
1276 Bytes::copy_from_slice(fencing_token.as_bytes()),
1277 ),
1278 Command::Trim { seq_num } => (
1279 CommandRecord::TRIM,
1280 Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1281 ),
1282 };
1283 AppendRecordParts {
1284 timestamp: value.timestamp,
1285 headers: vec![Header::from_value(header_value)],
1286 body,
1287 }
1288 .try_into()
1289 .expect("command record is a valid append record")
1290 }
1291}
1292
1293#[sync_docs(AppendRecordParts = "AppendRecord")]
1294#[derive(Debug, Clone)]
1295pub struct AppendRecordParts {
1296 pub timestamp: Option<u64>,
1297 pub headers: Vec<Header>,
1298 pub body: Bytes,
1299}
1300
1301impl From<AppendRecord> for AppendRecordParts {
1302 fn from(value: AppendRecord) -> Self {
1303 value.into_parts()
1304 }
1305}
1306
1307impl TryFrom<AppendRecordParts> for AppendRecord {
1308 type Error = ConvertError;
1309
1310 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1311 Self::try_from_parts(value)
1312 }
1313}
1314
1315#[derive(Debug, Clone)]
1317pub struct AppendRecordBatch {
1318 records: Vec<AppendRecord>,
1319 metered_bytes: u64,
1320 max_capacity: usize,
1321 #[cfg(test)]
1322 max_bytes: u64,
1323}
1324
1325impl PartialEq for AppendRecordBatch {
1326 fn eq(&self, other: &Self) -> bool {
1327 if self.records.eq(&other.records) {
1328 assert_eq!(self.metered_bytes, other.metered_bytes);
1329 true
1330 } else {
1331 false
1332 }
1333 }
1334}
1335
1336impl Eq for AppendRecordBatch {}
1337
1338impl Default for AppendRecordBatch {
1339 fn default() -> Self {
1340 Self::new()
1341 }
1342}
1343
1344impl AppendRecordBatch {
1345 pub const MAX_CAPACITY: usize = 1000;
1349
1350 pub const MAX_BYTES: u64 = MIB_BYTES;
1352
1353 pub fn new() -> Self {
1355 Self::with_max_capacity(Self::MAX_CAPACITY)
1356 }
1357
1358 pub fn with_max_capacity(max_capacity: usize) -> Self {
1362 assert!(
1363 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1364 "Batch capacity must be between 1 and 1000"
1365 );
1366
1367 Self {
1368 records: Vec::with_capacity(max_capacity),
1369 metered_bytes: 0,
1370 max_capacity,
1371 #[cfg(test)]
1372 max_bytes: Self::MAX_BYTES,
1373 }
1374 }
1375
1376 #[cfg(test)]
1377 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1378 #[cfg(test)]
1379 assert!(
1380 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1381 "Batch size must be between 1 byte and 1 MiB"
1382 );
1383
1384 Self {
1385 max_bytes,
1386 ..Self::with_max_capacity(max_capacity)
1387 }
1388 }
1389
1390 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1396 where
1397 R: Into<AppendRecord>,
1398 T: IntoIterator<Item = R>,
1399 {
1400 let mut records = Self::new();
1401 let mut pending = Vec::new();
1402
1403 let mut iter = iter.into_iter();
1404
1405 for record in iter.by_ref() {
1406 if let Err(record) = records.push(record) {
1407 pending.push(record);
1408 break;
1409 }
1410 }
1411
1412 if pending.is_empty() {
1413 Ok(records)
1414 } else {
1415 pending.extend(iter.map(Into::into));
1416 Err((records, pending))
1417 }
1418 }
1419
1420 pub fn is_empty(&self) -> bool {
1422 if self.records.is_empty() {
1423 assert_eq!(self.metered_bytes, 0);
1424 true
1425 } else {
1426 false
1427 }
1428 }
1429
1430 pub fn len(&self) -> usize {
1432 self.records.len()
1433 }
1434
1435 #[cfg(test)]
1436 fn max_bytes(&self) -> u64 {
1437 self.max_bytes
1438 }
1439
1440 #[cfg(not(test))]
1441 fn max_bytes(&self) -> u64 {
1442 Self::MAX_BYTES
1443 }
1444
1445 pub fn is_full(&self) -> bool {
1447 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1448 }
1449
1450 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1452 assert!(self.records.len() <= self.max_capacity);
1453 assert!(self.metered_bytes <= self.max_bytes());
1454
1455 let record = record.into();
1456 let record_size = record.metered_bytes();
1457 if self.records.len() >= self.max_capacity
1458 || self.metered_bytes + record_size > self.max_bytes()
1459 {
1460 Err(record)
1461 } else {
1462 self.records.push(record);
1463 self.metered_bytes += record_size;
1464 Ok(())
1465 }
1466 }
1467}
1468
1469impl MeteredBytes for AppendRecordBatch {
1470 fn metered_bytes(&self) -> u64 {
1471 self.metered_bytes
1472 }
1473}
1474
1475impl IntoIterator for AppendRecordBatch {
1476 type Item = AppendRecord;
1477 type IntoIter = std::vec::IntoIter<Self::Item>;
1478
1479 fn into_iter(self) -> Self::IntoIter {
1480 self.records.into_iter()
1481 }
1482}
1483
1484impl<'a> IntoIterator for &'a AppendRecordBatch {
1485 type Item = &'a AppendRecord;
1486 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1487
1488 fn into_iter(self) -> Self::IntoIter {
1489 self.records.iter()
1490 }
1491}
1492
1493impl AsRef<[AppendRecord]> for AppendRecordBatch {
1494 fn as_ref(&self) -> &[AppendRecord] {
1495 &self.records
1496 }
1497}
1498
1499#[sync_docs]
1500#[derive(Debug, Default, Clone)]
1501pub struct AppendInput {
1502 pub records: AppendRecordBatch,
1503 pub match_seq_num: Option<u64>,
1504 pub fencing_token: Option<FencingToken>,
1505}
1506
1507impl MeteredBytes for AppendInput {
1508 fn metered_bytes(&self) -> u64 {
1509 self.records.metered_bytes()
1510 }
1511}
1512
1513impl AppendInput {
1514 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1516 Self {
1517 records: records.into(),
1518 match_seq_num: None,
1519 fencing_token: None,
1520 }
1521 }
1522
1523 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1525 Self {
1526 match_seq_num: Some(match_seq_num.into()),
1527 ..self
1528 }
1529 }
1530
1531 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1533 Self {
1534 fencing_token: Some(fencing_token),
1535 ..self
1536 }
1537 }
1538
1539 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1540 let Self {
1541 records,
1542 match_seq_num,
1543 fencing_token,
1544 } = self;
1545
1546 api::AppendInput {
1547 stream: stream.into(),
1548 records: records.into_iter().map(Into::into).collect(),
1549 match_seq_num,
1550 fencing_token: fencing_token.map(|f| f.0),
1551 }
1552 }
1553}
1554
1555#[derive(Debug, Clone)]
1557pub struct AppendAck {
1558 pub start: StreamPosition,
1560 pub end: StreamPosition,
1565 pub tail: StreamPosition,
1569}
1570
1571impl From<api::AppendOutput> for AppendAck {
1572 fn from(value: api::AppendOutput) -> Self {
1573 let api::AppendOutput {
1574 start_seq_num,
1575 start_timestamp,
1576 end_seq_num,
1577 end_timestamp,
1578 next_seq_num,
1579 last_timestamp,
1580 } = value;
1581 let start = StreamPosition {
1582 seq_num: start_seq_num,
1583 timestamp: start_timestamp,
1584 };
1585 let end = StreamPosition {
1586 seq_num: end_seq_num,
1587 timestamp: end_timestamp,
1588 };
1589 let tail = StreamPosition {
1590 seq_num: next_seq_num,
1591 timestamp: last_timestamp,
1592 };
1593 Self { start, end, tail }
1594 }
1595}
1596
1597impl TryFrom<api::AppendResponse> for AppendAck {
1598 type Error = ConvertError;
1599 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1600 let api::AppendResponse { output } = value;
1601 let output = output.ok_or("missing append output")?;
1602 Ok(output.into())
1603 }
1604}
1605
1606impl TryFrom<api::AppendSessionResponse> for AppendAck {
1607 type Error = ConvertError;
1608 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1609 let api::AppendSessionResponse { output } = value;
1610 let output = output.ok_or("missing append output")?;
1611 Ok(output.into())
1612 }
1613}
1614
1615#[sync_docs]
1616#[derive(Debug, Clone, Default)]
1617pub struct ReadLimit {
1618 pub count: Option<u64>,
1619 pub bytes: Option<u64>,
1620}
1621
1622impl ReadLimit {
1623 pub fn new() -> Self {
1625 Self::default()
1626 }
1627
1628 pub fn with_count(self, count: u64) -> Self {
1630 Self {
1631 count: Some(count),
1632 ..self
1633 }
1634 }
1635
1636 pub fn with_bytes(self, bytes: u64) -> Self {
1638 Self {
1639 bytes: Some(bytes),
1640 ..self
1641 }
1642 }
1643}
1644
1645#[derive(Debug, Clone)]
1647pub enum ReadStart {
1648 SeqNum(u64),
1650 Timestamp(u64),
1652 TailOffset(u64),
1654}
1655
1656impl Default for ReadStart {
1657 fn default() -> Self {
1658 Self::SeqNum(0)
1659 }
1660}
1661
1662impl From<ReadStart> for api::read_request::Start {
1663 fn from(start: ReadStart) -> Self {
1664 match start {
1665 ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1666 ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1667 ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1668 }
1669 }
1670}
1671
1672impl From<ReadStart> for api::read_session_request::Start {
1673 fn from(start: ReadStart) -> Self {
1674 match start {
1675 ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1676 ReadStart::Timestamp(timestamp) => {
1677 api::read_session_request::Start::Timestamp(timestamp)
1678 }
1679 ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1680 }
1681 }
1682}
1683
1684#[sync_docs]
1685#[derive(Debug, Clone, Default)]
1686pub struct ReadRequest {
1687 pub start: ReadStart,
1688 pub limit: ReadLimit,
1689 pub until: Option<RangeTo<u64>>,
1690 pub clamp: bool,
1691}
1692
1693impl ReadRequest {
1694 pub fn new(start: ReadStart) -> Self {
1696 Self {
1697 start,
1698 ..Default::default()
1699 }
1700 }
1701
1702 pub fn with_limit(self, limit: ReadLimit) -> Self {
1704 Self { limit, ..self }
1705 }
1706
1707 pub fn with_until(self, until: RangeTo<u64>) -> Self {
1709 Self {
1710 until: Some(until),
1711 ..self
1712 }
1713 }
1714
1715 pub fn with_clamp(self, clamp: bool) -> Self {
1717 Self { clamp, ..self }
1718 }
1719}
1720
1721impl ReadRequest {
1722 pub(crate) fn try_into_api_type(
1723 self,
1724 stream: impl Into<String>,
1725 ) -> Result<api::ReadRequest, ConvertError> {
1726 let Self {
1727 start,
1728 limit,
1729 until,
1730 clamp,
1731 } = self;
1732
1733 let limit = if limit.count > Some(1000) {
1734 Err("read limit: count must not exceed 1000 for unary request")
1735 } else if limit.bytes > Some(MIB_BYTES) {
1736 Err("read limit: bytes must not exceed 1MiB for unary request")
1737 } else {
1738 Ok(api::ReadLimit {
1739 count: limit.count,
1740 bytes: limit.bytes,
1741 })
1742 }?;
1743
1744 Ok(api::ReadRequest {
1745 stream: stream.into(),
1746 start: Some(start.into()),
1747 limit: Some(limit),
1748 until: until.map(|range| range.end),
1749 clamp,
1750 })
1751 }
1752}
1753
1754#[sync_docs]
1755#[derive(Debug, Clone)]
1756pub struct SequencedRecord {
1757 pub seq_num: u64,
1758 pub timestamp: u64,
1759 pub headers: Vec<Header>,
1760 pub body: Bytes,
1761}
1762
1763metered_impl!(SequencedRecord);
1764
1765impl From<api::SequencedRecord> for SequencedRecord {
1766 fn from(value: api::SequencedRecord) -> Self {
1767 let api::SequencedRecord {
1768 seq_num,
1769 timestamp,
1770 headers,
1771 body,
1772 } = value;
1773 Self {
1774 seq_num,
1775 timestamp,
1776 headers: headers.into_iter().map(Into::into).collect(),
1777 body,
1778 }
1779 }
1780}
1781
1782impl SequencedRecord {
1783 pub fn as_command_record(&self) -> Option<CommandRecord> {
1785 if self.headers.len() != 1 {
1786 return None;
1787 }
1788
1789 let header = self.headers.first().expect("pre-validated length");
1790
1791 if !header.name.is_empty() {
1792 return None;
1793 }
1794
1795 match header.value.as_ref() {
1796 CommandRecord::FENCE => {
1797 let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1798 Some(CommandRecord {
1799 command: Command::Fence { fencing_token },
1800 timestamp: Some(self.timestamp),
1801 })
1802 }
1803 CommandRecord::TRIM => {
1804 let body: &[u8] = &self.body;
1805 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1806 Some(CommandRecord {
1807 command: Command::Trim { seq_num },
1808 timestamp: Some(self.timestamp),
1809 })
1810 }
1811 _ => None,
1812 }
1813 }
1814}
1815
1816#[sync_docs]
1817#[derive(Debug, Clone)]
1818pub struct SequencedRecordBatch {
1819 pub records: Vec<SequencedRecord>,
1820}
1821
1822impl MeteredBytes for SequencedRecordBatch {
1823 fn metered_bytes(&self) -> u64 {
1824 self.records.metered_bytes()
1825 }
1826}
1827
1828impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1829 fn from(value: api::SequencedRecordBatch) -> Self {
1830 let api::SequencedRecordBatch { records } = value;
1831 Self {
1832 records: records.into_iter().map(Into::into).collect(),
1833 }
1834 }
1835}
1836
1837#[sync_docs(ReadOutput = "Output")]
1838#[derive(Debug, Clone)]
1839pub enum ReadOutput {
1840 Batch(SequencedRecordBatch),
1841 NextSeqNum(u64),
1842}
1843
1844impl From<api::read_output::Output> for ReadOutput {
1845 fn from(value: api::read_output::Output) -> Self {
1846 match value {
1847 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1848 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1849 }
1850 }
1851}
1852
1853impl TryFrom<api::ReadOutput> for ReadOutput {
1854 type Error = ConvertError;
1855 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1856 let api::ReadOutput { output } = value;
1857 let output = output.ok_or("missing read output")?;
1858 Ok(output.into())
1859 }
1860}
1861
1862impl TryFrom<api::ReadResponse> for ReadOutput {
1863 type Error = ConvertError;
1864 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1865 let api::ReadResponse { output } = value;
1866 let output = output.ok_or("missing output in read response")?;
1867 output.try_into()
1868 }
1869}
1870
1871#[sync_docs]
1872#[derive(Debug, Clone, Default)]
1873pub struct ReadSessionRequest {
1874 pub start: ReadStart,
1875 pub limit: ReadLimit,
1876 pub until: Option<RangeTo<u64>>,
1877 pub clamp: bool,
1878}
1879
1880impl ReadSessionRequest {
1881 pub fn new(start: ReadStart) -> Self {
1883 Self {
1884 start,
1885 ..Default::default()
1886 }
1887 }
1888
1889 pub fn with_limit(self, limit: ReadLimit) -> Self {
1891 Self { limit, ..self }
1892 }
1893
1894 pub fn with_until(self, until: RangeTo<u64>) -> Self {
1896 Self {
1897 until: Some(until),
1898 ..self
1899 }
1900 }
1901
1902 pub fn with_clamp(self, clamp: bool) -> Self {
1904 Self { clamp, ..self }
1905 }
1906
1907 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1908 let Self {
1909 start,
1910 limit,
1911 until,
1912 clamp,
1913 } = self;
1914 api::ReadSessionRequest {
1915 stream: stream.into(),
1916 start: Some(start.into()),
1917 limit: Some(api::ReadLimit {
1918 count: limit.count,
1919 bytes: limit.bytes,
1920 }),
1921 heartbeats: false,
1922 until: until.map(|range| range.end),
1923 clamp,
1924 }
1925 }
1926}
1927
1928impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1929 type Error = ConvertError;
1930 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1931 let api::ReadSessionResponse { output } = value;
1932 let output = output.ok_or("missing output in read session response")?;
1933 output.try_into()
1934 }
1935}
1936
1937#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1942pub struct BasinName(String);
1943
1944impl Deref for BasinName {
1945 type Target = str;
1946 fn deref(&self) -> &Self::Target {
1947 &self.0
1948 }
1949}
1950
1951impl TryFrom<String> for BasinName {
1952 type Error = ConvertError;
1953
1954 fn try_from(name: String) -> Result<Self, Self::Error> {
1955 if name.len() < 8 || name.len() > 48 {
1956 return Err("Basin name must be between 8 and 48 characters in length".into());
1957 }
1958
1959 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1960 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1961 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1962 .expect("Failed to compile basin name regex")
1963 });
1964
1965 if !regex.is_match(&name) {
1966 return Err(
1967 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1968 It cannot begin or end with a hyphen."
1969 .into(),
1970 );
1971 }
1972
1973 Ok(Self(name))
1974 }
1975}
1976
1977impl FromStr for BasinName {
1978 type Err = ConvertError;
1979
1980 fn from_str(s: &str) -> Result<Self, Self::Err> {
1981 s.to_string().try_into()
1982 }
1983}
1984
1985impl std::fmt::Display for BasinName {
1986 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1987 f.write_str(&self.0)
1988 }
1989}
1990
1991impl From<BasinName> for String {
1992 fn from(value: BasinName) -> Self {
1993 value.0
1994 }
1995}
1996
1997#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2000pub struct AccessTokenId(String);
2001
2002impl Deref for AccessTokenId {
2003 type Target = str;
2004
2005 fn deref(&self) -> &Self::Target {
2006 &self.0
2007 }
2008}
2009
2010impl TryFrom<String> for AccessTokenId {
2011 type Error = ConvertError;
2012
2013 fn try_from(name: String) -> Result<Self, Self::Error> {
2014 if name.is_empty() {
2015 return Err("Access token ID must not be empty".into());
2016 }
2017
2018 if name.len() > 96 {
2019 return Err("Access token ID must not exceed 96 characters".into());
2020 }
2021
2022 Ok(Self(name))
2023 }
2024}
2025
2026impl From<AccessTokenId> for String {
2027 fn from(value: AccessTokenId) -> Self {
2028 value.0
2029 }
2030}
2031
2032impl FromStr for AccessTokenId {
2033 type Err = ConvertError;
2034
2035 fn from_str(s: &str) -> Result<Self, Self::Err> {
2036 s.to_string().try_into()
2037 }
2038}
2039
2040impl std::fmt::Display for AccessTokenId {
2041 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2042 f.write_str(&self.0)
2043 }
2044}
2045
2046impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
2047 fn from(value: AccessTokenInfo) -> Self {
2048 Self {
2049 info: Some(value.into()),
2050 }
2051 }
2052}
2053
2054#[sync_docs]
2055#[derive(Debug, Clone)]
2056pub struct AccessTokenInfo {
2057 pub id: AccessTokenId,
2058 pub expires_at: Option<u32>,
2059 pub auto_prefix_streams: bool,
2060 pub scope: Option<AccessTokenScope>,
2061}
2062
2063impl AccessTokenInfo {
2064 pub fn new(id: AccessTokenId) -> Self {
2066 Self {
2067 id,
2068 expires_at: None,
2069 auto_prefix_streams: false,
2070 scope: None,
2071 }
2072 }
2073
2074 pub fn with_expires_at(self, expires_at: u32) -> Self {
2076 Self {
2077 expires_at: Some(expires_at),
2078 ..self
2079 }
2080 }
2081
2082 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2084 Self {
2085 auto_prefix_streams,
2086 ..self
2087 }
2088 }
2089
2090 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2092 Self {
2093 scope: Some(scope),
2094 ..self
2095 }
2096 }
2097}
2098
2099impl From<AccessTokenInfo> for api::AccessTokenInfo {
2100 fn from(value: AccessTokenInfo) -> Self {
2101 let AccessTokenInfo {
2102 id,
2103 expires_at,
2104 auto_prefix_streams,
2105 scope,
2106 } = value;
2107 Self {
2108 id: id.into(),
2109 expires_at,
2110 auto_prefix_streams,
2111 scope: scope.map(Into::into),
2112 }
2113 }
2114}
2115
2116impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2117 type Error = ConvertError;
2118
2119 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2120 let api::AccessTokenInfo {
2121 id,
2122 expires_at,
2123 auto_prefix_streams,
2124 scope,
2125 } = value;
2126 Ok(Self {
2127 id: id.try_into()?,
2128 expires_at,
2129 auto_prefix_streams,
2130 scope: scope.map(Into::into),
2131 })
2132 }
2133}
2134
2135#[sync_docs]
2136#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2137pub enum Operation {
2138 ListBasins,
2139 CreateBasin,
2140 DeleteBasin,
2141 ReconfigureBasin,
2142 GetBasinConfig,
2143 IssueAccessToken,
2144 RevokeAccessToken,
2145 ListAccessTokens,
2146 ListStreams,
2147 CreateStream,
2148 DeleteStream,
2149 GetStreamConfig,
2150 ReconfigureStream,
2151 CheckTail,
2152 Append,
2153 Read,
2154 Trim,
2155 Fence,
2156 AccountMetrics,
2157 BasinMetrics,
2158 StreamMetrics,
2159}
2160
2161impl FromStr for Operation {
2162 type Err = ConvertError;
2163
2164 fn from_str(s: &str) -> Result<Self, Self::Err> {
2165 match s.to_lowercase().as_str() {
2166 "list-basins" => Ok(Self::ListBasins),
2167 "create-basin" => Ok(Self::CreateBasin),
2168 "delete-basin" => Ok(Self::DeleteBasin),
2169 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2170 "get-basin-config" => Ok(Self::GetBasinConfig),
2171 "issue-access-token" => Ok(Self::IssueAccessToken),
2172 "revoke-access-token" => Ok(Self::RevokeAccessToken),
2173 "list-access-tokens" => Ok(Self::ListAccessTokens),
2174 "list-streams" => Ok(Self::ListStreams),
2175 "create-stream" => Ok(Self::CreateStream),
2176 "delete-stream" => Ok(Self::DeleteStream),
2177 "get-stream-config" => Ok(Self::GetStreamConfig),
2178 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2179 "check-tail" => Ok(Self::CheckTail),
2180 "append" => Ok(Self::Append),
2181 "read" => Ok(Self::Read),
2182 "trim" => Ok(Self::Trim),
2183 "fence" => Ok(Self::Fence),
2184 "account-metrics" => Ok(Self::AccountMetrics),
2185 "basin-metrics" => Ok(Self::BasinMetrics),
2186 "stream-metrics" => Ok(Self::StreamMetrics),
2187 _ => Err("invalid operation".into()),
2188 }
2189 }
2190}
2191
2192impl From<Operation> for api::Operation {
2193 fn from(value: Operation) -> Self {
2194 match value {
2195 Operation::ListBasins => Self::ListBasins,
2196 Operation::CreateBasin => Self::CreateBasin,
2197 Operation::DeleteBasin => Self::DeleteBasin,
2198 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2199 Operation::GetBasinConfig => Self::GetBasinConfig,
2200 Operation::IssueAccessToken => Self::IssueAccessToken,
2201 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2202 Operation::ListAccessTokens => Self::ListAccessTokens,
2203 Operation::ListStreams => Self::ListStreams,
2204 Operation::CreateStream => Self::CreateStream,
2205 Operation::DeleteStream => Self::DeleteStream,
2206 Operation::GetStreamConfig => Self::GetStreamConfig,
2207 Operation::ReconfigureStream => Self::ReconfigureStream,
2208 Operation::CheckTail => Self::CheckTail,
2209 Operation::Append => Self::Append,
2210 Operation::Read => Self::Read,
2211 Operation::Trim => Self::Trim,
2212 Operation::Fence => Self::Fence,
2213 Operation::AccountMetrics => Self::AccountMetrics,
2214 Operation::BasinMetrics => Self::BasinMetrics,
2215 Operation::StreamMetrics => Self::StreamMetrics,
2216 }
2217 }
2218}
2219
2220impl From<api::Operation> for Option<Operation> {
2221 fn from(value: api::Operation) -> Self {
2222 match value {
2223 api::Operation::Unspecified => None,
2224 api::Operation::ListBasins => Some(Operation::ListBasins),
2225 api::Operation::CreateBasin => Some(Operation::CreateBasin),
2226 api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2227 api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2228 api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2229 api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2230 api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2231 api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2232 api::Operation::ListStreams => Some(Operation::ListStreams),
2233 api::Operation::CreateStream => Some(Operation::CreateStream),
2234 api::Operation::DeleteStream => Some(Operation::DeleteStream),
2235 api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2236 api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2237 api::Operation::CheckTail => Some(Operation::CheckTail),
2238 api::Operation::Append => Some(Operation::Append),
2239 api::Operation::Read => Some(Operation::Read),
2240 api::Operation::Trim => Some(Operation::Trim),
2241 api::Operation::Fence => Some(Operation::Fence),
2242 api::Operation::AccountMetrics => Some(Operation::AccountMetrics),
2243 api::Operation::BasinMetrics => Some(Operation::BasinMetrics),
2244 api::Operation::StreamMetrics => Some(Operation::StreamMetrics),
2245 }
2246 }
2247}
2248
2249#[sync_docs]
2250#[derive(Debug, Clone, Default)]
2251pub struct AccessTokenScope {
2252 pub basins: Option<ResourceSet>,
2253 pub streams: Option<ResourceSet>,
2254 pub access_tokens: Option<ResourceSet>,
2255 pub op_groups: Option<PermittedOperationGroups>,
2256 pub ops: HashSet<Operation>,
2257}
2258
2259impl AccessTokenScope {
2260 pub fn new() -> Self {
2262 Self::default()
2263 }
2264
2265 pub fn with_basins(self, basins: ResourceSet) -> Self {
2267 Self {
2268 basins: Some(basins),
2269 ..self
2270 }
2271 }
2272
2273 pub fn with_streams(self, streams: ResourceSet) -> Self {
2275 Self {
2276 streams: Some(streams),
2277 ..self
2278 }
2279 }
2280
2281 pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2283 Self {
2284 access_tokens: Some(access_tokens),
2285 ..self
2286 }
2287 }
2288
2289 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2291 Self {
2292 op_groups: Some(op_groups),
2293 ..self
2294 }
2295 }
2296
2297 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2299 Self {
2300 ops: ops.into_iter().collect(),
2301 ..self
2302 }
2303 }
2304
2305 pub fn with_op(self, op: Operation) -> Self {
2307 let mut ops = self.ops;
2308 ops.insert(op);
2309 Self { ops, ..self }
2310 }
2311}
2312
2313impl From<AccessTokenScope> for api::AccessTokenScope {
2314 fn from(value: AccessTokenScope) -> Self {
2315 let AccessTokenScope {
2316 basins,
2317 streams,
2318 access_tokens,
2319 op_groups,
2320 ops,
2321 } = value;
2322 Self {
2323 basins: basins.map(Into::into),
2324 streams: streams.map(Into::into),
2325 access_tokens: access_tokens.map(Into::into),
2326 op_groups: op_groups.map(Into::into),
2327 ops: ops
2328 .into_iter()
2329 .map(api::Operation::from)
2330 .map(Into::into)
2331 .collect(),
2332 }
2333 }
2334}
2335
2336impl From<api::AccessTokenScope> for AccessTokenScope {
2337 fn from(value: api::AccessTokenScope) -> Self {
2338 let api::AccessTokenScope {
2339 basins,
2340 streams,
2341 access_tokens,
2342 op_groups,
2343 ops,
2344 } = value;
2345 Self {
2346 basins: basins.and_then(|set| set.matching.map(Into::into)),
2347 streams: streams.and_then(|set| set.matching.map(Into::into)),
2348 access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2349 op_groups: op_groups.map(Into::into),
2350 ops: ops
2351 .into_iter()
2352 .map(api::Operation::try_from)
2353 .flat_map(Result::ok)
2354 .flat_map(<Option<Operation>>::from)
2355 .collect(),
2356 }
2357 }
2358}
2359
2360impl From<ResourceSet> for api::ResourceSet {
2361 fn from(value: ResourceSet) -> Self {
2362 Self {
2363 matching: Some(value.into()),
2364 }
2365 }
2366}
2367
2368#[sync_docs(ResourceSet = "Matching")]
2369#[derive(Debug, Clone)]
2370pub enum ResourceSet {
2371 Exact(String),
2372 Prefix(String),
2373}
2374
2375impl From<ResourceSet> for api::resource_set::Matching {
2376 fn from(value: ResourceSet) -> Self {
2377 match value {
2378 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2379 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2380 }
2381 }
2382}
2383
2384impl From<api::resource_set::Matching> for ResourceSet {
2385 fn from(value: api::resource_set::Matching) -> Self {
2386 match value {
2387 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2388 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2389 }
2390 }
2391}
2392
2393#[sync_docs]
2394#[derive(Debug, Clone, Default)]
2395pub struct PermittedOperationGroups {
2396 pub account: Option<ReadWritePermissions>,
2397 pub basin: Option<ReadWritePermissions>,
2398 pub stream: Option<ReadWritePermissions>,
2399}
2400
2401impl PermittedOperationGroups {
2402 pub fn new() -> Self {
2404 Self::default()
2405 }
2406
2407 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2409 Self {
2410 account: Some(account),
2411 ..self
2412 }
2413 }
2414
2415 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2417 Self {
2418 basin: Some(basin),
2419 ..self
2420 }
2421 }
2422
2423 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2425 Self {
2426 stream: Some(stream),
2427 ..self
2428 }
2429 }
2430}
2431
2432impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2433 fn from(value: PermittedOperationGroups) -> Self {
2434 let PermittedOperationGroups {
2435 account,
2436 basin,
2437 stream,
2438 } = value;
2439 Self {
2440 account: account.map(Into::into),
2441 basin: basin.map(Into::into),
2442 stream: stream.map(Into::into),
2443 }
2444 }
2445}
2446
2447impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2448 fn from(value: api::PermittedOperationGroups) -> Self {
2449 let api::PermittedOperationGroups {
2450 account,
2451 basin,
2452 stream,
2453 } = value;
2454 Self {
2455 account: account.map(Into::into),
2456 basin: basin.map(Into::into),
2457 stream: stream.map(Into::into),
2458 }
2459 }
2460}
2461
2462#[sync_docs]
2463#[derive(Debug, Clone, Default)]
2464pub struct ReadWritePermissions {
2465 pub read: bool,
2466 pub write: bool,
2467}
2468
2469impl ReadWritePermissions {
2470 pub fn new() -> Self {
2472 Self::default()
2473 }
2474
2475 pub fn with_read(self, read: bool) -> Self {
2477 Self { read, ..self }
2478 }
2479
2480 pub fn with_write(self, write: bool) -> Self {
2482 Self { write, ..self }
2483 }
2484}
2485
2486impl From<ReadWritePermissions> for api::ReadWritePermissions {
2487 fn from(value: ReadWritePermissions) -> Self {
2488 let ReadWritePermissions { read, write } = value;
2489 Self { read, write }
2490 }
2491}
2492
2493impl From<api::ReadWritePermissions> for ReadWritePermissions {
2494 fn from(value: api::ReadWritePermissions) -> Self {
2495 let api::ReadWritePermissions { read, write } = value;
2496 Self { read, write }
2497 }
2498}
2499
2500impl From<api::IssueAccessTokenResponse> for String {
2501 fn from(value: api::IssueAccessTokenResponse) -> Self {
2502 value.access_token
2503 }
2504}
2505
2506impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2507 fn from(value: AccessTokenId) -> Self {
2508 Self { id: value.into() }
2509 }
2510}
2511
2512impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2513 type Error = ConvertError;
2514 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2515 let token_info = value.info.ok_or("access token info is missing")?;
2516 token_info.try_into()
2517 }
2518}
2519
2520#[sync_docs]
2521#[derive(Debug, Clone, Default)]
2522pub struct ListAccessTokensRequest {
2523 pub prefix: String,
2524 pub start_after: String,
2525 pub limit: Option<usize>,
2526}
2527
2528impl ListAccessTokensRequest {
2529 pub fn new() -> Self {
2531 Self::default()
2532 }
2533
2534 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2536 Self {
2537 prefix: prefix.into(),
2538 ..self
2539 }
2540 }
2541
2542 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2544 Self {
2545 start_after: start_after.into(),
2546 ..self
2547 }
2548 }
2549
2550 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2552 Self {
2553 limit: limit.into(),
2554 ..self
2555 }
2556 }
2557}
2558
2559impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2560 type Error = ConvertError;
2561 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2562 let ListAccessTokensRequest {
2563 prefix,
2564 start_after,
2565 limit,
2566 } = value;
2567 Ok(Self {
2568 prefix,
2569 start_after,
2570 limit: limit
2571 .map(TryInto::try_into)
2572 .transpose()
2573 .map_err(|_| "request limit does not fit into u64 bounds")?,
2574 })
2575 }
2576}
2577
2578#[sync_docs]
2579#[derive(Debug, Clone)]
2580pub struct ListAccessTokensResponse {
2581 pub access_tokens: Vec<AccessTokenInfo>,
2582 pub has_more: bool,
2583}
2584
2585impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2586 type Error = ConvertError;
2587 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2588 let api::ListAccessTokensResponse {
2589 access_tokens,
2590 has_more,
2591 } = value;
2592 let access_tokens = access_tokens
2593 .into_iter()
2594 .map(TryInto::try_into)
2595 .collect::<Result<Vec<_>, _>>()?;
2596 Ok(Self {
2597 access_tokens,
2598 has_more,
2599 })
2600 }
2601}