1use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
14
15#[derive(Debug, Clone, thiserror::Error)]
17#[error("{0}")]
18pub struct ConvertError(String);
19
20impl<T: Into<String>> From<T> for ConvertError {
21 fn from(value: T) -> Self {
22 Self(value.into())
23 }
24}
25
26pub trait MeteredBytes {
34 fn metered_bytes(&self) -> u64;
36}
37
38impl<T: MeteredBytes> MeteredBytes for Vec<T> {
39 fn metered_bytes(&self) -> u64 {
40 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
41 }
42}
43
44macro_rules! metered_impl {
45 ($ty:ty) => {
46 impl MeteredBytes for $ty {
47 fn metered_bytes(&self) -> u64 {
48 let bytes = 8
49 + (2 * self.headers.len())
50 + self
51 .headers
52 .iter()
53 .map(|h| h.name.len() + h.value.len())
54 .sum::<usize>()
55 + self.body.len();
56 bytes as u64
57 }
58 }
59 };
60}
61
62#[sync_docs]
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum BasinScope {
65 AwsUsEast1,
66}
67
68impl From<BasinScope> for api::BasinScope {
69 fn from(value: BasinScope) -> Self {
70 match value {
71 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
72 }
73 }
74}
75
76impl From<api::BasinScope> for Option<BasinScope> {
77 fn from(value: api::BasinScope) -> Self {
78 match value {
79 api::BasinScope::Unspecified => None,
80 api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
81 }
82 }
83}
84
85impl FromStr for BasinScope {
86 type Err = ConvertError;
87 fn from_str(value: &str) -> Result<Self, Self::Err> {
88 match value {
89 "aws:us-east-1" => Ok(Self::AwsUsEast1),
90 _ => Err("invalid basin scope value".into()),
91 }
92 }
93}
94
95#[sync_docs]
96#[derive(Debug, Clone)]
97pub struct CreateBasinRequest {
98 pub basin: BasinName,
99 pub config: Option<BasinConfig>,
100 pub scope: Option<BasinScope>,
101}
102
103impl CreateBasinRequest {
104 pub fn new(basin: BasinName) -> Self {
106 Self {
107 basin,
108 config: None,
109 scope: None,
110 }
111 }
112
113 pub fn with_config(self, config: BasinConfig) -> Self {
115 Self {
116 config: Some(config),
117 ..self
118 }
119 }
120
121 pub fn with_scope(self, scope: BasinScope) -> Self {
123 Self {
124 scope: Some(scope),
125 ..self
126 }
127 }
128}
129
130impl From<CreateBasinRequest> for api::CreateBasinRequest {
131 fn from(value: CreateBasinRequest) -> Self {
132 let CreateBasinRequest {
133 basin,
134 config,
135 scope,
136 } = value;
137 Self {
138 basin: basin.0,
139 config: config.map(Into::into),
140 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
141 }
142 }
143}
144
145#[sync_docs]
146#[derive(Debug, Clone, Default)]
147pub struct BasinConfig {
148 pub default_stream_config: Option<StreamConfig>,
149 pub create_stream_on_append: bool,
150 pub create_stream_on_read: bool,
151}
152
153impl BasinConfig {
154 pub fn new() -> Self {
156 Self::default()
157 }
158
159 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
161 Self {
162 default_stream_config: Some(default_stream_config),
163 ..self
164 }
165 }
166
167 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
169 Self {
170 create_stream_on_append,
171 ..self
172 }
173 }
174
175 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
177 Self {
178 create_stream_on_read,
179 ..self
180 }
181 }
182}
183
184impl From<BasinConfig> for api::BasinConfig {
185 fn from(value: BasinConfig) -> Self {
186 let BasinConfig {
187 default_stream_config,
188 create_stream_on_append,
189 create_stream_on_read,
190 } = value;
191 Self {
192 default_stream_config: default_stream_config.map(Into::into),
193 create_stream_on_append,
194 create_stream_on_read,
195 }
196 }
197}
198
199impl From<api::BasinConfig> for BasinConfig {
200 fn from(value: api::BasinConfig) -> Self {
201 let api::BasinConfig {
202 default_stream_config,
203 create_stream_on_append,
204 create_stream_on_read,
205 } = value;
206 Self {
207 default_stream_config: default_stream_config.map(Into::into),
208 create_stream_on_append,
209 create_stream_on_read,
210 }
211 }
212}
213
214#[sync_docs]
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum TimestampingMode {
217 ClientPrefer,
218 ClientRequire,
219 Arrival,
220}
221
222impl From<TimestampingMode> for api::TimestampingMode {
223 fn from(value: TimestampingMode) -> Self {
224 match value {
225 TimestampingMode::ClientPrefer => Self::ClientPrefer,
226 TimestampingMode::ClientRequire => Self::ClientRequire,
227 TimestampingMode::Arrival => Self::Arrival,
228 }
229 }
230}
231
232impl From<api::TimestampingMode> for Option<TimestampingMode> {
233 fn from(value: api::TimestampingMode) -> Self {
234 match value {
235 api::TimestampingMode::Unspecified => None,
236 api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
237 api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
238 api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
239 }
240 }
241}
242
243#[sync_docs(TimestampingConfig = "Timestamping")]
244#[derive(Debug, Clone, Default)]
245pub struct TimestampingConfig {
247 pub mode: Option<TimestampingMode>,
248 pub uncapped: Option<bool>,
249}
250
251impl TimestampingConfig {
252 pub fn new() -> Self {
254 Self::default()
255 }
256
257 pub fn with_mode(self, mode: TimestampingMode) -> Self {
259 Self {
260 mode: Some(mode),
261 ..self
262 }
263 }
264
265 pub fn with_uncapped(self, uncapped: bool) -> Self {
267 Self {
268 uncapped: Some(uncapped),
269 ..self
270 }
271 }
272}
273
274impl From<TimestampingConfig> for api::stream_config::Timestamping {
275 fn from(value: TimestampingConfig) -> Self {
276 Self {
277 mode: value
278 .mode
279 .map(api::TimestampingMode::from)
280 .unwrap_or_default()
281 .into(),
282 uncapped: value.uncapped,
283 }
284 }
285}
286
287impl From<api::stream_config::Timestamping> for TimestampingConfig {
288 fn from(value: api::stream_config::Timestamping) -> Self {
289 let mode = value.mode().into();
290 let uncapped = value.uncapped;
291 Self { mode, uncapped }
292 }
293}
294
295#[sync_docs]
296#[derive(Debug, Clone, Default)]
297pub struct StreamConfig {
298 pub storage_class: Option<StorageClass>,
299 pub retention_policy: Option<RetentionPolicy>,
300 pub timestamping: Option<TimestampingConfig>,
301}
302
303impl StreamConfig {
304 pub fn new() -> Self {
306 Self::default()
307 }
308
309 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
311 Self {
312 storage_class: Some(storage_class),
313 ..self
314 }
315 }
316
317 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
319 Self {
320 retention_policy: Some(retention_policy),
321 ..self
322 }
323 }
324
325 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
327 Self {
328 timestamping: Some(timestamping),
329 ..self
330 }
331 }
332}
333
334impl From<StreamConfig> for api::StreamConfig {
335 fn from(value: StreamConfig) -> Self {
336 let StreamConfig {
337 storage_class,
338 retention_policy,
339 timestamping,
340 } = value;
341 Self {
342 storage_class: storage_class
343 .map(api::StorageClass::from)
344 .unwrap_or_default()
345 .into(),
346 retention_policy: retention_policy.map(Into::into),
347 timestamping: timestamping.map(Into::into),
348 }
349 }
350}
351
352impl From<api::StreamConfig> for StreamConfig {
353 fn from(value: api::StreamConfig) -> Self {
354 Self {
355 storage_class: value.storage_class().into(),
356 retention_policy: value.retention_policy.map(Into::into),
357 timestamping: value.timestamping.map(Into::into),
358 }
359 }
360}
361
362#[sync_docs]
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364pub enum StorageClass {
365 Standard,
366 Express,
367}
368
369impl From<StorageClass> for api::StorageClass {
370 fn from(value: StorageClass) -> Self {
371 match value {
372 StorageClass::Standard => Self::Standard,
373 StorageClass::Express => Self::Express,
374 }
375 }
376}
377
378impl From<api::StorageClass> for Option<StorageClass> {
379 fn from(value: api::StorageClass) -> Self {
380 match value {
381 api::StorageClass::Unspecified => None,
382 api::StorageClass::Standard => Some(StorageClass::Standard),
383 api::StorageClass::Express => Some(StorageClass::Express),
384 }
385 }
386}
387
388impl FromStr for StorageClass {
389 type Err = ConvertError;
390
391 fn from_str(value: &str) -> Result<Self, Self::Err> {
392 match value {
393 "standard" => Ok(Self::Standard),
394 "express" => Ok(Self::Express),
395 v => Err(format!("unknown storage class: {v}").into()),
396 }
397 }
398}
399
400#[sync_docs(Age = "Age")]
401#[derive(Debug, Clone)]
402pub enum RetentionPolicy {
403 Age(Duration),
404}
405
406impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
407 fn from(value: RetentionPolicy) -> Self {
408 match value {
409 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
410 }
411 }
412}
413
414impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
415 fn from(value: api::stream_config::RetentionPolicy) -> Self {
416 match value {
417 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
418 }
419 }
420}
421
422#[sync_docs]
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum BasinState {
425 Active,
426 Creating,
427 Deleting,
428}
429
430impl From<BasinState> for api::BasinState {
431 fn from(value: BasinState) -> Self {
432 match value {
433 BasinState::Active => Self::Active,
434 BasinState::Creating => Self::Creating,
435 BasinState::Deleting => Self::Deleting,
436 }
437 }
438}
439
440impl From<api::BasinState> for Option<BasinState> {
441 fn from(value: api::BasinState) -> Self {
442 match value {
443 api::BasinState::Unspecified => None,
444 api::BasinState::Active => Some(BasinState::Active),
445 api::BasinState::Creating => Some(BasinState::Creating),
446 api::BasinState::Deleting => Some(BasinState::Deleting),
447 }
448 }
449}
450
451impl std::fmt::Display for BasinState {
452 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453 match self {
454 BasinState::Active => write!(f, "active"),
455 BasinState::Creating => write!(f, "creating"),
456 BasinState::Deleting => write!(f, "deleting"),
457 }
458 }
459}
460
461#[sync_docs]
462#[derive(Debug, Clone)]
463pub struct BasinInfo {
464 pub name: String,
465 pub scope: Option<BasinScope>,
466 pub state: Option<BasinState>,
467}
468
469impl From<BasinInfo> for api::BasinInfo {
470 fn from(value: BasinInfo) -> Self {
471 let BasinInfo { name, scope, state } = value;
472 Self {
473 name,
474 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
475 state: state.map(api::BasinState::from).unwrap_or_default().into(),
476 }
477 }
478}
479
480impl From<api::BasinInfo> for BasinInfo {
481 fn from(value: api::BasinInfo) -> Self {
482 let scope = value.scope().into();
483 let state = value.state().into();
484 let name = value.name;
485 Self { name, scope, state }
486 }
487}
488
489impl TryFrom<api::CreateBasinResponse> for BasinInfo {
490 type Error = ConvertError;
491 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
492 let api::CreateBasinResponse { info } = value;
493 let info = info.ok_or("missing basin info")?;
494 Ok(info.into())
495 }
496}
497
498#[sync_docs]
499#[derive(Debug, Clone, Default)]
500pub struct ListStreamsRequest {
501 pub prefix: String,
502 pub start_after: String,
503 pub limit: Option<usize>,
504}
505
506impl ListStreamsRequest {
507 pub fn new() -> Self {
509 Self::default()
510 }
511
512 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
514 Self {
515 prefix: prefix.into(),
516 ..self
517 }
518 }
519
520 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
522 Self {
523 start_after: start_after.into(),
524 ..self
525 }
526 }
527
528 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
530 Self {
531 limit: limit.into(),
532 ..self
533 }
534 }
535}
536
537impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
538 type Error = ConvertError;
539 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
540 let ListStreamsRequest {
541 prefix,
542 start_after,
543 limit,
544 } = value;
545 Ok(Self {
546 prefix,
547 start_after,
548 limit: limit.map(|n| n as u64),
549 })
550 }
551}
552
553#[sync_docs]
554#[derive(Debug, Clone)]
555pub struct StreamInfo {
556 pub name: String,
557 pub created_at: u32,
558 pub deleted_at: Option<u32>,
559}
560
561impl From<api::StreamInfo> for StreamInfo {
562 fn from(value: api::StreamInfo) -> Self {
563 Self {
564 name: value.name,
565 created_at: value.created_at,
566 deleted_at: value.deleted_at,
567 }
568 }
569}
570
571impl TryFrom<api::CreateStreamResponse> for StreamInfo {
572 type Error = ConvertError;
573
574 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
575 let api::CreateStreamResponse { info } = value;
576 let info = info.ok_or("missing stream info")?;
577 Ok(info.into())
578 }
579}
580
581#[sync_docs]
582#[derive(Debug, Clone)]
583pub struct ListStreamsResponse {
584 pub streams: Vec<StreamInfo>,
585 pub has_more: bool,
586}
587
588impl From<api::ListStreamsResponse> for ListStreamsResponse {
589 fn from(value: api::ListStreamsResponse) -> Self {
590 let api::ListStreamsResponse { streams, has_more } = value;
591 let streams = streams.into_iter().map(Into::into).collect();
592 Self { streams, has_more }
593 }
594}
595
596impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
597 type Error = ConvertError;
598
599 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
600 let api::GetBasinConfigResponse { config } = value;
601 let config = config.ok_or("missing basin config")?;
602 Ok(config.into())
603 }
604}
605
606impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
607 type Error = ConvertError;
608
609 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
610 let api::GetStreamConfigResponse { config } = value;
611 let config = config.ok_or("missing stream config")?;
612 Ok(config.into())
613 }
614}
615
616#[sync_docs]
617#[derive(Debug, Clone)]
618pub struct CreateStreamRequest {
619 pub stream: String,
620 pub config: Option<StreamConfig>,
621}
622
623impl CreateStreamRequest {
624 pub fn new(stream: impl Into<String>) -> Self {
626 Self {
627 stream: stream.into(),
628 config: None,
629 }
630 }
631
632 pub fn with_config(self, config: StreamConfig) -> Self {
634 Self {
635 config: Some(config),
636 ..self
637 }
638 }
639}
640
641impl From<CreateStreamRequest> for api::CreateStreamRequest {
642 fn from(value: CreateStreamRequest) -> Self {
643 let CreateStreamRequest { stream, config } = value;
644 Self {
645 stream,
646 config: config.map(Into::into),
647 }
648 }
649}
650
651#[sync_docs]
652#[derive(Debug, Clone, Default)]
653pub struct ListBasinsRequest {
654 pub prefix: String,
655 pub start_after: String,
656 pub limit: Option<usize>,
657}
658
659impl ListBasinsRequest {
660 pub fn new() -> Self {
662 Self::default()
663 }
664
665 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
667 Self {
668 prefix: prefix.into(),
669 ..self
670 }
671 }
672
673 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
675 Self {
676 start_after: start_after.into(),
677 ..self
678 }
679 }
680
681 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
683 Self {
684 limit: limit.into(),
685 ..self
686 }
687 }
688}
689
690impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
691 type Error = ConvertError;
692 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
693 let ListBasinsRequest {
694 prefix,
695 start_after,
696 limit,
697 } = value;
698 Ok(Self {
699 prefix,
700 start_after,
701 limit: limit
702 .map(TryInto::try_into)
703 .transpose()
704 .map_err(|_| "request limit does not fit into u64 bounds")?,
705 })
706 }
707}
708
709#[sync_docs]
710#[derive(Debug, Clone)]
711pub struct ListBasinsResponse {
712 pub basins: Vec<BasinInfo>,
713 pub has_more: bool,
714}
715
716impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
717 type Error = ConvertError;
718 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
719 let api::ListBasinsResponse { basins, has_more } = value;
720 Ok(Self {
721 basins: basins.into_iter().map(Into::into).collect(),
722 has_more,
723 })
724 }
725}
726
727#[sync_docs]
728#[derive(Debug, Clone)]
729pub struct DeleteBasinRequest {
730 pub basin: BasinName,
731 pub if_exists: bool,
733}
734
735impl DeleteBasinRequest {
736 pub fn new(basin: BasinName) -> Self {
738 Self {
739 basin,
740 if_exists: false,
741 }
742 }
743
744 pub fn with_if_exists(self, if_exists: bool) -> Self {
746 Self { if_exists, ..self }
747 }
748}
749
750impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
751 fn from(value: DeleteBasinRequest) -> Self {
752 let DeleteBasinRequest { basin, .. } = value;
753 Self { basin: basin.0 }
754 }
755}
756
757#[sync_docs]
758#[derive(Debug, Clone)]
759pub struct DeleteStreamRequest {
760 pub stream: String,
761 pub if_exists: bool,
763}
764
765impl DeleteStreamRequest {
766 pub fn new(stream: impl Into<String>) -> Self {
768 Self {
769 stream: stream.into(),
770 if_exists: false,
771 }
772 }
773
774 pub fn with_if_exists(self, if_exists: bool) -> Self {
776 Self { if_exists, ..self }
777 }
778}
779
780impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
781 fn from(value: DeleteStreamRequest) -> Self {
782 let DeleteStreamRequest { stream, .. } = value;
783 Self { stream }
784 }
785}
786
787#[sync_docs]
788#[derive(Debug, Clone)]
789pub struct ReconfigureBasinRequest {
790 pub basin: BasinName,
791 pub config: Option<BasinConfig>,
792 pub mask: Option<Vec<String>>,
793}
794
795impl ReconfigureBasinRequest {
796 pub fn new(basin: BasinName) -> Self {
798 Self {
799 basin,
800 config: None,
801 mask: None,
802 }
803 }
804
805 pub fn with_config(self, config: BasinConfig) -> Self {
807 Self {
808 config: Some(config),
809 ..self
810 }
811 }
812
813 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
815 Self {
816 mask: Some(mask.into()),
817 ..self
818 }
819 }
820}
821
822impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
823 fn from(value: ReconfigureBasinRequest) -> Self {
824 let ReconfigureBasinRequest {
825 basin,
826 config,
827 mask,
828 } = value;
829 Self {
830 basin: basin.0,
831 config: config.map(Into::into),
832 mask: mask.map(|paths| prost_types::FieldMask { paths }),
833 }
834 }
835}
836
837impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
838 type Error = ConvertError;
839 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
840 let api::ReconfigureBasinResponse { config } = value;
841 let config = config.ok_or("missing basin config")?;
842 Ok(config.into())
843 }
844}
845
846#[sync_docs]
847#[derive(Debug, Clone)]
848pub struct ReconfigureStreamRequest {
849 pub stream: String,
850 pub config: Option<StreamConfig>,
851 pub mask: Option<Vec<String>>,
852}
853
854impl ReconfigureStreamRequest {
855 pub fn new(stream: impl Into<String>) -> Self {
857 Self {
858 stream: stream.into(),
859 config: None,
860 mask: None,
861 }
862 }
863
864 pub fn with_config(self, config: StreamConfig) -> Self {
866 Self {
867 config: Some(config),
868 ..self
869 }
870 }
871
872 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
874 Self {
875 mask: Some(mask.into()),
876 ..self
877 }
878 }
879}
880
881impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
882 fn from(value: ReconfigureStreamRequest) -> Self {
883 let ReconfigureStreamRequest {
884 stream,
885 config,
886 mask,
887 } = value;
888 Self {
889 stream,
890 config: config.map(Into::into),
891 mask: mask.map(|paths| prost_types::FieldMask { paths }),
892 }
893 }
894}
895
896impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
897 type Error = ConvertError;
898 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
899 let api::ReconfigureStreamResponse { config } = value;
900 let config = config.ok_or("missing stream config")?;
901 Ok(config.into())
902 }
903}
904
905impl From<api::CheckTailResponse> for StreamPosition {
906 fn from(value: api::CheckTailResponse) -> Self {
907 let api::CheckTailResponse {
908 next_seq_num,
909 last_timestamp,
910 } = value;
911 StreamPosition {
912 seq_num: next_seq_num,
913 timestamp: last_timestamp,
914 }
915 }
916}
917
918#[derive(Debug, Clone, PartialEq, Eq)]
920pub struct StreamPosition {
921 pub seq_num: u64,
923 pub timestamp: u64,
926}
927
928#[sync_docs]
929#[derive(Debug, Clone, PartialEq, Eq)]
930pub struct Header {
931 pub name: Bytes,
932 pub value: Bytes,
933}
934
935impl Header {
936 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
938 Self {
939 name: name.into(),
940 value: value.into(),
941 }
942 }
943
944 pub fn from_value(value: impl Into<Bytes>) -> Self {
946 Self {
947 name: Bytes::new(),
948 value: value.into(),
949 }
950 }
951}
952
953impl From<Header> for api::Header {
954 fn from(value: Header) -> Self {
955 let Header { name, value } = value;
956 Self { name, value }
957 }
958}
959
960impl From<api::Header> for Header {
961 fn from(value: api::Header) -> Self {
962 let api::Header { name, value } = value;
963 Self { name, value }
964 }
965}
966
967#[derive(Debug, Clone, Default, PartialEq, Eq)]
971pub struct FencingToken(Bytes);
972
973impl FencingToken {
974 const MAX_BYTES: usize = 16;
975
976 pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
978 let bytes = bytes.into();
979 if bytes.len() > Self::MAX_BYTES {
980 Err(format!(
981 "Size of a fencing token cannot exceed {} bytes",
982 Self::MAX_BYTES
983 )
984 .into())
985 } else {
986 Ok(Self(bytes))
987 }
988 }
989
990 pub fn generate(n: usize) -> Result<Self, ConvertError> {
992 Self::new(
993 rand::rng()
994 .sample_iter(rand::distr::StandardUniform)
995 .take(n)
996 .collect::<Bytes>(),
997 )
998 }
999}
1000
1001impl AsRef<Bytes> for FencingToken {
1002 fn as_ref(&self) -> &Bytes {
1003 &self.0
1004 }
1005}
1006
1007impl AsRef<[u8]> for FencingToken {
1008 fn as_ref(&self) -> &[u8] {
1009 &self.0
1010 }
1011}
1012
1013impl Deref for FencingToken {
1014 type Target = [u8];
1015
1016 fn deref(&self) -> &Self::Target {
1017 &self.0
1018 }
1019}
1020
1021impl From<FencingToken> for Bytes {
1022 fn from(value: FencingToken) -> Self {
1023 value.0
1024 }
1025}
1026
1027impl From<FencingToken> for Vec<u8> {
1028 fn from(value: FencingToken) -> Self {
1029 value.0.into()
1030 }
1031}
1032
1033impl TryFrom<Bytes> for FencingToken {
1034 type Error = ConvertError;
1035
1036 fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1037 Self::new(value)
1038 }
1039}
1040
1041impl TryFrom<Vec<u8>> for FencingToken {
1042 type Error = ConvertError;
1043
1044 fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
1045 Self::new(value)
1046 }
1047}
1048
1049#[derive(Debug, Clone)]
1051pub enum Command {
1052 Fence {
1057 fencing_token: FencingToken,
1061 },
1062 Trim {
1067 seq_num: u64,
1072 },
1073}
1074
1075#[derive(Debug, Clone)]
1081pub struct CommandRecord {
1082 pub command: Command,
1084 pub timestamp: Option<u64>,
1086}
1087
1088impl CommandRecord {
1089 const FENCE: &[u8] = b"fence";
1090 const TRIM: &[u8] = b"trim";
1091
1092 pub fn fence(fencing_token: FencingToken) -> Self {
1094 Self {
1095 command: Command::Fence { fencing_token },
1096 timestamp: None,
1097 }
1098 }
1099
1100 pub fn trim(seq_num: impl Into<u64>) -> Self {
1102 Self {
1103 command: Command::Trim {
1104 seq_num: seq_num.into(),
1105 },
1106 timestamp: None,
1107 }
1108 }
1109
1110 pub fn with_timestamp(self, timestamp: u64) -> Self {
1112 Self {
1113 timestamp: Some(timestamp),
1114 ..self
1115 }
1116 }
1117}
1118
1119#[sync_docs]
1120#[derive(Debug, Clone, PartialEq, Eq)]
1121pub struct AppendRecord {
1122 timestamp: Option<u64>,
1123 headers: Vec<Header>,
1124 body: Bytes,
1125 #[cfg(test)]
1126 max_bytes: u64,
1127}
1128
1129metered_impl!(AppendRecord);
1130
1131impl AppendRecord {
1132 const MAX_BYTES: u64 = MIB_BYTES;
1133
1134 fn validated(self) -> Result<Self, ConvertError> {
1135 #[cfg(test)]
1136 let max_bytes = self.max_bytes;
1137 #[cfg(not(test))]
1138 let max_bytes = Self::MAX_BYTES;
1139
1140 if self.metered_bytes() > max_bytes {
1141 Err("AppendRecord should have metered size less than 1 MiB".into())
1142 } else {
1143 Ok(self)
1144 }
1145 }
1146
1147 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1149 Self {
1150 timestamp: None,
1151 headers: Vec::new(),
1152 body: body.into(),
1153 #[cfg(test)]
1154 max_bytes: Self::MAX_BYTES,
1155 }
1156 .validated()
1157 }
1158
1159 #[cfg(test)]
1160 pub(crate) fn with_max_bytes(
1161 max_bytes: u64,
1162 body: impl Into<Bytes>,
1163 ) -> Result<Self, ConvertError> {
1164 Self {
1165 timestamp: None,
1166 headers: Vec::new(),
1167 body: body.into(),
1168 max_bytes,
1169 }
1170 .validated()
1171 }
1172
1173 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1175 Self {
1176 headers: headers.into(),
1177 ..self
1178 }
1179 .validated()
1180 }
1181
1182 pub fn with_timestamp(self, timestamp: u64) -> Self {
1184 Self {
1185 timestamp: Some(timestamp),
1186 ..self
1187 }
1188 }
1189
1190 pub fn body(&self) -> &[u8] {
1192 &self.body
1193 }
1194
1195 pub fn headers(&self) -> &[Header] {
1197 &self.headers
1198 }
1199
1200 pub fn timestamp(&self) -> Option<u64> {
1202 self.timestamp
1203 }
1204
1205 pub fn into_parts(self) -> AppendRecordParts {
1207 AppendRecordParts {
1208 timestamp: self.timestamp,
1209 headers: self.headers,
1210 body: self.body,
1211 }
1212 }
1213
1214 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1216 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1217 if let Some(timestamp) = parts.timestamp {
1218 Ok(record.with_timestamp(timestamp))
1219 } else {
1220 Ok(record)
1221 }
1222 }
1223}
1224
1225impl From<AppendRecord> for api::AppendRecord {
1226 fn from(value: AppendRecord) -> Self {
1227 Self {
1228 timestamp: value.timestamp,
1229 headers: value.headers.into_iter().map(Into::into).collect(),
1230 body: value.body,
1231 }
1232 }
1233}
1234
1235impl From<CommandRecord> for AppendRecord {
1236 fn from(value: CommandRecord) -> Self {
1237 let (header_value, body) = match value.command {
1238 Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1239 Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1240 };
1241 AppendRecordParts {
1242 timestamp: value.timestamp,
1243 headers: vec![Header::from_value(header_value)],
1244 body: body.into(),
1245 }
1246 .try_into()
1247 .expect("command record is a valid append record")
1248 }
1249}
1250
1251#[sync_docs(AppendRecordParts = "AppendRecord")]
1252#[derive(Debug, Clone)]
1253pub struct AppendRecordParts {
1254 pub timestamp: Option<u64>,
1255 pub headers: Vec<Header>,
1256 pub body: Bytes,
1257}
1258
1259impl From<AppendRecord> for AppendRecordParts {
1260 fn from(value: AppendRecord) -> Self {
1261 value.into_parts()
1262 }
1263}
1264
1265impl TryFrom<AppendRecordParts> for AppendRecord {
1266 type Error = ConvertError;
1267
1268 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1269 Self::try_from_parts(value)
1270 }
1271}
1272
1273#[derive(Debug, Clone)]
1275pub struct AppendRecordBatch {
1276 records: Vec<AppendRecord>,
1277 metered_bytes: u64,
1278 max_capacity: usize,
1279 #[cfg(test)]
1280 max_bytes: u64,
1281}
1282
1283impl PartialEq for AppendRecordBatch {
1284 fn eq(&self, other: &Self) -> bool {
1285 if self.records.eq(&other.records) {
1286 assert_eq!(self.metered_bytes, other.metered_bytes);
1287 true
1288 } else {
1289 false
1290 }
1291 }
1292}
1293
1294impl Eq for AppendRecordBatch {}
1295
1296impl Default for AppendRecordBatch {
1297 fn default() -> Self {
1298 Self::new()
1299 }
1300}
1301
1302impl AppendRecordBatch {
1303 pub const MAX_CAPACITY: usize = 1000;
1307
1308 pub const MAX_BYTES: u64 = MIB_BYTES;
1310
1311 pub fn new() -> Self {
1313 Self::with_max_capacity(Self::MAX_CAPACITY)
1314 }
1315
1316 pub fn with_max_capacity(max_capacity: usize) -> Self {
1320 assert!(
1321 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1322 "Batch capacity must be between 1 and 1000"
1323 );
1324
1325 Self {
1326 records: Vec::with_capacity(max_capacity),
1327 metered_bytes: 0,
1328 max_capacity,
1329 #[cfg(test)]
1330 max_bytes: Self::MAX_BYTES,
1331 }
1332 }
1333
1334 #[cfg(test)]
1335 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1336 #[cfg(test)]
1337 assert!(
1338 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1339 "Batch size must be between 1 byte and 1 MiB"
1340 );
1341
1342 Self {
1343 max_bytes,
1344 ..Self::with_max_capacity(max_capacity)
1345 }
1346 }
1347
1348 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1354 where
1355 R: Into<AppendRecord>,
1356 T: IntoIterator<Item = R>,
1357 {
1358 let mut records = Self::new();
1359 let mut pending = Vec::new();
1360
1361 let mut iter = iter.into_iter();
1362
1363 for record in iter.by_ref() {
1364 if let Err(record) = records.push(record) {
1365 pending.push(record);
1366 break;
1367 }
1368 }
1369
1370 if pending.is_empty() {
1371 Ok(records)
1372 } else {
1373 pending.extend(iter.map(Into::into));
1374 Err((records, pending))
1375 }
1376 }
1377
1378 pub fn is_empty(&self) -> bool {
1380 if self.records.is_empty() {
1381 assert_eq!(self.metered_bytes, 0);
1382 true
1383 } else {
1384 false
1385 }
1386 }
1387
1388 pub fn len(&self) -> usize {
1390 self.records.len()
1391 }
1392
1393 #[cfg(test)]
1394 fn max_bytes(&self) -> u64 {
1395 self.max_bytes
1396 }
1397
1398 #[cfg(not(test))]
1399 fn max_bytes(&self) -> u64 {
1400 Self::MAX_BYTES
1401 }
1402
1403 pub fn is_full(&self) -> bool {
1405 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1406 }
1407
1408 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1410 assert!(self.records.len() <= self.max_capacity);
1411 assert!(self.metered_bytes <= self.max_bytes());
1412
1413 let record = record.into();
1414 let record_size = record.metered_bytes();
1415 if self.records.len() >= self.max_capacity
1416 || self.metered_bytes + record_size > self.max_bytes()
1417 {
1418 Err(record)
1419 } else {
1420 self.records.push(record);
1421 self.metered_bytes += record_size;
1422 Ok(())
1423 }
1424 }
1425}
1426
1427impl MeteredBytes for AppendRecordBatch {
1428 fn metered_bytes(&self) -> u64 {
1429 self.metered_bytes
1430 }
1431}
1432
1433impl IntoIterator for AppendRecordBatch {
1434 type Item = AppendRecord;
1435 type IntoIter = std::vec::IntoIter<Self::Item>;
1436
1437 fn into_iter(self) -> Self::IntoIter {
1438 self.records.into_iter()
1439 }
1440}
1441
1442impl<'a> IntoIterator for &'a AppendRecordBatch {
1443 type Item = &'a AppendRecord;
1444 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1445
1446 fn into_iter(self) -> Self::IntoIter {
1447 self.records.iter()
1448 }
1449}
1450
1451impl AsRef<[AppendRecord]> for AppendRecordBatch {
1452 fn as_ref(&self) -> &[AppendRecord] {
1453 &self.records
1454 }
1455}
1456
1457#[sync_docs]
1458#[derive(Debug, Default, Clone)]
1459pub struct AppendInput {
1460 pub records: AppendRecordBatch,
1461 pub match_seq_num: Option<u64>,
1462 pub fencing_token: Option<FencingToken>,
1463}
1464
1465impl MeteredBytes for AppendInput {
1466 fn metered_bytes(&self) -> u64 {
1467 self.records.metered_bytes()
1468 }
1469}
1470
1471impl AppendInput {
1472 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1474 Self {
1475 records: records.into(),
1476 match_seq_num: None,
1477 fencing_token: None,
1478 }
1479 }
1480
1481 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1483 Self {
1484 match_seq_num: Some(match_seq_num.into()),
1485 ..self
1486 }
1487 }
1488
1489 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1491 Self {
1492 fencing_token: Some(fencing_token),
1493 ..self
1494 }
1495 }
1496
1497 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1498 let Self {
1499 records,
1500 match_seq_num,
1501 fencing_token,
1502 } = self;
1503
1504 api::AppendInput {
1505 stream: stream.into(),
1506 records: records.into_iter().map(Into::into).collect(),
1507 match_seq_num,
1508 fencing_token: fencing_token.map(|f| f.0),
1509 }
1510 }
1511}
1512
1513#[derive(Debug, Clone)]
1515pub struct AppendAck {
1516 pub start: StreamPosition,
1518 pub end: StreamPosition,
1523 pub tail: StreamPosition,
1527}
1528
1529impl From<api::AppendOutput> for AppendAck {
1530 fn from(value: api::AppendOutput) -> Self {
1531 let api::AppendOutput {
1532 start_seq_num,
1533 start_timestamp,
1534 end_seq_num,
1535 end_timestamp,
1536 next_seq_num,
1537 last_timestamp,
1538 } = value;
1539 let start = StreamPosition {
1540 seq_num: start_seq_num,
1541 timestamp: start_timestamp,
1542 };
1543 let end = StreamPosition {
1544 seq_num: end_seq_num,
1545 timestamp: end_timestamp,
1546 };
1547 let tail = StreamPosition {
1548 seq_num: next_seq_num,
1549 timestamp: last_timestamp,
1550 };
1551 Self { start, end, tail }
1552 }
1553}
1554
1555impl TryFrom<api::AppendResponse> for AppendAck {
1556 type Error = ConvertError;
1557 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1558 let api::AppendResponse { output } = value;
1559 let output = output.ok_or("missing append output")?;
1560 Ok(output.into())
1561 }
1562}
1563
1564impl TryFrom<api::AppendSessionResponse> for AppendAck {
1565 type Error = ConvertError;
1566 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1567 let api::AppendSessionResponse { output } = value;
1568 let output = output.ok_or("missing append output")?;
1569 Ok(output.into())
1570 }
1571}
1572
1573#[sync_docs]
1574#[derive(Debug, Clone, Default)]
1575pub struct ReadLimit {
1576 pub count: Option<u64>,
1577 pub bytes: Option<u64>,
1578}
1579
1580impl ReadLimit {
1581 pub fn new() -> Self {
1583 Self::default()
1584 }
1585
1586 pub fn with_count(self, count: u64) -> Self {
1588 Self {
1589 count: Some(count),
1590 ..self
1591 }
1592 }
1593
1594 pub fn with_bytes(self, bytes: u64) -> Self {
1596 Self {
1597 bytes: Some(bytes),
1598 ..self
1599 }
1600 }
1601}
1602
1603#[derive(Debug, Clone)]
1605pub enum ReadStart {
1606 SeqNum(u64),
1608 Timestamp(u64),
1610 TailOffset(u64),
1612}
1613
1614impl Default for ReadStart {
1615 fn default() -> Self {
1616 Self::SeqNum(0)
1617 }
1618}
1619
1620impl From<ReadStart> for api::read_request::Start {
1621 fn from(start: ReadStart) -> Self {
1622 match start {
1623 ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1624 ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1625 ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1626 }
1627 }
1628}
1629
1630impl From<ReadStart> for api::read_session_request::Start {
1631 fn from(start: ReadStart) -> Self {
1632 match start {
1633 ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1634 ReadStart::Timestamp(timestamp) => {
1635 api::read_session_request::Start::Timestamp(timestamp)
1636 }
1637 ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1638 }
1639 }
1640}
1641
1642#[sync_docs]
1643#[derive(Debug, Clone, Default)]
1644pub struct ReadRequest {
1645 pub start: ReadStart,
1646 pub limit: ReadLimit,
1647}
1648
1649impl ReadRequest {
1650 pub fn new(start: ReadStart) -> Self {
1652 Self {
1653 start,
1654 ..Default::default()
1655 }
1656 }
1657
1658 pub fn with_limit(self, limit: ReadLimit) -> Self {
1660 Self { limit, ..self }
1661 }
1662}
1663
1664impl ReadRequest {
1665 pub(crate) fn try_into_api_type(
1666 self,
1667 stream: impl Into<String>,
1668 ) -> Result<api::ReadRequest, ConvertError> {
1669 let Self { start, limit } = self;
1670
1671 let limit = if limit.count > Some(1000) {
1672 Err("read limit: count must not exceed 1000 for unary request")
1673 } else if limit.bytes > Some(MIB_BYTES) {
1674 Err("read limit: bytes must not exceed 1MiB for unary request")
1675 } else {
1676 Ok(api::ReadLimit {
1677 count: limit.count,
1678 bytes: limit.bytes,
1679 })
1680 }?;
1681
1682 Ok(api::ReadRequest {
1683 stream: stream.into(),
1684 start: Some(start.into()),
1685 limit: Some(limit),
1686 })
1687 }
1688}
1689
1690#[sync_docs]
1691#[derive(Debug, Clone)]
1692pub struct SequencedRecord {
1693 pub seq_num: u64,
1694 pub timestamp: u64,
1695 pub headers: Vec<Header>,
1696 pub body: Bytes,
1697}
1698
1699metered_impl!(SequencedRecord);
1700
1701impl From<api::SequencedRecord> for SequencedRecord {
1702 fn from(value: api::SequencedRecord) -> Self {
1703 let api::SequencedRecord {
1704 seq_num,
1705 timestamp,
1706 headers,
1707 body,
1708 } = value;
1709 Self {
1710 seq_num,
1711 timestamp,
1712 headers: headers.into_iter().map(Into::into).collect(),
1713 body,
1714 }
1715 }
1716}
1717
1718impl SequencedRecord {
1719 pub fn as_command_record(&self) -> Option<CommandRecord> {
1721 if self.headers.len() != 1 {
1722 return None;
1723 }
1724
1725 let header = self.headers.first().expect("pre-validated length");
1726
1727 if !header.name.is_empty() {
1728 return None;
1729 }
1730
1731 match header.value.as_ref() {
1732 CommandRecord::FENCE => {
1733 let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1734 Some(CommandRecord {
1735 command: Command::Fence { fencing_token },
1736 timestamp: Some(self.timestamp),
1737 })
1738 }
1739 CommandRecord::TRIM => {
1740 let body: &[u8] = &self.body;
1741 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1742 Some(CommandRecord {
1743 command: Command::Trim { seq_num },
1744 timestamp: Some(self.timestamp),
1745 })
1746 }
1747 _ => None,
1748 }
1749 }
1750}
1751
1752#[sync_docs]
1753#[derive(Debug, Clone)]
1754pub struct SequencedRecordBatch {
1755 pub records: Vec<SequencedRecord>,
1756}
1757
1758impl MeteredBytes for SequencedRecordBatch {
1759 fn metered_bytes(&self) -> u64 {
1760 self.records.metered_bytes()
1761 }
1762}
1763
1764impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1765 fn from(value: api::SequencedRecordBatch) -> Self {
1766 let api::SequencedRecordBatch { records } = value;
1767 Self {
1768 records: records.into_iter().map(Into::into).collect(),
1769 }
1770 }
1771}
1772
1773#[sync_docs(ReadOutput = "Output")]
1774#[derive(Debug, Clone)]
1775pub enum ReadOutput {
1776 Batch(SequencedRecordBatch),
1777 NextSeqNum(u64),
1778}
1779
1780impl From<api::read_output::Output> for ReadOutput {
1781 fn from(value: api::read_output::Output) -> Self {
1782 match value {
1783 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1784 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1785 }
1786 }
1787}
1788
1789impl TryFrom<api::ReadOutput> for ReadOutput {
1790 type Error = ConvertError;
1791 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1792 let api::ReadOutput { output } = value;
1793 let output = output.ok_or("missing read output")?;
1794 Ok(output.into())
1795 }
1796}
1797
1798impl TryFrom<api::ReadResponse> for ReadOutput {
1799 type Error = ConvertError;
1800 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1801 let api::ReadResponse { output } = value;
1802 let output = output.ok_or("missing output in read response")?;
1803 output.try_into()
1804 }
1805}
1806
1807#[sync_docs]
1808#[derive(Debug, Clone, Default)]
1809pub struct ReadSessionRequest {
1810 pub start: ReadStart,
1811 pub limit: ReadLimit,
1812}
1813
1814impl ReadSessionRequest {
1815 pub fn new(start: ReadStart) -> Self {
1817 Self {
1818 start,
1819 ..Default::default()
1820 }
1821 }
1822
1823 pub fn with_limit(self, limit: ReadLimit) -> Self {
1825 Self { limit, ..self }
1826 }
1827
1828 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1829 let Self { start, limit } = self;
1830 api::ReadSessionRequest {
1831 stream: stream.into(),
1832 start: Some(start.into()),
1833 limit: Some(api::ReadLimit {
1834 count: limit.count,
1835 bytes: limit.bytes,
1836 }),
1837 heartbeats: false,
1838 }
1839 }
1840}
1841
1842impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1843 type Error = ConvertError;
1844 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1845 let api::ReadSessionResponse { output } = value;
1846 let output = output.ok_or("missing output in read session response")?;
1847 output.try_into()
1848 }
1849}
1850
1851#[derive(Debug, Clone)]
1856pub struct BasinName(String);
1857
1858impl AsRef<str> for BasinName {
1859 fn as_ref(&self) -> &str {
1860 &self.0
1861 }
1862}
1863
1864impl Deref for BasinName {
1865 type Target = str;
1866 fn deref(&self) -> &Self::Target {
1867 &self.0
1868 }
1869}
1870
1871impl TryFrom<String> for BasinName {
1872 type Error = ConvertError;
1873
1874 fn try_from(name: String) -> Result<Self, Self::Error> {
1875 if name.len() < 8 || name.len() > 48 {
1876 return Err("Basin name must be between 8 and 48 characters in length".into());
1877 }
1878
1879 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1880 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1881 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1882 .expect("Failed to compile basin name regex")
1883 });
1884
1885 if !regex.is_match(&name) {
1886 return Err(
1887 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1888 It cannot begin or end with a hyphen."
1889 .into(),
1890 );
1891 }
1892
1893 Ok(Self(name))
1894 }
1895}
1896
1897impl FromStr for BasinName {
1898 type Err = ConvertError;
1899
1900 fn from_str(s: &str) -> Result<Self, Self::Err> {
1901 s.to_string().try_into()
1902 }
1903}
1904
1905impl std::fmt::Display for BasinName {
1906 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1907 f.write_str(&self.0)
1908 }
1909}
1910
1911#[derive(Debug, Clone)]
1914pub struct AccessTokenId(String);
1915
1916impl AsRef<str> for AccessTokenId {
1917 fn as_ref(&self) -> &str {
1918 &self.0
1919 }
1920}
1921
1922impl Deref for AccessTokenId {
1923 type Target = str;
1924
1925 fn deref(&self) -> &Self::Target {
1926 &self.0
1927 }
1928}
1929
1930impl TryFrom<String> for AccessTokenId {
1931 type Error = ConvertError;
1932
1933 fn try_from(name: String) -> Result<Self, Self::Error> {
1934 if name.is_empty() {
1935 return Err("Access token ID must not be empty".into());
1936 }
1937
1938 if name.len() > 96 {
1939 return Err("Access token ID must not exceed 96 characters".into());
1940 }
1941
1942 Ok(Self(name))
1943 }
1944}
1945
1946impl From<AccessTokenId> for String {
1947 fn from(value: AccessTokenId) -> Self {
1948 value.0
1949 }
1950}
1951
1952impl FromStr for AccessTokenId {
1953 type Err = ConvertError;
1954
1955 fn from_str(s: &str) -> Result<Self, Self::Err> {
1956 s.to_string().try_into()
1957 }
1958}
1959
1960impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1961 fn from(value: AccessTokenInfo) -> Self {
1962 Self {
1963 info: Some(value.into()),
1964 }
1965 }
1966}
1967
1968#[sync_docs]
1969#[derive(Debug, Clone)]
1970pub struct AccessTokenInfo {
1971 pub id: AccessTokenId,
1972 pub expires_at: Option<u32>,
1973 pub auto_prefix_streams: bool,
1974 pub scope: Option<AccessTokenScope>,
1975}
1976
1977impl AccessTokenInfo {
1978 pub fn new(id: AccessTokenId) -> Self {
1980 Self {
1981 id,
1982 expires_at: None,
1983 auto_prefix_streams: false,
1984 scope: None,
1985 }
1986 }
1987
1988 pub fn with_expires_at(self, expires_at: u32) -> Self {
1990 Self {
1991 expires_at: Some(expires_at),
1992 ..self
1993 }
1994 }
1995
1996 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1998 Self {
1999 auto_prefix_streams,
2000 ..self
2001 }
2002 }
2003
2004 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2006 Self {
2007 scope: Some(scope),
2008 ..self
2009 }
2010 }
2011}
2012
2013impl From<AccessTokenInfo> for api::AccessTokenInfo {
2014 fn from(value: AccessTokenInfo) -> Self {
2015 let AccessTokenInfo {
2016 id,
2017 expires_at,
2018 auto_prefix_streams,
2019 scope,
2020 } = value;
2021 Self {
2022 id: id.into(),
2023 expires_at,
2024 auto_prefix_streams,
2025 scope: scope.map(Into::into),
2026 }
2027 }
2028}
2029
2030impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2031 type Error = ConvertError;
2032
2033 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2034 let api::AccessTokenInfo {
2035 id,
2036 expires_at,
2037 auto_prefix_streams,
2038 scope,
2039 } = value;
2040 Ok(Self {
2041 id: id.try_into()?,
2042 expires_at,
2043 auto_prefix_streams,
2044 scope: scope.map(Into::into),
2045 })
2046 }
2047}
2048
2049#[sync_docs]
2050#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2051pub enum Operation {
2052 ListBasins,
2053 CreateBasin,
2054 DeleteBasin,
2055 ReconfigureBasin,
2056 GetBasinConfig,
2057 IssueAccessToken,
2058 RevokeAccessToken,
2059 ListAccessTokens,
2060 ListStreams,
2061 CreateStream,
2062 DeleteStream,
2063 GetStreamConfig,
2064 ReconfigureStream,
2065 CheckTail,
2066 Append,
2067 Read,
2068 Trim,
2069 Fence,
2070}
2071
2072impl FromStr for Operation {
2073 type Err = ConvertError;
2074
2075 fn from_str(s: &str) -> Result<Self, Self::Err> {
2076 match s.to_lowercase().as_str() {
2077 "list-basins" => Ok(Self::ListBasins),
2078 "create-basin" => Ok(Self::CreateBasin),
2079 "delete-basin" => Ok(Self::DeleteBasin),
2080 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2081 "get-basin-config" => Ok(Self::GetBasinConfig),
2082 "issue-access-token" => Ok(Self::IssueAccessToken),
2083 "revoke-access-token" => Ok(Self::RevokeAccessToken),
2084 "list-access-tokens" => Ok(Self::ListAccessTokens),
2085 "list-streams" => Ok(Self::ListStreams),
2086 "create-stream" => Ok(Self::CreateStream),
2087 "delete-stream" => Ok(Self::DeleteStream),
2088 "get-stream-config" => Ok(Self::GetStreamConfig),
2089 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2090 "check-tail" => Ok(Self::CheckTail),
2091 "append" => Ok(Self::Append),
2092 "read" => Ok(Self::Read),
2093 "trim" => Ok(Self::Trim),
2094 "fence" => Ok(Self::Fence),
2095 _ => Err("invalid operation".into()),
2096 }
2097 }
2098}
2099
2100impl From<Operation> for api::Operation {
2101 fn from(value: Operation) -> Self {
2102 match value {
2103 Operation::ListBasins => Self::ListBasins,
2104 Operation::CreateBasin => Self::CreateBasin,
2105 Operation::DeleteBasin => Self::DeleteBasin,
2106 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2107 Operation::GetBasinConfig => Self::GetBasinConfig,
2108 Operation::IssueAccessToken => Self::IssueAccessToken,
2109 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2110 Operation::ListAccessTokens => Self::ListAccessTokens,
2111 Operation::ListStreams => Self::ListStreams,
2112 Operation::CreateStream => Self::CreateStream,
2113 Operation::DeleteStream => Self::DeleteStream,
2114 Operation::GetStreamConfig => Self::GetStreamConfig,
2115 Operation::ReconfigureStream => Self::ReconfigureStream,
2116 Operation::CheckTail => Self::CheckTail,
2117 Operation::Append => Self::Append,
2118 Operation::Read => Self::Read,
2119 Operation::Trim => Self::Trim,
2120 Operation::Fence => Self::Fence,
2121 }
2122 }
2123}
2124
2125impl From<api::Operation> for Option<Operation> {
2126 fn from(value: api::Operation) -> Self {
2127 match value {
2128 api::Operation::Unspecified => None,
2129 api::Operation::ListBasins => Some(Operation::ListBasins),
2130 api::Operation::CreateBasin => Some(Operation::CreateBasin),
2131 api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2132 api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2133 api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2134 api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2135 api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2136 api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2137 api::Operation::ListStreams => Some(Operation::ListStreams),
2138 api::Operation::CreateStream => Some(Operation::CreateStream),
2139 api::Operation::DeleteStream => Some(Operation::DeleteStream),
2140 api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2141 api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2142 api::Operation::CheckTail => Some(Operation::CheckTail),
2143 api::Operation::Append => Some(Operation::Append),
2144 api::Operation::Read => Some(Operation::Read),
2145 api::Operation::Trim => Some(Operation::Trim),
2146 api::Operation::Fence => Some(Operation::Fence),
2147 }
2148 }
2149}
2150
2151#[sync_docs]
2152#[derive(Debug, Clone, Default)]
2153pub struct AccessTokenScope {
2154 pub basins: Option<ResourceSet>,
2155 pub streams: Option<ResourceSet>,
2156 pub access_tokens: Option<ResourceSet>,
2157 pub op_groups: Option<PermittedOperationGroups>,
2158 pub ops: HashSet<Operation>,
2159}
2160
2161impl AccessTokenScope {
2162 pub fn new() -> Self {
2164 Self::default()
2165 }
2166
2167 pub fn with_basins(self, basins: ResourceSet) -> Self {
2169 Self {
2170 basins: Some(basins),
2171 ..self
2172 }
2173 }
2174
2175 pub fn with_streams(self, streams: ResourceSet) -> Self {
2177 Self {
2178 streams: Some(streams),
2179 ..self
2180 }
2181 }
2182
2183 pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2185 Self {
2186 access_tokens: Some(access_tokens),
2187 ..self
2188 }
2189 }
2190
2191 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2193 Self {
2194 op_groups: Some(op_groups),
2195 ..self
2196 }
2197 }
2198
2199 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2201 Self {
2202 ops: ops.into_iter().collect(),
2203 ..self
2204 }
2205 }
2206
2207 pub fn with_op(self, op: Operation) -> Self {
2209 let mut ops = self.ops;
2210 ops.insert(op);
2211 Self { ops, ..self }
2212 }
2213}
2214
2215impl From<AccessTokenScope> for api::AccessTokenScope {
2216 fn from(value: AccessTokenScope) -> Self {
2217 let AccessTokenScope {
2218 basins,
2219 streams,
2220 access_tokens,
2221 op_groups,
2222 ops,
2223 } = value;
2224 Self {
2225 basins: basins.map(Into::into),
2226 streams: streams.map(Into::into),
2227 access_tokens: access_tokens.map(Into::into),
2228 op_groups: op_groups.map(Into::into),
2229 ops: ops
2230 .into_iter()
2231 .map(api::Operation::from)
2232 .map(Into::into)
2233 .collect(),
2234 }
2235 }
2236}
2237
2238impl From<api::AccessTokenScope> for AccessTokenScope {
2239 fn from(value: api::AccessTokenScope) -> Self {
2240 let api::AccessTokenScope {
2241 basins,
2242 streams,
2243 access_tokens,
2244 op_groups,
2245 ops,
2246 } = value;
2247 Self {
2248 basins: basins.and_then(|set| set.matching.map(Into::into)),
2249 streams: streams.and_then(|set| set.matching.map(Into::into)),
2250 access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2251 op_groups: op_groups.map(Into::into),
2252 ops: ops
2253 .into_iter()
2254 .map(api::Operation::try_from)
2255 .flat_map(Result::ok)
2256 .flat_map(<Option<Operation>>::from)
2257 .collect(),
2258 }
2259 }
2260}
2261
2262impl From<ResourceSet> for api::ResourceSet {
2263 fn from(value: ResourceSet) -> Self {
2264 Self {
2265 matching: Some(value.into()),
2266 }
2267 }
2268}
2269
2270#[sync_docs(ResourceSet = "Matching")]
2271#[derive(Debug, Clone)]
2272pub enum ResourceSet {
2273 Exact(String),
2274 Prefix(String),
2275}
2276
2277impl From<ResourceSet> for api::resource_set::Matching {
2278 fn from(value: ResourceSet) -> Self {
2279 match value {
2280 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2281 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2282 }
2283 }
2284}
2285
2286impl From<api::resource_set::Matching> for ResourceSet {
2287 fn from(value: api::resource_set::Matching) -> Self {
2288 match value {
2289 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2290 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2291 }
2292 }
2293}
2294
2295#[sync_docs]
2296#[derive(Debug, Clone, Default)]
2297pub struct PermittedOperationGroups {
2298 pub account: Option<ReadWritePermissions>,
2299 pub basin: Option<ReadWritePermissions>,
2300 pub stream: Option<ReadWritePermissions>,
2301}
2302
2303impl PermittedOperationGroups {
2304 pub fn new() -> Self {
2306 Self::default()
2307 }
2308
2309 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2311 Self {
2312 account: Some(account),
2313 ..self
2314 }
2315 }
2316
2317 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2319 Self {
2320 basin: Some(basin),
2321 ..self
2322 }
2323 }
2324
2325 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2327 Self {
2328 stream: Some(stream),
2329 ..self
2330 }
2331 }
2332}
2333
2334impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2335 fn from(value: PermittedOperationGroups) -> Self {
2336 let PermittedOperationGroups {
2337 account,
2338 basin,
2339 stream,
2340 } = value;
2341 Self {
2342 account: account.map(Into::into),
2343 basin: basin.map(Into::into),
2344 stream: stream.map(Into::into),
2345 }
2346 }
2347}
2348
2349impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2350 fn from(value: api::PermittedOperationGroups) -> Self {
2351 let api::PermittedOperationGroups {
2352 account,
2353 basin,
2354 stream,
2355 } = value;
2356 Self {
2357 account: account.map(Into::into),
2358 basin: basin.map(Into::into),
2359 stream: stream.map(Into::into),
2360 }
2361 }
2362}
2363
2364#[sync_docs]
2365#[derive(Debug, Clone, Default)]
2366pub struct ReadWritePermissions {
2367 pub read: bool,
2368 pub write: bool,
2369}
2370
2371impl ReadWritePermissions {
2372 pub fn new() -> Self {
2374 Self::default()
2375 }
2376
2377 pub fn with_read(self, read: bool) -> Self {
2379 Self { read, ..self }
2380 }
2381
2382 pub fn with_write(self, write: bool) -> Self {
2384 Self { write, ..self }
2385 }
2386}
2387
2388impl From<ReadWritePermissions> for api::ReadWritePermissions {
2389 fn from(value: ReadWritePermissions) -> Self {
2390 let ReadWritePermissions { read, write } = value;
2391 Self { read, write }
2392 }
2393}
2394
2395impl From<api::ReadWritePermissions> for ReadWritePermissions {
2396 fn from(value: api::ReadWritePermissions) -> Self {
2397 let api::ReadWritePermissions { read, write } = value;
2398 Self { read, write }
2399 }
2400}
2401
2402impl From<api::IssueAccessTokenResponse> for String {
2403 fn from(value: api::IssueAccessTokenResponse) -> Self {
2404 value.access_token
2405 }
2406}
2407
2408impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2409 fn from(value: AccessTokenId) -> Self {
2410 Self { id: value.into() }
2411 }
2412}
2413
2414impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2415 type Error = ConvertError;
2416 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2417 let token_info = value.info.ok_or("access token info is missing")?;
2418 token_info.try_into()
2419 }
2420}
2421
2422#[sync_docs]
2423#[derive(Debug, Clone, Default)]
2424pub struct ListAccessTokensRequest {
2425 pub prefix: String,
2426 pub start_after: String,
2427 pub limit: Option<usize>,
2428}
2429
2430impl ListAccessTokensRequest {
2431 pub fn new() -> Self {
2433 Self::default()
2434 }
2435
2436 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2438 Self {
2439 prefix: prefix.into(),
2440 ..self
2441 }
2442 }
2443
2444 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2446 Self {
2447 start_after: start_after.into(),
2448 ..self
2449 }
2450 }
2451
2452 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2454 Self {
2455 limit: limit.into(),
2456 ..self
2457 }
2458 }
2459}
2460
2461impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2462 type Error = ConvertError;
2463 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2464 let ListAccessTokensRequest {
2465 prefix,
2466 start_after,
2467 limit,
2468 } = value;
2469 Ok(Self {
2470 prefix,
2471 start_after,
2472 limit: limit
2473 .map(TryInto::try_into)
2474 .transpose()
2475 .map_err(|_| "request limit does not fit into u64 bounds")?,
2476 })
2477 }
2478}
2479
2480#[sync_docs]
2481#[derive(Debug, Clone)]
2482pub struct ListAccessTokensResponse {
2483 pub access_tokens: Vec<AccessTokenInfo>,
2484 pub has_more: bool,
2485}
2486
2487impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2488 type Error = ConvertError;
2489 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2490 let api::ListAccessTokensResponse {
2491 access_tokens,
2492 has_more,
2493 } = value;
2494 let access_tokens = access_tokens
2495 .into_iter()
2496 .map(TryInto::try_into)
2497 .collect::<Result<Vec<_>, _>>()?;
2498 Ok(Self {
2499 access_tokens,
2500 has_more,
2501 })
2502 }
2503}