1use std::{
4 collections::HashSet,
5 ops::{Deref, RangeTo},
6 str::FromStr,
7 sync::OnceLock,
8 time::Duration,
9};
10
11use bytes::Bytes;
12use rand::Rng;
13use regex::Regex;
14use sync_docs::sync_docs;
15
16use crate::api;
17
18pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
19pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
20
21#[derive(Debug, Clone, thiserror::Error)]
23#[error("{0}")]
24pub struct ConvertError(String);
25
26impl<T: Into<String>> From<T> for ConvertError {
27 fn from(value: T) -> Self {
28 Self(value.into())
29 }
30}
31
32pub trait MeteredBytes {
40 fn metered_bytes(&self) -> u64;
42}
43
44impl<T: MeteredBytes> MeteredBytes for Vec<T> {
45 fn metered_bytes(&self) -> u64 {
46 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
47 }
48}
49
50macro_rules! metered_impl {
51 ($ty:ty) => {
52 impl MeteredBytes for $ty {
53 fn metered_bytes(&self) -> u64 {
54 let bytes = 8
55 + (2 * self.headers.len())
56 + self
57 .headers
58 .iter()
59 .map(|h| h.name.len() + h.value.len())
60 .sum::<usize>()
61 + self.body.len();
62 bytes as u64
63 }
64 }
65 };
66}
67
68#[sync_docs]
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BasinScope {
71 AwsUsEast1,
72}
73
74impl From<BasinScope> for api::BasinScope {
75 fn from(value: BasinScope) -> Self {
76 match value {
77 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
78 }
79 }
80}
81
82impl From<api::BasinScope> for Option<BasinScope> {
83 fn from(value: api::BasinScope) -> Self {
84 match value {
85 api::BasinScope::Unspecified => None,
86 api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
87 }
88 }
89}
90
91impl FromStr for BasinScope {
92 type Err = ConvertError;
93 fn from_str(value: &str) -> Result<Self, Self::Err> {
94 match value {
95 "aws:us-east-1" => Ok(Self::AwsUsEast1),
96 _ => Err("invalid basin scope value".into()),
97 }
98 }
99}
100
101#[sync_docs]
102#[derive(Debug, Clone)]
103pub struct CreateBasinRequest {
104 pub basin: BasinName,
105 pub config: Option<BasinConfig>,
106 pub scope: Option<BasinScope>,
107}
108
109impl CreateBasinRequest {
110 pub fn new(basin: BasinName) -> Self {
112 Self {
113 basin,
114 config: None,
115 scope: None,
116 }
117 }
118
119 pub fn with_config(self, config: BasinConfig) -> Self {
121 Self {
122 config: Some(config),
123 ..self
124 }
125 }
126
127 pub fn with_scope(self, scope: BasinScope) -> Self {
129 Self {
130 scope: Some(scope),
131 ..self
132 }
133 }
134}
135
136impl From<CreateBasinRequest> for api::CreateBasinRequest {
137 fn from(value: CreateBasinRequest) -> Self {
138 let CreateBasinRequest {
139 basin,
140 config,
141 scope,
142 } = value;
143 Self {
144 basin: basin.0,
145 config: config.map(Into::into),
146 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
147 }
148 }
149}
150
151#[sync_docs]
152#[derive(Debug, Clone, Default)]
153pub struct BasinConfig {
154 pub default_stream_config: Option<StreamConfig>,
155 pub create_stream_on_append: bool,
156 pub create_stream_on_read: bool,
157}
158
159impl BasinConfig {
160 pub fn new() -> Self {
162 Self::default()
163 }
164
165 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
167 Self {
168 default_stream_config: Some(default_stream_config),
169 ..self
170 }
171 }
172
173 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
175 Self {
176 create_stream_on_append,
177 ..self
178 }
179 }
180
181 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
183 Self {
184 create_stream_on_read,
185 ..self
186 }
187 }
188}
189
190impl From<BasinConfig> for api::BasinConfig {
191 fn from(value: BasinConfig) -> Self {
192 let BasinConfig {
193 default_stream_config,
194 create_stream_on_append,
195 create_stream_on_read,
196 } = value;
197 Self {
198 default_stream_config: default_stream_config.map(Into::into),
199 create_stream_on_append,
200 create_stream_on_read,
201 }
202 }
203}
204
205impl From<api::BasinConfig> for BasinConfig {
206 fn from(value: api::BasinConfig) -> Self {
207 let api::BasinConfig {
208 default_stream_config,
209 create_stream_on_append,
210 create_stream_on_read,
211 } = value;
212 Self {
213 default_stream_config: default_stream_config.map(Into::into),
214 create_stream_on_append,
215 create_stream_on_read,
216 }
217 }
218}
219
220#[sync_docs]
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TimestampingMode {
223 ClientPrefer,
224 ClientRequire,
225 Arrival,
226}
227
228impl From<TimestampingMode> for api::TimestampingMode {
229 fn from(value: TimestampingMode) -> Self {
230 match value {
231 TimestampingMode::ClientPrefer => Self::ClientPrefer,
232 TimestampingMode::ClientRequire => Self::ClientRequire,
233 TimestampingMode::Arrival => Self::Arrival,
234 }
235 }
236}
237
238impl From<api::TimestampingMode> for Option<TimestampingMode> {
239 fn from(value: api::TimestampingMode) -> Self {
240 match value {
241 api::TimestampingMode::Unspecified => None,
242 api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
243 api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
244 api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
245 }
246 }
247}
248
249#[sync_docs(TimestampingConfig = "Timestamping")]
250#[derive(Debug, Clone, Default)]
251pub struct TimestampingConfig {
253 pub mode: Option<TimestampingMode>,
254 pub uncapped: Option<bool>,
255}
256
257impl TimestampingConfig {
258 pub fn new() -> Self {
260 Self::default()
261 }
262
263 pub fn with_mode(self, mode: TimestampingMode) -> Self {
265 Self {
266 mode: Some(mode),
267 ..self
268 }
269 }
270
271 pub fn with_uncapped(self, uncapped: bool) -> Self {
273 Self {
274 uncapped: Some(uncapped),
275 ..self
276 }
277 }
278}
279
280impl From<TimestampingConfig> for api::stream_config::Timestamping {
281 fn from(value: TimestampingConfig) -> Self {
282 Self {
283 mode: value
284 .mode
285 .map(api::TimestampingMode::from)
286 .unwrap_or_default()
287 .into(),
288 uncapped: value.uncapped,
289 }
290 }
291}
292
293impl From<api::stream_config::Timestamping> for TimestampingConfig {
294 fn from(value: api::stream_config::Timestamping) -> Self {
295 let mode = value.mode().into();
296 let uncapped = value.uncapped;
297 Self { mode, uncapped }
298 }
299}
300
301#[sync_docs(min_age = "min_age_secs")]
302#[derive(Debug, Clone, Default)]
303pub struct DeleteOnEmpty {
305 pub min_age: Duration,
306}
307
308impl DeleteOnEmpty {
309 pub fn new() -> Self {
311 Self::default()
312 }
313
314 pub fn with_min_age(self, min_age: Duration) -> Self {
316 Self { min_age }
317 }
318}
319
320impl From<DeleteOnEmpty> for api::stream_config::DeleteOnEmpty {
321 fn from(value: DeleteOnEmpty) -> Self {
322 Self {
323 min_age_secs: value.min_age.as_secs(),
324 }
325 }
326}
327
328impl From<api::stream_config::DeleteOnEmpty> for DeleteOnEmpty {
329 fn from(value: api::stream_config::DeleteOnEmpty) -> Self {
330 Self {
331 min_age: Duration::from_secs(value.min_age_secs),
332 }
333 }
334}
335
336#[sync_docs]
337#[derive(Debug, Clone, Default)]
338pub struct StreamConfig {
339 pub storage_class: Option<StorageClass>,
340 pub retention_policy: Option<RetentionPolicy>,
341 pub timestamping: Option<TimestampingConfig>,
342 pub delete_on_empty: Option<DeleteOnEmpty>,
343}
344
345impl StreamConfig {
346 pub fn new() -> Self {
348 Self::default()
349 }
350
351 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
353 Self {
354 storage_class: Some(storage_class),
355 ..self
356 }
357 }
358
359 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
361 Self {
362 retention_policy: Some(retention_policy),
363 ..self
364 }
365 }
366
367 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
369 Self {
370 timestamping: Some(timestamping),
371 ..self
372 }
373 }
374
375 pub fn with_delete_on_empty(self, delete_on_empty: DeleteOnEmpty) -> Self {
377 Self {
378 delete_on_empty: Some(delete_on_empty),
379 ..self
380 }
381 }
382}
383
384impl From<StreamConfig> for api::StreamConfig {
385 fn from(value: StreamConfig) -> Self {
386 let StreamConfig {
387 storage_class,
388 retention_policy,
389 timestamping,
390 delete_on_empty,
391 } = value;
392 Self {
393 storage_class: storage_class
394 .map(api::StorageClass::from)
395 .unwrap_or_default()
396 .into(),
397 retention_policy: retention_policy.map(Into::into),
398 timestamping: timestamping.map(Into::into),
399 delete_on_empty: delete_on_empty.map(Into::into),
400 }
401 }
402}
403
404impl From<api::StreamConfig> for StreamConfig {
405 fn from(value: api::StreamConfig) -> Self {
406 Self {
407 storage_class: value.storage_class().into(),
408 retention_policy: value.retention_policy.map(Into::into),
409 timestamping: value.timestamping.map(Into::into),
410 delete_on_empty: value.delete_on_empty.map(Into::into),
411 }
412 }
413}
414
415#[sync_docs]
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417pub enum StorageClass {
418 Standard,
419 Express,
420}
421
422impl From<StorageClass> for api::StorageClass {
423 fn from(value: StorageClass) -> Self {
424 match value {
425 StorageClass::Standard => Self::Standard,
426 StorageClass::Express => Self::Express,
427 }
428 }
429}
430
431impl From<api::StorageClass> for Option<StorageClass> {
432 fn from(value: api::StorageClass) -> Self {
433 match value {
434 api::StorageClass::Unspecified => None,
435 api::StorageClass::Standard => Some(StorageClass::Standard),
436 api::StorageClass::Express => Some(StorageClass::Express),
437 }
438 }
439}
440
441impl FromStr for StorageClass {
442 type Err = ConvertError;
443
444 fn from_str(value: &str) -> Result<Self, Self::Err> {
445 match value {
446 "standard" => Ok(Self::Standard),
447 "express" => Ok(Self::Express),
448 v => Err(format!("unknown storage class: {v}").into()),
449 }
450 }
451}
452
453#[sync_docs(Age = "Age")]
454#[derive(Debug, Clone)]
455pub enum RetentionPolicy {
456 Age(Duration),
457}
458
459impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
460 fn from(value: RetentionPolicy) -> Self {
461 match value {
462 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
463 }
464 }
465}
466
467impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
468 fn from(value: api::stream_config::RetentionPolicy) -> Self {
469 match value {
470 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
471 }
472 }
473}
474
475#[sync_docs]
476#[derive(Debug, Clone, Copy, PartialEq, Eq)]
477pub enum BasinState {
478 Active,
479 Creating,
480 Deleting,
481}
482
483impl From<BasinState> for api::BasinState {
484 fn from(value: BasinState) -> Self {
485 match value {
486 BasinState::Active => Self::Active,
487 BasinState::Creating => Self::Creating,
488 BasinState::Deleting => Self::Deleting,
489 }
490 }
491}
492
493impl From<api::BasinState> for Option<BasinState> {
494 fn from(value: api::BasinState) -> Self {
495 match value {
496 api::BasinState::Unspecified => None,
497 api::BasinState::Active => Some(BasinState::Active),
498 api::BasinState::Creating => Some(BasinState::Creating),
499 api::BasinState::Deleting => Some(BasinState::Deleting),
500 }
501 }
502}
503
504impl std::fmt::Display for BasinState {
505 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
506 match self {
507 BasinState::Active => write!(f, "active"),
508 BasinState::Creating => write!(f, "creating"),
509 BasinState::Deleting => write!(f, "deleting"),
510 }
511 }
512}
513
514#[sync_docs]
515#[derive(Debug, Clone)]
516pub struct BasinInfo {
517 pub name: String,
518 pub scope: Option<BasinScope>,
519 pub state: Option<BasinState>,
520}
521
522impl From<BasinInfo> for api::BasinInfo {
523 fn from(value: BasinInfo) -> Self {
524 let BasinInfo { name, scope, state } = value;
525 Self {
526 name,
527 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
528 state: state.map(api::BasinState::from).unwrap_or_default().into(),
529 }
530 }
531}
532
533impl From<api::BasinInfo> for BasinInfo {
534 fn from(value: api::BasinInfo) -> Self {
535 let scope = value.scope().into();
536 let state = value.state().into();
537 let name = value.name;
538 Self { name, scope, state }
539 }
540}
541
542impl TryFrom<api::CreateBasinResponse> for BasinInfo {
543 type Error = ConvertError;
544 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
545 let api::CreateBasinResponse { info } = value;
546 let info = info.ok_or("missing basin info")?;
547 Ok(info.into())
548 }
549}
550
551#[sync_docs]
552#[derive(Debug, Clone, Default)]
553pub struct ListStreamsRequest {
554 pub prefix: String,
555 pub start_after: String,
556 pub limit: Option<usize>,
557}
558
559impl ListStreamsRequest {
560 pub fn new() -> Self {
562 Self::default()
563 }
564
565 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
567 Self {
568 prefix: prefix.into(),
569 ..self
570 }
571 }
572
573 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
575 Self {
576 start_after: start_after.into(),
577 ..self
578 }
579 }
580
581 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
583 Self {
584 limit: limit.into(),
585 ..self
586 }
587 }
588}
589
590impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
591 type Error = ConvertError;
592 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
593 let ListStreamsRequest {
594 prefix,
595 start_after,
596 limit,
597 } = value;
598 Ok(Self {
599 prefix,
600 start_after,
601 limit: limit.map(|n| n as u64),
602 })
603 }
604}
605
606#[sync_docs]
607#[derive(Debug, Clone)]
608pub struct StreamInfo {
609 pub name: String,
610 pub created_at: u32,
611 pub deleted_at: Option<u32>,
612}
613
614impl From<api::StreamInfo> for StreamInfo {
615 fn from(value: api::StreamInfo) -> Self {
616 Self {
617 name: value.name,
618 created_at: value.created_at,
619 deleted_at: value.deleted_at,
620 }
621 }
622}
623
624impl TryFrom<api::CreateStreamResponse> for StreamInfo {
625 type Error = ConvertError;
626
627 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
628 let api::CreateStreamResponse { info } = value;
629 let info = info.ok_or("missing stream info")?;
630 Ok(info.into())
631 }
632}
633
634#[sync_docs]
635#[derive(Debug, Clone)]
636pub struct ListStreamsResponse {
637 pub streams: Vec<StreamInfo>,
638 pub has_more: bool,
639}
640
641impl From<api::ListStreamsResponse> for ListStreamsResponse {
642 fn from(value: api::ListStreamsResponse) -> Self {
643 let api::ListStreamsResponse { streams, has_more } = value;
644 let streams = streams.into_iter().map(Into::into).collect();
645 Self { streams, has_more }
646 }
647}
648
649impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
650 type Error = ConvertError;
651
652 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
653 let api::GetBasinConfigResponse { config } = value;
654 let config = config.ok_or("missing basin config")?;
655 Ok(config.into())
656 }
657}
658
659impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
660 type Error = ConvertError;
661
662 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
663 let api::GetStreamConfigResponse { config } = value;
664 let config = config.ok_or("missing stream config")?;
665 Ok(config.into())
666 }
667}
668
669#[sync_docs]
670#[derive(Debug, Clone)]
671pub struct CreateStreamRequest {
672 pub stream: String,
673 pub config: Option<StreamConfig>,
674}
675
676impl CreateStreamRequest {
677 pub fn new(stream: impl Into<String>) -> Self {
679 Self {
680 stream: stream.into(),
681 config: None,
682 }
683 }
684
685 pub fn with_config(self, config: StreamConfig) -> Self {
687 Self {
688 config: Some(config),
689 ..self
690 }
691 }
692}
693
694impl From<CreateStreamRequest> for api::CreateStreamRequest {
695 fn from(value: CreateStreamRequest) -> Self {
696 let CreateStreamRequest { stream, config } = value;
697 Self {
698 stream,
699 config: config.map(Into::into),
700 }
701 }
702}
703
704#[sync_docs]
705#[derive(Debug, Clone, Default)]
706pub struct ListBasinsRequest {
707 pub prefix: String,
708 pub start_after: String,
709 pub limit: Option<usize>,
710}
711
712impl ListBasinsRequest {
713 pub fn new() -> Self {
715 Self::default()
716 }
717
718 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
720 Self {
721 prefix: prefix.into(),
722 ..self
723 }
724 }
725
726 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
728 Self {
729 start_after: start_after.into(),
730 ..self
731 }
732 }
733
734 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
736 Self {
737 limit: limit.into(),
738 ..self
739 }
740 }
741}
742
743impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
744 type Error = ConvertError;
745 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
746 let ListBasinsRequest {
747 prefix,
748 start_after,
749 limit,
750 } = value;
751 Ok(Self {
752 prefix,
753 start_after,
754 limit: limit
755 .map(TryInto::try_into)
756 .transpose()
757 .map_err(|_| "request limit does not fit into u64 bounds")?,
758 })
759 }
760}
761
762#[sync_docs]
763#[derive(Debug, Clone)]
764pub struct ListBasinsResponse {
765 pub basins: Vec<BasinInfo>,
766 pub has_more: bool,
767}
768
769impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
770 type Error = ConvertError;
771 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
772 let api::ListBasinsResponse { basins, has_more } = value;
773 Ok(Self {
774 basins: basins.into_iter().map(Into::into).collect(),
775 has_more,
776 })
777 }
778}
779
780#[sync_docs]
781#[derive(Debug, Clone)]
782pub struct DeleteBasinRequest {
783 pub basin: BasinName,
784 pub if_exists: bool,
786}
787
788impl DeleteBasinRequest {
789 pub fn new(basin: BasinName) -> Self {
791 Self {
792 basin,
793 if_exists: false,
794 }
795 }
796
797 pub fn with_if_exists(self, if_exists: bool) -> Self {
799 Self { if_exists, ..self }
800 }
801}
802
803impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
804 fn from(value: DeleteBasinRequest) -> Self {
805 let DeleteBasinRequest { basin, .. } = value;
806 Self { basin: basin.0 }
807 }
808}
809
810#[sync_docs]
811#[derive(Debug, Clone)]
812pub struct DeleteStreamRequest {
813 pub stream: String,
814 pub if_exists: bool,
816}
817
818impl DeleteStreamRequest {
819 pub fn new(stream: impl Into<String>) -> Self {
821 Self {
822 stream: stream.into(),
823 if_exists: false,
824 }
825 }
826
827 pub fn with_if_exists(self, if_exists: bool) -> Self {
829 Self { if_exists, ..self }
830 }
831}
832
833impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
834 fn from(value: DeleteStreamRequest) -> Self {
835 let DeleteStreamRequest { stream, .. } = value;
836 Self { stream }
837 }
838}
839
840#[sync_docs]
841#[derive(Debug, Clone)]
842pub struct ReconfigureBasinRequest {
843 pub basin: BasinName,
844 pub config: Option<BasinConfig>,
845 pub mask: Option<Vec<String>>,
846}
847
848impl ReconfigureBasinRequest {
849 pub fn new(basin: BasinName) -> Self {
851 Self {
852 basin,
853 config: None,
854 mask: None,
855 }
856 }
857
858 pub fn with_config(self, config: BasinConfig) -> Self {
860 Self {
861 config: Some(config),
862 ..self
863 }
864 }
865
866 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
868 Self {
869 mask: Some(mask.into()),
870 ..self
871 }
872 }
873}
874
875impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
876 fn from(value: ReconfigureBasinRequest) -> Self {
877 let ReconfigureBasinRequest {
878 basin,
879 config,
880 mask,
881 } = value;
882 Self {
883 basin: basin.0,
884 config: config.map(Into::into),
885 mask: mask.map(|paths| prost_types::FieldMask { paths }),
886 }
887 }
888}
889
890impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
891 type Error = ConvertError;
892 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
893 let api::ReconfigureBasinResponse { config } = value;
894 let config = config.ok_or("missing basin config")?;
895 Ok(config.into())
896 }
897}
898
899#[sync_docs]
900#[derive(Debug, Clone)]
901pub struct ReconfigureStreamRequest {
902 pub stream: String,
903 pub config: Option<StreamConfig>,
904 pub mask: Option<Vec<String>>,
905}
906
907impl ReconfigureStreamRequest {
908 pub fn new(stream: impl Into<String>) -> Self {
910 Self {
911 stream: stream.into(),
912 config: None,
913 mask: None,
914 }
915 }
916
917 pub fn with_config(self, config: StreamConfig) -> Self {
919 Self {
920 config: Some(config),
921 ..self
922 }
923 }
924
925 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
927 Self {
928 mask: Some(mask.into()),
929 ..self
930 }
931 }
932}
933
934impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
935 fn from(value: ReconfigureStreamRequest) -> Self {
936 let ReconfigureStreamRequest {
937 stream,
938 config,
939 mask,
940 } = value;
941 Self {
942 stream,
943 config: config.map(Into::into),
944 mask: mask.map(|paths| prost_types::FieldMask { paths }),
945 }
946 }
947}
948
949impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
950 type Error = ConvertError;
951 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
952 let api::ReconfigureStreamResponse { config } = value;
953 let config = config.ok_or("missing stream config")?;
954 Ok(config.into())
955 }
956}
957
958impl From<api::CheckTailResponse> for StreamPosition {
959 fn from(value: api::CheckTailResponse) -> Self {
960 let api::CheckTailResponse {
961 next_seq_num,
962 last_timestamp,
963 } = value;
964 StreamPosition {
965 seq_num: next_seq_num,
966 timestamp: last_timestamp,
967 }
968 }
969}
970
971#[derive(Debug, Clone, PartialEq, Eq)]
973pub struct StreamPosition {
974 pub seq_num: u64,
976 pub timestamp: u64,
979}
980
981#[sync_docs]
982#[derive(Debug, Clone, PartialEq, Eq)]
983pub struct Header {
984 pub name: Bytes,
985 pub value: Bytes,
986}
987
988impl Header {
989 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
991 Self {
992 name: name.into(),
993 value: value.into(),
994 }
995 }
996
997 pub fn from_value(value: impl Into<Bytes>) -> Self {
999 Self {
1000 name: Bytes::new(),
1001 value: value.into(),
1002 }
1003 }
1004}
1005
1006impl From<Header> for api::Header {
1007 fn from(value: Header) -> Self {
1008 let Header { name, value } = value;
1009 Self { name, value }
1010 }
1011}
1012
1013impl From<api::Header> for Header {
1014 fn from(value: api::Header) -> Self {
1015 let api::Header { name, value } = value;
1016 Self { name, value }
1017 }
1018}
1019
1020#[derive(Debug, Clone, Default, PartialEq, Eq)]
1024pub struct FencingToken(String);
1025
1026impl FencingToken {
1027 const MAX_BYTES: usize = 36;
1028
1029 pub fn generate(n: usize) -> Result<Self, ConvertError> {
1031 rand::rng()
1032 .sample_iter(&rand::distr::Alphanumeric)
1033 .take(n)
1034 .map(char::from)
1035 .collect::<String>()
1036 .parse()
1037 }
1038}
1039
1040impl Deref for FencingToken {
1041 type Target = str;
1042
1043 fn deref(&self) -> &Self::Target {
1044 &self.0
1045 }
1046}
1047
1048impl std::fmt::Display for FencingToken {
1049 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1050 write!(f, "{}", self.0)
1051 }
1052}
1053
1054impl FromStr for FencingToken {
1055 type Err = ConvertError;
1056
1057 fn from_str(value: &str) -> Result<Self, Self::Err> {
1058 value.to_string().try_into()
1059 }
1060}
1061
1062impl TryFrom<String> for FencingToken {
1063 type Error = ConvertError;
1064
1065 fn try_from(value: String) -> Result<Self, Self::Error> {
1066 if value.len() > Self::MAX_BYTES {
1067 Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1068 } else {
1069 Ok(Self(value))
1070 }
1071 }
1072}
1073
1074impl From<FencingToken> for String {
1075 fn from(value: FencingToken) -> Self {
1076 value.0
1077 }
1078}
1079
1080#[derive(Debug, Clone)]
1082pub enum Command {
1083 Fence {
1088 fencing_token: FencingToken,
1092 },
1093 Trim {
1098 seq_num: u64,
1103 },
1104}
1105
1106#[derive(Debug, Clone)]
1112pub struct CommandRecord {
1113 pub command: Command,
1115 pub timestamp: Option<u64>,
1117}
1118
1119impl CommandRecord {
1120 const FENCE: &[u8] = b"fence";
1121 const TRIM: &[u8] = b"trim";
1122
1123 pub fn fence(fencing_token: FencingToken) -> Self {
1125 Self {
1126 command: Command::Fence { fencing_token },
1127 timestamp: None,
1128 }
1129 }
1130
1131 pub fn trim(seq_num: impl Into<u64>) -> Self {
1133 Self {
1134 command: Command::Trim {
1135 seq_num: seq_num.into(),
1136 },
1137 timestamp: None,
1138 }
1139 }
1140
1141 pub fn with_timestamp(self, timestamp: u64) -> Self {
1143 Self {
1144 timestamp: Some(timestamp),
1145 ..self
1146 }
1147 }
1148}
1149
1150#[sync_docs]
1151#[derive(Debug, Clone, PartialEq, Eq)]
1152pub struct AppendRecord {
1153 timestamp: Option<u64>,
1154 headers: Vec<Header>,
1155 body: Bytes,
1156 #[cfg(test)]
1157 max_bytes: u64,
1158}
1159
1160metered_impl!(AppendRecord);
1161
1162impl AppendRecord {
1163 const MAX_BYTES: u64 = MIB_BYTES;
1164
1165 fn validated(self) -> Result<Self, ConvertError> {
1166 #[cfg(test)]
1167 let max_bytes = self.max_bytes;
1168 #[cfg(not(test))]
1169 let max_bytes = Self::MAX_BYTES;
1170
1171 if self.metered_bytes() > max_bytes {
1172 Err("AppendRecord should have metered size less than 1 MiB".into())
1173 } else {
1174 Ok(self)
1175 }
1176 }
1177
1178 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1180 Self {
1181 timestamp: None,
1182 headers: Vec::new(),
1183 body: body.into(),
1184 #[cfg(test)]
1185 max_bytes: Self::MAX_BYTES,
1186 }
1187 .validated()
1188 }
1189
1190 #[cfg(test)]
1191 pub(crate) fn with_max_bytes(
1192 max_bytes: u64,
1193 body: impl Into<Bytes>,
1194 ) -> Result<Self, ConvertError> {
1195 Self {
1196 timestamp: None,
1197 headers: Vec::new(),
1198 body: body.into(),
1199 max_bytes,
1200 }
1201 .validated()
1202 }
1203
1204 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1206 Self {
1207 headers: headers.into(),
1208 ..self
1209 }
1210 .validated()
1211 }
1212
1213 pub fn with_timestamp(self, timestamp: u64) -> Self {
1215 Self {
1216 timestamp: Some(timestamp),
1217 ..self
1218 }
1219 }
1220
1221 pub fn body(&self) -> &[u8] {
1223 &self.body
1224 }
1225
1226 pub fn headers(&self) -> &[Header] {
1228 &self.headers
1229 }
1230
1231 pub fn timestamp(&self) -> Option<u64> {
1233 self.timestamp
1234 }
1235
1236 pub fn into_parts(self) -> AppendRecordParts {
1238 AppendRecordParts {
1239 timestamp: self.timestamp,
1240 headers: self.headers,
1241 body: self.body,
1242 }
1243 }
1244
1245 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1247 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1248 if let Some(timestamp) = parts.timestamp {
1249 Ok(record.with_timestamp(timestamp))
1250 } else {
1251 Ok(record)
1252 }
1253 }
1254}
1255
1256impl From<AppendRecord> for api::AppendRecord {
1257 fn from(value: AppendRecord) -> Self {
1258 Self {
1259 timestamp: value.timestamp,
1260 headers: value.headers.into_iter().map(Into::into).collect(),
1261 body: value.body,
1262 }
1263 }
1264}
1265
1266impl From<CommandRecord> for AppendRecord {
1267 fn from(value: CommandRecord) -> Self {
1268 let (header_value, body) = match value.command {
1269 Command::Fence { fencing_token } => (
1270 CommandRecord::FENCE,
1271 Bytes::copy_from_slice(fencing_token.as_bytes()),
1272 ),
1273 Command::Trim { seq_num } => (
1274 CommandRecord::TRIM,
1275 Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1276 ),
1277 };
1278 AppendRecordParts {
1279 timestamp: value.timestamp,
1280 headers: vec![Header::from_value(header_value)],
1281 body,
1282 }
1283 .try_into()
1284 .expect("command record is a valid append record")
1285 }
1286}
1287
1288#[sync_docs(AppendRecordParts = "AppendRecord")]
1289#[derive(Debug, Clone)]
1290pub struct AppendRecordParts {
1291 pub timestamp: Option<u64>,
1292 pub headers: Vec<Header>,
1293 pub body: Bytes,
1294}
1295
1296impl From<AppendRecord> for AppendRecordParts {
1297 fn from(value: AppendRecord) -> Self {
1298 value.into_parts()
1299 }
1300}
1301
1302impl TryFrom<AppendRecordParts> for AppendRecord {
1303 type Error = ConvertError;
1304
1305 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1306 Self::try_from_parts(value)
1307 }
1308}
1309
1310#[derive(Debug, Clone)]
1312pub struct AppendRecordBatch {
1313 records: Vec<AppendRecord>,
1314 metered_bytes: u64,
1315 max_capacity: usize,
1316 #[cfg(test)]
1317 max_bytes: u64,
1318}
1319
1320impl PartialEq for AppendRecordBatch {
1321 fn eq(&self, other: &Self) -> bool {
1322 if self.records.eq(&other.records) {
1323 assert_eq!(self.metered_bytes, other.metered_bytes);
1324 true
1325 } else {
1326 false
1327 }
1328 }
1329}
1330
1331impl Eq for AppendRecordBatch {}
1332
1333impl Default for AppendRecordBatch {
1334 fn default() -> Self {
1335 Self::new()
1336 }
1337}
1338
1339impl AppendRecordBatch {
1340 pub const MAX_CAPACITY: usize = 1000;
1344
1345 pub const MAX_BYTES: u64 = MIB_BYTES;
1347
1348 pub fn new() -> Self {
1350 Self::with_max_capacity(Self::MAX_CAPACITY)
1351 }
1352
1353 pub fn with_max_capacity(max_capacity: usize) -> Self {
1357 assert!(
1358 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1359 "Batch capacity must be between 1 and 1000"
1360 );
1361
1362 Self {
1363 records: Vec::with_capacity(max_capacity),
1364 metered_bytes: 0,
1365 max_capacity,
1366 #[cfg(test)]
1367 max_bytes: Self::MAX_BYTES,
1368 }
1369 }
1370
1371 #[cfg(test)]
1372 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1373 #[cfg(test)]
1374 assert!(
1375 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1376 "Batch size must be between 1 byte and 1 MiB"
1377 );
1378
1379 Self {
1380 max_bytes,
1381 ..Self::with_max_capacity(max_capacity)
1382 }
1383 }
1384
1385 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1391 where
1392 R: Into<AppendRecord>,
1393 T: IntoIterator<Item = R>,
1394 {
1395 let mut records = Self::new();
1396 let mut pending = Vec::new();
1397
1398 let mut iter = iter.into_iter();
1399
1400 for record in iter.by_ref() {
1401 if let Err(record) = records.push(record) {
1402 pending.push(record);
1403 break;
1404 }
1405 }
1406
1407 if pending.is_empty() {
1408 Ok(records)
1409 } else {
1410 pending.extend(iter.map(Into::into));
1411 Err((records, pending))
1412 }
1413 }
1414
1415 pub fn is_empty(&self) -> bool {
1417 if self.records.is_empty() {
1418 assert_eq!(self.metered_bytes, 0);
1419 true
1420 } else {
1421 false
1422 }
1423 }
1424
1425 pub fn len(&self) -> usize {
1427 self.records.len()
1428 }
1429
1430 #[cfg(test)]
1431 fn max_bytes(&self) -> u64 {
1432 self.max_bytes
1433 }
1434
1435 #[cfg(not(test))]
1436 fn max_bytes(&self) -> u64 {
1437 Self::MAX_BYTES
1438 }
1439
1440 pub fn is_full(&self) -> bool {
1442 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1443 }
1444
1445 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1447 assert!(self.records.len() <= self.max_capacity);
1448 assert!(self.metered_bytes <= self.max_bytes());
1449
1450 let record = record.into();
1451 let record_size = record.metered_bytes();
1452 if self.records.len() >= self.max_capacity
1453 || self.metered_bytes + record_size > self.max_bytes()
1454 {
1455 Err(record)
1456 } else {
1457 self.records.push(record);
1458 self.metered_bytes += record_size;
1459 Ok(())
1460 }
1461 }
1462}
1463
1464impl MeteredBytes for AppendRecordBatch {
1465 fn metered_bytes(&self) -> u64 {
1466 self.metered_bytes
1467 }
1468}
1469
1470impl IntoIterator for AppendRecordBatch {
1471 type Item = AppendRecord;
1472 type IntoIter = std::vec::IntoIter<Self::Item>;
1473
1474 fn into_iter(self) -> Self::IntoIter {
1475 self.records.into_iter()
1476 }
1477}
1478
1479impl<'a> IntoIterator for &'a AppendRecordBatch {
1480 type Item = &'a AppendRecord;
1481 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1482
1483 fn into_iter(self) -> Self::IntoIter {
1484 self.records.iter()
1485 }
1486}
1487
1488impl AsRef<[AppendRecord]> for AppendRecordBatch {
1489 fn as_ref(&self) -> &[AppendRecord] {
1490 &self.records
1491 }
1492}
1493
1494#[sync_docs]
1495#[derive(Debug, Default, Clone)]
1496pub struct AppendInput {
1497 pub records: AppendRecordBatch,
1498 pub match_seq_num: Option<u64>,
1499 pub fencing_token: Option<FencingToken>,
1500}
1501
1502impl MeteredBytes for AppendInput {
1503 fn metered_bytes(&self) -> u64 {
1504 self.records.metered_bytes()
1505 }
1506}
1507
1508impl AppendInput {
1509 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1511 Self {
1512 records: records.into(),
1513 match_seq_num: None,
1514 fencing_token: None,
1515 }
1516 }
1517
1518 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1520 Self {
1521 match_seq_num: Some(match_seq_num.into()),
1522 ..self
1523 }
1524 }
1525
1526 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1528 Self {
1529 fencing_token: Some(fencing_token),
1530 ..self
1531 }
1532 }
1533
1534 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1535 let Self {
1536 records,
1537 match_seq_num,
1538 fencing_token,
1539 } = self;
1540
1541 api::AppendInput {
1542 stream: stream.into(),
1543 records: records.into_iter().map(Into::into).collect(),
1544 match_seq_num,
1545 fencing_token: fencing_token.map(|f| f.0),
1546 }
1547 }
1548}
1549
1550#[derive(Debug, Clone)]
1552pub struct AppendAck {
1553 pub start: StreamPosition,
1555 pub end: StreamPosition,
1560 pub tail: StreamPosition,
1564}
1565
1566impl From<api::AppendOutput> for AppendAck {
1567 fn from(value: api::AppendOutput) -> Self {
1568 let api::AppendOutput {
1569 start_seq_num,
1570 start_timestamp,
1571 end_seq_num,
1572 end_timestamp,
1573 next_seq_num,
1574 last_timestamp,
1575 } = value;
1576 let start = StreamPosition {
1577 seq_num: start_seq_num,
1578 timestamp: start_timestamp,
1579 };
1580 let end = StreamPosition {
1581 seq_num: end_seq_num,
1582 timestamp: end_timestamp,
1583 };
1584 let tail = StreamPosition {
1585 seq_num: next_seq_num,
1586 timestamp: last_timestamp,
1587 };
1588 Self { start, end, tail }
1589 }
1590}
1591
1592impl TryFrom<api::AppendResponse> for AppendAck {
1593 type Error = ConvertError;
1594 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1595 let api::AppendResponse { output } = value;
1596 let output = output.ok_or("missing append output")?;
1597 Ok(output.into())
1598 }
1599}
1600
1601impl TryFrom<api::AppendSessionResponse> for AppendAck {
1602 type Error = ConvertError;
1603 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1604 let api::AppendSessionResponse { output } = value;
1605 let output = output.ok_or("missing append output")?;
1606 Ok(output.into())
1607 }
1608}
1609
1610#[sync_docs]
1611#[derive(Debug, Clone, Default)]
1612pub struct ReadLimit {
1613 pub count: Option<u64>,
1614 pub bytes: Option<u64>,
1615}
1616
1617impl ReadLimit {
1618 pub fn new() -> Self {
1620 Self::default()
1621 }
1622
1623 pub fn with_count(self, count: u64) -> Self {
1625 Self {
1626 count: Some(count),
1627 ..self
1628 }
1629 }
1630
1631 pub fn with_bytes(self, bytes: u64) -> Self {
1633 Self {
1634 bytes: Some(bytes),
1635 ..self
1636 }
1637 }
1638}
1639
1640#[derive(Debug, Clone)]
1642pub enum ReadStart {
1643 SeqNum(u64),
1645 Timestamp(u64),
1647 TailOffset(u64),
1649}
1650
1651impl Default for ReadStart {
1652 fn default() -> Self {
1653 Self::SeqNum(0)
1654 }
1655}
1656
1657impl From<ReadStart> for api::read_request::Start {
1658 fn from(start: ReadStart) -> Self {
1659 match start {
1660 ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1661 ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1662 ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1663 }
1664 }
1665}
1666
1667impl From<ReadStart> for api::read_session_request::Start {
1668 fn from(start: ReadStart) -> Self {
1669 match start {
1670 ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1671 ReadStart::Timestamp(timestamp) => {
1672 api::read_session_request::Start::Timestamp(timestamp)
1673 }
1674 ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1675 }
1676 }
1677}
1678
1679#[sync_docs]
1680#[derive(Debug, Clone, Default)]
1681pub struct ReadRequest {
1682 pub start: ReadStart,
1683 pub limit: ReadLimit,
1684 pub until: Option<RangeTo<u64>>,
1685 pub clamp: bool,
1686}
1687
1688impl ReadRequest {
1689 pub fn new(start: ReadStart) -> Self {
1691 Self {
1692 start,
1693 ..Default::default()
1694 }
1695 }
1696
1697 pub fn with_limit(self, limit: ReadLimit) -> Self {
1699 Self { limit, ..self }
1700 }
1701
1702 pub fn with_until(self, until: RangeTo<u64>) -> Self {
1704 Self {
1705 until: Some(until),
1706 ..self
1707 }
1708 }
1709
1710 pub fn with_clamp(self, clamp: bool) -> Self {
1712 Self { clamp, ..self }
1713 }
1714}
1715
1716impl ReadRequest {
1717 pub(crate) fn try_into_api_type(
1718 self,
1719 stream: impl Into<String>,
1720 ) -> Result<api::ReadRequest, ConvertError> {
1721 let Self {
1722 start,
1723 limit,
1724 until,
1725 clamp,
1726 } = self;
1727
1728 let limit = if limit.count > Some(1000) {
1729 Err("read limit: count must not exceed 1000 for unary request")
1730 } else if limit.bytes > Some(MIB_BYTES) {
1731 Err("read limit: bytes must not exceed 1MiB for unary request")
1732 } else {
1733 Ok(api::ReadLimit {
1734 count: limit.count,
1735 bytes: limit.bytes,
1736 })
1737 }?;
1738
1739 Ok(api::ReadRequest {
1740 stream: stream.into(),
1741 start: Some(start.into()),
1742 limit: Some(limit),
1743 until: until.map(|range| range.end),
1744 clamp,
1745 })
1746 }
1747}
1748
1749#[sync_docs]
1750#[derive(Debug, Clone)]
1751pub struct SequencedRecord {
1752 pub seq_num: u64,
1753 pub timestamp: u64,
1754 pub headers: Vec<Header>,
1755 pub body: Bytes,
1756}
1757
1758metered_impl!(SequencedRecord);
1759
1760impl From<api::SequencedRecord> for SequencedRecord {
1761 fn from(value: api::SequencedRecord) -> Self {
1762 let api::SequencedRecord {
1763 seq_num,
1764 timestamp,
1765 headers,
1766 body,
1767 } = value;
1768 Self {
1769 seq_num,
1770 timestamp,
1771 headers: headers.into_iter().map(Into::into).collect(),
1772 body,
1773 }
1774 }
1775}
1776
1777impl SequencedRecord {
1778 pub fn as_command_record(&self) -> Option<CommandRecord> {
1780 if self.headers.len() != 1 {
1781 return None;
1782 }
1783
1784 let header = self.headers.first().expect("pre-validated length");
1785
1786 if !header.name.is_empty() {
1787 return None;
1788 }
1789
1790 match header.value.as_ref() {
1791 CommandRecord::FENCE => {
1792 let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1793 Some(CommandRecord {
1794 command: Command::Fence { fencing_token },
1795 timestamp: Some(self.timestamp),
1796 })
1797 }
1798 CommandRecord::TRIM => {
1799 let body: &[u8] = &self.body;
1800 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1801 Some(CommandRecord {
1802 command: Command::Trim { seq_num },
1803 timestamp: Some(self.timestamp),
1804 })
1805 }
1806 _ => None,
1807 }
1808 }
1809}
1810
1811#[sync_docs]
1812#[derive(Debug, Clone)]
1813pub struct SequencedRecordBatch {
1814 pub records: Vec<SequencedRecord>,
1815}
1816
1817impl MeteredBytes for SequencedRecordBatch {
1818 fn metered_bytes(&self) -> u64 {
1819 self.records.metered_bytes()
1820 }
1821}
1822
1823impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1824 fn from(value: api::SequencedRecordBatch) -> Self {
1825 let api::SequencedRecordBatch { records } = value;
1826 Self {
1827 records: records.into_iter().map(Into::into).collect(),
1828 }
1829 }
1830}
1831
1832#[sync_docs(ReadOutput = "Output")]
1833#[derive(Debug, Clone)]
1834pub enum ReadOutput {
1835 Batch(SequencedRecordBatch),
1836 NextSeqNum(u64),
1837}
1838
1839impl From<api::read_output::Output> for ReadOutput {
1840 fn from(value: api::read_output::Output) -> Self {
1841 match value {
1842 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1843 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1844 }
1845 }
1846}
1847
1848impl TryFrom<api::ReadOutput> for ReadOutput {
1849 type Error = ConvertError;
1850 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1851 let api::ReadOutput { output } = value;
1852 let output = output.ok_or("missing read output")?;
1853 Ok(output.into())
1854 }
1855}
1856
1857impl TryFrom<api::ReadResponse> for ReadOutput {
1858 type Error = ConvertError;
1859 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1860 let api::ReadResponse { output } = value;
1861 let output = output.ok_or("missing output in read response")?;
1862 output.try_into()
1863 }
1864}
1865
1866#[sync_docs]
1867#[derive(Debug, Clone, Default)]
1868pub struct ReadSessionRequest {
1869 pub start: ReadStart,
1870 pub limit: ReadLimit,
1871 pub until: Option<RangeTo<u64>>,
1872 pub clamp: bool,
1873}
1874
1875impl ReadSessionRequest {
1876 pub fn new(start: ReadStart) -> Self {
1878 Self {
1879 start,
1880 ..Default::default()
1881 }
1882 }
1883
1884 pub fn with_limit(self, limit: ReadLimit) -> Self {
1886 Self { limit, ..self }
1887 }
1888
1889 pub fn with_until(self, until: RangeTo<u64>) -> Self {
1891 Self {
1892 until: Some(until),
1893 ..self
1894 }
1895 }
1896
1897 pub fn with_clamp(self, clamp: bool) -> Self {
1899 Self { clamp, ..self }
1900 }
1901
1902 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1903 let Self {
1904 start,
1905 limit,
1906 until,
1907 clamp,
1908 } = self;
1909 api::ReadSessionRequest {
1910 stream: stream.into(),
1911 start: Some(start.into()),
1912 limit: Some(api::ReadLimit {
1913 count: limit.count,
1914 bytes: limit.bytes,
1915 }),
1916 heartbeats: false,
1917 until: until.map(|range| range.end),
1918 clamp,
1919 }
1920 }
1921}
1922
1923impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1924 type Error = ConvertError;
1925 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1926 let api::ReadSessionResponse { output } = value;
1927 let output = output.ok_or("missing output in read session response")?;
1928 output.try_into()
1929 }
1930}
1931
1932#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1937pub struct BasinName(String);
1938
1939impl Deref for BasinName {
1940 type Target = str;
1941 fn deref(&self) -> &Self::Target {
1942 &self.0
1943 }
1944}
1945
1946impl TryFrom<String> for BasinName {
1947 type Error = ConvertError;
1948
1949 fn try_from(name: String) -> Result<Self, Self::Error> {
1950 if name.len() < 8 || name.len() > 48 {
1951 return Err("Basin name must be between 8 and 48 characters in length".into());
1952 }
1953
1954 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1955 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1956 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1957 .expect("Failed to compile basin name regex")
1958 });
1959
1960 if !regex.is_match(&name) {
1961 return Err(
1962 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1963 It cannot begin or end with a hyphen."
1964 .into(),
1965 );
1966 }
1967
1968 Ok(Self(name))
1969 }
1970}
1971
1972impl FromStr for BasinName {
1973 type Err = ConvertError;
1974
1975 fn from_str(s: &str) -> Result<Self, Self::Err> {
1976 s.to_string().try_into()
1977 }
1978}
1979
1980impl std::fmt::Display for BasinName {
1981 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1982 f.write_str(&self.0)
1983 }
1984}
1985
1986impl From<BasinName> for String {
1987 fn from(value: BasinName) -> Self {
1988 value.0
1989 }
1990}
1991
1992#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1995pub struct AccessTokenId(String);
1996
1997impl Deref for AccessTokenId {
1998 type Target = str;
1999
2000 fn deref(&self) -> &Self::Target {
2001 &self.0
2002 }
2003}
2004
2005impl TryFrom<String> for AccessTokenId {
2006 type Error = ConvertError;
2007
2008 fn try_from(name: String) -> Result<Self, Self::Error> {
2009 if name.is_empty() {
2010 return Err("Access token ID must not be empty".into());
2011 }
2012
2013 if name.len() > 96 {
2014 return Err("Access token ID must not exceed 96 characters".into());
2015 }
2016
2017 Ok(Self(name))
2018 }
2019}
2020
2021impl From<AccessTokenId> for String {
2022 fn from(value: AccessTokenId) -> Self {
2023 value.0
2024 }
2025}
2026
2027impl FromStr for AccessTokenId {
2028 type Err = ConvertError;
2029
2030 fn from_str(s: &str) -> Result<Self, Self::Err> {
2031 s.to_string().try_into()
2032 }
2033}
2034
2035impl std::fmt::Display for AccessTokenId {
2036 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2037 f.write_str(&self.0)
2038 }
2039}
2040
2041impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
2042 fn from(value: AccessTokenInfo) -> Self {
2043 Self {
2044 info: Some(value.into()),
2045 }
2046 }
2047}
2048
2049#[sync_docs]
2050#[derive(Debug, Clone)]
2051pub struct AccessTokenInfo {
2052 pub id: AccessTokenId,
2053 pub expires_at: Option<u32>,
2054 pub auto_prefix_streams: bool,
2055 pub scope: Option<AccessTokenScope>,
2056}
2057
2058impl AccessTokenInfo {
2059 pub fn new(id: AccessTokenId) -> Self {
2061 Self {
2062 id,
2063 expires_at: None,
2064 auto_prefix_streams: false,
2065 scope: None,
2066 }
2067 }
2068
2069 pub fn with_expires_at(self, expires_at: u32) -> Self {
2071 Self {
2072 expires_at: Some(expires_at),
2073 ..self
2074 }
2075 }
2076
2077 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2079 Self {
2080 auto_prefix_streams,
2081 ..self
2082 }
2083 }
2084
2085 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2087 Self {
2088 scope: Some(scope),
2089 ..self
2090 }
2091 }
2092}
2093
2094impl From<AccessTokenInfo> for api::AccessTokenInfo {
2095 fn from(value: AccessTokenInfo) -> Self {
2096 let AccessTokenInfo {
2097 id,
2098 expires_at,
2099 auto_prefix_streams,
2100 scope,
2101 } = value;
2102 Self {
2103 id: id.into(),
2104 expires_at,
2105 auto_prefix_streams,
2106 scope: scope.map(Into::into),
2107 }
2108 }
2109}
2110
2111impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2112 type Error = ConvertError;
2113
2114 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2115 let api::AccessTokenInfo {
2116 id,
2117 expires_at,
2118 auto_prefix_streams,
2119 scope,
2120 } = value;
2121 Ok(Self {
2122 id: id.try_into()?,
2123 expires_at,
2124 auto_prefix_streams,
2125 scope: scope.map(Into::into),
2126 })
2127 }
2128}
2129
2130#[sync_docs]
2131#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2132pub enum Operation {
2133 ListBasins,
2134 CreateBasin,
2135 DeleteBasin,
2136 ReconfigureBasin,
2137 GetBasinConfig,
2138 IssueAccessToken,
2139 RevokeAccessToken,
2140 ListAccessTokens,
2141 ListStreams,
2142 CreateStream,
2143 DeleteStream,
2144 GetStreamConfig,
2145 ReconfigureStream,
2146 CheckTail,
2147 Append,
2148 Read,
2149 Trim,
2150 Fence,
2151 AccountMetrics,
2152 BasinMetrics,
2153 StreamMetrics,
2154}
2155
2156impl FromStr for Operation {
2157 type Err = ConvertError;
2158
2159 fn from_str(s: &str) -> Result<Self, Self::Err> {
2160 match s.to_lowercase().as_str() {
2161 "list-basins" => Ok(Self::ListBasins),
2162 "create-basin" => Ok(Self::CreateBasin),
2163 "delete-basin" => Ok(Self::DeleteBasin),
2164 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2165 "get-basin-config" => Ok(Self::GetBasinConfig),
2166 "issue-access-token" => Ok(Self::IssueAccessToken),
2167 "revoke-access-token" => Ok(Self::RevokeAccessToken),
2168 "list-access-tokens" => Ok(Self::ListAccessTokens),
2169 "list-streams" => Ok(Self::ListStreams),
2170 "create-stream" => Ok(Self::CreateStream),
2171 "delete-stream" => Ok(Self::DeleteStream),
2172 "get-stream-config" => Ok(Self::GetStreamConfig),
2173 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2174 "check-tail" => Ok(Self::CheckTail),
2175 "append" => Ok(Self::Append),
2176 "read" => Ok(Self::Read),
2177 "trim" => Ok(Self::Trim),
2178 "fence" => Ok(Self::Fence),
2179 "account-metrics" => Ok(Self::AccountMetrics),
2180 "basin-metrics" => Ok(Self::BasinMetrics),
2181 "stream-metrics" => Ok(Self::StreamMetrics),
2182 _ => Err("invalid operation".into()),
2183 }
2184 }
2185}
2186
2187impl From<Operation> for api::Operation {
2188 fn from(value: Operation) -> Self {
2189 match value {
2190 Operation::ListBasins => Self::ListBasins,
2191 Operation::CreateBasin => Self::CreateBasin,
2192 Operation::DeleteBasin => Self::DeleteBasin,
2193 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2194 Operation::GetBasinConfig => Self::GetBasinConfig,
2195 Operation::IssueAccessToken => Self::IssueAccessToken,
2196 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2197 Operation::ListAccessTokens => Self::ListAccessTokens,
2198 Operation::ListStreams => Self::ListStreams,
2199 Operation::CreateStream => Self::CreateStream,
2200 Operation::DeleteStream => Self::DeleteStream,
2201 Operation::GetStreamConfig => Self::GetStreamConfig,
2202 Operation::ReconfigureStream => Self::ReconfigureStream,
2203 Operation::CheckTail => Self::CheckTail,
2204 Operation::Append => Self::Append,
2205 Operation::Read => Self::Read,
2206 Operation::Trim => Self::Trim,
2207 Operation::Fence => Self::Fence,
2208 Operation::AccountMetrics => Self::AccountMetrics,
2209 Operation::BasinMetrics => Self::BasinMetrics,
2210 Operation::StreamMetrics => Self::StreamMetrics,
2211 }
2212 }
2213}
2214
2215impl From<api::Operation> for Option<Operation> {
2216 fn from(value: api::Operation) -> Self {
2217 match value {
2218 api::Operation::Unspecified => None,
2219 api::Operation::ListBasins => Some(Operation::ListBasins),
2220 api::Operation::CreateBasin => Some(Operation::CreateBasin),
2221 api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2222 api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2223 api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2224 api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2225 api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2226 api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2227 api::Operation::ListStreams => Some(Operation::ListStreams),
2228 api::Operation::CreateStream => Some(Operation::CreateStream),
2229 api::Operation::DeleteStream => Some(Operation::DeleteStream),
2230 api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2231 api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2232 api::Operation::CheckTail => Some(Operation::CheckTail),
2233 api::Operation::Append => Some(Operation::Append),
2234 api::Operation::Read => Some(Operation::Read),
2235 api::Operation::Trim => Some(Operation::Trim),
2236 api::Operation::Fence => Some(Operation::Fence),
2237 api::Operation::AccountMetrics => Some(Operation::AccountMetrics),
2238 api::Operation::BasinMetrics => Some(Operation::BasinMetrics),
2239 api::Operation::StreamMetrics => Some(Operation::StreamMetrics),
2240 }
2241 }
2242}
2243
2244#[sync_docs]
2245#[derive(Debug, Clone, Default)]
2246pub struct AccessTokenScope {
2247 pub basins: Option<ResourceSet>,
2248 pub streams: Option<ResourceSet>,
2249 pub access_tokens: Option<ResourceSet>,
2250 pub op_groups: Option<PermittedOperationGroups>,
2251 pub ops: HashSet<Operation>,
2252}
2253
2254impl AccessTokenScope {
2255 pub fn new() -> Self {
2257 Self::default()
2258 }
2259
2260 pub fn with_basins(self, basins: ResourceSet) -> Self {
2262 Self {
2263 basins: Some(basins),
2264 ..self
2265 }
2266 }
2267
2268 pub fn with_streams(self, streams: ResourceSet) -> Self {
2270 Self {
2271 streams: Some(streams),
2272 ..self
2273 }
2274 }
2275
2276 pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2278 Self {
2279 access_tokens: Some(access_tokens),
2280 ..self
2281 }
2282 }
2283
2284 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2286 Self {
2287 op_groups: Some(op_groups),
2288 ..self
2289 }
2290 }
2291
2292 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2294 Self {
2295 ops: ops.into_iter().collect(),
2296 ..self
2297 }
2298 }
2299
2300 pub fn with_op(self, op: Operation) -> Self {
2302 let mut ops = self.ops;
2303 ops.insert(op);
2304 Self { ops, ..self }
2305 }
2306}
2307
2308impl From<AccessTokenScope> for api::AccessTokenScope {
2309 fn from(value: AccessTokenScope) -> Self {
2310 let AccessTokenScope {
2311 basins,
2312 streams,
2313 access_tokens,
2314 op_groups,
2315 ops,
2316 } = value;
2317 Self {
2318 basins: basins.map(Into::into),
2319 streams: streams.map(Into::into),
2320 access_tokens: access_tokens.map(Into::into),
2321 op_groups: op_groups.map(Into::into),
2322 ops: ops
2323 .into_iter()
2324 .map(api::Operation::from)
2325 .map(Into::into)
2326 .collect(),
2327 }
2328 }
2329}
2330
2331impl From<api::AccessTokenScope> for AccessTokenScope {
2332 fn from(value: api::AccessTokenScope) -> Self {
2333 let api::AccessTokenScope {
2334 basins,
2335 streams,
2336 access_tokens,
2337 op_groups,
2338 ops,
2339 } = value;
2340 Self {
2341 basins: basins.and_then(|set| set.matching.map(Into::into)),
2342 streams: streams.and_then(|set| set.matching.map(Into::into)),
2343 access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2344 op_groups: op_groups.map(Into::into),
2345 ops: ops
2346 .into_iter()
2347 .map(api::Operation::try_from)
2348 .flat_map(Result::ok)
2349 .flat_map(<Option<Operation>>::from)
2350 .collect(),
2351 }
2352 }
2353}
2354
2355impl From<ResourceSet> for api::ResourceSet {
2356 fn from(value: ResourceSet) -> Self {
2357 Self {
2358 matching: Some(value.into()),
2359 }
2360 }
2361}
2362
2363#[sync_docs(ResourceSet = "Matching")]
2364#[derive(Debug, Clone)]
2365pub enum ResourceSet {
2366 Exact(String),
2367 Prefix(String),
2368}
2369
2370impl From<ResourceSet> for api::resource_set::Matching {
2371 fn from(value: ResourceSet) -> Self {
2372 match value {
2373 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2374 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2375 }
2376 }
2377}
2378
2379impl From<api::resource_set::Matching> for ResourceSet {
2380 fn from(value: api::resource_set::Matching) -> Self {
2381 match value {
2382 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2383 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2384 }
2385 }
2386}
2387
2388#[sync_docs]
2389#[derive(Debug, Clone, Default)]
2390pub struct PermittedOperationGroups {
2391 pub account: Option<ReadWritePermissions>,
2392 pub basin: Option<ReadWritePermissions>,
2393 pub stream: Option<ReadWritePermissions>,
2394}
2395
2396impl PermittedOperationGroups {
2397 pub fn new() -> Self {
2399 Self::default()
2400 }
2401
2402 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2404 Self {
2405 account: Some(account),
2406 ..self
2407 }
2408 }
2409
2410 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2412 Self {
2413 basin: Some(basin),
2414 ..self
2415 }
2416 }
2417
2418 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2420 Self {
2421 stream: Some(stream),
2422 ..self
2423 }
2424 }
2425}
2426
2427impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2428 fn from(value: PermittedOperationGroups) -> Self {
2429 let PermittedOperationGroups {
2430 account,
2431 basin,
2432 stream,
2433 } = value;
2434 Self {
2435 account: account.map(Into::into),
2436 basin: basin.map(Into::into),
2437 stream: stream.map(Into::into),
2438 }
2439 }
2440}
2441
2442impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2443 fn from(value: api::PermittedOperationGroups) -> Self {
2444 let api::PermittedOperationGroups {
2445 account,
2446 basin,
2447 stream,
2448 } = value;
2449 Self {
2450 account: account.map(Into::into),
2451 basin: basin.map(Into::into),
2452 stream: stream.map(Into::into),
2453 }
2454 }
2455}
2456
2457#[sync_docs]
2458#[derive(Debug, Clone, Default)]
2459pub struct ReadWritePermissions {
2460 pub read: bool,
2461 pub write: bool,
2462}
2463
2464impl ReadWritePermissions {
2465 pub fn new() -> Self {
2467 Self::default()
2468 }
2469
2470 pub fn with_read(self, read: bool) -> Self {
2472 Self { read, ..self }
2473 }
2474
2475 pub fn with_write(self, write: bool) -> Self {
2477 Self { write, ..self }
2478 }
2479}
2480
2481impl From<ReadWritePermissions> for api::ReadWritePermissions {
2482 fn from(value: ReadWritePermissions) -> Self {
2483 let ReadWritePermissions { read, write } = value;
2484 Self { read, write }
2485 }
2486}
2487
2488impl From<api::ReadWritePermissions> for ReadWritePermissions {
2489 fn from(value: api::ReadWritePermissions) -> Self {
2490 let api::ReadWritePermissions { read, write } = value;
2491 Self { read, write }
2492 }
2493}
2494
2495impl From<api::IssueAccessTokenResponse> for String {
2496 fn from(value: api::IssueAccessTokenResponse) -> Self {
2497 value.access_token
2498 }
2499}
2500
2501impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2502 fn from(value: AccessTokenId) -> Self {
2503 Self { id: value.into() }
2504 }
2505}
2506
2507impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2508 type Error = ConvertError;
2509 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2510 let token_info = value.info.ok_or("access token info is missing")?;
2511 token_info.try_into()
2512 }
2513}
2514
2515#[sync_docs]
2516#[derive(Debug, Clone, Default)]
2517pub struct ListAccessTokensRequest {
2518 pub prefix: String,
2519 pub start_after: String,
2520 pub limit: Option<usize>,
2521}
2522
2523impl ListAccessTokensRequest {
2524 pub fn new() -> Self {
2526 Self::default()
2527 }
2528
2529 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2531 Self {
2532 prefix: prefix.into(),
2533 ..self
2534 }
2535 }
2536
2537 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2539 Self {
2540 start_after: start_after.into(),
2541 ..self
2542 }
2543 }
2544
2545 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2547 Self {
2548 limit: limit.into(),
2549 ..self
2550 }
2551 }
2552}
2553
2554impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2555 type Error = ConvertError;
2556 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2557 let ListAccessTokensRequest {
2558 prefix,
2559 start_after,
2560 limit,
2561 } = value;
2562 Ok(Self {
2563 prefix,
2564 start_after,
2565 limit: limit
2566 .map(TryInto::try_into)
2567 .transpose()
2568 .map_err(|_| "request limit does not fit into u64 bounds")?,
2569 })
2570 }
2571}
2572
2573#[sync_docs]
2574#[derive(Debug, Clone)]
2575pub struct ListAccessTokensResponse {
2576 pub access_tokens: Vec<AccessTokenInfo>,
2577 pub has_more: bool,
2578}
2579
2580impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2581 type Error = ConvertError;
2582 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2583 let api::ListAccessTokensResponse {
2584 access_tokens,
2585 has_more,
2586 } = value;
2587 let access_tokens = access_tokens
2588 .into_iter()
2589 .map(TryInto::try_into)
2590 .collect::<Result<Vec<_>, _>>()?;
2591 Ok(Self {
2592 access_tokens,
2593 has_more,
2594 })
2595 }
2596}