1use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
14
15#[derive(Debug, Clone, thiserror::Error)]
17#[error("{0}")]
18pub struct ConvertError(String);
19
20impl<T: Into<String>> From<T> for ConvertError {
21 fn from(value: T) -> Self {
22 Self(value.into())
23 }
24}
25
26pub trait MeteredBytes {
34 fn metered_bytes(&self) -> u64;
36}
37
38impl<T: MeteredBytes> MeteredBytes for Vec<T> {
39 fn metered_bytes(&self) -> u64 {
40 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
41 }
42}
43
44macro_rules! metered_impl {
45 ($ty:ty) => {
46 impl MeteredBytes for $ty {
47 fn metered_bytes(&self) -> u64 {
48 let bytes = 8
49 + (2 * self.headers.len())
50 + self
51 .headers
52 .iter()
53 .map(|h| h.name.len() + h.value.len())
54 .sum::<usize>()
55 + self.body.len();
56 bytes as u64
57 }
58 }
59 };
60}
61
62#[sync_docs]
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum BasinScope {
65 AwsUsEast1,
66}
67
68impl From<BasinScope> for api::BasinScope {
69 fn from(value: BasinScope) -> Self {
70 match value {
71 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
72 }
73 }
74}
75
76impl From<api::BasinScope> for Option<BasinScope> {
77 fn from(value: api::BasinScope) -> Self {
78 match value {
79 api::BasinScope::Unspecified => None,
80 api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
81 }
82 }
83}
84
85impl FromStr for BasinScope {
86 type Err = ConvertError;
87 fn from_str(value: &str) -> Result<Self, Self::Err> {
88 match value {
89 "aws:us-east-1" => Ok(Self::AwsUsEast1),
90 _ => Err("invalid basin scope value".into()),
91 }
92 }
93}
94
95#[sync_docs]
96#[derive(Debug, Clone)]
97pub struct CreateBasinRequest {
98 pub basin: BasinName,
99 pub config: Option<BasinConfig>,
100 pub scope: Option<BasinScope>,
101}
102
103impl CreateBasinRequest {
104 pub fn new(basin: BasinName) -> Self {
106 Self {
107 basin,
108 config: None,
109 scope: None,
110 }
111 }
112
113 pub fn with_config(self, config: BasinConfig) -> Self {
115 Self {
116 config: Some(config),
117 ..self
118 }
119 }
120
121 pub fn with_scope(self, scope: BasinScope) -> Self {
123 Self {
124 scope: Some(scope),
125 ..self
126 }
127 }
128}
129
130impl From<CreateBasinRequest> for api::CreateBasinRequest {
131 fn from(value: CreateBasinRequest) -> Self {
132 let CreateBasinRequest {
133 basin,
134 config,
135 scope,
136 } = value;
137 Self {
138 basin: basin.0,
139 config: config.map(Into::into),
140 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
141 }
142 }
143}
144
145#[sync_docs]
146#[derive(Debug, Clone, Default)]
147pub struct BasinConfig {
148 pub default_stream_config: Option<StreamConfig>,
149 pub create_stream_on_append: bool,
150 pub create_stream_on_read: bool,
151}
152
153impl BasinConfig {
154 pub fn new() -> Self {
156 Self::default()
157 }
158
159 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
161 Self {
162 default_stream_config: Some(default_stream_config),
163 ..self
164 }
165 }
166
167 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
169 Self {
170 create_stream_on_append,
171 ..self
172 }
173 }
174
175 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
177 Self {
178 create_stream_on_read,
179 ..self
180 }
181 }
182}
183
184impl From<BasinConfig> for api::BasinConfig {
185 fn from(value: BasinConfig) -> Self {
186 let BasinConfig {
187 default_stream_config,
188 create_stream_on_append,
189 create_stream_on_read,
190 } = value;
191 Self {
192 default_stream_config: default_stream_config.map(Into::into),
193 create_stream_on_append,
194 create_stream_on_read,
195 }
196 }
197}
198
199impl From<api::BasinConfig> for BasinConfig {
200 fn from(value: api::BasinConfig) -> Self {
201 let api::BasinConfig {
202 default_stream_config,
203 create_stream_on_append,
204 create_stream_on_read,
205 } = value;
206 Self {
207 default_stream_config: default_stream_config.map(Into::into),
208 create_stream_on_append,
209 create_stream_on_read,
210 }
211 }
212}
213
214#[sync_docs]
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum TimestampingMode {
217 ClientPrefer,
218 ClientRequire,
219 Arrival,
220}
221
222impl From<TimestampingMode> for api::TimestampingMode {
223 fn from(value: TimestampingMode) -> Self {
224 match value {
225 TimestampingMode::ClientPrefer => Self::ClientPrefer,
226 TimestampingMode::ClientRequire => Self::ClientRequire,
227 TimestampingMode::Arrival => Self::Arrival,
228 }
229 }
230}
231
232impl From<api::TimestampingMode> for Option<TimestampingMode> {
233 fn from(value: api::TimestampingMode) -> Self {
234 match value {
235 api::TimestampingMode::Unspecified => None,
236 api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
237 api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
238 api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
239 }
240 }
241}
242
243#[sync_docs(TimestampingConfig = "Timestamping")]
244#[derive(Debug, Clone, Default)]
245pub struct TimestampingConfig {
247 pub mode: Option<TimestampingMode>,
248 pub uncapped: Option<bool>,
249}
250
251impl TimestampingConfig {
252 pub fn new() -> Self {
254 Self::default()
255 }
256
257 pub fn with_mode(self, mode: TimestampingMode) -> Self {
259 Self {
260 mode: Some(mode),
261 ..self
262 }
263 }
264
265 pub fn with_uncapped(self, uncapped: bool) -> Self {
267 Self {
268 uncapped: Some(uncapped),
269 ..self
270 }
271 }
272}
273
274impl From<TimestampingConfig> for api::stream_config::Timestamping {
275 fn from(value: TimestampingConfig) -> Self {
276 Self {
277 mode: value
278 .mode
279 .map(api::TimestampingMode::from)
280 .unwrap_or_default()
281 .into(),
282 uncapped: value.uncapped,
283 }
284 }
285}
286
287impl From<api::stream_config::Timestamping> for TimestampingConfig {
288 fn from(value: api::stream_config::Timestamping) -> Self {
289 let mode = value.mode().into();
290 let uncapped = value.uncapped;
291 Self { mode, uncapped }
292 }
293}
294
295#[sync_docs]
296#[derive(Debug, Clone, Default)]
297pub struct StreamConfig {
298 pub storage_class: Option<StorageClass>,
299 pub retention_policy: Option<RetentionPolicy>,
300 pub timestamping: Option<TimestampingConfig>,
301}
302
303impl StreamConfig {
304 pub fn new() -> Self {
306 Self::default()
307 }
308
309 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
311 Self {
312 storage_class: Some(storage_class),
313 ..self
314 }
315 }
316
317 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
319 Self {
320 retention_policy: Some(retention_policy),
321 ..self
322 }
323 }
324
325 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
327 Self {
328 timestamping: Some(timestamping),
329 ..self
330 }
331 }
332}
333
334impl From<StreamConfig> for api::StreamConfig {
335 fn from(value: StreamConfig) -> Self {
336 let StreamConfig {
337 storage_class,
338 retention_policy,
339 timestamping,
340 } = value;
341 Self {
342 storage_class: storage_class
343 .map(api::StorageClass::from)
344 .unwrap_or_default()
345 .into(),
346 retention_policy: retention_policy.map(Into::into),
347 timestamping: timestamping.map(Into::into),
348 }
349 }
350}
351
352impl From<api::StreamConfig> for StreamConfig {
353 fn from(value: api::StreamConfig) -> Self {
354 Self {
355 storage_class: value.storage_class().into(),
356 retention_policy: value.retention_policy.map(Into::into),
357 timestamping: value.timestamping.map(Into::into),
358 }
359 }
360}
361
362#[sync_docs]
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364pub enum StorageClass {
365 Standard,
366 Express,
367}
368
369impl From<StorageClass> for api::StorageClass {
370 fn from(value: StorageClass) -> Self {
371 match value {
372 StorageClass::Standard => Self::Standard,
373 StorageClass::Express => Self::Express,
374 }
375 }
376}
377
378impl From<api::StorageClass> for Option<StorageClass> {
379 fn from(value: api::StorageClass) -> Self {
380 match value {
381 api::StorageClass::Unspecified => None,
382 api::StorageClass::Standard => Some(StorageClass::Standard),
383 api::StorageClass::Express => Some(StorageClass::Express),
384 }
385 }
386}
387
388impl FromStr for StorageClass {
389 type Err = ConvertError;
390
391 fn from_str(value: &str) -> Result<Self, Self::Err> {
392 match value {
393 "standard" => Ok(Self::Standard),
394 "express" => Ok(Self::Express),
395 v => Err(format!("unknown storage class: {v}").into()),
396 }
397 }
398}
399
400#[sync_docs(Age = "Age")]
401#[derive(Debug, Clone)]
402pub enum RetentionPolicy {
403 Age(Duration),
404}
405
406impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
407 fn from(value: RetentionPolicy) -> Self {
408 match value {
409 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
410 }
411 }
412}
413
414impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
415 fn from(value: api::stream_config::RetentionPolicy) -> Self {
416 match value {
417 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
418 }
419 }
420}
421
422#[sync_docs]
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum BasinState {
425 Active,
426 Creating,
427 Deleting,
428}
429
430impl From<BasinState> for api::BasinState {
431 fn from(value: BasinState) -> Self {
432 match value {
433 BasinState::Active => Self::Active,
434 BasinState::Creating => Self::Creating,
435 BasinState::Deleting => Self::Deleting,
436 }
437 }
438}
439
440impl From<api::BasinState> for Option<BasinState> {
441 fn from(value: api::BasinState) -> Self {
442 match value {
443 api::BasinState::Unspecified => None,
444 api::BasinState::Active => Some(BasinState::Active),
445 api::BasinState::Creating => Some(BasinState::Creating),
446 api::BasinState::Deleting => Some(BasinState::Deleting),
447 }
448 }
449}
450
451impl std::fmt::Display for BasinState {
452 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453 match self {
454 BasinState::Active => write!(f, "active"),
455 BasinState::Creating => write!(f, "creating"),
456 BasinState::Deleting => write!(f, "deleting"),
457 }
458 }
459}
460
461#[sync_docs]
462#[derive(Debug, Clone)]
463pub struct BasinInfo {
464 pub name: String,
465 pub scope: Option<BasinScope>,
466 pub state: Option<BasinState>,
467}
468
469impl From<BasinInfo> for api::BasinInfo {
470 fn from(value: BasinInfo) -> Self {
471 let BasinInfo { name, scope, state } = value;
472 Self {
473 name,
474 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
475 state: state.map(api::BasinState::from).unwrap_or_default().into(),
476 }
477 }
478}
479
480impl From<api::BasinInfo> for BasinInfo {
481 fn from(value: api::BasinInfo) -> Self {
482 let scope = value.scope().into();
483 let state = value.state().into();
484 let name = value.name;
485 Self { name, scope, state }
486 }
487}
488
489impl TryFrom<api::CreateBasinResponse> for BasinInfo {
490 type Error = ConvertError;
491 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
492 let api::CreateBasinResponse { info } = value;
493 let info = info.ok_or("missing basin info")?;
494 Ok(info.into())
495 }
496}
497
498#[sync_docs]
499#[derive(Debug, Clone, Default)]
500pub struct ListStreamsRequest {
501 pub prefix: String,
502 pub start_after: String,
503 pub limit: Option<usize>,
504}
505
506impl ListStreamsRequest {
507 pub fn new() -> Self {
509 Self::default()
510 }
511
512 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
514 Self {
515 prefix: prefix.into(),
516 ..self
517 }
518 }
519
520 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
522 Self {
523 start_after: start_after.into(),
524 ..self
525 }
526 }
527
528 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
530 Self {
531 limit: limit.into(),
532 ..self
533 }
534 }
535}
536
537impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
538 type Error = ConvertError;
539 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
540 let ListStreamsRequest {
541 prefix,
542 start_after,
543 limit,
544 } = value;
545 Ok(Self {
546 prefix,
547 start_after,
548 limit: limit.map(|n| n as u64),
549 })
550 }
551}
552
553#[sync_docs]
554#[derive(Debug, Clone)]
555pub struct StreamInfo {
556 pub name: String,
557 pub created_at: u32,
558 pub deleted_at: Option<u32>,
559}
560
561impl From<api::StreamInfo> for StreamInfo {
562 fn from(value: api::StreamInfo) -> Self {
563 Self {
564 name: value.name,
565 created_at: value.created_at,
566 deleted_at: value.deleted_at,
567 }
568 }
569}
570
571impl TryFrom<api::CreateStreamResponse> for StreamInfo {
572 type Error = ConvertError;
573
574 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
575 let api::CreateStreamResponse { info } = value;
576 let info = info.ok_or("missing stream info")?;
577 Ok(info.into())
578 }
579}
580
581#[sync_docs]
582#[derive(Debug, Clone)]
583pub struct ListStreamsResponse {
584 pub streams: Vec<StreamInfo>,
585 pub has_more: bool,
586}
587
588impl From<api::ListStreamsResponse> for ListStreamsResponse {
589 fn from(value: api::ListStreamsResponse) -> Self {
590 let api::ListStreamsResponse { streams, has_more } = value;
591 let streams = streams.into_iter().map(Into::into).collect();
592 Self { streams, has_more }
593 }
594}
595
596impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
597 type Error = ConvertError;
598
599 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
600 let api::GetBasinConfigResponse { config } = value;
601 let config = config.ok_or("missing basin config")?;
602 Ok(config.into())
603 }
604}
605
606impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
607 type Error = ConvertError;
608
609 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
610 let api::GetStreamConfigResponse { config } = value;
611 let config = config.ok_or("missing stream config")?;
612 Ok(config.into())
613 }
614}
615
616#[sync_docs]
617#[derive(Debug, Clone)]
618pub struct CreateStreamRequest {
619 pub stream: String,
620 pub config: Option<StreamConfig>,
621}
622
623impl CreateStreamRequest {
624 pub fn new(stream: impl Into<String>) -> Self {
626 Self {
627 stream: stream.into(),
628 config: None,
629 }
630 }
631
632 pub fn with_config(self, config: StreamConfig) -> Self {
634 Self {
635 config: Some(config),
636 ..self
637 }
638 }
639}
640
641impl From<CreateStreamRequest> for api::CreateStreamRequest {
642 fn from(value: CreateStreamRequest) -> Self {
643 let CreateStreamRequest { stream, config } = value;
644 Self {
645 stream,
646 config: config.map(Into::into),
647 }
648 }
649}
650
651#[sync_docs]
652#[derive(Debug, Clone, Default)]
653pub struct ListBasinsRequest {
654 pub prefix: String,
655 pub start_after: String,
656 pub limit: Option<usize>,
657}
658
659impl ListBasinsRequest {
660 pub fn new() -> Self {
662 Self::default()
663 }
664
665 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
667 Self {
668 prefix: prefix.into(),
669 ..self
670 }
671 }
672
673 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
675 Self {
676 start_after: start_after.into(),
677 ..self
678 }
679 }
680
681 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
683 Self {
684 limit: limit.into(),
685 ..self
686 }
687 }
688}
689
690impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
691 type Error = ConvertError;
692 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
693 let ListBasinsRequest {
694 prefix,
695 start_after,
696 limit,
697 } = value;
698 Ok(Self {
699 prefix,
700 start_after,
701 limit: limit
702 .map(TryInto::try_into)
703 .transpose()
704 .map_err(|_| "request limit does not fit into u64 bounds")?,
705 })
706 }
707}
708
709#[sync_docs]
710#[derive(Debug, Clone)]
711pub struct ListBasinsResponse {
712 pub basins: Vec<BasinInfo>,
713 pub has_more: bool,
714}
715
716impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
717 type Error = ConvertError;
718 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
719 let api::ListBasinsResponse { basins, has_more } = value;
720 Ok(Self {
721 basins: basins.into_iter().map(Into::into).collect(),
722 has_more,
723 })
724 }
725}
726
727#[sync_docs]
728#[derive(Debug, Clone)]
729pub struct DeleteBasinRequest {
730 pub basin: BasinName,
731 pub if_exists: bool,
733}
734
735impl DeleteBasinRequest {
736 pub fn new(basin: BasinName) -> Self {
738 Self {
739 basin,
740 if_exists: false,
741 }
742 }
743
744 pub fn with_if_exists(self, if_exists: bool) -> Self {
746 Self { if_exists, ..self }
747 }
748}
749
750impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
751 fn from(value: DeleteBasinRequest) -> Self {
752 let DeleteBasinRequest { basin, .. } = value;
753 Self { basin: basin.0 }
754 }
755}
756
757#[sync_docs]
758#[derive(Debug, Clone)]
759pub struct DeleteStreamRequest {
760 pub stream: String,
761 pub if_exists: bool,
763}
764
765impl DeleteStreamRequest {
766 pub fn new(stream: impl Into<String>) -> Self {
768 Self {
769 stream: stream.into(),
770 if_exists: false,
771 }
772 }
773
774 pub fn with_if_exists(self, if_exists: bool) -> Self {
776 Self { if_exists, ..self }
777 }
778}
779
780impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
781 fn from(value: DeleteStreamRequest) -> Self {
782 let DeleteStreamRequest { stream, .. } = value;
783 Self { stream }
784 }
785}
786
787#[sync_docs]
788#[derive(Debug, Clone)]
789pub struct ReconfigureBasinRequest {
790 pub basin: BasinName,
791 pub config: Option<BasinConfig>,
792 pub mask: Option<Vec<String>>,
793}
794
795impl ReconfigureBasinRequest {
796 pub fn new(basin: BasinName) -> Self {
798 Self {
799 basin,
800 config: None,
801 mask: None,
802 }
803 }
804
805 pub fn with_config(self, config: BasinConfig) -> Self {
807 Self {
808 config: Some(config),
809 ..self
810 }
811 }
812
813 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
815 Self {
816 mask: Some(mask.into()),
817 ..self
818 }
819 }
820}
821
822impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
823 fn from(value: ReconfigureBasinRequest) -> Self {
824 let ReconfigureBasinRequest {
825 basin,
826 config,
827 mask,
828 } = value;
829 Self {
830 basin: basin.0,
831 config: config.map(Into::into),
832 mask: mask.map(|paths| prost_types::FieldMask { paths }),
833 }
834 }
835}
836
837impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
838 type Error = ConvertError;
839 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
840 let api::ReconfigureBasinResponse { config } = value;
841 let config = config.ok_or("missing basin config")?;
842 Ok(config.into())
843 }
844}
845
846#[sync_docs]
847#[derive(Debug, Clone)]
848pub struct ReconfigureStreamRequest {
849 pub stream: String,
850 pub config: Option<StreamConfig>,
851 pub mask: Option<Vec<String>>,
852}
853
854impl ReconfigureStreamRequest {
855 pub fn new(stream: impl Into<String>) -> Self {
857 Self {
858 stream: stream.into(),
859 config: None,
860 mask: None,
861 }
862 }
863
864 pub fn with_config(self, config: StreamConfig) -> Self {
866 Self {
867 config: Some(config),
868 ..self
869 }
870 }
871
872 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
874 Self {
875 mask: Some(mask.into()),
876 ..self
877 }
878 }
879}
880
881impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
882 fn from(value: ReconfigureStreamRequest) -> Self {
883 let ReconfigureStreamRequest {
884 stream,
885 config,
886 mask,
887 } = value;
888 Self {
889 stream,
890 config: config.map(Into::into),
891 mask: mask.map(|paths| prost_types::FieldMask { paths }),
892 }
893 }
894}
895
896impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
897 type Error = ConvertError;
898 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
899 let api::ReconfigureStreamResponse { config } = value;
900 let config = config.ok_or("missing stream config")?;
901 Ok(config.into())
902 }
903}
904
905impl From<api::CheckTailResponse> for StreamPosition {
906 fn from(value: api::CheckTailResponse) -> Self {
907 let api::CheckTailResponse {
908 next_seq_num,
909 last_timestamp,
910 } = value;
911 StreamPosition {
912 seq_num: next_seq_num,
913 timestamp: last_timestamp,
914 }
915 }
916}
917
918#[derive(Debug, Clone, PartialEq, Eq)]
920pub struct StreamPosition {
921 pub seq_num: u64,
923 pub timestamp: u64,
926}
927
928#[sync_docs]
929#[derive(Debug, Clone, PartialEq, Eq)]
930pub struct Header {
931 pub name: Bytes,
932 pub value: Bytes,
933}
934
935impl Header {
936 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
938 Self {
939 name: name.into(),
940 value: value.into(),
941 }
942 }
943
944 pub fn from_value(value: impl Into<Bytes>) -> Self {
946 Self {
947 name: Bytes::new(),
948 value: value.into(),
949 }
950 }
951}
952
953impl From<Header> for api::Header {
954 fn from(value: Header) -> Self {
955 let Header { name, value } = value;
956 Self { name, value }
957 }
958}
959
960impl From<api::Header> for Header {
961 fn from(value: api::Header) -> Self {
962 let api::Header { name, value } = value;
963 Self { name, value }
964 }
965}
966
967#[derive(Debug, Clone, Default, PartialEq, Eq)]
971pub struct FencingToken(String);
972
973impl FencingToken {
974 const MAX_BYTES: usize = 36;
975
976 pub fn generate(n: usize) -> Result<Self, ConvertError> {
978 rand::rng()
979 .sample_iter(&rand::distr::Alphanumeric)
980 .take(n)
981 .map(char::from)
982 .collect::<String>()
983 .parse()
984 }
985}
986
987impl Deref for FencingToken {
988 type Target = str;
989
990 fn deref(&self) -> &Self::Target {
991 &self.0
992 }
993}
994
995impl std::fmt::Display for FencingToken {
996 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
997 write!(f, "{}", self.0)
998 }
999}
1000
1001impl FromStr for FencingToken {
1002 type Err = ConvertError;
1003
1004 fn from_str(value: &str) -> Result<Self, Self::Err> {
1005 value.to_string().try_into()
1006 }
1007}
1008
1009impl TryFrom<String> for FencingToken {
1010 type Error = ConvertError;
1011
1012 fn try_from(value: String) -> Result<Self, Self::Error> {
1013 if value.len() > Self::MAX_BYTES {
1014 Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1015 } else {
1016 Ok(Self(value))
1017 }
1018 }
1019}
1020
1021impl From<FencingToken> for String {
1022 fn from(value: FencingToken) -> Self {
1023 value.0
1024 }
1025}
1026
1027#[derive(Debug, Clone)]
1029pub enum Command {
1030 Fence {
1035 fencing_token: FencingToken,
1039 },
1040 Trim {
1045 seq_num: u64,
1050 },
1051}
1052
1053#[derive(Debug, Clone)]
1059pub struct CommandRecord {
1060 pub command: Command,
1062 pub timestamp: Option<u64>,
1064}
1065
1066impl CommandRecord {
1067 const FENCE: &[u8] = b"fence";
1068 const TRIM: &[u8] = b"trim";
1069
1070 pub fn fence(fencing_token: FencingToken) -> Self {
1072 Self {
1073 command: Command::Fence { fencing_token },
1074 timestamp: None,
1075 }
1076 }
1077
1078 pub fn trim(seq_num: impl Into<u64>) -> Self {
1080 Self {
1081 command: Command::Trim {
1082 seq_num: seq_num.into(),
1083 },
1084 timestamp: None,
1085 }
1086 }
1087
1088 pub fn with_timestamp(self, timestamp: u64) -> Self {
1090 Self {
1091 timestamp: Some(timestamp),
1092 ..self
1093 }
1094 }
1095}
1096
1097#[sync_docs]
1098#[derive(Debug, Clone, PartialEq, Eq)]
1099pub struct AppendRecord {
1100 timestamp: Option<u64>,
1101 headers: Vec<Header>,
1102 body: Bytes,
1103 #[cfg(test)]
1104 max_bytes: u64,
1105}
1106
1107metered_impl!(AppendRecord);
1108
1109impl AppendRecord {
1110 const MAX_BYTES: u64 = MIB_BYTES;
1111
1112 fn validated(self) -> Result<Self, ConvertError> {
1113 #[cfg(test)]
1114 let max_bytes = self.max_bytes;
1115 #[cfg(not(test))]
1116 let max_bytes = Self::MAX_BYTES;
1117
1118 if self.metered_bytes() > max_bytes {
1119 Err("AppendRecord should have metered size less than 1 MiB".into())
1120 } else {
1121 Ok(self)
1122 }
1123 }
1124
1125 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1127 Self {
1128 timestamp: None,
1129 headers: Vec::new(),
1130 body: body.into(),
1131 #[cfg(test)]
1132 max_bytes: Self::MAX_BYTES,
1133 }
1134 .validated()
1135 }
1136
1137 #[cfg(test)]
1138 pub(crate) fn with_max_bytes(
1139 max_bytes: u64,
1140 body: impl Into<Bytes>,
1141 ) -> Result<Self, ConvertError> {
1142 Self {
1143 timestamp: None,
1144 headers: Vec::new(),
1145 body: body.into(),
1146 max_bytes,
1147 }
1148 .validated()
1149 }
1150
1151 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1153 Self {
1154 headers: headers.into(),
1155 ..self
1156 }
1157 .validated()
1158 }
1159
1160 pub fn with_timestamp(self, timestamp: u64) -> Self {
1162 Self {
1163 timestamp: Some(timestamp),
1164 ..self
1165 }
1166 }
1167
1168 pub fn body(&self) -> &[u8] {
1170 &self.body
1171 }
1172
1173 pub fn headers(&self) -> &[Header] {
1175 &self.headers
1176 }
1177
1178 pub fn timestamp(&self) -> Option<u64> {
1180 self.timestamp
1181 }
1182
1183 pub fn into_parts(self) -> AppendRecordParts {
1185 AppendRecordParts {
1186 timestamp: self.timestamp,
1187 headers: self.headers,
1188 body: self.body,
1189 }
1190 }
1191
1192 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1194 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1195 if let Some(timestamp) = parts.timestamp {
1196 Ok(record.with_timestamp(timestamp))
1197 } else {
1198 Ok(record)
1199 }
1200 }
1201}
1202
1203impl From<AppendRecord> for api::AppendRecord {
1204 fn from(value: AppendRecord) -> Self {
1205 Self {
1206 timestamp: value.timestamp,
1207 headers: value.headers.into_iter().map(Into::into).collect(),
1208 body: value.body,
1209 }
1210 }
1211}
1212
1213impl From<CommandRecord> for AppendRecord {
1214 fn from(value: CommandRecord) -> Self {
1215 let (header_value, body) = match value.command {
1216 Command::Fence { fencing_token } => (
1217 CommandRecord::FENCE,
1218 Bytes::copy_from_slice(fencing_token.as_bytes()),
1219 ),
1220 Command::Trim { seq_num } => (
1221 CommandRecord::TRIM,
1222 Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1223 ),
1224 };
1225 AppendRecordParts {
1226 timestamp: value.timestamp,
1227 headers: vec![Header::from_value(header_value)],
1228 body,
1229 }
1230 .try_into()
1231 .expect("command record is a valid append record")
1232 }
1233}
1234
1235#[sync_docs(AppendRecordParts = "AppendRecord")]
1236#[derive(Debug, Clone)]
1237pub struct AppendRecordParts {
1238 pub timestamp: Option<u64>,
1239 pub headers: Vec<Header>,
1240 pub body: Bytes,
1241}
1242
1243impl From<AppendRecord> for AppendRecordParts {
1244 fn from(value: AppendRecord) -> Self {
1245 value.into_parts()
1246 }
1247}
1248
1249impl TryFrom<AppendRecordParts> for AppendRecord {
1250 type Error = ConvertError;
1251
1252 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1253 Self::try_from_parts(value)
1254 }
1255}
1256
1257#[derive(Debug, Clone)]
1259pub struct AppendRecordBatch {
1260 records: Vec<AppendRecord>,
1261 metered_bytes: u64,
1262 max_capacity: usize,
1263 #[cfg(test)]
1264 max_bytes: u64,
1265}
1266
1267impl PartialEq for AppendRecordBatch {
1268 fn eq(&self, other: &Self) -> bool {
1269 if self.records.eq(&other.records) {
1270 assert_eq!(self.metered_bytes, other.metered_bytes);
1271 true
1272 } else {
1273 false
1274 }
1275 }
1276}
1277
1278impl Eq for AppendRecordBatch {}
1279
1280impl Default for AppendRecordBatch {
1281 fn default() -> Self {
1282 Self::new()
1283 }
1284}
1285
1286impl AppendRecordBatch {
1287 pub const MAX_CAPACITY: usize = 1000;
1291
1292 pub const MAX_BYTES: u64 = MIB_BYTES;
1294
1295 pub fn new() -> Self {
1297 Self::with_max_capacity(Self::MAX_CAPACITY)
1298 }
1299
1300 pub fn with_max_capacity(max_capacity: usize) -> Self {
1304 assert!(
1305 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1306 "Batch capacity must be between 1 and 1000"
1307 );
1308
1309 Self {
1310 records: Vec::with_capacity(max_capacity),
1311 metered_bytes: 0,
1312 max_capacity,
1313 #[cfg(test)]
1314 max_bytes: Self::MAX_BYTES,
1315 }
1316 }
1317
1318 #[cfg(test)]
1319 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1320 #[cfg(test)]
1321 assert!(
1322 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1323 "Batch size must be between 1 byte and 1 MiB"
1324 );
1325
1326 Self {
1327 max_bytes,
1328 ..Self::with_max_capacity(max_capacity)
1329 }
1330 }
1331
1332 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1338 where
1339 R: Into<AppendRecord>,
1340 T: IntoIterator<Item = R>,
1341 {
1342 let mut records = Self::new();
1343 let mut pending = Vec::new();
1344
1345 let mut iter = iter.into_iter();
1346
1347 for record in iter.by_ref() {
1348 if let Err(record) = records.push(record) {
1349 pending.push(record);
1350 break;
1351 }
1352 }
1353
1354 if pending.is_empty() {
1355 Ok(records)
1356 } else {
1357 pending.extend(iter.map(Into::into));
1358 Err((records, pending))
1359 }
1360 }
1361
1362 pub fn is_empty(&self) -> bool {
1364 if self.records.is_empty() {
1365 assert_eq!(self.metered_bytes, 0);
1366 true
1367 } else {
1368 false
1369 }
1370 }
1371
1372 pub fn len(&self) -> usize {
1374 self.records.len()
1375 }
1376
1377 #[cfg(test)]
1378 fn max_bytes(&self) -> u64 {
1379 self.max_bytes
1380 }
1381
1382 #[cfg(not(test))]
1383 fn max_bytes(&self) -> u64 {
1384 Self::MAX_BYTES
1385 }
1386
1387 pub fn is_full(&self) -> bool {
1389 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1390 }
1391
1392 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1394 assert!(self.records.len() <= self.max_capacity);
1395 assert!(self.metered_bytes <= self.max_bytes());
1396
1397 let record = record.into();
1398 let record_size = record.metered_bytes();
1399 if self.records.len() >= self.max_capacity
1400 || self.metered_bytes + record_size > self.max_bytes()
1401 {
1402 Err(record)
1403 } else {
1404 self.records.push(record);
1405 self.metered_bytes += record_size;
1406 Ok(())
1407 }
1408 }
1409}
1410
1411impl MeteredBytes for AppendRecordBatch {
1412 fn metered_bytes(&self) -> u64 {
1413 self.metered_bytes
1414 }
1415}
1416
1417impl IntoIterator for AppendRecordBatch {
1418 type Item = AppendRecord;
1419 type IntoIter = std::vec::IntoIter<Self::Item>;
1420
1421 fn into_iter(self) -> Self::IntoIter {
1422 self.records.into_iter()
1423 }
1424}
1425
1426impl<'a> IntoIterator for &'a AppendRecordBatch {
1427 type Item = &'a AppendRecord;
1428 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1429
1430 fn into_iter(self) -> Self::IntoIter {
1431 self.records.iter()
1432 }
1433}
1434
1435impl AsRef<[AppendRecord]> for AppendRecordBatch {
1436 fn as_ref(&self) -> &[AppendRecord] {
1437 &self.records
1438 }
1439}
1440
1441#[sync_docs]
1442#[derive(Debug, Default, Clone)]
1443pub struct AppendInput {
1444 pub records: AppendRecordBatch,
1445 pub match_seq_num: Option<u64>,
1446 pub fencing_token: Option<FencingToken>,
1447}
1448
1449impl MeteredBytes for AppendInput {
1450 fn metered_bytes(&self) -> u64 {
1451 self.records.metered_bytes()
1452 }
1453}
1454
1455impl AppendInput {
1456 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1458 Self {
1459 records: records.into(),
1460 match_seq_num: None,
1461 fencing_token: None,
1462 }
1463 }
1464
1465 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1467 Self {
1468 match_seq_num: Some(match_seq_num.into()),
1469 ..self
1470 }
1471 }
1472
1473 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1475 Self {
1476 fencing_token: Some(fencing_token),
1477 ..self
1478 }
1479 }
1480
1481 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1482 let Self {
1483 records,
1484 match_seq_num,
1485 fencing_token,
1486 } = self;
1487
1488 api::AppendInput {
1489 stream: stream.into(),
1490 records: records.into_iter().map(Into::into).collect(),
1491 match_seq_num,
1492 fencing_token: fencing_token.map(|f| f.0),
1493 }
1494 }
1495}
1496
1497#[derive(Debug, Clone)]
1499pub struct AppendAck {
1500 pub start: StreamPosition,
1502 pub end: StreamPosition,
1507 pub tail: StreamPosition,
1511}
1512
1513impl From<api::AppendOutput> for AppendAck {
1514 fn from(value: api::AppendOutput) -> Self {
1515 let api::AppendOutput {
1516 start_seq_num,
1517 start_timestamp,
1518 end_seq_num,
1519 end_timestamp,
1520 next_seq_num,
1521 last_timestamp,
1522 } = value;
1523 let start = StreamPosition {
1524 seq_num: start_seq_num,
1525 timestamp: start_timestamp,
1526 };
1527 let end = StreamPosition {
1528 seq_num: end_seq_num,
1529 timestamp: end_timestamp,
1530 };
1531 let tail = StreamPosition {
1532 seq_num: next_seq_num,
1533 timestamp: last_timestamp,
1534 };
1535 Self { start, end, tail }
1536 }
1537}
1538
1539impl TryFrom<api::AppendResponse> for AppendAck {
1540 type Error = ConvertError;
1541 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1542 let api::AppendResponse { output } = value;
1543 let output = output.ok_or("missing append output")?;
1544 Ok(output.into())
1545 }
1546}
1547
1548impl TryFrom<api::AppendSessionResponse> for AppendAck {
1549 type Error = ConvertError;
1550 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1551 let api::AppendSessionResponse { output } = value;
1552 let output = output.ok_or("missing append output")?;
1553 Ok(output.into())
1554 }
1555}
1556
1557#[sync_docs]
1558#[derive(Debug, Clone, Default)]
1559pub struct ReadLimit {
1560 pub count: Option<u64>,
1561 pub bytes: Option<u64>,
1562}
1563
1564impl ReadLimit {
1565 pub fn new() -> Self {
1567 Self::default()
1568 }
1569
1570 pub fn with_count(self, count: u64) -> Self {
1572 Self {
1573 count: Some(count),
1574 ..self
1575 }
1576 }
1577
1578 pub fn with_bytes(self, bytes: u64) -> Self {
1580 Self {
1581 bytes: Some(bytes),
1582 ..self
1583 }
1584 }
1585}
1586
1587#[derive(Debug, Clone)]
1589pub enum ReadStart {
1590 SeqNum(u64),
1592 Timestamp(u64),
1594 TailOffset(u64),
1596}
1597
1598impl Default for ReadStart {
1599 fn default() -> Self {
1600 Self::SeqNum(0)
1601 }
1602}
1603
1604impl From<ReadStart> for api::read_request::Start {
1605 fn from(start: ReadStart) -> Self {
1606 match start {
1607 ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1608 ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1609 ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1610 }
1611 }
1612}
1613
1614impl From<ReadStart> for api::read_session_request::Start {
1615 fn from(start: ReadStart) -> Self {
1616 match start {
1617 ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1618 ReadStart::Timestamp(timestamp) => {
1619 api::read_session_request::Start::Timestamp(timestamp)
1620 }
1621 ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1622 }
1623 }
1624}
1625
1626#[sync_docs]
1627#[derive(Debug, Clone, Default)]
1628pub struct ReadRequest {
1629 pub start: ReadStart,
1630 pub limit: ReadLimit,
1631}
1632
1633impl ReadRequest {
1634 pub fn new(start: ReadStart) -> Self {
1636 Self {
1637 start,
1638 ..Default::default()
1639 }
1640 }
1641
1642 pub fn with_limit(self, limit: ReadLimit) -> Self {
1644 Self { limit, ..self }
1645 }
1646}
1647
1648impl ReadRequest {
1649 pub(crate) fn try_into_api_type(
1650 self,
1651 stream: impl Into<String>,
1652 ) -> Result<api::ReadRequest, ConvertError> {
1653 let Self { start, limit } = self;
1654
1655 let limit = if limit.count > Some(1000) {
1656 Err("read limit: count must not exceed 1000 for unary request")
1657 } else if limit.bytes > Some(MIB_BYTES) {
1658 Err("read limit: bytes must not exceed 1MiB for unary request")
1659 } else {
1660 Ok(api::ReadLimit {
1661 count: limit.count,
1662 bytes: limit.bytes,
1663 })
1664 }?;
1665
1666 Ok(api::ReadRequest {
1667 stream: stream.into(),
1668 start: Some(start.into()),
1669 limit: Some(limit),
1670 })
1671 }
1672}
1673
1674#[sync_docs]
1675#[derive(Debug, Clone)]
1676pub struct SequencedRecord {
1677 pub seq_num: u64,
1678 pub timestamp: u64,
1679 pub headers: Vec<Header>,
1680 pub body: Bytes,
1681}
1682
1683metered_impl!(SequencedRecord);
1684
1685impl From<api::SequencedRecord> for SequencedRecord {
1686 fn from(value: api::SequencedRecord) -> Self {
1687 let api::SequencedRecord {
1688 seq_num,
1689 timestamp,
1690 headers,
1691 body,
1692 } = value;
1693 Self {
1694 seq_num,
1695 timestamp,
1696 headers: headers.into_iter().map(Into::into).collect(),
1697 body,
1698 }
1699 }
1700}
1701
1702impl SequencedRecord {
1703 pub fn as_command_record(&self) -> Option<CommandRecord> {
1705 if self.headers.len() != 1 {
1706 return None;
1707 }
1708
1709 let header = self.headers.first().expect("pre-validated length");
1710
1711 if !header.name.is_empty() {
1712 return None;
1713 }
1714
1715 match header.value.as_ref() {
1716 CommandRecord::FENCE => {
1717 let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1718 Some(CommandRecord {
1719 command: Command::Fence { fencing_token },
1720 timestamp: Some(self.timestamp),
1721 })
1722 }
1723 CommandRecord::TRIM => {
1724 let body: &[u8] = &self.body;
1725 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1726 Some(CommandRecord {
1727 command: Command::Trim { seq_num },
1728 timestamp: Some(self.timestamp),
1729 })
1730 }
1731 _ => None,
1732 }
1733 }
1734}
1735
1736#[sync_docs]
1737#[derive(Debug, Clone)]
1738pub struct SequencedRecordBatch {
1739 pub records: Vec<SequencedRecord>,
1740}
1741
1742impl MeteredBytes for SequencedRecordBatch {
1743 fn metered_bytes(&self) -> u64 {
1744 self.records.metered_bytes()
1745 }
1746}
1747
1748impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1749 fn from(value: api::SequencedRecordBatch) -> Self {
1750 let api::SequencedRecordBatch { records } = value;
1751 Self {
1752 records: records.into_iter().map(Into::into).collect(),
1753 }
1754 }
1755}
1756
1757#[sync_docs(ReadOutput = "Output")]
1758#[derive(Debug, Clone)]
1759pub enum ReadOutput {
1760 Batch(SequencedRecordBatch),
1761 NextSeqNum(u64),
1762}
1763
1764impl From<api::read_output::Output> for ReadOutput {
1765 fn from(value: api::read_output::Output) -> Self {
1766 match value {
1767 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1768 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1769 }
1770 }
1771}
1772
1773impl TryFrom<api::ReadOutput> for ReadOutput {
1774 type Error = ConvertError;
1775 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1776 let api::ReadOutput { output } = value;
1777 let output = output.ok_or("missing read output")?;
1778 Ok(output.into())
1779 }
1780}
1781
1782impl TryFrom<api::ReadResponse> for ReadOutput {
1783 type Error = ConvertError;
1784 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1785 let api::ReadResponse { output } = value;
1786 let output = output.ok_or("missing output in read response")?;
1787 output.try_into()
1788 }
1789}
1790
1791#[sync_docs]
1792#[derive(Debug, Clone, Default)]
1793pub struct ReadSessionRequest {
1794 pub start: ReadStart,
1795 pub limit: ReadLimit,
1796}
1797
1798impl ReadSessionRequest {
1799 pub fn new(start: ReadStart) -> Self {
1801 Self {
1802 start,
1803 ..Default::default()
1804 }
1805 }
1806
1807 pub fn with_limit(self, limit: ReadLimit) -> Self {
1809 Self { limit, ..self }
1810 }
1811
1812 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1813 let Self { start, limit } = self;
1814 api::ReadSessionRequest {
1815 stream: stream.into(),
1816 start: Some(start.into()),
1817 limit: Some(api::ReadLimit {
1818 count: limit.count,
1819 bytes: limit.bytes,
1820 }),
1821 heartbeats: false,
1822 }
1823 }
1824}
1825
1826impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1827 type Error = ConvertError;
1828 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1829 let api::ReadSessionResponse { output } = value;
1830 let output = output.ok_or("missing output in read session response")?;
1831 output.try_into()
1832 }
1833}
1834
1835#[derive(Debug, Clone)]
1840pub struct BasinName(String);
1841
1842impl Deref for BasinName {
1843 type Target = str;
1844 fn deref(&self) -> &Self::Target {
1845 &self.0
1846 }
1847}
1848
1849impl TryFrom<String> for BasinName {
1850 type Error = ConvertError;
1851
1852 fn try_from(name: String) -> Result<Self, Self::Error> {
1853 if name.len() < 8 || name.len() > 48 {
1854 return Err("Basin name must be between 8 and 48 characters in length".into());
1855 }
1856
1857 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1858 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1859 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1860 .expect("Failed to compile basin name regex")
1861 });
1862
1863 if !regex.is_match(&name) {
1864 return Err(
1865 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1866 It cannot begin or end with a hyphen."
1867 .into(),
1868 );
1869 }
1870
1871 Ok(Self(name))
1872 }
1873}
1874
1875impl FromStr for BasinName {
1876 type Err = ConvertError;
1877
1878 fn from_str(s: &str) -> Result<Self, Self::Err> {
1879 s.to_string().try_into()
1880 }
1881}
1882
1883impl std::fmt::Display for BasinName {
1884 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1885 f.write_str(&self.0)
1886 }
1887}
1888
1889impl From<BasinName> for String {
1890 fn from(value: BasinName) -> Self {
1891 value.0
1892 }
1893}
1894
1895#[derive(Debug, Clone)]
1898pub struct AccessTokenId(String);
1899
1900impl Deref for AccessTokenId {
1901 type Target = str;
1902
1903 fn deref(&self) -> &Self::Target {
1904 &self.0
1905 }
1906}
1907
1908impl TryFrom<String> for AccessTokenId {
1909 type Error = ConvertError;
1910
1911 fn try_from(name: String) -> Result<Self, Self::Error> {
1912 if name.is_empty() {
1913 return Err("Access token ID must not be empty".into());
1914 }
1915
1916 if name.len() > 96 {
1917 return Err("Access token ID must not exceed 96 characters".into());
1918 }
1919
1920 Ok(Self(name))
1921 }
1922}
1923
1924impl From<AccessTokenId> for String {
1925 fn from(value: AccessTokenId) -> Self {
1926 value.0
1927 }
1928}
1929
1930impl FromStr for AccessTokenId {
1931 type Err = ConvertError;
1932
1933 fn from_str(s: &str) -> Result<Self, Self::Err> {
1934 s.to_string().try_into()
1935 }
1936}
1937
1938impl std::fmt::Display for AccessTokenId {
1939 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1940 f.write_str(&self.0)
1941 }
1942}
1943
1944impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1945 fn from(value: AccessTokenInfo) -> Self {
1946 Self {
1947 info: Some(value.into()),
1948 }
1949 }
1950}
1951
1952#[sync_docs]
1953#[derive(Debug, Clone)]
1954pub struct AccessTokenInfo {
1955 pub id: AccessTokenId,
1956 pub expires_at: Option<u32>,
1957 pub auto_prefix_streams: bool,
1958 pub scope: Option<AccessTokenScope>,
1959}
1960
1961impl AccessTokenInfo {
1962 pub fn new(id: AccessTokenId) -> Self {
1964 Self {
1965 id,
1966 expires_at: None,
1967 auto_prefix_streams: false,
1968 scope: None,
1969 }
1970 }
1971
1972 pub fn with_expires_at(self, expires_at: u32) -> Self {
1974 Self {
1975 expires_at: Some(expires_at),
1976 ..self
1977 }
1978 }
1979
1980 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1982 Self {
1983 auto_prefix_streams,
1984 ..self
1985 }
1986 }
1987
1988 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1990 Self {
1991 scope: Some(scope),
1992 ..self
1993 }
1994 }
1995}
1996
1997impl From<AccessTokenInfo> for api::AccessTokenInfo {
1998 fn from(value: AccessTokenInfo) -> Self {
1999 let AccessTokenInfo {
2000 id,
2001 expires_at,
2002 auto_prefix_streams,
2003 scope,
2004 } = value;
2005 Self {
2006 id: id.into(),
2007 expires_at,
2008 auto_prefix_streams,
2009 scope: scope.map(Into::into),
2010 }
2011 }
2012}
2013
2014impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2015 type Error = ConvertError;
2016
2017 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2018 let api::AccessTokenInfo {
2019 id,
2020 expires_at,
2021 auto_prefix_streams,
2022 scope,
2023 } = value;
2024 Ok(Self {
2025 id: id.try_into()?,
2026 expires_at,
2027 auto_prefix_streams,
2028 scope: scope.map(Into::into),
2029 })
2030 }
2031}
2032
2033#[sync_docs]
2034#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2035pub enum Operation {
2036 ListBasins,
2037 CreateBasin,
2038 DeleteBasin,
2039 ReconfigureBasin,
2040 GetBasinConfig,
2041 IssueAccessToken,
2042 RevokeAccessToken,
2043 ListAccessTokens,
2044 ListStreams,
2045 CreateStream,
2046 DeleteStream,
2047 GetStreamConfig,
2048 ReconfigureStream,
2049 CheckTail,
2050 Append,
2051 Read,
2052 Trim,
2053 Fence,
2054}
2055
2056impl FromStr for Operation {
2057 type Err = ConvertError;
2058
2059 fn from_str(s: &str) -> Result<Self, Self::Err> {
2060 match s.to_lowercase().as_str() {
2061 "list-basins" => Ok(Self::ListBasins),
2062 "create-basin" => Ok(Self::CreateBasin),
2063 "delete-basin" => Ok(Self::DeleteBasin),
2064 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2065 "get-basin-config" => Ok(Self::GetBasinConfig),
2066 "issue-access-token" => Ok(Self::IssueAccessToken),
2067 "revoke-access-token" => Ok(Self::RevokeAccessToken),
2068 "list-access-tokens" => Ok(Self::ListAccessTokens),
2069 "list-streams" => Ok(Self::ListStreams),
2070 "create-stream" => Ok(Self::CreateStream),
2071 "delete-stream" => Ok(Self::DeleteStream),
2072 "get-stream-config" => Ok(Self::GetStreamConfig),
2073 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2074 "check-tail" => Ok(Self::CheckTail),
2075 "append" => Ok(Self::Append),
2076 "read" => Ok(Self::Read),
2077 "trim" => Ok(Self::Trim),
2078 "fence" => Ok(Self::Fence),
2079 _ => Err("invalid operation".into()),
2080 }
2081 }
2082}
2083
2084impl From<Operation> for api::Operation {
2085 fn from(value: Operation) -> Self {
2086 match value {
2087 Operation::ListBasins => Self::ListBasins,
2088 Operation::CreateBasin => Self::CreateBasin,
2089 Operation::DeleteBasin => Self::DeleteBasin,
2090 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2091 Operation::GetBasinConfig => Self::GetBasinConfig,
2092 Operation::IssueAccessToken => Self::IssueAccessToken,
2093 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2094 Operation::ListAccessTokens => Self::ListAccessTokens,
2095 Operation::ListStreams => Self::ListStreams,
2096 Operation::CreateStream => Self::CreateStream,
2097 Operation::DeleteStream => Self::DeleteStream,
2098 Operation::GetStreamConfig => Self::GetStreamConfig,
2099 Operation::ReconfigureStream => Self::ReconfigureStream,
2100 Operation::CheckTail => Self::CheckTail,
2101 Operation::Append => Self::Append,
2102 Operation::Read => Self::Read,
2103 Operation::Trim => Self::Trim,
2104 Operation::Fence => Self::Fence,
2105 }
2106 }
2107}
2108
2109impl From<api::Operation> for Option<Operation> {
2110 fn from(value: api::Operation) -> Self {
2111 match value {
2112 api::Operation::Unspecified => None,
2113 api::Operation::ListBasins => Some(Operation::ListBasins),
2114 api::Operation::CreateBasin => Some(Operation::CreateBasin),
2115 api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2116 api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2117 api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2118 api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2119 api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2120 api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2121 api::Operation::ListStreams => Some(Operation::ListStreams),
2122 api::Operation::CreateStream => Some(Operation::CreateStream),
2123 api::Operation::DeleteStream => Some(Operation::DeleteStream),
2124 api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2125 api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2126 api::Operation::CheckTail => Some(Operation::CheckTail),
2127 api::Operation::Append => Some(Operation::Append),
2128 api::Operation::Read => Some(Operation::Read),
2129 api::Operation::Trim => Some(Operation::Trim),
2130 api::Operation::Fence => Some(Operation::Fence),
2131 }
2132 }
2133}
2134
2135#[sync_docs]
2136#[derive(Debug, Clone, Default)]
2137pub struct AccessTokenScope {
2138 pub basins: Option<ResourceSet>,
2139 pub streams: Option<ResourceSet>,
2140 pub access_tokens: Option<ResourceSet>,
2141 pub op_groups: Option<PermittedOperationGroups>,
2142 pub ops: HashSet<Operation>,
2143}
2144
2145impl AccessTokenScope {
2146 pub fn new() -> Self {
2148 Self::default()
2149 }
2150
2151 pub fn with_basins(self, basins: ResourceSet) -> Self {
2153 Self {
2154 basins: Some(basins),
2155 ..self
2156 }
2157 }
2158
2159 pub fn with_streams(self, streams: ResourceSet) -> Self {
2161 Self {
2162 streams: Some(streams),
2163 ..self
2164 }
2165 }
2166
2167 pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2169 Self {
2170 access_tokens: Some(access_tokens),
2171 ..self
2172 }
2173 }
2174
2175 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2177 Self {
2178 op_groups: Some(op_groups),
2179 ..self
2180 }
2181 }
2182
2183 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2185 Self {
2186 ops: ops.into_iter().collect(),
2187 ..self
2188 }
2189 }
2190
2191 pub fn with_op(self, op: Operation) -> Self {
2193 let mut ops = self.ops;
2194 ops.insert(op);
2195 Self { ops, ..self }
2196 }
2197}
2198
2199impl From<AccessTokenScope> for api::AccessTokenScope {
2200 fn from(value: AccessTokenScope) -> Self {
2201 let AccessTokenScope {
2202 basins,
2203 streams,
2204 access_tokens,
2205 op_groups,
2206 ops,
2207 } = value;
2208 Self {
2209 basins: basins.map(Into::into),
2210 streams: streams.map(Into::into),
2211 access_tokens: access_tokens.map(Into::into),
2212 op_groups: op_groups.map(Into::into),
2213 ops: ops
2214 .into_iter()
2215 .map(api::Operation::from)
2216 .map(Into::into)
2217 .collect(),
2218 }
2219 }
2220}
2221
2222impl From<api::AccessTokenScope> for AccessTokenScope {
2223 fn from(value: api::AccessTokenScope) -> Self {
2224 let api::AccessTokenScope {
2225 basins,
2226 streams,
2227 access_tokens,
2228 op_groups,
2229 ops,
2230 } = value;
2231 Self {
2232 basins: basins.and_then(|set| set.matching.map(Into::into)),
2233 streams: streams.and_then(|set| set.matching.map(Into::into)),
2234 access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2235 op_groups: op_groups.map(Into::into),
2236 ops: ops
2237 .into_iter()
2238 .map(api::Operation::try_from)
2239 .flat_map(Result::ok)
2240 .flat_map(<Option<Operation>>::from)
2241 .collect(),
2242 }
2243 }
2244}
2245
2246impl From<ResourceSet> for api::ResourceSet {
2247 fn from(value: ResourceSet) -> Self {
2248 Self {
2249 matching: Some(value.into()),
2250 }
2251 }
2252}
2253
2254#[sync_docs(ResourceSet = "Matching")]
2255#[derive(Debug, Clone)]
2256pub enum ResourceSet {
2257 Exact(String),
2258 Prefix(String),
2259}
2260
2261impl From<ResourceSet> for api::resource_set::Matching {
2262 fn from(value: ResourceSet) -> Self {
2263 match value {
2264 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2265 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2266 }
2267 }
2268}
2269
2270impl From<api::resource_set::Matching> for ResourceSet {
2271 fn from(value: api::resource_set::Matching) -> Self {
2272 match value {
2273 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2274 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2275 }
2276 }
2277}
2278
2279#[sync_docs]
2280#[derive(Debug, Clone, Default)]
2281pub struct PermittedOperationGroups {
2282 pub account: Option<ReadWritePermissions>,
2283 pub basin: Option<ReadWritePermissions>,
2284 pub stream: Option<ReadWritePermissions>,
2285}
2286
2287impl PermittedOperationGroups {
2288 pub fn new() -> Self {
2290 Self::default()
2291 }
2292
2293 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2295 Self {
2296 account: Some(account),
2297 ..self
2298 }
2299 }
2300
2301 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2303 Self {
2304 basin: Some(basin),
2305 ..self
2306 }
2307 }
2308
2309 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2311 Self {
2312 stream: Some(stream),
2313 ..self
2314 }
2315 }
2316}
2317
2318impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2319 fn from(value: PermittedOperationGroups) -> Self {
2320 let PermittedOperationGroups {
2321 account,
2322 basin,
2323 stream,
2324 } = value;
2325 Self {
2326 account: account.map(Into::into),
2327 basin: basin.map(Into::into),
2328 stream: stream.map(Into::into),
2329 }
2330 }
2331}
2332
2333impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2334 fn from(value: api::PermittedOperationGroups) -> Self {
2335 let api::PermittedOperationGroups {
2336 account,
2337 basin,
2338 stream,
2339 } = value;
2340 Self {
2341 account: account.map(Into::into),
2342 basin: basin.map(Into::into),
2343 stream: stream.map(Into::into),
2344 }
2345 }
2346}
2347
2348#[sync_docs]
2349#[derive(Debug, Clone, Default)]
2350pub struct ReadWritePermissions {
2351 pub read: bool,
2352 pub write: bool,
2353}
2354
2355impl ReadWritePermissions {
2356 pub fn new() -> Self {
2358 Self::default()
2359 }
2360
2361 pub fn with_read(self, read: bool) -> Self {
2363 Self { read, ..self }
2364 }
2365
2366 pub fn with_write(self, write: bool) -> Self {
2368 Self { write, ..self }
2369 }
2370}
2371
2372impl From<ReadWritePermissions> for api::ReadWritePermissions {
2373 fn from(value: ReadWritePermissions) -> Self {
2374 let ReadWritePermissions { read, write } = value;
2375 Self { read, write }
2376 }
2377}
2378
2379impl From<api::ReadWritePermissions> for ReadWritePermissions {
2380 fn from(value: api::ReadWritePermissions) -> Self {
2381 let api::ReadWritePermissions { read, write } = value;
2382 Self { read, write }
2383 }
2384}
2385
2386impl From<api::IssueAccessTokenResponse> for String {
2387 fn from(value: api::IssueAccessTokenResponse) -> Self {
2388 value.access_token
2389 }
2390}
2391
2392impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2393 fn from(value: AccessTokenId) -> Self {
2394 Self { id: value.into() }
2395 }
2396}
2397
2398impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2399 type Error = ConvertError;
2400 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2401 let token_info = value.info.ok_or("access token info is missing")?;
2402 token_info.try_into()
2403 }
2404}
2405
2406#[sync_docs]
2407#[derive(Debug, Clone, Default)]
2408pub struct ListAccessTokensRequest {
2409 pub prefix: String,
2410 pub start_after: String,
2411 pub limit: Option<usize>,
2412}
2413
2414impl ListAccessTokensRequest {
2415 pub fn new() -> Self {
2417 Self::default()
2418 }
2419
2420 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2422 Self {
2423 prefix: prefix.into(),
2424 ..self
2425 }
2426 }
2427
2428 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2430 Self {
2431 start_after: start_after.into(),
2432 ..self
2433 }
2434 }
2435
2436 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2438 Self {
2439 limit: limit.into(),
2440 ..self
2441 }
2442 }
2443}
2444
2445impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2446 type Error = ConvertError;
2447 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2448 let ListAccessTokensRequest {
2449 prefix,
2450 start_after,
2451 limit,
2452 } = value;
2453 Ok(Self {
2454 prefix,
2455 start_after,
2456 limit: limit
2457 .map(TryInto::try_into)
2458 .transpose()
2459 .map_err(|_| "request limit does not fit into u64 bounds")?,
2460 })
2461 }
2462}
2463
2464#[sync_docs]
2465#[derive(Debug, Clone)]
2466pub struct ListAccessTokensResponse {
2467 pub access_tokens: Vec<AccessTokenInfo>,
2468 pub has_more: bool,
2469}
2470
2471impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2472 type Error = ConvertError;
2473 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2474 let api::ListAccessTokensResponse {
2475 access_tokens,
2476 has_more,
2477 } = value;
2478 let access_tokens = access_tokens
2479 .into_iter()
2480 .map(TryInto::try_into)
2481 .collect::<Result<Vec<_>, _>>()?;
2482 Ok(Self {
2483 access_tokens,
2484 has_more,
2485 })
2486 }
2487}