1use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
14
15#[derive(Debug, Clone, thiserror::Error)]
17#[error("{0}")]
18pub struct ConvertError(String);
19
20impl<T: Into<String>> From<T> for ConvertError {
21 fn from(value: T) -> Self {
22 Self(value.into())
23 }
24}
25
26pub trait MeteredBytes {
34 fn metered_bytes(&self) -> u64;
36}
37
38impl<T: MeteredBytes> MeteredBytes for Vec<T> {
39 fn metered_bytes(&self) -> u64 {
40 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
41 }
42}
43
44macro_rules! metered_impl {
45 ($ty:ty) => {
46 impl MeteredBytes for $ty {
47 fn metered_bytes(&self) -> u64 {
48 let bytes = 8
49 + (2 * self.headers.len())
50 + self
51 .headers
52 .iter()
53 .map(|h| h.name.len() + h.value.len())
54 .sum::<usize>()
55 + self.body.len();
56 bytes as u64
57 }
58 }
59 };
60}
61
62#[sync_docs]
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum BasinScope {
65 AwsUsEast1,
66}
67
68impl From<BasinScope> for api::BasinScope {
69 fn from(value: BasinScope) -> Self {
70 match value {
71 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
72 }
73 }
74}
75
76impl From<api::BasinScope> for Option<BasinScope> {
77 fn from(value: api::BasinScope) -> Self {
78 match value {
79 api::BasinScope::Unspecified => None,
80 api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
81 }
82 }
83}
84
85impl FromStr for BasinScope {
86 type Err = ConvertError;
87 fn from_str(value: &str) -> Result<Self, Self::Err> {
88 match value {
89 "aws:us-east-1" => Ok(Self::AwsUsEast1),
90 _ => Err("invalid basin scope value".into()),
91 }
92 }
93}
94
95#[sync_docs]
96#[derive(Debug, Clone)]
97pub struct CreateBasinRequest {
98 pub basin: BasinName,
99 pub config: Option<BasinConfig>,
100 pub scope: Option<BasinScope>,
101}
102
103impl CreateBasinRequest {
104 pub fn new(basin: BasinName) -> Self {
106 Self {
107 basin,
108 config: None,
109 scope: None,
110 }
111 }
112
113 pub fn with_config(self, config: BasinConfig) -> Self {
115 Self {
116 config: Some(config),
117 ..self
118 }
119 }
120
121 pub fn with_scope(self, scope: BasinScope) -> Self {
123 Self {
124 scope: Some(scope),
125 ..self
126 }
127 }
128}
129
130impl From<CreateBasinRequest> for api::CreateBasinRequest {
131 fn from(value: CreateBasinRequest) -> Self {
132 let CreateBasinRequest {
133 basin,
134 config,
135 scope,
136 } = value;
137 Self {
138 basin: basin.0,
139 config: config.map(Into::into),
140 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
141 }
142 }
143}
144
145#[sync_docs]
146#[derive(Debug, Clone, Default)]
147pub struct BasinConfig {
148 pub default_stream_config: Option<StreamConfig>,
149 pub create_stream_on_append: bool,
150 pub create_stream_on_read: bool,
151}
152
153impl BasinConfig {
154 pub fn new() -> Self {
156 Self::default()
157 }
158
159 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
161 Self {
162 default_stream_config: Some(default_stream_config),
163 ..self
164 }
165 }
166
167 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
169 Self {
170 create_stream_on_append,
171 ..self
172 }
173 }
174
175 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
177 Self {
178 create_stream_on_read,
179 ..self
180 }
181 }
182}
183
184impl From<BasinConfig> for api::BasinConfig {
185 fn from(value: BasinConfig) -> Self {
186 let BasinConfig {
187 default_stream_config,
188 create_stream_on_append,
189 create_stream_on_read,
190 } = value;
191 Self {
192 default_stream_config: default_stream_config.map(Into::into),
193 create_stream_on_append,
194 create_stream_on_read,
195 }
196 }
197}
198
199impl From<api::BasinConfig> for BasinConfig {
200 fn from(value: api::BasinConfig) -> Self {
201 let api::BasinConfig {
202 default_stream_config,
203 create_stream_on_append,
204 create_stream_on_read,
205 } = value;
206 Self {
207 default_stream_config: default_stream_config.map(Into::into),
208 create_stream_on_append,
209 create_stream_on_read,
210 }
211 }
212}
213
214#[sync_docs]
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum TimestampingMode {
217 ClientPrefer,
218 ClientRequire,
219 Arrival,
220}
221
222impl From<TimestampingMode> for api::TimestampingMode {
223 fn from(value: TimestampingMode) -> Self {
224 match value {
225 TimestampingMode::ClientPrefer => Self::ClientPrefer,
226 TimestampingMode::ClientRequire => Self::ClientRequire,
227 TimestampingMode::Arrival => Self::Arrival,
228 }
229 }
230}
231
232impl From<api::TimestampingMode> for Option<TimestampingMode> {
233 fn from(value: api::TimestampingMode) -> Self {
234 match value {
235 api::TimestampingMode::Unspecified => None,
236 api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
237 api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
238 api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
239 }
240 }
241}
242
243#[sync_docs(TimestampingConfig = "Timestamping")]
244#[derive(Debug, Clone, Default)]
245pub struct TimestampingConfig {
247 pub mode: Option<TimestampingMode>,
248 pub uncapped: Option<bool>,
249}
250
251impl TimestampingConfig {
252 pub fn new() -> Self {
254 Self::default()
255 }
256
257 pub fn with_mode(self, mode: TimestampingMode) -> Self {
259 Self {
260 mode: Some(mode),
261 ..self
262 }
263 }
264
265 pub fn with_uncapped(self, uncapped: bool) -> Self {
267 Self {
268 uncapped: Some(uncapped),
269 ..self
270 }
271 }
272}
273
274impl From<TimestampingConfig> for api::stream_config::Timestamping {
275 fn from(value: TimestampingConfig) -> Self {
276 Self {
277 mode: value
278 .mode
279 .map(api::TimestampingMode::from)
280 .unwrap_or_default()
281 .into(),
282 uncapped: value.uncapped,
283 }
284 }
285}
286
287impl From<api::stream_config::Timestamping> for TimestampingConfig {
288 fn from(value: api::stream_config::Timestamping) -> Self {
289 let mode = value.mode().into();
290 let uncapped = value.uncapped;
291 Self { mode, uncapped }
292 }
293}
294
295#[sync_docs]
296#[derive(Debug, Clone, Default)]
297pub struct StreamConfig {
298 pub storage_class: Option<StorageClass>,
299 pub retention_policy: Option<RetentionPolicy>,
300 pub timestamping: Option<TimestampingConfig>,
301}
302
303impl StreamConfig {
304 pub fn new() -> Self {
306 Self::default()
307 }
308
309 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
311 Self {
312 storage_class: Some(storage_class),
313 ..self
314 }
315 }
316
317 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
319 Self {
320 retention_policy: Some(retention_policy),
321 ..self
322 }
323 }
324
325 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
327 Self {
328 timestamping: Some(timestamping),
329 ..self
330 }
331 }
332}
333
334impl From<StreamConfig> for api::StreamConfig {
335 fn from(value: StreamConfig) -> Self {
336 let StreamConfig {
337 storage_class,
338 retention_policy,
339 timestamping,
340 } = value;
341 Self {
342 storage_class: storage_class
343 .map(api::StorageClass::from)
344 .unwrap_or_default()
345 .into(),
346 retention_policy: retention_policy.map(Into::into),
347 timestamping: timestamping.map(Into::into),
348 }
349 }
350}
351
352impl From<api::StreamConfig> for StreamConfig {
353 fn from(value: api::StreamConfig) -> Self {
354 Self {
355 storage_class: value.storage_class().into(),
356 retention_policy: value.retention_policy.map(Into::into),
357 timestamping: value.timestamping.map(Into::into),
358 }
359 }
360}
361
362#[sync_docs]
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364pub enum StorageClass {
365 Standard,
366 Express,
367}
368
369impl From<StorageClass> for api::StorageClass {
370 fn from(value: StorageClass) -> Self {
371 match value {
372 StorageClass::Standard => Self::Standard,
373 StorageClass::Express => Self::Express,
374 }
375 }
376}
377
378impl From<api::StorageClass> for Option<StorageClass> {
379 fn from(value: api::StorageClass) -> Self {
380 match value {
381 api::StorageClass::Unspecified => None,
382 api::StorageClass::Standard => Some(StorageClass::Standard),
383 api::StorageClass::Express => Some(StorageClass::Express),
384 }
385 }
386}
387
388impl FromStr for StorageClass {
389 type Err = ConvertError;
390
391 fn from_str(value: &str) -> Result<Self, Self::Err> {
392 match value {
393 "standard" => Ok(Self::Standard),
394 "express" => Ok(Self::Express),
395 v => Err(format!("unknown storage class: {v}").into()),
396 }
397 }
398}
399
400#[sync_docs(Age = "Age")]
401#[derive(Debug, Clone)]
402pub enum RetentionPolicy {
403 Age(Duration),
404}
405
406impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
407 fn from(value: RetentionPolicy) -> Self {
408 match value {
409 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
410 }
411 }
412}
413
414impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
415 fn from(value: api::stream_config::RetentionPolicy) -> Self {
416 match value {
417 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
418 }
419 }
420}
421
422#[sync_docs]
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum BasinState {
425 Active,
426 Creating,
427 Deleting,
428}
429
430impl From<BasinState> for api::BasinState {
431 fn from(value: BasinState) -> Self {
432 match value {
433 BasinState::Active => Self::Active,
434 BasinState::Creating => Self::Creating,
435 BasinState::Deleting => Self::Deleting,
436 }
437 }
438}
439
440impl From<api::BasinState> for Option<BasinState> {
441 fn from(value: api::BasinState) -> Self {
442 match value {
443 api::BasinState::Unspecified => None,
444 api::BasinState::Active => Some(BasinState::Active),
445 api::BasinState::Creating => Some(BasinState::Creating),
446 api::BasinState::Deleting => Some(BasinState::Deleting),
447 }
448 }
449}
450
451impl std::fmt::Display for BasinState {
452 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453 match self {
454 BasinState::Active => write!(f, "active"),
455 BasinState::Creating => write!(f, "creating"),
456 BasinState::Deleting => write!(f, "deleting"),
457 }
458 }
459}
460
461#[sync_docs]
462#[derive(Debug, Clone)]
463pub struct BasinInfo {
464 pub name: String,
465 pub scope: Option<BasinScope>,
466 pub state: Option<BasinState>,
467}
468
469impl From<BasinInfo> for api::BasinInfo {
470 fn from(value: BasinInfo) -> Self {
471 let BasinInfo { name, scope, state } = value;
472 Self {
473 name,
474 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
475 state: state.map(api::BasinState::from).unwrap_or_default().into(),
476 }
477 }
478}
479
480impl From<api::BasinInfo> for BasinInfo {
481 fn from(value: api::BasinInfo) -> Self {
482 let scope = value.scope().into();
483 let state = value.state().into();
484 let name = value.name;
485 Self { name, scope, state }
486 }
487}
488
489impl TryFrom<api::CreateBasinResponse> for BasinInfo {
490 type Error = ConvertError;
491 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
492 let api::CreateBasinResponse { info } = value;
493 let info = info.ok_or("missing basin info")?;
494 Ok(info.into())
495 }
496}
497
498#[sync_docs]
499#[derive(Debug, Clone, Default)]
500pub struct ListStreamsRequest {
501 pub prefix: String,
502 pub start_after: String,
503 pub limit: Option<usize>,
504}
505
506impl ListStreamsRequest {
507 pub fn new() -> Self {
509 Self::default()
510 }
511
512 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
514 Self {
515 prefix: prefix.into(),
516 ..self
517 }
518 }
519
520 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
522 Self {
523 start_after: start_after.into(),
524 ..self
525 }
526 }
527
528 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
530 Self {
531 limit: limit.into(),
532 ..self
533 }
534 }
535}
536
537impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
538 type Error = ConvertError;
539 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
540 let ListStreamsRequest {
541 prefix,
542 start_after,
543 limit,
544 } = value;
545 Ok(Self {
546 prefix,
547 start_after,
548 limit: limit.map(|n| n as u64),
549 })
550 }
551}
552
553#[sync_docs]
554#[derive(Debug, Clone)]
555pub struct StreamInfo {
556 pub name: String,
557 pub created_at: u32,
558 pub deleted_at: Option<u32>,
559}
560
561impl From<api::StreamInfo> for StreamInfo {
562 fn from(value: api::StreamInfo) -> Self {
563 Self {
564 name: value.name,
565 created_at: value.created_at,
566 deleted_at: value.deleted_at,
567 }
568 }
569}
570
571impl TryFrom<api::CreateStreamResponse> for StreamInfo {
572 type Error = ConvertError;
573
574 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
575 let api::CreateStreamResponse { info } = value;
576 let info = info.ok_or("missing stream info")?;
577 Ok(info.into())
578 }
579}
580
581#[sync_docs]
582#[derive(Debug, Clone)]
583pub struct ListStreamsResponse {
584 pub streams: Vec<StreamInfo>,
585 pub has_more: bool,
586}
587
588impl From<api::ListStreamsResponse> for ListStreamsResponse {
589 fn from(value: api::ListStreamsResponse) -> Self {
590 let api::ListStreamsResponse { streams, has_more } = value;
591 let streams = streams.into_iter().map(Into::into).collect();
592 Self { streams, has_more }
593 }
594}
595
596impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
597 type Error = ConvertError;
598
599 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
600 let api::GetBasinConfigResponse { config } = value;
601 let config = config.ok_or("missing basin config")?;
602 Ok(config.into())
603 }
604}
605
606impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
607 type Error = ConvertError;
608
609 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
610 let api::GetStreamConfigResponse { config } = value;
611 let config = config.ok_or("missing stream config")?;
612 Ok(config.into())
613 }
614}
615
616#[sync_docs]
617#[derive(Debug, Clone)]
618pub struct CreateStreamRequest {
619 pub stream: String,
620 pub config: Option<StreamConfig>,
621}
622
623impl CreateStreamRequest {
624 pub fn new(stream: impl Into<String>) -> Self {
626 Self {
627 stream: stream.into(),
628 config: None,
629 }
630 }
631
632 pub fn with_config(self, config: StreamConfig) -> Self {
634 Self {
635 config: Some(config),
636 ..self
637 }
638 }
639}
640
641impl From<CreateStreamRequest> for api::CreateStreamRequest {
642 fn from(value: CreateStreamRequest) -> Self {
643 let CreateStreamRequest { stream, config } = value;
644 Self {
645 stream,
646 config: config.map(Into::into),
647 }
648 }
649}
650
651#[sync_docs]
652#[derive(Debug, Clone, Default)]
653pub struct ListBasinsRequest {
654 pub prefix: String,
655 pub start_after: String,
656 pub limit: Option<usize>,
657}
658
659impl ListBasinsRequest {
660 pub fn new() -> Self {
662 Self::default()
663 }
664
665 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
667 Self {
668 prefix: prefix.into(),
669 ..self
670 }
671 }
672
673 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
675 Self {
676 start_after: start_after.into(),
677 ..self
678 }
679 }
680
681 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
683 Self {
684 limit: limit.into(),
685 ..self
686 }
687 }
688}
689
690impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
691 type Error = ConvertError;
692 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
693 let ListBasinsRequest {
694 prefix,
695 start_after,
696 limit,
697 } = value;
698 Ok(Self {
699 prefix,
700 start_after,
701 limit: limit
702 .map(TryInto::try_into)
703 .transpose()
704 .map_err(|_| "request limit does not fit into u64 bounds")?,
705 })
706 }
707}
708
709#[sync_docs]
710#[derive(Debug, Clone)]
711pub struct ListBasinsResponse {
712 pub basins: Vec<BasinInfo>,
713 pub has_more: bool,
714}
715
716impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
717 type Error = ConvertError;
718 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
719 let api::ListBasinsResponse { basins, has_more } = value;
720 Ok(Self {
721 basins: basins.into_iter().map(Into::into).collect(),
722 has_more,
723 })
724 }
725}
726
727#[sync_docs]
728#[derive(Debug, Clone)]
729pub struct DeleteBasinRequest {
730 pub basin: BasinName,
731 pub if_exists: bool,
733}
734
735impl DeleteBasinRequest {
736 pub fn new(basin: BasinName) -> Self {
738 Self {
739 basin,
740 if_exists: false,
741 }
742 }
743
744 pub fn with_if_exists(self, if_exists: bool) -> Self {
746 Self { if_exists, ..self }
747 }
748}
749
750impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
751 fn from(value: DeleteBasinRequest) -> Self {
752 let DeleteBasinRequest { basin, .. } = value;
753 Self { basin: basin.0 }
754 }
755}
756
757#[sync_docs]
758#[derive(Debug, Clone)]
759pub struct DeleteStreamRequest {
760 pub stream: String,
761 pub if_exists: bool,
763}
764
765impl DeleteStreamRequest {
766 pub fn new(stream: impl Into<String>) -> Self {
768 Self {
769 stream: stream.into(),
770 if_exists: false,
771 }
772 }
773
774 pub fn with_if_exists(self, if_exists: bool) -> Self {
776 Self { if_exists, ..self }
777 }
778}
779
780impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
781 fn from(value: DeleteStreamRequest) -> Self {
782 let DeleteStreamRequest { stream, .. } = value;
783 Self { stream }
784 }
785}
786
787#[sync_docs]
788#[derive(Debug, Clone)]
789pub struct ReconfigureBasinRequest {
790 pub basin: BasinName,
791 pub config: Option<BasinConfig>,
792 pub mask: Option<Vec<String>>,
793}
794
795impl ReconfigureBasinRequest {
796 pub fn new(basin: BasinName) -> Self {
798 Self {
799 basin,
800 config: None,
801 mask: None,
802 }
803 }
804
805 pub fn with_config(self, config: BasinConfig) -> Self {
807 Self {
808 config: Some(config),
809 ..self
810 }
811 }
812
813 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
815 Self {
816 mask: Some(mask.into()),
817 ..self
818 }
819 }
820}
821
822impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
823 fn from(value: ReconfigureBasinRequest) -> Self {
824 let ReconfigureBasinRequest {
825 basin,
826 config,
827 mask,
828 } = value;
829 Self {
830 basin: basin.0,
831 config: config.map(Into::into),
832 mask: mask.map(|paths| prost_types::FieldMask { paths }),
833 }
834 }
835}
836
837impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
838 type Error = ConvertError;
839 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
840 let api::ReconfigureBasinResponse { config } = value;
841 let config = config.ok_or("missing basin config")?;
842 Ok(config.into())
843 }
844}
845
846#[sync_docs]
847#[derive(Debug, Clone)]
848pub struct ReconfigureStreamRequest {
849 pub stream: String,
850 pub config: Option<StreamConfig>,
851 pub mask: Option<Vec<String>>,
852}
853
854impl ReconfigureStreamRequest {
855 pub fn new(stream: impl Into<String>) -> Self {
857 Self {
858 stream: stream.into(),
859 config: None,
860 mask: None,
861 }
862 }
863
864 pub fn with_config(self, config: StreamConfig) -> Self {
866 Self {
867 config: Some(config),
868 ..self
869 }
870 }
871
872 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
874 Self {
875 mask: Some(mask.into()),
876 ..self
877 }
878 }
879}
880
881impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
882 fn from(value: ReconfigureStreamRequest) -> Self {
883 let ReconfigureStreamRequest {
884 stream,
885 config,
886 mask,
887 } = value;
888 Self {
889 stream,
890 config: config.map(Into::into),
891 mask: mask.map(|paths| prost_types::FieldMask { paths }),
892 }
893 }
894}
895
896impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
897 type Error = ConvertError;
898 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
899 let api::ReconfigureStreamResponse { config } = value;
900 let config = config.ok_or("missing stream config")?;
901 Ok(config.into())
902 }
903}
904
905impl From<api::CheckTailResponse> for StreamPosition {
906 fn from(value: api::CheckTailResponse) -> Self {
907 let api::CheckTailResponse {
908 next_seq_num,
909 last_timestamp,
910 } = value;
911 StreamPosition {
912 seq_num: next_seq_num,
913 timestamp: last_timestamp,
914 }
915 }
916}
917
918#[derive(Debug, Clone, PartialEq, Eq)]
920pub struct StreamPosition {
921 pub seq_num: u64,
923 pub timestamp: u64,
926}
927
928#[sync_docs]
929#[derive(Debug, Clone, PartialEq, Eq)]
930pub struct Header {
931 pub name: Bytes,
932 pub value: Bytes,
933}
934
935impl Header {
936 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
938 Self {
939 name: name.into(),
940 value: value.into(),
941 }
942 }
943
944 pub fn from_value(value: impl Into<Bytes>) -> Self {
946 Self {
947 name: Bytes::new(),
948 value: value.into(),
949 }
950 }
951}
952
953impl From<Header> for api::Header {
954 fn from(value: Header) -> Self {
955 let Header { name, value } = value;
956 Self { name, value }
957 }
958}
959
960impl From<api::Header> for Header {
961 fn from(value: api::Header) -> Self {
962 let api::Header { name, value } = value;
963 Self { name, value }
964 }
965}
966
967#[derive(Debug, Clone, Default, PartialEq, Eq)]
971pub struct FencingToken(String);
972
973impl FencingToken {
974 const MAX_BYTES: usize = 36;
975
976 pub fn new(s: impl Into<String>) -> Result<Self, ConvertError> {
978 let s = s.into();
979 if s.len() > Self::MAX_BYTES {
980 Err(format!(
981 "Size of a fencing token cannot exceed {} bytes",
982 Self::MAX_BYTES
983 )
984 .into())
985 } else {
986 Ok(Self(s))
987 }
988 }
989
990 pub fn generate(n: usize) -> Result<Self, ConvertError> {
992 Self::new(
993 rand::rng()
994 .sample_iter(&rand::distr::Alphanumeric)
995 .take(n)
996 .map(char::from)
997 .collect::<String>(),
998 )
999 }
1000}
1001
1002impl Deref for FencingToken {
1003 type Target = str;
1004
1005 fn deref(&self) -> &Self::Target {
1006 &self.0
1007 }
1008}
1009
1010impl TryFrom<&str> for FencingToken {
1011 type Error = ConvertError;
1012
1013 fn try_from(value: &str) -> Result<Self, Self::Error> {
1014 Self::new(value)
1015 }
1016}
1017
1018#[derive(Debug, Clone)]
1020pub enum Command {
1021 Fence {
1026 fencing_token: FencingToken,
1030 },
1031 Trim {
1036 seq_num: u64,
1041 },
1042}
1043
1044#[derive(Debug, Clone)]
1050pub struct CommandRecord {
1051 pub command: Command,
1053 pub timestamp: Option<u64>,
1055}
1056
1057impl CommandRecord {
1058 const FENCE: &[u8] = b"fence";
1059 const TRIM: &[u8] = b"trim";
1060
1061 pub fn fence(fencing_token: FencingToken) -> Self {
1063 Self {
1064 command: Command::Fence { fencing_token },
1065 timestamp: None,
1066 }
1067 }
1068
1069 pub fn trim(seq_num: impl Into<u64>) -> Self {
1071 Self {
1072 command: Command::Trim {
1073 seq_num: seq_num.into(),
1074 },
1075 timestamp: None,
1076 }
1077 }
1078
1079 pub fn with_timestamp(self, timestamp: u64) -> Self {
1081 Self {
1082 timestamp: Some(timestamp),
1083 ..self
1084 }
1085 }
1086}
1087
1088#[sync_docs]
1089#[derive(Debug, Clone, PartialEq, Eq)]
1090pub struct AppendRecord {
1091 timestamp: Option<u64>,
1092 headers: Vec<Header>,
1093 body: Bytes,
1094 #[cfg(test)]
1095 max_bytes: u64,
1096}
1097
1098metered_impl!(AppendRecord);
1099
1100impl AppendRecord {
1101 const MAX_BYTES: u64 = MIB_BYTES;
1102
1103 fn validated(self) -> Result<Self, ConvertError> {
1104 #[cfg(test)]
1105 let max_bytes = self.max_bytes;
1106 #[cfg(not(test))]
1107 let max_bytes = Self::MAX_BYTES;
1108
1109 if self.metered_bytes() > max_bytes {
1110 Err("AppendRecord should have metered size less than 1 MiB".into())
1111 } else {
1112 Ok(self)
1113 }
1114 }
1115
1116 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1118 Self {
1119 timestamp: None,
1120 headers: Vec::new(),
1121 body: body.into(),
1122 #[cfg(test)]
1123 max_bytes: Self::MAX_BYTES,
1124 }
1125 .validated()
1126 }
1127
1128 #[cfg(test)]
1129 pub(crate) fn with_max_bytes(
1130 max_bytes: u64,
1131 body: impl Into<Bytes>,
1132 ) -> Result<Self, ConvertError> {
1133 Self {
1134 timestamp: None,
1135 headers: Vec::new(),
1136 body: body.into(),
1137 max_bytes,
1138 }
1139 .validated()
1140 }
1141
1142 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1144 Self {
1145 headers: headers.into(),
1146 ..self
1147 }
1148 .validated()
1149 }
1150
1151 pub fn with_timestamp(self, timestamp: u64) -> Self {
1153 Self {
1154 timestamp: Some(timestamp),
1155 ..self
1156 }
1157 }
1158
1159 pub fn body(&self) -> &[u8] {
1161 &self.body
1162 }
1163
1164 pub fn headers(&self) -> &[Header] {
1166 &self.headers
1167 }
1168
1169 pub fn timestamp(&self) -> Option<u64> {
1171 self.timestamp
1172 }
1173
1174 pub fn into_parts(self) -> AppendRecordParts {
1176 AppendRecordParts {
1177 timestamp: self.timestamp,
1178 headers: self.headers,
1179 body: self.body,
1180 }
1181 }
1182
1183 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1185 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1186 if let Some(timestamp) = parts.timestamp {
1187 Ok(record.with_timestamp(timestamp))
1188 } else {
1189 Ok(record)
1190 }
1191 }
1192}
1193
1194impl From<AppendRecord> for api::AppendRecord {
1195 fn from(value: AppendRecord) -> Self {
1196 Self {
1197 timestamp: value.timestamp,
1198 headers: value.headers.into_iter().map(Into::into).collect(),
1199 body: value.body,
1200 }
1201 }
1202}
1203
1204impl From<CommandRecord> for AppendRecord {
1205 fn from(value: CommandRecord) -> Self {
1206 let (header_value, body) = match value.command {
1207 Command::Fence { fencing_token } => (
1208 CommandRecord::FENCE,
1209 Bytes::copy_from_slice(fencing_token.as_bytes()),
1210 ),
1211 Command::Trim { seq_num } => (
1212 CommandRecord::TRIM,
1213 Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1214 ),
1215 };
1216 AppendRecordParts {
1217 timestamp: value.timestamp,
1218 headers: vec![Header::from_value(header_value)],
1219 body,
1220 }
1221 .try_into()
1222 .expect("command record is a valid append record")
1223 }
1224}
1225
1226#[sync_docs(AppendRecordParts = "AppendRecord")]
1227#[derive(Debug, Clone)]
1228pub struct AppendRecordParts {
1229 pub timestamp: Option<u64>,
1230 pub headers: Vec<Header>,
1231 pub body: Bytes,
1232}
1233
1234impl From<AppendRecord> for AppendRecordParts {
1235 fn from(value: AppendRecord) -> Self {
1236 value.into_parts()
1237 }
1238}
1239
1240impl TryFrom<AppendRecordParts> for AppendRecord {
1241 type Error = ConvertError;
1242
1243 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1244 Self::try_from_parts(value)
1245 }
1246}
1247
1248#[derive(Debug, Clone)]
1250pub struct AppendRecordBatch {
1251 records: Vec<AppendRecord>,
1252 metered_bytes: u64,
1253 max_capacity: usize,
1254 #[cfg(test)]
1255 max_bytes: u64,
1256}
1257
1258impl PartialEq for AppendRecordBatch {
1259 fn eq(&self, other: &Self) -> bool {
1260 if self.records.eq(&other.records) {
1261 assert_eq!(self.metered_bytes, other.metered_bytes);
1262 true
1263 } else {
1264 false
1265 }
1266 }
1267}
1268
1269impl Eq for AppendRecordBatch {}
1270
1271impl Default for AppendRecordBatch {
1272 fn default() -> Self {
1273 Self::new()
1274 }
1275}
1276
1277impl AppendRecordBatch {
1278 pub const MAX_CAPACITY: usize = 1000;
1282
1283 pub const MAX_BYTES: u64 = MIB_BYTES;
1285
1286 pub fn new() -> Self {
1288 Self::with_max_capacity(Self::MAX_CAPACITY)
1289 }
1290
1291 pub fn with_max_capacity(max_capacity: usize) -> Self {
1295 assert!(
1296 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1297 "Batch capacity must be between 1 and 1000"
1298 );
1299
1300 Self {
1301 records: Vec::with_capacity(max_capacity),
1302 metered_bytes: 0,
1303 max_capacity,
1304 #[cfg(test)]
1305 max_bytes: Self::MAX_BYTES,
1306 }
1307 }
1308
1309 #[cfg(test)]
1310 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1311 #[cfg(test)]
1312 assert!(
1313 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1314 "Batch size must be between 1 byte and 1 MiB"
1315 );
1316
1317 Self {
1318 max_bytes,
1319 ..Self::with_max_capacity(max_capacity)
1320 }
1321 }
1322
1323 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1329 where
1330 R: Into<AppendRecord>,
1331 T: IntoIterator<Item = R>,
1332 {
1333 let mut records = Self::new();
1334 let mut pending = Vec::new();
1335
1336 let mut iter = iter.into_iter();
1337
1338 for record in iter.by_ref() {
1339 if let Err(record) = records.push(record) {
1340 pending.push(record);
1341 break;
1342 }
1343 }
1344
1345 if pending.is_empty() {
1346 Ok(records)
1347 } else {
1348 pending.extend(iter.map(Into::into));
1349 Err((records, pending))
1350 }
1351 }
1352
1353 pub fn is_empty(&self) -> bool {
1355 if self.records.is_empty() {
1356 assert_eq!(self.metered_bytes, 0);
1357 true
1358 } else {
1359 false
1360 }
1361 }
1362
1363 pub fn len(&self) -> usize {
1365 self.records.len()
1366 }
1367
1368 #[cfg(test)]
1369 fn max_bytes(&self) -> u64 {
1370 self.max_bytes
1371 }
1372
1373 #[cfg(not(test))]
1374 fn max_bytes(&self) -> u64 {
1375 Self::MAX_BYTES
1376 }
1377
1378 pub fn is_full(&self) -> bool {
1380 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1381 }
1382
1383 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1385 assert!(self.records.len() <= self.max_capacity);
1386 assert!(self.metered_bytes <= self.max_bytes());
1387
1388 let record = record.into();
1389 let record_size = record.metered_bytes();
1390 if self.records.len() >= self.max_capacity
1391 || self.metered_bytes + record_size > self.max_bytes()
1392 {
1393 Err(record)
1394 } else {
1395 self.records.push(record);
1396 self.metered_bytes += record_size;
1397 Ok(())
1398 }
1399 }
1400}
1401
1402impl MeteredBytes for AppendRecordBatch {
1403 fn metered_bytes(&self) -> u64 {
1404 self.metered_bytes
1405 }
1406}
1407
1408impl IntoIterator for AppendRecordBatch {
1409 type Item = AppendRecord;
1410 type IntoIter = std::vec::IntoIter<Self::Item>;
1411
1412 fn into_iter(self) -> Self::IntoIter {
1413 self.records.into_iter()
1414 }
1415}
1416
1417impl<'a> IntoIterator for &'a AppendRecordBatch {
1418 type Item = &'a AppendRecord;
1419 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1420
1421 fn into_iter(self) -> Self::IntoIter {
1422 self.records.iter()
1423 }
1424}
1425
1426impl AsRef<[AppendRecord]> for AppendRecordBatch {
1427 fn as_ref(&self) -> &[AppendRecord] {
1428 &self.records
1429 }
1430}
1431
1432#[sync_docs]
1433#[derive(Debug, Default, Clone)]
1434pub struct AppendInput {
1435 pub records: AppendRecordBatch,
1436 pub match_seq_num: Option<u64>,
1437 pub fencing_token: Option<FencingToken>,
1438}
1439
1440impl MeteredBytes for AppendInput {
1441 fn metered_bytes(&self) -> u64 {
1442 self.records.metered_bytes()
1443 }
1444}
1445
1446impl AppendInput {
1447 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1449 Self {
1450 records: records.into(),
1451 match_seq_num: None,
1452 fencing_token: None,
1453 }
1454 }
1455
1456 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1458 Self {
1459 match_seq_num: Some(match_seq_num.into()),
1460 ..self
1461 }
1462 }
1463
1464 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1466 Self {
1467 fencing_token: Some(fencing_token),
1468 ..self
1469 }
1470 }
1471
1472 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1473 let Self {
1474 records,
1475 match_seq_num,
1476 fencing_token,
1477 } = self;
1478
1479 api::AppendInput {
1480 stream: stream.into(),
1481 records: records.into_iter().map(Into::into).collect(),
1482 match_seq_num,
1483 fencing_token: fencing_token.map(|f| f.0),
1484 }
1485 }
1486}
1487
1488#[derive(Debug, Clone)]
1490pub struct AppendAck {
1491 pub start: StreamPosition,
1493 pub end: StreamPosition,
1498 pub tail: StreamPosition,
1502}
1503
1504impl From<api::AppendOutput> for AppendAck {
1505 fn from(value: api::AppendOutput) -> Self {
1506 let api::AppendOutput {
1507 start_seq_num,
1508 start_timestamp,
1509 end_seq_num,
1510 end_timestamp,
1511 next_seq_num,
1512 last_timestamp,
1513 } = value;
1514 let start = StreamPosition {
1515 seq_num: start_seq_num,
1516 timestamp: start_timestamp,
1517 };
1518 let end = StreamPosition {
1519 seq_num: end_seq_num,
1520 timestamp: end_timestamp,
1521 };
1522 let tail = StreamPosition {
1523 seq_num: next_seq_num,
1524 timestamp: last_timestamp,
1525 };
1526 Self { start, end, tail }
1527 }
1528}
1529
1530impl TryFrom<api::AppendResponse> for AppendAck {
1531 type Error = ConvertError;
1532 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1533 let api::AppendResponse { output } = value;
1534 let output = output.ok_or("missing append output")?;
1535 Ok(output.into())
1536 }
1537}
1538
1539impl TryFrom<api::AppendSessionResponse> for AppendAck {
1540 type Error = ConvertError;
1541 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1542 let api::AppendSessionResponse { output } = value;
1543 let output = output.ok_or("missing append output")?;
1544 Ok(output.into())
1545 }
1546}
1547
1548#[sync_docs]
1549#[derive(Debug, Clone, Default)]
1550pub struct ReadLimit {
1551 pub count: Option<u64>,
1552 pub bytes: Option<u64>,
1553}
1554
1555impl ReadLimit {
1556 pub fn new() -> Self {
1558 Self::default()
1559 }
1560
1561 pub fn with_count(self, count: u64) -> Self {
1563 Self {
1564 count: Some(count),
1565 ..self
1566 }
1567 }
1568
1569 pub fn with_bytes(self, bytes: u64) -> Self {
1571 Self {
1572 bytes: Some(bytes),
1573 ..self
1574 }
1575 }
1576}
1577
1578#[derive(Debug, Clone)]
1580pub enum ReadStart {
1581 SeqNum(u64),
1583 Timestamp(u64),
1585 TailOffset(u64),
1587}
1588
1589impl Default for ReadStart {
1590 fn default() -> Self {
1591 Self::SeqNum(0)
1592 }
1593}
1594
1595impl From<ReadStart> for api::read_request::Start {
1596 fn from(start: ReadStart) -> Self {
1597 match start {
1598 ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1599 ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1600 ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1601 }
1602 }
1603}
1604
1605impl From<ReadStart> for api::read_session_request::Start {
1606 fn from(start: ReadStart) -> Self {
1607 match start {
1608 ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1609 ReadStart::Timestamp(timestamp) => {
1610 api::read_session_request::Start::Timestamp(timestamp)
1611 }
1612 ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1613 }
1614 }
1615}
1616
1617#[sync_docs]
1618#[derive(Debug, Clone, Default)]
1619pub struct ReadRequest {
1620 pub start: ReadStart,
1621 pub limit: ReadLimit,
1622}
1623
1624impl ReadRequest {
1625 pub fn new(start: ReadStart) -> Self {
1627 Self {
1628 start,
1629 ..Default::default()
1630 }
1631 }
1632
1633 pub fn with_limit(self, limit: ReadLimit) -> Self {
1635 Self { limit, ..self }
1636 }
1637}
1638
1639impl ReadRequest {
1640 pub(crate) fn try_into_api_type(
1641 self,
1642 stream: impl Into<String>,
1643 ) -> Result<api::ReadRequest, ConvertError> {
1644 let Self { start, limit } = self;
1645
1646 let limit = if limit.count > Some(1000) {
1647 Err("read limit: count must not exceed 1000 for unary request")
1648 } else if limit.bytes > Some(MIB_BYTES) {
1649 Err("read limit: bytes must not exceed 1MiB for unary request")
1650 } else {
1651 Ok(api::ReadLimit {
1652 count: limit.count,
1653 bytes: limit.bytes,
1654 })
1655 }?;
1656
1657 Ok(api::ReadRequest {
1658 stream: stream.into(),
1659 start: Some(start.into()),
1660 limit: Some(limit),
1661 })
1662 }
1663}
1664
1665#[sync_docs]
1666#[derive(Debug, Clone)]
1667pub struct SequencedRecord {
1668 pub seq_num: u64,
1669 pub timestamp: u64,
1670 pub headers: Vec<Header>,
1671 pub body: Bytes,
1672}
1673
1674metered_impl!(SequencedRecord);
1675
1676impl From<api::SequencedRecord> for SequencedRecord {
1677 fn from(value: api::SequencedRecord) -> Self {
1678 let api::SequencedRecord {
1679 seq_num,
1680 timestamp,
1681 headers,
1682 body,
1683 } = value;
1684 Self {
1685 seq_num,
1686 timestamp,
1687 headers: headers.into_iter().map(Into::into).collect(),
1688 body,
1689 }
1690 }
1691}
1692
1693impl SequencedRecord {
1694 pub fn as_command_record(&self) -> Option<CommandRecord> {
1696 if self.headers.len() != 1 {
1697 return None;
1698 }
1699
1700 let header = self.headers.first().expect("pre-validated length");
1701
1702 if !header.name.is_empty() {
1703 return None;
1704 }
1705
1706 match header.value.as_ref() {
1707 CommandRecord::FENCE => {
1708 let fencing_token =
1709 FencingToken::new(std::str::from_utf8(&self.body).ok()?).ok()?;
1710 Some(CommandRecord {
1711 command: Command::Fence { fencing_token },
1712 timestamp: Some(self.timestamp),
1713 })
1714 }
1715 CommandRecord::TRIM => {
1716 let body: &[u8] = &self.body;
1717 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1718 Some(CommandRecord {
1719 command: Command::Trim { seq_num },
1720 timestamp: Some(self.timestamp),
1721 })
1722 }
1723 _ => None,
1724 }
1725 }
1726}
1727
1728#[sync_docs]
1729#[derive(Debug, Clone)]
1730pub struct SequencedRecordBatch {
1731 pub records: Vec<SequencedRecord>,
1732}
1733
1734impl MeteredBytes for SequencedRecordBatch {
1735 fn metered_bytes(&self) -> u64 {
1736 self.records.metered_bytes()
1737 }
1738}
1739
1740impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1741 fn from(value: api::SequencedRecordBatch) -> Self {
1742 let api::SequencedRecordBatch { records } = value;
1743 Self {
1744 records: records.into_iter().map(Into::into).collect(),
1745 }
1746 }
1747}
1748
1749#[sync_docs(ReadOutput = "Output")]
1750#[derive(Debug, Clone)]
1751pub enum ReadOutput {
1752 Batch(SequencedRecordBatch),
1753 NextSeqNum(u64),
1754}
1755
1756impl From<api::read_output::Output> for ReadOutput {
1757 fn from(value: api::read_output::Output) -> Self {
1758 match value {
1759 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1760 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1761 }
1762 }
1763}
1764
1765impl TryFrom<api::ReadOutput> for ReadOutput {
1766 type Error = ConvertError;
1767 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1768 let api::ReadOutput { output } = value;
1769 let output = output.ok_or("missing read output")?;
1770 Ok(output.into())
1771 }
1772}
1773
1774impl TryFrom<api::ReadResponse> for ReadOutput {
1775 type Error = ConvertError;
1776 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1777 let api::ReadResponse { output } = value;
1778 let output = output.ok_or("missing output in read response")?;
1779 output.try_into()
1780 }
1781}
1782
1783#[sync_docs]
1784#[derive(Debug, Clone, Default)]
1785pub struct ReadSessionRequest {
1786 pub start: ReadStart,
1787 pub limit: ReadLimit,
1788}
1789
1790impl ReadSessionRequest {
1791 pub fn new(start: ReadStart) -> Self {
1793 Self {
1794 start,
1795 ..Default::default()
1796 }
1797 }
1798
1799 pub fn with_limit(self, limit: ReadLimit) -> Self {
1801 Self { limit, ..self }
1802 }
1803
1804 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1805 let Self { start, limit } = self;
1806 api::ReadSessionRequest {
1807 stream: stream.into(),
1808 start: Some(start.into()),
1809 limit: Some(api::ReadLimit {
1810 count: limit.count,
1811 bytes: limit.bytes,
1812 }),
1813 heartbeats: false,
1814 }
1815 }
1816}
1817
1818impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1819 type Error = ConvertError;
1820 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1821 let api::ReadSessionResponse { output } = value;
1822 let output = output.ok_or("missing output in read session response")?;
1823 output.try_into()
1824 }
1825}
1826
1827#[derive(Debug, Clone)]
1832pub struct BasinName(String);
1833
1834impl AsRef<str> for BasinName {
1835 fn as_ref(&self) -> &str {
1836 &self.0
1837 }
1838}
1839
1840impl Deref for BasinName {
1841 type Target = str;
1842 fn deref(&self) -> &Self::Target {
1843 &self.0
1844 }
1845}
1846
1847impl TryFrom<String> for BasinName {
1848 type Error = ConvertError;
1849
1850 fn try_from(name: String) -> Result<Self, Self::Error> {
1851 if name.len() < 8 || name.len() > 48 {
1852 return Err("Basin name must be between 8 and 48 characters in length".into());
1853 }
1854
1855 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1856 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1857 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1858 .expect("Failed to compile basin name regex")
1859 });
1860
1861 if !regex.is_match(&name) {
1862 return Err(
1863 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1864 It cannot begin or end with a hyphen."
1865 .into(),
1866 );
1867 }
1868
1869 Ok(Self(name))
1870 }
1871}
1872
1873impl FromStr for BasinName {
1874 type Err = ConvertError;
1875
1876 fn from_str(s: &str) -> Result<Self, Self::Err> {
1877 s.to_string().try_into()
1878 }
1879}
1880
1881impl std::fmt::Display for BasinName {
1882 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1883 f.write_str(&self.0)
1884 }
1885}
1886
1887#[derive(Debug, Clone)]
1890pub struct AccessTokenId(String);
1891
1892impl AsRef<str> for AccessTokenId {
1893 fn as_ref(&self) -> &str {
1894 &self.0
1895 }
1896}
1897
1898impl Deref for AccessTokenId {
1899 type Target = str;
1900
1901 fn deref(&self) -> &Self::Target {
1902 &self.0
1903 }
1904}
1905
1906impl TryFrom<String> for AccessTokenId {
1907 type Error = ConvertError;
1908
1909 fn try_from(name: String) -> Result<Self, Self::Error> {
1910 if name.is_empty() {
1911 return Err("Access token ID must not be empty".into());
1912 }
1913
1914 if name.len() > 96 {
1915 return Err("Access token ID must not exceed 96 characters".into());
1916 }
1917
1918 Ok(Self(name))
1919 }
1920}
1921
1922impl From<AccessTokenId> for String {
1923 fn from(value: AccessTokenId) -> Self {
1924 value.0
1925 }
1926}
1927
1928impl FromStr for AccessTokenId {
1929 type Err = ConvertError;
1930
1931 fn from_str(s: &str) -> Result<Self, Self::Err> {
1932 s.to_string().try_into()
1933 }
1934}
1935
1936impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1937 fn from(value: AccessTokenInfo) -> Self {
1938 Self {
1939 info: Some(value.into()),
1940 }
1941 }
1942}
1943
1944#[sync_docs]
1945#[derive(Debug, Clone)]
1946pub struct AccessTokenInfo {
1947 pub id: AccessTokenId,
1948 pub expires_at: Option<u32>,
1949 pub auto_prefix_streams: bool,
1950 pub scope: Option<AccessTokenScope>,
1951}
1952
1953impl AccessTokenInfo {
1954 pub fn new(id: AccessTokenId) -> Self {
1956 Self {
1957 id,
1958 expires_at: None,
1959 auto_prefix_streams: false,
1960 scope: None,
1961 }
1962 }
1963
1964 pub fn with_expires_at(self, expires_at: u32) -> Self {
1966 Self {
1967 expires_at: Some(expires_at),
1968 ..self
1969 }
1970 }
1971
1972 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1974 Self {
1975 auto_prefix_streams,
1976 ..self
1977 }
1978 }
1979
1980 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1982 Self {
1983 scope: Some(scope),
1984 ..self
1985 }
1986 }
1987}
1988
1989impl From<AccessTokenInfo> for api::AccessTokenInfo {
1990 fn from(value: AccessTokenInfo) -> Self {
1991 let AccessTokenInfo {
1992 id,
1993 expires_at,
1994 auto_prefix_streams,
1995 scope,
1996 } = value;
1997 Self {
1998 id: id.into(),
1999 expires_at,
2000 auto_prefix_streams,
2001 scope: scope.map(Into::into),
2002 }
2003 }
2004}
2005
2006impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2007 type Error = ConvertError;
2008
2009 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2010 let api::AccessTokenInfo {
2011 id,
2012 expires_at,
2013 auto_prefix_streams,
2014 scope,
2015 } = value;
2016 Ok(Self {
2017 id: id.try_into()?,
2018 expires_at,
2019 auto_prefix_streams,
2020 scope: scope.map(Into::into),
2021 })
2022 }
2023}
2024
2025#[sync_docs]
2026#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2027pub enum Operation {
2028 ListBasins,
2029 CreateBasin,
2030 DeleteBasin,
2031 ReconfigureBasin,
2032 GetBasinConfig,
2033 IssueAccessToken,
2034 RevokeAccessToken,
2035 ListAccessTokens,
2036 ListStreams,
2037 CreateStream,
2038 DeleteStream,
2039 GetStreamConfig,
2040 ReconfigureStream,
2041 CheckTail,
2042 Append,
2043 Read,
2044 Trim,
2045 Fence,
2046}
2047
2048impl FromStr for Operation {
2049 type Err = ConvertError;
2050
2051 fn from_str(s: &str) -> Result<Self, Self::Err> {
2052 match s.to_lowercase().as_str() {
2053 "list-basins" => Ok(Self::ListBasins),
2054 "create-basin" => Ok(Self::CreateBasin),
2055 "delete-basin" => Ok(Self::DeleteBasin),
2056 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2057 "get-basin-config" => Ok(Self::GetBasinConfig),
2058 "issue-access-token" => Ok(Self::IssueAccessToken),
2059 "revoke-access-token" => Ok(Self::RevokeAccessToken),
2060 "list-access-tokens" => Ok(Self::ListAccessTokens),
2061 "list-streams" => Ok(Self::ListStreams),
2062 "create-stream" => Ok(Self::CreateStream),
2063 "delete-stream" => Ok(Self::DeleteStream),
2064 "get-stream-config" => Ok(Self::GetStreamConfig),
2065 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2066 "check-tail" => Ok(Self::CheckTail),
2067 "append" => Ok(Self::Append),
2068 "read" => Ok(Self::Read),
2069 "trim" => Ok(Self::Trim),
2070 "fence" => Ok(Self::Fence),
2071 _ => Err("invalid operation".into()),
2072 }
2073 }
2074}
2075
2076impl From<Operation> for api::Operation {
2077 fn from(value: Operation) -> Self {
2078 match value {
2079 Operation::ListBasins => Self::ListBasins,
2080 Operation::CreateBasin => Self::CreateBasin,
2081 Operation::DeleteBasin => Self::DeleteBasin,
2082 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2083 Operation::GetBasinConfig => Self::GetBasinConfig,
2084 Operation::IssueAccessToken => Self::IssueAccessToken,
2085 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2086 Operation::ListAccessTokens => Self::ListAccessTokens,
2087 Operation::ListStreams => Self::ListStreams,
2088 Operation::CreateStream => Self::CreateStream,
2089 Operation::DeleteStream => Self::DeleteStream,
2090 Operation::GetStreamConfig => Self::GetStreamConfig,
2091 Operation::ReconfigureStream => Self::ReconfigureStream,
2092 Operation::CheckTail => Self::CheckTail,
2093 Operation::Append => Self::Append,
2094 Operation::Read => Self::Read,
2095 Operation::Trim => Self::Trim,
2096 Operation::Fence => Self::Fence,
2097 }
2098 }
2099}
2100
2101impl From<api::Operation> for Option<Operation> {
2102 fn from(value: api::Operation) -> Self {
2103 match value {
2104 api::Operation::Unspecified => None,
2105 api::Operation::ListBasins => Some(Operation::ListBasins),
2106 api::Operation::CreateBasin => Some(Operation::CreateBasin),
2107 api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2108 api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2109 api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2110 api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2111 api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2112 api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2113 api::Operation::ListStreams => Some(Operation::ListStreams),
2114 api::Operation::CreateStream => Some(Operation::CreateStream),
2115 api::Operation::DeleteStream => Some(Operation::DeleteStream),
2116 api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2117 api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2118 api::Operation::CheckTail => Some(Operation::CheckTail),
2119 api::Operation::Append => Some(Operation::Append),
2120 api::Operation::Read => Some(Operation::Read),
2121 api::Operation::Trim => Some(Operation::Trim),
2122 api::Operation::Fence => Some(Operation::Fence),
2123 }
2124 }
2125}
2126
2127#[sync_docs]
2128#[derive(Debug, Clone, Default)]
2129pub struct AccessTokenScope {
2130 pub basins: Option<ResourceSet>,
2131 pub streams: Option<ResourceSet>,
2132 pub access_tokens: Option<ResourceSet>,
2133 pub op_groups: Option<PermittedOperationGroups>,
2134 pub ops: HashSet<Operation>,
2135}
2136
2137impl AccessTokenScope {
2138 pub fn new() -> Self {
2140 Self::default()
2141 }
2142
2143 pub fn with_basins(self, basins: ResourceSet) -> Self {
2145 Self {
2146 basins: Some(basins),
2147 ..self
2148 }
2149 }
2150
2151 pub fn with_streams(self, streams: ResourceSet) -> Self {
2153 Self {
2154 streams: Some(streams),
2155 ..self
2156 }
2157 }
2158
2159 pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2161 Self {
2162 access_tokens: Some(access_tokens),
2163 ..self
2164 }
2165 }
2166
2167 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2169 Self {
2170 op_groups: Some(op_groups),
2171 ..self
2172 }
2173 }
2174
2175 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2177 Self {
2178 ops: ops.into_iter().collect(),
2179 ..self
2180 }
2181 }
2182
2183 pub fn with_op(self, op: Operation) -> Self {
2185 let mut ops = self.ops;
2186 ops.insert(op);
2187 Self { ops, ..self }
2188 }
2189}
2190
2191impl From<AccessTokenScope> for api::AccessTokenScope {
2192 fn from(value: AccessTokenScope) -> Self {
2193 let AccessTokenScope {
2194 basins,
2195 streams,
2196 access_tokens,
2197 op_groups,
2198 ops,
2199 } = value;
2200 Self {
2201 basins: basins.map(Into::into),
2202 streams: streams.map(Into::into),
2203 access_tokens: access_tokens.map(Into::into),
2204 op_groups: op_groups.map(Into::into),
2205 ops: ops
2206 .into_iter()
2207 .map(api::Operation::from)
2208 .map(Into::into)
2209 .collect(),
2210 }
2211 }
2212}
2213
2214impl From<api::AccessTokenScope> for AccessTokenScope {
2215 fn from(value: api::AccessTokenScope) -> Self {
2216 let api::AccessTokenScope {
2217 basins,
2218 streams,
2219 access_tokens,
2220 op_groups,
2221 ops,
2222 } = value;
2223 Self {
2224 basins: basins.and_then(|set| set.matching.map(Into::into)),
2225 streams: streams.and_then(|set| set.matching.map(Into::into)),
2226 access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2227 op_groups: op_groups.map(Into::into),
2228 ops: ops
2229 .into_iter()
2230 .map(api::Operation::try_from)
2231 .flat_map(Result::ok)
2232 .flat_map(<Option<Operation>>::from)
2233 .collect(),
2234 }
2235 }
2236}
2237
2238impl From<ResourceSet> for api::ResourceSet {
2239 fn from(value: ResourceSet) -> Self {
2240 Self {
2241 matching: Some(value.into()),
2242 }
2243 }
2244}
2245
2246#[sync_docs(ResourceSet = "Matching")]
2247#[derive(Debug, Clone)]
2248pub enum ResourceSet {
2249 Exact(String),
2250 Prefix(String),
2251}
2252
2253impl From<ResourceSet> for api::resource_set::Matching {
2254 fn from(value: ResourceSet) -> Self {
2255 match value {
2256 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2257 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2258 }
2259 }
2260}
2261
2262impl From<api::resource_set::Matching> for ResourceSet {
2263 fn from(value: api::resource_set::Matching) -> Self {
2264 match value {
2265 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2266 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2267 }
2268 }
2269}
2270
2271#[sync_docs]
2272#[derive(Debug, Clone, Default)]
2273pub struct PermittedOperationGroups {
2274 pub account: Option<ReadWritePermissions>,
2275 pub basin: Option<ReadWritePermissions>,
2276 pub stream: Option<ReadWritePermissions>,
2277}
2278
2279impl PermittedOperationGroups {
2280 pub fn new() -> Self {
2282 Self::default()
2283 }
2284
2285 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2287 Self {
2288 account: Some(account),
2289 ..self
2290 }
2291 }
2292
2293 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2295 Self {
2296 basin: Some(basin),
2297 ..self
2298 }
2299 }
2300
2301 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2303 Self {
2304 stream: Some(stream),
2305 ..self
2306 }
2307 }
2308}
2309
2310impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2311 fn from(value: PermittedOperationGroups) -> Self {
2312 let PermittedOperationGroups {
2313 account,
2314 basin,
2315 stream,
2316 } = value;
2317 Self {
2318 account: account.map(Into::into),
2319 basin: basin.map(Into::into),
2320 stream: stream.map(Into::into),
2321 }
2322 }
2323}
2324
2325impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2326 fn from(value: api::PermittedOperationGroups) -> Self {
2327 let api::PermittedOperationGroups {
2328 account,
2329 basin,
2330 stream,
2331 } = value;
2332 Self {
2333 account: account.map(Into::into),
2334 basin: basin.map(Into::into),
2335 stream: stream.map(Into::into),
2336 }
2337 }
2338}
2339
2340#[sync_docs]
2341#[derive(Debug, Clone, Default)]
2342pub struct ReadWritePermissions {
2343 pub read: bool,
2344 pub write: bool,
2345}
2346
2347impl ReadWritePermissions {
2348 pub fn new() -> Self {
2350 Self::default()
2351 }
2352
2353 pub fn with_read(self, read: bool) -> Self {
2355 Self { read, ..self }
2356 }
2357
2358 pub fn with_write(self, write: bool) -> Self {
2360 Self { write, ..self }
2361 }
2362}
2363
2364impl From<ReadWritePermissions> for api::ReadWritePermissions {
2365 fn from(value: ReadWritePermissions) -> Self {
2366 let ReadWritePermissions { read, write } = value;
2367 Self { read, write }
2368 }
2369}
2370
2371impl From<api::ReadWritePermissions> for ReadWritePermissions {
2372 fn from(value: api::ReadWritePermissions) -> Self {
2373 let api::ReadWritePermissions { read, write } = value;
2374 Self { read, write }
2375 }
2376}
2377
2378impl From<api::IssueAccessTokenResponse> for String {
2379 fn from(value: api::IssueAccessTokenResponse) -> Self {
2380 value.access_token
2381 }
2382}
2383
2384impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2385 fn from(value: AccessTokenId) -> Self {
2386 Self { id: value.into() }
2387 }
2388}
2389
2390impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2391 type Error = ConvertError;
2392 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2393 let token_info = value.info.ok_or("access token info is missing")?;
2394 token_info.try_into()
2395 }
2396}
2397
2398#[sync_docs]
2399#[derive(Debug, Clone, Default)]
2400pub struct ListAccessTokensRequest {
2401 pub prefix: String,
2402 pub start_after: String,
2403 pub limit: Option<usize>,
2404}
2405
2406impl ListAccessTokensRequest {
2407 pub fn new() -> Self {
2409 Self::default()
2410 }
2411
2412 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2414 Self {
2415 prefix: prefix.into(),
2416 ..self
2417 }
2418 }
2419
2420 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2422 Self {
2423 start_after: start_after.into(),
2424 ..self
2425 }
2426 }
2427
2428 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2430 Self {
2431 limit: limit.into(),
2432 ..self
2433 }
2434 }
2435}
2436
2437impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2438 type Error = ConvertError;
2439 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2440 let ListAccessTokensRequest {
2441 prefix,
2442 start_after,
2443 limit,
2444 } = value;
2445 Ok(Self {
2446 prefix,
2447 start_after,
2448 limit: limit
2449 .map(TryInto::try_into)
2450 .transpose()
2451 .map_err(|_| "request limit does not fit into u64 bounds")?,
2452 })
2453 }
2454}
2455
2456#[sync_docs]
2457#[derive(Debug, Clone)]
2458pub struct ListAccessTokensResponse {
2459 pub access_tokens: Vec<AccessTokenInfo>,
2460 pub has_more: bool,
2461}
2462
2463impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2464 type Error = ConvertError;
2465 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2466 let api::ListAccessTokensResponse {
2467 access_tokens,
2468 has_more,
2469 } = value;
2470 let access_tokens = access_tokens
2471 .into_iter()
2472 .map(TryInto::try_into)
2473 .collect::<Result<Vec<_>, _>>()?;
2474 Ok(Self {
2475 access_tokens,
2476 has_more,
2477 })
2478 }
2479}