1use std::{
4 collections::HashSet,
5 ops::{Deref, RangeTo},
6 str::FromStr,
7 sync::OnceLock,
8 time::Duration,
9};
10
11use bytes::Bytes;
12use rand::Rng;
13use regex::Regex;
14use sync_docs::sync_docs;
15
16use crate::api;
17
18pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
19pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
20
21#[derive(Debug, Clone, thiserror::Error)]
23#[error("{0}")]
24pub struct ConvertError(String);
25
26impl<T: Into<String>> From<T> for ConvertError {
27 fn from(value: T) -> Self {
28 Self(value.into())
29 }
30}
31
32pub trait MeteredBytes {
40 fn metered_bytes(&self) -> u64;
42}
43
44impl<T: MeteredBytes> MeteredBytes for Vec<T> {
45 fn metered_bytes(&self) -> u64 {
46 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
47 }
48}
49
50macro_rules! metered_impl {
51 ($ty:ty) => {
52 impl MeteredBytes for $ty {
53 fn metered_bytes(&self) -> u64 {
54 let bytes = 8
55 + (2 * self.headers.len())
56 + self
57 .headers
58 .iter()
59 .map(|h| h.name.len() + h.value.len())
60 .sum::<usize>()
61 + self.body.len();
62 bytes as u64
63 }
64 }
65 };
66}
67
68#[sync_docs]
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BasinScope {
71 AwsUsEast1,
72}
73
74impl From<BasinScope> for api::BasinScope {
75 fn from(value: BasinScope) -> Self {
76 match value {
77 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
78 }
79 }
80}
81
82impl From<api::BasinScope> for Option<BasinScope> {
83 fn from(value: api::BasinScope) -> Self {
84 match value {
85 api::BasinScope::Unspecified => None,
86 api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
87 }
88 }
89}
90
91impl FromStr for BasinScope {
92 type Err = ConvertError;
93 fn from_str(value: &str) -> Result<Self, Self::Err> {
94 match value {
95 "aws:us-east-1" => Ok(Self::AwsUsEast1),
96 _ => Err("invalid basin scope value".into()),
97 }
98 }
99}
100
101#[sync_docs]
102#[derive(Debug, Clone)]
103pub struct CreateBasinRequest {
104 pub basin: BasinName,
105 pub config: Option<BasinConfig>,
106 pub scope: Option<BasinScope>,
107}
108
109impl CreateBasinRequest {
110 pub fn new(basin: BasinName) -> Self {
112 Self {
113 basin,
114 config: None,
115 scope: None,
116 }
117 }
118
119 pub fn with_config(self, config: BasinConfig) -> Self {
121 Self {
122 config: Some(config),
123 ..self
124 }
125 }
126
127 pub fn with_scope(self, scope: BasinScope) -> Self {
129 Self {
130 scope: Some(scope),
131 ..self
132 }
133 }
134}
135
136impl From<CreateBasinRequest> for api::CreateBasinRequest {
137 fn from(value: CreateBasinRequest) -> Self {
138 let CreateBasinRequest {
139 basin,
140 config,
141 scope,
142 } = value;
143 Self {
144 basin: basin.0,
145 config: config.map(Into::into),
146 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
147 }
148 }
149}
150
151#[sync_docs]
152#[derive(Debug, Clone, Default)]
153pub struct BasinConfig {
154 pub default_stream_config: Option<StreamConfig>,
155 pub create_stream_on_append: bool,
156 pub create_stream_on_read: bool,
157}
158
159impl BasinConfig {
160 pub fn new() -> Self {
162 Self::default()
163 }
164
165 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
167 Self {
168 default_stream_config: Some(default_stream_config),
169 ..self
170 }
171 }
172
173 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
175 Self {
176 create_stream_on_append,
177 ..self
178 }
179 }
180
181 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
183 Self {
184 create_stream_on_read,
185 ..self
186 }
187 }
188}
189
190impl From<BasinConfig> for api::BasinConfig {
191 fn from(value: BasinConfig) -> Self {
192 let BasinConfig {
193 default_stream_config,
194 create_stream_on_append,
195 create_stream_on_read,
196 } = value;
197 Self {
198 default_stream_config: default_stream_config.map(Into::into),
199 create_stream_on_append,
200 create_stream_on_read,
201 }
202 }
203}
204
205impl From<api::BasinConfig> for BasinConfig {
206 fn from(value: api::BasinConfig) -> Self {
207 let api::BasinConfig {
208 default_stream_config,
209 create_stream_on_append,
210 create_stream_on_read,
211 } = value;
212 Self {
213 default_stream_config: default_stream_config.map(Into::into),
214 create_stream_on_append,
215 create_stream_on_read,
216 }
217 }
218}
219
220#[sync_docs]
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TimestampingMode {
223 ClientPrefer,
224 ClientRequire,
225 Arrival,
226}
227
228impl From<TimestampingMode> for api::TimestampingMode {
229 fn from(value: TimestampingMode) -> Self {
230 match value {
231 TimestampingMode::ClientPrefer => Self::ClientPrefer,
232 TimestampingMode::ClientRequire => Self::ClientRequire,
233 TimestampingMode::Arrival => Self::Arrival,
234 }
235 }
236}
237
238impl From<api::TimestampingMode> for Option<TimestampingMode> {
239 fn from(value: api::TimestampingMode) -> Self {
240 match value {
241 api::TimestampingMode::Unspecified => None,
242 api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
243 api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
244 api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
245 }
246 }
247}
248
249#[sync_docs(TimestampingConfig = "Timestamping")]
250#[derive(Debug, Clone, Default)]
251pub struct TimestampingConfig {
253 pub mode: Option<TimestampingMode>,
254 pub uncapped: Option<bool>,
255}
256
257impl TimestampingConfig {
258 pub fn new() -> Self {
260 Self::default()
261 }
262
263 pub fn with_mode(self, mode: TimestampingMode) -> Self {
265 Self {
266 mode: Some(mode),
267 ..self
268 }
269 }
270
271 pub fn with_uncapped(self, uncapped: bool) -> Self {
273 Self {
274 uncapped: Some(uncapped),
275 ..self
276 }
277 }
278}
279
280impl From<TimestampingConfig> for api::stream_config::Timestamping {
281 fn from(value: TimestampingConfig) -> Self {
282 Self {
283 mode: value
284 .mode
285 .map(api::TimestampingMode::from)
286 .unwrap_or_default()
287 .into(),
288 uncapped: value.uncapped,
289 }
290 }
291}
292
293impl From<api::stream_config::Timestamping> for TimestampingConfig {
294 fn from(value: api::stream_config::Timestamping) -> Self {
295 let mode = value.mode().into();
296 let uncapped = value.uncapped;
297 Self { mode, uncapped }
298 }
299}
300
301#[sync_docs]
302#[derive(Debug, Clone, Default)]
303pub struct StreamConfig {
304 pub storage_class: Option<StorageClass>,
305 pub retention_policy: Option<RetentionPolicy>,
306 pub timestamping: Option<TimestampingConfig>,
307}
308
309impl StreamConfig {
310 pub fn new() -> Self {
312 Self::default()
313 }
314
315 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
317 Self {
318 storage_class: Some(storage_class),
319 ..self
320 }
321 }
322
323 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
325 Self {
326 retention_policy: Some(retention_policy),
327 ..self
328 }
329 }
330
331 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
333 Self {
334 timestamping: Some(timestamping),
335 ..self
336 }
337 }
338}
339
340impl From<StreamConfig> for api::StreamConfig {
341 fn from(value: StreamConfig) -> Self {
342 let StreamConfig {
343 storage_class,
344 retention_policy,
345 timestamping,
346 } = value;
347 Self {
348 storage_class: storage_class
349 .map(api::StorageClass::from)
350 .unwrap_or_default()
351 .into(),
352 retention_policy: retention_policy.map(Into::into),
353 timestamping: timestamping.map(Into::into),
354 }
355 }
356}
357
358impl From<api::StreamConfig> for StreamConfig {
359 fn from(value: api::StreamConfig) -> Self {
360 Self {
361 storage_class: value.storage_class().into(),
362 retention_policy: value.retention_policy.map(Into::into),
363 timestamping: value.timestamping.map(Into::into),
364 }
365 }
366}
367
368#[sync_docs]
369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
370pub enum StorageClass {
371 Standard,
372 Express,
373}
374
375impl From<StorageClass> for api::StorageClass {
376 fn from(value: StorageClass) -> Self {
377 match value {
378 StorageClass::Standard => Self::Standard,
379 StorageClass::Express => Self::Express,
380 }
381 }
382}
383
384impl From<api::StorageClass> for Option<StorageClass> {
385 fn from(value: api::StorageClass) -> Self {
386 match value {
387 api::StorageClass::Unspecified => None,
388 api::StorageClass::Standard => Some(StorageClass::Standard),
389 api::StorageClass::Express => Some(StorageClass::Express),
390 }
391 }
392}
393
394impl FromStr for StorageClass {
395 type Err = ConvertError;
396
397 fn from_str(value: &str) -> Result<Self, Self::Err> {
398 match value {
399 "standard" => Ok(Self::Standard),
400 "express" => Ok(Self::Express),
401 v => Err(format!("unknown storage class: {v}").into()),
402 }
403 }
404}
405
406#[sync_docs(Age = "Age")]
407#[derive(Debug, Clone)]
408pub enum RetentionPolicy {
409 Age(Duration),
410}
411
412impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
413 fn from(value: RetentionPolicy) -> Self {
414 match value {
415 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
416 }
417 }
418}
419
420impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
421 fn from(value: api::stream_config::RetentionPolicy) -> Self {
422 match value {
423 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
424 }
425 }
426}
427
428#[sync_docs]
429#[derive(Debug, Clone, Copy, PartialEq, Eq)]
430pub enum BasinState {
431 Active,
432 Creating,
433 Deleting,
434}
435
436impl From<BasinState> for api::BasinState {
437 fn from(value: BasinState) -> Self {
438 match value {
439 BasinState::Active => Self::Active,
440 BasinState::Creating => Self::Creating,
441 BasinState::Deleting => Self::Deleting,
442 }
443 }
444}
445
446impl From<api::BasinState> for Option<BasinState> {
447 fn from(value: api::BasinState) -> Self {
448 match value {
449 api::BasinState::Unspecified => None,
450 api::BasinState::Active => Some(BasinState::Active),
451 api::BasinState::Creating => Some(BasinState::Creating),
452 api::BasinState::Deleting => Some(BasinState::Deleting),
453 }
454 }
455}
456
457impl std::fmt::Display for BasinState {
458 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459 match self {
460 BasinState::Active => write!(f, "active"),
461 BasinState::Creating => write!(f, "creating"),
462 BasinState::Deleting => write!(f, "deleting"),
463 }
464 }
465}
466
467#[sync_docs]
468#[derive(Debug, Clone)]
469pub struct BasinInfo {
470 pub name: String,
471 pub scope: Option<BasinScope>,
472 pub state: Option<BasinState>,
473}
474
475impl From<BasinInfo> for api::BasinInfo {
476 fn from(value: BasinInfo) -> Self {
477 let BasinInfo { name, scope, state } = value;
478 Self {
479 name,
480 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
481 state: state.map(api::BasinState::from).unwrap_or_default().into(),
482 }
483 }
484}
485
486impl From<api::BasinInfo> for BasinInfo {
487 fn from(value: api::BasinInfo) -> Self {
488 let scope = value.scope().into();
489 let state = value.state().into();
490 let name = value.name;
491 Self { name, scope, state }
492 }
493}
494
495impl TryFrom<api::CreateBasinResponse> for BasinInfo {
496 type Error = ConvertError;
497 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
498 let api::CreateBasinResponse { info } = value;
499 let info = info.ok_or("missing basin info")?;
500 Ok(info.into())
501 }
502}
503
504#[sync_docs]
505#[derive(Debug, Clone, Default)]
506pub struct ListStreamsRequest {
507 pub prefix: String,
508 pub start_after: String,
509 pub limit: Option<usize>,
510}
511
512impl ListStreamsRequest {
513 pub fn new() -> Self {
515 Self::default()
516 }
517
518 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
520 Self {
521 prefix: prefix.into(),
522 ..self
523 }
524 }
525
526 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
528 Self {
529 start_after: start_after.into(),
530 ..self
531 }
532 }
533
534 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
536 Self {
537 limit: limit.into(),
538 ..self
539 }
540 }
541}
542
543impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
544 type Error = ConvertError;
545 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
546 let ListStreamsRequest {
547 prefix,
548 start_after,
549 limit,
550 } = value;
551 Ok(Self {
552 prefix,
553 start_after,
554 limit: limit.map(|n| n as u64),
555 })
556 }
557}
558
559#[sync_docs]
560#[derive(Debug, Clone)]
561pub struct StreamInfo {
562 pub name: String,
563 pub created_at: u32,
564 pub deleted_at: Option<u32>,
565}
566
567impl From<api::StreamInfo> for StreamInfo {
568 fn from(value: api::StreamInfo) -> Self {
569 Self {
570 name: value.name,
571 created_at: value.created_at,
572 deleted_at: value.deleted_at,
573 }
574 }
575}
576
577impl TryFrom<api::CreateStreamResponse> for StreamInfo {
578 type Error = ConvertError;
579
580 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
581 let api::CreateStreamResponse { info } = value;
582 let info = info.ok_or("missing stream info")?;
583 Ok(info.into())
584 }
585}
586
587#[sync_docs]
588#[derive(Debug, Clone)]
589pub struct ListStreamsResponse {
590 pub streams: Vec<StreamInfo>,
591 pub has_more: bool,
592}
593
594impl From<api::ListStreamsResponse> for ListStreamsResponse {
595 fn from(value: api::ListStreamsResponse) -> Self {
596 let api::ListStreamsResponse { streams, has_more } = value;
597 let streams = streams.into_iter().map(Into::into).collect();
598 Self { streams, has_more }
599 }
600}
601
602impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
603 type Error = ConvertError;
604
605 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
606 let api::GetBasinConfigResponse { config } = value;
607 let config = config.ok_or("missing basin config")?;
608 Ok(config.into())
609 }
610}
611
612impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
613 type Error = ConvertError;
614
615 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
616 let api::GetStreamConfigResponse { config } = value;
617 let config = config.ok_or("missing stream config")?;
618 Ok(config.into())
619 }
620}
621
622#[sync_docs]
623#[derive(Debug, Clone)]
624pub struct CreateStreamRequest {
625 pub stream: String,
626 pub config: Option<StreamConfig>,
627}
628
629impl CreateStreamRequest {
630 pub fn new(stream: impl Into<String>) -> Self {
632 Self {
633 stream: stream.into(),
634 config: None,
635 }
636 }
637
638 pub fn with_config(self, config: StreamConfig) -> Self {
640 Self {
641 config: Some(config),
642 ..self
643 }
644 }
645}
646
647impl From<CreateStreamRequest> for api::CreateStreamRequest {
648 fn from(value: CreateStreamRequest) -> Self {
649 let CreateStreamRequest { stream, config } = value;
650 Self {
651 stream,
652 config: config.map(Into::into),
653 }
654 }
655}
656
657#[sync_docs]
658#[derive(Debug, Clone, Default)]
659pub struct ListBasinsRequest {
660 pub prefix: String,
661 pub start_after: String,
662 pub limit: Option<usize>,
663}
664
665impl ListBasinsRequest {
666 pub fn new() -> Self {
668 Self::default()
669 }
670
671 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
673 Self {
674 prefix: prefix.into(),
675 ..self
676 }
677 }
678
679 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
681 Self {
682 start_after: start_after.into(),
683 ..self
684 }
685 }
686
687 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
689 Self {
690 limit: limit.into(),
691 ..self
692 }
693 }
694}
695
696impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
697 type Error = ConvertError;
698 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
699 let ListBasinsRequest {
700 prefix,
701 start_after,
702 limit,
703 } = value;
704 Ok(Self {
705 prefix,
706 start_after,
707 limit: limit
708 .map(TryInto::try_into)
709 .transpose()
710 .map_err(|_| "request limit does not fit into u64 bounds")?,
711 })
712 }
713}
714
715#[sync_docs]
716#[derive(Debug, Clone)]
717pub struct ListBasinsResponse {
718 pub basins: Vec<BasinInfo>,
719 pub has_more: bool,
720}
721
722impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
723 type Error = ConvertError;
724 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
725 let api::ListBasinsResponse { basins, has_more } = value;
726 Ok(Self {
727 basins: basins.into_iter().map(Into::into).collect(),
728 has_more,
729 })
730 }
731}
732
733#[sync_docs]
734#[derive(Debug, Clone)]
735pub struct DeleteBasinRequest {
736 pub basin: BasinName,
737 pub if_exists: bool,
739}
740
741impl DeleteBasinRequest {
742 pub fn new(basin: BasinName) -> Self {
744 Self {
745 basin,
746 if_exists: false,
747 }
748 }
749
750 pub fn with_if_exists(self, if_exists: bool) -> Self {
752 Self { if_exists, ..self }
753 }
754}
755
756impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
757 fn from(value: DeleteBasinRequest) -> Self {
758 let DeleteBasinRequest { basin, .. } = value;
759 Self { basin: basin.0 }
760 }
761}
762
763#[sync_docs]
764#[derive(Debug, Clone)]
765pub struct DeleteStreamRequest {
766 pub stream: String,
767 pub if_exists: bool,
769}
770
771impl DeleteStreamRequest {
772 pub fn new(stream: impl Into<String>) -> Self {
774 Self {
775 stream: stream.into(),
776 if_exists: false,
777 }
778 }
779
780 pub fn with_if_exists(self, if_exists: bool) -> Self {
782 Self { if_exists, ..self }
783 }
784}
785
786impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
787 fn from(value: DeleteStreamRequest) -> Self {
788 let DeleteStreamRequest { stream, .. } = value;
789 Self { stream }
790 }
791}
792
793#[sync_docs]
794#[derive(Debug, Clone)]
795pub struct ReconfigureBasinRequest {
796 pub basin: BasinName,
797 pub config: Option<BasinConfig>,
798 pub mask: Option<Vec<String>>,
799}
800
801impl ReconfigureBasinRequest {
802 pub fn new(basin: BasinName) -> Self {
804 Self {
805 basin,
806 config: None,
807 mask: None,
808 }
809 }
810
811 pub fn with_config(self, config: BasinConfig) -> Self {
813 Self {
814 config: Some(config),
815 ..self
816 }
817 }
818
819 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
821 Self {
822 mask: Some(mask.into()),
823 ..self
824 }
825 }
826}
827
828impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
829 fn from(value: ReconfigureBasinRequest) -> Self {
830 let ReconfigureBasinRequest {
831 basin,
832 config,
833 mask,
834 } = value;
835 Self {
836 basin: basin.0,
837 config: config.map(Into::into),
838 mask: mask.map(|paths| prost_types::FieldMask { paths }),
839 }
840 }
841}
842
843impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
844 type Error = ConvertError;
845 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
846 let api::ReconfigureBasinResponse { config } = value;
847 let config = config.ok_or("missing basin config")?;
848 Ok(config.into())
849 }
850}
851
852#[sync_docs]
853#[derive(Debug, Clone)]
854pub struct ReconfigureStreamRequest {
855 pub stream: String,
856 pub config: Option<StreamConfig>,
857 pub mask: Option<Vec<String>>,
858}
859
860impl ReconfigureStreamRequest {
861 pub fn new(stream: impl Into<String>) -> Self {
863 Self {
864 stream: stream.into(),
865 config: None,
866 mask: None,
867 }
868 }
869
870 pub fn with_config(self, config: StreamConfig) -> Self {
872 Self {
873 config: Some(config),
874 ..self
875 }
876 }
877
878 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
880 Self {
881 mask: Some(mask.into()),
882 ..self
883 }
884 }
885}
886
887impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
888 fn from(value: ReconfigureStreamRequest) -> Self {
889 let ReconfigureStreamRequest {
890 stream,
891 config,
892 mask,
893 } = value;
894 Self {
895 stream,
896 config: config.map(Into::into),
897 mask: mask.map(|paths| prost_types::FieldMask { paths }),
898 }
899 }
900}
901
902impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
903 type Error = ConvertError;
904 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
905 let api::ReconfigureStreamResponse { config } = value;
906 let config = config.ok_or("missing stream config")?;
907 Ok(config.into())
908 }
909}
910
911impl From<api::CheckTailResponse> for StreamPosition {
912 fn from(value: api::CheckTailResponse) -> Self {
913 let api::CheckTailResponse {
914 next_seq_num,
915 last_timestamp,
916 } = value;
917 StreamPosition {
918 seq_num: next_seq_num,
919 timestamp: last_timestamp,
920 }
921 }
922}
923
924#[derive(Debug, Clone, PartialEq, Eq)]
926pub struct StreamPosition {
927 pub seq_num: u64,
929 pub timestamp: u64,
932}
933
934#[sync_docs]
935#[derive(Debug, Clone, PartialEq, Eq)]
936pub struct Header {
937 pub name: Bytes,
938 pub value: Bytes,
939}
940
941impl Header {
942 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
944 Self {
945 name: name.into(),
946 value: value.into(),
947 }
948 }
949
950 pub fn from_value(value: impl Into<Bytes>) -> Self {
952 Self {
953 name: Bytes::new(),
954 value: value.into(),
955 }
956 }
957}
958
959impl From<Header> for api::Header {
960 fn from(value: Header) -> Self {
961 let Header { name, value } = value;
962 Self { name, value }
963 }
964}
965
966impl From<api::Header> for Header {
967 fn from(value: api::Header) -> Self {
968 let api::Header { name, value } = value;
969 Self { name, value }
970 }
971}
972
973#[derive(Debug, Clone, Default, PartialEq, Eq)]
977pub struct FencingToken(String);
978
979impl FencingToken {
980 const MAX_BYTES: usize = 36;
981
982 pub fn generate(n: usize) -> Result<Self, ConvertError> {
984 rand::rng()
985 .sample_iter(&rand::distr::Alphanumeric)
986 .take(n)
987 .map(char::from)
988 .collect::<String>()
989 .parse()
990 }
991}
992
993impl Deref for FencingToken {
994 type Target = str;
995
996 fn deref(&self) -> &Self::Target {
997 &self.0
998 }
999}
1000
1001impl std::fmt::Display for FencingToken {
1002 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1003 write!(f, "{}", self.0)
1004 }
1005}
1006
1007impl FromStr for FencingToken {
1008 type Err = ConvertError;
1009
1010 fn from_str(value: &str) -> Result<Self, Self::Err> {
1011 value.to_string().try_into()
1012 }
1013}
1014
1015impl TryFrom<String> for FencingToken {
1016 type Error = ConvertError;
1017
1018 fn try_from(value: String) -> Result<Self, Self::Error> {
1019 if value.len() > Self::MAX_BYTES {
1020 Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1021 } else {
1022 Ok(Self(value))
1023 }
1024 }
1025}
1026
1027impl From<FencingToken> for String {
1028 fn from(value: FencingToken) -> Self {
1029 value.0
1030 }
1031}
1032
1033#[derive(Debug, Clone)]
1035pub enum Command {
1036 Fence {
1041 fencing_token: FencingToken,
1045 },
1046 Trim {
1051 seq_num: u64,
1056 },
1057}
1058
1059#[derive(Debug, Clone)]
1065pub struct CommandRecord {
1066 pub command: Command,
1068 pub timestamp: Option<u64>,
1070}
1071
1072impl CommandRecord {
1073 const FENCE: &[u8] = b"fence";
1074 const TRIM: &[u8] = b"trim";
1075
1076 pub fn fence(fencing_token: FencingToken) -> Self {
1078 Self {
1079 command: Command::Fence { fencing_token },
1080 timestamp: None,
1081 }
1082 }
1083
1084 pub fn trim(seq_num: impl Into<u64>) -> Self {
1086 Self {
1087 command: Command::Trim {
1088 seq_num: seq_num.into(),
1089 },
1090 timestamp: None,
1091 }
1092 }
1093
1094 pub fn with_timestamp(self, timestamp: u64) -> Self {
1096 Self {
1097 timestamp: Some(timestamp),
1098 ..self
1099 }
1100 }
1101}
1102
1103#[sync_docs]
1104#[derive(Debug, Clone, PartialEq, Eq)]
1105pub struct AppendRecord {
1106 timestamp: Option<u64>,
1107 headers: Vec<Header>,
1108 body: Bytes,
1109 #[cfg(test)]
1110 max_bytes: u64,
1111}
1112
1113metered_impl!(AppendRecord);
1114
1115impl AppendRecord {
1116 const MAX_BYTES: u64 = MIB_BYTES;
1117
1118 fn validated(self) -> Result<Self, ConvertError> {
1119 #[cfg(test)]
1120 let max_bytes = self.max_bytes;
1121 #[cfg(not(test))]
1122 let max_bytes = Self::MAX_BYTES;
1123
1124 if self.metered_bytes() > max_bytes {
1125 Err("AppendRecord should have metered size less than 1 MiB".into())
1126 } else {
1127 Ok(self)
1128 }
1129 }
1130
1131 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1133 Self {
1134 timestamp: None,
1135 headers: Vec::new(),
1136 body: body.into(),
1137 #[cfg(test)]
1138 max_bytes: Self::MAX_BYTES,
1139 }
1140 .validated()
1141 }
1142
1143 #[cfg(test)]
1144 pub(crate) fn with_max_bytes(
1145 max_bytes: u64,
1146 body: impl Into<Bytes>,
1147 ) -> Result<Self, ConvertError> {
1148 Self {
1149 timestamp: None,
1150 headers: Vec::new(),
1151 body: body.into(),
1152 max_bytes,
1153 }
1154 .validated()
1155 }
1156
1157 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1159 Self {
1160 headers: headers.into(),
1161 ..self
1162 }
1163 .validated()
1164 }
1165
1166 pub fn with_timestamp(self, timestamp: u64) -> Self {
1168 Self {
1169 timestamp: Some(timestamp),
1170 ..self
1171 }
1172 }
1173
1174 pub fn body(&self) -> &[u8] {
1176 &self.body
1177 }
1178
1179 pub fn headers(&self) -> &[Header] {
1181 &self.headers
1182 }
1183
1184 pub fn timestamp(&self) -> Option<u64> {
1186 self.timestamp
1187 }
1188
1189 pub fn into_parts(self) -> AppendRecordParts {
1191 AppendRecordParts {
1192 timestamp: self.timestamp,
1193 headers: self.headers,
1194 body: self.body,
1195 }
1196 }
1197
1198 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1200 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1201 if let Some(timestamp) = parts.timestamp {
1202 Ok(record.with_timestamp(timestamp))
1203 } else {
1204 Ok(record)
1205 }
1206 }
1207}
1208
1209impl From<AppendRecord> for api::AppendRecord {
1210 fn from(value: AppendRecord) -> Self {
1211 Self {
1212 timestamp: value.timestamp,
1213 headers: value.headers.into_iter().map(Into::into).collect(),
1214 body: value.body,
1215 }
1216 }
1217}
1218
1219impl From<CommandRecord> for AppendRecord {
1220 fn from(value: CommandRecord) -> Self {
1221 let (header_value, body) = match value.command {
1222 Command::Fence { fencing_token } => (
1223 CommandRecord::FENCE,
1224 Bytes::copy_from_slice(fencing_token.as_bytes()),
1225 ),
1226 Command::Trim { seq_num } => (
1227 CommandRecord::TRIM,
1228 Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1229 ),
1230 };
1231 AppendRecordParts {
1232 timestamp: value.timestamp,
1233 headers: vec![Header::from_value(header_value)],
1234 body,
1235 }
1236 .try_into()
1237 .expect("command record is a valid append record")
1238 }
1239}
1240
1241#[sync_docs(AppendRecordParts = "AppendRecord")]
1242#[derive(Debug, Clone)]
1243pub struct AppendRecordParts {
1244 pub timestamp: Option<u64>,
1245 pub headers: Vec<Header>,
1246 pub body: Bytes,
1247}
1248
1249impl From<AppendRecord> for AppendRecordParts {
1250 fn from(value: AppendRecord) -> Self {
1251 value.into_parts()
1252 }
1253}
1254
1255impl TryFrom<AppendRecordParts> for AppendRecord {
1256 type Error = ConvertError;
1257
1258 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1259 Self::try_from_parts(value)
1260 }
1261}
1262
1263#[derive(Debug, Clone)]
1265pub struct AppendRecordBatch {
1266 records: Vec<AppendRecord>,
1267 metered_bytes: u64,
1268 max_capacity: usize,
1269 #[cfg(test)]
1270 max_bytes: u64,
1271}
1272
1273impl PartialEq for AppendRecordBatch {
1274 fn eq(&self, other: &Self) -> bool {
1275 if self.records.eq(&other.records) {
1276 assert_eq!(self.metered_bytes, other.metered_bytes);
1277 true
1278 } else {
1279 false
1280 }
1281 }
1282}
1283
1284impl Eq for AppendRecordBatch {}
1285
1286impl Default for AppendRecordBatch {
1287 fn default() -> Self {
1288 Self::new()
1289 }
1290}
1291
1292impl AppendRecordBatch {
1293 pub const MAX_CAPACITY: usize = 1000;
1297
1298 pub const MAX_BYTES: u64 = MIB_BYTES;
1300
1301 pub fn new() -> Self {
1303 Self::with_max_capacity(Self::MAX_CAPACITY)
1304 }
1305
1306 pub fn with_max_capacity(max_capacity: usize) -> Self {
1310 assert!(
1311 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1312 "Batch capacity must be between 1 and 1000"
1313 );
1314
1315 Self {
1316 records: Vec::with_capacity(max_capacity),
1317 metered_bytes: 0,
1318 max_capacity,
1319 #[cfg(test)]
1320 max_bytes: Self::MAX_BYTES,
1321 }
1322 }
1323
1324 #[cfg(test)]
1325 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1326 #[cfg(test)]
1327 assert!(
1328 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1329 "Batch size must be between 1 byte and 1 MiB"
1330 );
1331
1332 Self {
1333 max_bytes,
1334 ..Self::with_max_capacity(max_capacity)
1335 }
1336 }
1337
1338 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1344 where
1345 R: Into<AppendRecord>,
1346 T: IntoIterator<Item = R>,
1347 {
1348 let mut records = Self::new();
1349 let mut pending = Vec::new();
1350
1351 let mut iter = iter.into_iter();
1352
1353 for record in iter.by_ref() {
1354 if let Err(record) = records.push(record) {
1355 pending.push(record);
1356 break;
1357 }
1358 }
1359
1360 if pending.is_empty() {
1361 Ok(records)
1362 } else {
1363 pending.extend(iter.map(Into::into));
1364 Err((records, pending))
1365 }
1366 }
1367
1368 pub fn is_empty(&self) -> bool {
1370 if self.records.is_empty() {
1371 assert_eq!(self.metered_bytes, 0);
1372 true
1373 } else {
1374 false
1375 }
1376 }
1377
1378 pub fn len(&self) -> usize {
1380 self.records.len()
1381 }
1382
1383 #[cfg(test)]
1384 fn max_bytes(&self) -> u64 {
1385 self.max_bytes
1386 }
1387
1388 #[cfg(not(test))]
1389 fn max_bytes(&self) -> u64 {
1390 Self::MAX_BYTES
1391 }
1392
1393 pub fn is_full(&self) -> bool {
1395 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1396 }
1397
1398 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1400 assert!(self.records.len() <= self.max_capacity);
1401 assert!(self.metered_bytes <= self.max_bytes());
1402
1403 let record = record.into();
1404 let record_size = record.metered_bytes();
1405 if self.records.len() >= self.max_capacity
1406 || self.metered_bytes + record_size > self.max_bytes()
1407 {
1408 Err(record)
1409 } else {
1410 self.records.push(record);
1411 self.metered_bytes += record_size;
1412 Ok(())
1413 }
1414 }
1415}
1416
1417impl MeteredBytes for AppendRecordBatch {
1418 fn metered_bytes(&self) -> u64 {
1419 self.metered_bytes
1420 }
1421}
1422
1423impl IntoIterator for AppendRecordBatch {
1424 type Item = AppendRecord;
1425 type IntoIter = std::vec::IntoIter<Self::Item>;
1426
1427 fn into_iter(self) -> Self::IntoIter {
1428 self.records.into_iter()
1429 }
1430}
1431
1432impl<'a> IntoIterator for &'a AppendRecordBatch {
1433 type Item = &'a AppendRecord;
1434 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1435
1436 fn into_iter(self) -> Self::IntoIter {
1437 self.records.iter()
1438 }
1439}
1440
1441impl AsRef<[AppendRecord]> for AppendRecordBatch {
1442 fn as_ref(&self) -> &[AppendRecord] {
1443 &self.records
1444 }
1445}
1446
1447#[sync_docs]
1448#[derive(Debug, Default, Clone)]
1449pub struct AppendInput {
1450 pub records: AppendRecordBatch,
1451 pub match_seq_num: Option<u64>,
1452 pub fencing_token: Option<FencingToken>,
1453}
1454
1455impl MeteredBytes for AppendInput {
1456 fn metered_bytes(&self) -> u64 {
1457 self.records.metered_bytes()
1458 }
1459}
1460
1461impl AppendInput {
1462 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1464 Self {
1465 records: records.into(),
1466 match_seq_num: None,
1467 fencing_token: None,
1468 }
1469 }
1470
1471 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1473 Self {
1474 match_seq_num: Some(match_seq_num.into()),
1475 ..self
1476 }
1477 }
1478
1479 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1481 Self {
1482 fencing_token: Some(fencing_token),
1483 ..self
1484 }
1485 }
1486
1487 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1488 let Self {
1489 records,
1490 match_seq_num,
1491 fencing_token,
1492 } = self;
1493
1494 api::AppendInput {
1495 stream: stream.into(),
1496 records: records.into_iter().map(Into::into).collect(),
1497 match_seq_num,
1498 fencing_token: fencing_token.map(|f| f.0),
1499 }
1500 }
1501}
1502
1503#[derive(Debug, Clone)]
1505pub struct AppendAck {
1506 pub start: StreamPosition,
1508 pub end: StreamPosition,
1513 pub tail: StreamPosition,
1517}
1518
1519impl From<api::AppendOutput> for AppendAck {
1520 fn from(value: api::AppendOutput) -> Self {
1521 let api::AppendOutput {
1522 start_seq_num,
1523 start_timestamp,
1524 end_seq_num,
1525 end_timestamp,
1526 next_seq_num,
1527 last_timestamp,
1528 } = value;
1529 let start = StreamPosition {
1530 seq_num: start_seq_num,
1531 timestamp: start_timestamp,
1532 };
1533 let end = StreamPosition {
1534 seq_num: end_seq_num,
1535 timestamp: end_timestamp,
1536 };
1537 let tail = StreamPosition {
1538 seq_num: next_seq_num,
1539 timestamp: last_timestamp,
1540 };
1541 Self { start, end, tail }
1542 }
1543}
1544
1545impl TryFrom<api::AppendResponse> for AppendAck {
1546 type Error = ConvertError;
1547 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1548 let api::AppendResponse { output } = value;
1549 let output = output.ok_or("missing append output")?;
1550 Ok(output.into())
1551 }
1552}
1553
1554impl TryFrom<api::AppendSessionResponse> for AppendAck {
1555 type Error = ConvertError;
1556 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1557 let api::AppendSessionResponse { output } = value;
1558 let output = output.ok_or("missing append output")?;
1559 Ok(output.into())
1560 }
1561}
1562
1563#[sync_docs]
1564#[derive(Debug, Clone, Default)]
1565pub struct ReadLimit {
1566 pub count: Option<u64>,
1567 pub bytes: Option<u64>,
1568}
1569
1570impl ReadLimit {
1571 pub fn new() -> Self {
1573 Self::default()
1574 }
1575
1576 pub fn with_count(self, count: u64) -> Self {
1578 Self {
1579 count: Some(count),
1580 ..self
1581 }
1582 }
1583
1584 pub fn with_bytes(self, bytes: u64) -> Self {
1586 Self {
1587 bytes: Some(bytes),
1588 ..self
1589 }
1590 }
1591}
1592
1593#[derive(Debug, Clone)]
1595pub enum ReadStart {
1596 SeqNum(u64),
1598 Timestamp(u64),
1600 TailOffset(u64),
1602}
1603
1604impl Default for ReadStart {
1605 fn default() -> Self {
1606 Self::SeqNum(0)
1607 }
1608}
1609
1610impl From<ReadStart> for api::read_request::Start {
1611 fn from(start: ReadStart) -> Self {
1612 match start {
1613 ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1614 ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1615 ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1616 }
1617 }
1618}
1619
1620impl From<ReadStart> for api::read_session_request::Start {
1621 fn from(start: ReadStart) -> Self {
1622 match start {
1623 ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1624 ReadStart::Timestamp(timestamp) => {
1625 api::read_session_request::Start::Timestamp(timestamp)
1626 }
1627 ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1628 }
1629 }
1630}
1631
1632#[sync_docs]
1633#[derive(Debug, Clone, Default)]
1634pub struct ReadRequest {
1635 pub start: ReadStart,
1636 pub limit: ReadLimit,
1637 pub until: Option<RangeTo<u64>>,
1638}
1639
1640impl ReadRequest {
1641 pub fn new(start: ReadStart) -> Self {
1643 Self {
1644 start,
1645 ..Default::default()
1646 }
1647 }
1648
1649 pub fn with_limit(self, limit: ReadLimit) -> Self {
1651 Self { limit, ..self }
1652 }
1653
1654 pub fn with_until(self, until: RangeTo<u64>) -> Self {
1656 Self {
1657 until: Some(until),
1658 ..self
1659 }
1660 }
1661}
1662
1663impl ReadRequest {
1664 pub(crate) fn try_into_api_type(
1665 self,
1666 stream: impl Into<String>,
1667 ) -> Result<api::ReadRequest, ConvertError> {
1668 let Self {
1669 start,
1670 limit,
1671 until,
1672 } = self;
1673
1674 let limit = if limit.count > Some(1000) {
1675 Err("read limit: count must not exceed 1000 for unary request")
1676 } else if limit.bytes > Some(MIB_BYTES) {
1677 Err("read limit: bytes must not exceed 1MiB for unary request")
1678 } else {
1679 Ok(api::ReadLimit {
1680 count: limit.count,
1681 bytes: limit.bytes,
1682 })
1683 }?;
1684
1685 Ok(api::ReadRequest {
1686 stream: stream.into(),
1687 start: Some(start.into()),
1688 limit: Some(limit),
1689 until: until.map(|range| range.end),
1690 })
1691 }
1692}
1693
1694#[sync_docs]
1695#[derive(Debug, Clone)]
1696pub struct SequencedRecord {
1697 pub seq_num: u64,
1698 pub timestamp: u64,
1699 pub headers: Vec<Header>,
1700 pub body: Bytes,
1701}
1702
1703metered_impl!(SequencedRecord);
1704
1705impl From<api::SequencedRecord> for SequencedRecord {
1706 fn from(value: api::SequencedRecord) -> Self {
1707 let api::SequencedRecord {
1708 seq_num,
1709 timestamp,
1710 headers,
1711 body,
1712 } = value;
1713 Self {
1714 seq_num,
1715 timestamp,
1716 headers: headers.into_iter().map(Into::into).collect(),
1717 body,
1718 }
1719 }
1720}
1721
1722impl SequencedRecord {
1723 pub fn as_command_record(&self) -> Option<CommandRecord> {
1725 if self.headers.len() != 1 {
1726 return None;
1727 }
1728
1729 let header = self.headers.first().expect("pre-validated length");
1730
1731 if !header.name.is_empty() {
1732 return None;
1733 }
1734
1735 match header.value.as_ref() {
1736 CommandRecord::FENCE => {
1737 let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1738 Some(CommandRecord {
1739 command: Command::Fence { fencing_token },
1740 timestamp: Some(self.timestamp),
1741 })
1742 }
1743 CommandRecord::TRIM => {
1744 let body: &[u8] = &self.body;
1745 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1746 Some(CommandRecord {
1747 command: Command::Trim { seq_num },
1748 timestamp: Some(self.timestamp),
1749 })
1750 }
1751 _ => None,
1752 }
1753 }
1754}
1755
1756#[sync_docs]
1757#[derive(Debug, Clone)]
1758pub struct SequencedRecordBatch {
1759 pub records: Vec<SequencedRecord>,
1760}
1761
1762impl MeteredBytes for SequencedRecordBatch {
1763 fn metered_bytes(&self) -> u64 {
1764 self.records.metered_bytes()
1765 }
1766}
1767
1768impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1769 fn from(value: api::SequencedRecordBatch) -> Self {
1770 let api::SequencedRecordBatch { records } = value;
1771 Self {
1772 records: records.into_iter().map(Into::into).collect(),
1773 }
1774 }
1775}
1776
1777#[sync_docs(ReadOutput = "Output")]
1778#[derive(Debug, Clone)]
1779pub enum ReadOutput {
1780 Batch(SequencedRecordBatch),
1781 NextSeqNum(u64),
1782}
1783
1784impl From<api::read_output::Output> for ReadOutput {
1785 fn from(value: api::read_output::Output) -> Self {
1786 match value {
1787 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1788 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1789 }
1790 }
1791}
1792
1793impl TryFrom<api::ReadOutput> for ReadOutput {
1794 type Error = ConvertError;
1795 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1796 let api::ReadOutput { output } = value;
1797 let output = output.ok_or("missing read output")?;
1798 Ok(output.into())
1799 }
1800}
1801
1802impl TryFrom<api::ReadResponse> for ReadOutput {
1803 type Error = ConvertError;
1804 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1805 let api::ReadResponse { output } = value;
1806 let output = output.ok_or("missing output in read response")?;
1807 output.try_into()
1808 }
1809}
1810
1811#[sync_docs]
1812#[derive(Debug, Clone, Default)]
1813pub struct ReadSessionRequest {
1814 pub start: ReadStart,
1815 pub limit: ReadLimit,
1816 pub until: Option<RangeTo<u64>>,
1817}
1818
1819impl ReadSessionRequest {
1820 pub fn new(start: ReadStart) -> Self {
1822 Self {
1823 start,
1824 ..Default::default()
1825 }
1826 }
1827
1828 pub fn with_limit(self, limit: ReadLimit) -> Self {
1830 Self { limit, ..self }
1831 }
1832
1833 pub fn with_until(self, until: RangeTo<u64>) -> Self {
1835 Self {
1836 until: Some(until),
1837 ..self
1838 }
1839 }
1840
1841 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1842 let Self {
1843 start,
1844 limit,
1845 until,
1846 } = self;
1847 api::ReadSessionRequest {
1848 stream: stream.into(),
1849 start: Some(start.into()),
1850 limit: Some(api::ReadLimit {
1851 count: limit.count,
1852 bytes: limit.bytes,
1853 }),
1854 heartbeats: false,
1855 until: until.map(|range| range.end),
1856 }
1857 }
1858}
1859
1860impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1861 type Error = ConvertError;
1862 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1863 let api::ReadSessionResponse { output } = value;
1864 let output = output.ok_or("missing output in read session response")?;
1865 output.try_into()
1866 }
1867}
1868
1869#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1874pub struct BasinName(String);
1875
1876impl Deref for BasinName {
1877 type Target = str;
1878 fn deref(&self) -> &Self::Target {
1879 &self.0
1880 }
1881}
1882
1883impl TryFrom<String> for BasinName {
1884 type Error = ConvertError;
1885
1886 fn try_from(name: String) -> Result<Self, Self::Error> {
1887 if name.len() < 8 || name.len() > 48 {
1888 return Err("Basin name must be between 8 and 48 characters in length".into());
1889 }
1890
1891 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1892 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1893 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1894 .expect("Failed to compile basin name regex")
1895 });
1896
1897 if !regex.is_match(&name) {
1898 return Err(
1899 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1900 It cannot begin or end with a hyphen."
1901 .into(),
1902 );
1903 }
1904
1905 Ok(Self(name))
1906 }
1907}
1908
1909impl FromStr for BasinName {
1910 type Err = ConvertError;
1911
1912 fn from_str(s: &str) -> Result<Self, Self::Err> {
1913 s.to_string().try_into()
1914 }
1915}
1916
1917impl std::fmt::Display for BasinName {
1918 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1919 f.write_str(&self.0)
1920 }
1921}
1922
1923impl From<BasinName> for String {
1924 fn from(value: BasinName) -> Self {
1925 value.0
1926 }
1927}
1928
1929#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1932pub struct AccessTokenId(String);
1933
1934impl Deref for AccessTokenId {
1935 type Target = str;
1936
1937 fn deref(&self) -> &Self::Target {
1938 &self.0
1939 }
1940}
1941
1942impl TryFrom<String> for AccessTokenId {
1943 type Error = ConvertError;
1944
1945 fn try_from(name: String) -> Result<Self, Self::Error> {
1946 if name.is_empty() {
1947 return Err("Access token ID must not be empty".into());
1948 }
1949
1950 if name.len() > 96 {
1951 return Err("Access token ID must not exceed 96 characters".into());
1952 }
1953
1954 Ok(Self(name))
1955 }
1956}
1957
1958impl From<AccessTokenId> for String {
1959 fn from(value: AccessTokenId) -> Self {
1960 value.0
1961 }
1962}
1963
1964impl FromStr for AccessTokenId {
1965 type Err = ConvertError;
1966
1967 fn from_str(s: &str) -> Result<Self, Self::Err> {
1968 s.to_string().try_into()
1969 }
1970}
1971
1972impl std::fmt::Display for AccessTokenId {
1973 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1974 f.write_str(&self.0)
1975 }
1976}
1977
1978impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1979 fn from(value: AccessTokenInfo) -> Self {
1980 Self {
1981 info: Some(value.into()),
1982 }
1983 }
1984}
1985
1986#[sync_docs]
1987#[derive(Debug, Clone)]
1988pub struct AccessTokenInfo {
1989 pub id: AccessTokenId,
1990 pub expires_at: Option<u32>,
1991 pub auto_prefix_streams: bool,
1992 pub scope: Option<AccessTokenScope>,
1993}
1994
1995impl AccessTokenInfo {
1996 pub fn new(id: AccessTokenId) -> Self {
1998 Self {
1999 id,
2000 expires_at: None,
2001 auto_prefix_streams: false,
2002 scope: None,
2003 }
2004 }
2005
2006 pub fn with_expires_at(self, expires_at: u32) -> Self {
2008 Self {
2009 expires_at: Some(expires_at),
2010 ..self
2011 }
2012 }
2013
2014 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2016 Self {
2017 auto_prefix_streams,
2018 ..self
2019 }
2020 }
2021
2022 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2024 Self {
2025 scope: Some(scope),
2026 ..self
2027 }
2028 }
2029}
2030
2031impl From<AccessTokenInfo> for api::AccessTokenInfo {
2032 fn from(value: AccessTokenInfo) -> Self {
2033 let AccessTokenInfo {
2034 id,
2035 expires_at,
2036 auto_prefix_streams,
2037 scope,
2038 } = value;
2039 Self {
2040 id: id.into(),
2041 expires_at,
2042 auto_prefix_streams,
2043 scope: scope.map(Into::into),
2044 }
2045 }
2046}
2047
2048impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2049 type Error = ConvertError;
2050
2051 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2052 let api::AccessTokenInfo {
2053 id,
2054 expires_at,
2055 auto_prefix_streams,
2056 scope,
2057 } = value;
2058 Ok(Self {
2059 id: id.try_into()?,
2060 expires_at,
2061 auto_prefix_streams,
2062 scope: scope.map(Into::into),
2063 })
2064 }
2065}
2066
2067#[sync_docs]
2068#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2069pub enum Operation {
2070 ListBasins,
2071 CreateBasin,
2072 DeleteBasin,
2073 ReconfigureBasin,
2074 GetBasinConfig,
2075 IssueAccessToken,
2076 RevokeAccessToken,
2077 ListAccessTokens,
2078 ListStreams,
2079 CreateStream,
2080 DeleteStream,
2081 GetStreamConfig,
2082 ReconfigureStream,
2083 CheckTail,
2084 Append,
2085 Read,
2086 Trim,
2087 Fence,
2088 AccountMetrics,
2089 BasinMetrics,
2090 StreamMetrics,
2091}
2092
2093impl FromStr for Operation {
2094 type Err = ConvertError;
2095
2096 fn from_str(s: &str) -> Result<Self, Self::Err> {
2097 match s.to_lowercase().as_str() {
2098 "list-basins" => Ok(Self::ListBasins),
2099 "create-basin" => Ok(Self::CreateBasin),
2100 "delete-basin" => Ok(Self::DeleteBasin),
2101 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2102 "get-basin-config" => Ok(Self::GetBasinConfig),
2103 "issue-access-token" => Ok(Self::IssueAccessToken),
2104 "revoke-access-token" => Ok(Self::RevokeAccessToken),
2105 "list-access-tokens" => Ok(Self::ListAccessTokens),
2106 "list-streams" => Ok(Self::ListStreams),
2107 "create-stream" => Ok(Self::CreateStream),
2108 "delete-stream" => Ok(Self::DeleteStream),
2109 "get-stream-config" => Ok(Self::GetStreamConfig),
2110 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2111 "check-tail" => Ok(Self::CheckTail),
2112 "append" => Ok(Self::Append),
2113 "read" => Ok(Self::Read),
2114 "trim" => Ok(Self::Trim),
2115 "fence" => Ok(Self::Fence),
2116 "account-metrics" => Ok(Self::AccountMetrics),
2117 "basin-metrics" => Ok(Self::BasinMetrics),
2118 "stream-metrics" => Ok(Self::StreamMetrics),
2119 _ => Err("invalid operation".into()),
2120 }
2121 }
2122}
2123
2124impl From<Operation> for api::Operation {
2125 fn from(value: Operation) -> Self {
2126 match value {
2127 Operation::ListBasins => Self::ListBasins,
2128 Operation::CreateBasin => Self::CreateBasin,
2129 Operation::DeleteBasin => Self::DeleteBasin,
2130 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2131 Operation::GetBasinConfig => Self::GetBasinConfig,
2132 Operation::IssueAccessToken => Self::IssueAccessToken,
2133 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2134 Operation::ListAccessTokens => Self::ListAccessTokens,
2135 Operation::ListStreams => Self::ListStreams,
2136 Operation::CreateStream => Self::CreateStream,
2137 Operation::DeleteStream => Self::DeleteStream,
2138 Operation::GetStreamConfig => Self::GetStreamConfig,
2139 Operation::ReconfigureStream => Self::ReconfigureStream,
2140 Operation::CheckTail => Self::CheckTail,
2141 Operation::Append => Self::Append,
2142 Operation::Read => Self::Read,
2143 Operation::Trim => Self::Trim,
2144 Operation::Fence => Self::Fence,
2145 Operation::AccountMetrics => Self::AccountMetrics,
2146 Operation::BasinMetrics => Self::BasinMetrics,
2147 Operation::StreamMetrics => Self::StreamMetrics,
2148 }
2149 }
2150}
2151
2152impl From<api::Operation> for Option<Operation> {
2153 fn from(value: api::Operation) -> Self {
2154 match value {
2155 api::Operation::Unspecified => None,
2156 api::Operation::ListBasins => Some(Operation::ListBasins),
2157 api::Operation::CreateBasin => Some(Operation::CreateBasin),
2158 api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2159 api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2160 api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2161 api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2162 api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2163 api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2164 api::Operation::ListStreams => Some(Operation::ListStreams),
2165 api::Operation::CreateStream => Some(Operation::CreateStream),
2166 api::Operation::DeleteStream => Some(Operation::DeleteStream),
2167 api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2168 api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2169 api::Operation::CheckTail => Some(Operation::CheckTail),
2170 api::Operation::Append => Some(Operation::Append),
2171 api::Operation::Read => Some(Operation::Read),
2172 api::Operation::Trim => Some(Operation::Trim),
2173 api::Operation::Fence => Some(Operation::Fence),
2174 api::Operation::AccountMetrics => Some(Operation::AccountMetrics),
2175 api::Operation::BasinMetrics => Some(Operation::BasinMetrics),
2176 api::Operation::StreamMetrics => Some(Operation::StreamMetrics),
2177 }
2178 }
2179}
2180
2181#[sync_docs]
2182#[derive(Debug, Clone, Default)]
2183pub struct AccessTokenScope {
2184 pub basins: Option<ResourceSet>,
2185 pub streams: Option<ResourceSet>,
2186 pub access_tokens: Option<ResourceSet>,
2187 pub op_groups: Option<PermittedOperationGroups>,
2188 pub ops: HashSet<Operation>,
2189}
2190
2191impl AccessTokenScope {
2192 pub fn new() -> Self {
2194 Self::default()
2195 }
2196
2197 pub fn with_basins(self, basins: ResourceSet) -> Self {
2199 Self {
2200 basins: Some(basins),
2201 ..self
2202 }
2203 }
2204
2205 pub fn with_streams(self, streams: ResourceSet) -> Self {
2207 Self {
2208 streams: Some(streams),
2209 ..self
2210 }
2211 }
2212
2213 pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2215 Self {
2216 access_tokens: Some(access_tokens),
2217 ..self
2218 }
2219 }
2220
2221 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2223 Self {
2224 op_groups: Some(op_groups),
2225 ..self
2226 }
2227 }
2228
2229 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2231 Self {
2232 ops: ops.into_iter().collect(),
2233 ..self
2234 }
2235 }
2236
2237 pub fn with_op(self, op: Operation) -> Self {
2239 let mut ops = self.ops;
2240 ops.insert(op);
2241 Self { ops, ..self }
2242 }
2243}
2244
2245impl From<AccessTokenScope> for api::AccessTokenScope {
2246 fn from(value: AccessTokenScope) -> Self {
2247 let AccessTokenScope {
2248 basins,
2249 streams,
2250 access_tokens,
2251 op_groups,
2252 ops,
2253 } = value;
2254 Self {
2255 basins: basins.map(Into::into),
2256 streams: streams.map(Into::into),
2257 access_tokens: access_tokens.map(Into::into),
2258 op_groups: op_groups.map(Into::into),
2259 ops: ops
2260 .into_iter()
2261 .map(api::Operation::from)
2262 .map(Into::into)
2263 .collect(),
2264 }
2265 }
2266}
2267
2268impl From<api::AccessTokenScope> for AccessTokenScope {
2269 fn from(value: api::AccessTokenScope) -> Self {
2270 let api::AccessTokenScope {
2271 basins,
2272 streams,
2273 access_tokens,
2274 op_groups,
2275 ops,
2276 } = value;
2277 Self {
2278 basins: basins.and_then(|set| set.matching.map(Into::into)),
2279 streams: streams.and_then(|set| set.matching.map(Into::into)),
2280 access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2281 op_groups: op_groups.map(Into::into),
2282 ops: ops
2283 .into_iter()
2284 .map(api::Operation::try_from)
2285 .flat_map(Result::ok)
2286 .flat_map(<Option<Operation>>::from)
2287 .collect(),
2288 }
2289 }
2290}
2291
2292impl From<ResourceSet> for api::ResourceSet {
2293 fn from(value: ResourceSet) -> Self {
2294 Self {
2295 matching: Some(value.into()),
2296 }
2297 }
2298}
2299
2300#[sync_docs(ResourceSet = "Matching")]
2301#[derive(Debug, Clone)]
2302pub enum ResourceSet {
2303 Exact(String),
2304 Prefix(String),
2305}
2306
2307impl From<ResourceSet> for api::resource_set::Matching {
2308 fn from(value: ResourceSet) -> Self {
2309 match value {
2310 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2311 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2312 }
2313 }
2314}
2315
2316impl From<api::resource_set::Matching> for ResourceSet {
2317 fn from(value: api::resource_set::Matching) -> Self {
2318 match value {
2319 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2320 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2321 }
2322 }
2323}
2324
2325#[sync_docs]
2326#[derive(Debug, Clone, Default)]
2327pub struct PermittedOperationGroups {
2328 pub account: Option<ReadWritePermissions>,
2329 pub basin: Option<ReadWritePermissions>,
2330 pub stream: Option<ReadWritePermissions>,
2331}
2332
2333impl PermittedOperationGroups {
2334 pub fn new() -> Self {
2336 Self::default()
2337 }
2338
2339 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2341 Self {
2342 account: Some(account),
2343 ..self
2344 }
2345 }
2346
2347 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2349 Self {
2350 basin: Some(basin),
2351 ..self
2352 }
2353 }
2354
2355 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2357 Self {
2358 stream: Some(stream),
2359 ..self
2360 }
2361 }
2362}
2363
2364impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2365 fn from(value: PermittedOperationGroups) -> Self {
2366 let PermittedOperationGroups {
2367 account,
2368 basin,
2369 stream,
2370 } = value;
2371 Self {
2372 account: account.map(Into::into),
2373 basin: basin.map(Into::into),
2374 stream: stream.map(Into::into),
2375 }
2376 }
2377}
2378
2379impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2380 fn from(value: api::PermittedOperationGroups) -> Self {
2381 let api::PermittedOperationGroups {
2382 account,
2383 basin,
2384 stream,
2385 } = value;
2386 Self {
2387 account: account.map(Into::into),
2388 basin: basin.map(Into::into),
2389 stream: stream.map(Into::into),
2390 }
2391 }
2392}
2393
2394#[sync_docs]
2395#[derive(Debug, Clone, Default)]
2396pub struct ReadWritePermissions {
2397 pub read: bool,
2398 pub write: bool,
2399}
2400
2401impl ReadWritePermissions {
2402 pub fn new() -> Self {
2404 Self::default()
2405 }
2406
2407 pub fn with_read(self, read: bool) -> Self {
2409 Self { read, ..self }
2410 }
2411
2412 pub fn with_write(self, write: bool) -> Self {
2414 Self { write, ..self }
2415 }
2416}
2417
2418impl From<ReadWritePermissions> for api::ReadWritePermissions {
2419 fn from(value: ReadWritePermissions) -> Self {
2420 let ReadWritePermissions { read, write } = value;
2421 Self { read, write }
2422 }
2423}
2424
2425impl From<api::ReadWritePermissions> for ReadWritePermissions {
2426 fn from(value: api::ReadWritePermissions) -> Self {
2427 let api::ReadWritePermissions { read, write } = value;
2428 Self { read, write }
2429 }
2430}
2431
2432impl From<api::IssueAccessTokenResponse> for String {
2433 fn from(value: api::IssueAccessTokenResponse) -> Self {
2434 value.access_token
2435 }
2436}
2437
2438impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2439 fn from(value: AccessTokenId) -> Self {
2440 Self { id: value.into() }
2441 }
2442}
2443
2444impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2445 type Error = ConvertError;
2446 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2447 let token_info = value.info.ok_or("access token info is missing")?;
2448 token_info.try_into()
2449 }
2450}
2451
2452#[sync_docs]
2453#[derive(Debug, Clone, Default)]
2454pub struct ListAccessTokensRequest {
2455 pub prefix: String,
2456 pub start_after: String,
2457 pub limit: Option<usize>,
2458}
2459
2460impl ListAccessTokensRequest {
2461 pub fn new() -> Self {
2463 Self::default()
2464 }
2465
2466 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2468 Self {
2469 prefix: prefix.into(),
2470 ..self
2471 }
2472 }
2473
2474 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2476 Self {
2477 start_after: start_after.into(),
2478 ..self
2479 }
2480 }
2481
2482 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2484 Self {
2485 limit: limit.into(),
2486 ..self
2487 }
2488 }
2489}
2490
2491impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2492 type Error = ConvertError;
2493 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2494 let ListAccessTokensRequest {
2495 prefix,
2496 start_after,
2497 limit,
2498 } = value;
2499 Ok(Self {
2500 prefix,
2501 start_after,
2502 limit: limit
2503 .map(TryInto::try_into)
2504 .transpose()
2505 .map_err(|_| "request limit does not fit into u64 bounds")?,
2506 })
2507 }
2508}
2509
2510#[sync_docs]
2511#[derive(Debug, Clone)]
2512pub struct ListAccessTokensResponse {
2513 pub access_tokens: Vec<AccessTokenInfo>,
2514 pub has_more: bool,
2515}
2516
2517impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2518 type Error = ConvertError;
2519 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2520 let api::ListAccessTokensResponse {
2521 access_tokens,
2522 has_more,
2523 } = value;
2524 let access_tokens = access_tokens
2525 .into_iter()
2526 .map(TryInto::try_into)
2527 .collect::<Result<Vec<_>, _>>()?;
2528 Ok(Self {
2529 access_tokens,
2530 has_more,
2531 })
2532 }
2533}