1use std::{
4 collections::HashSet,
5 ops::{Deref, RangeTo},
6 str::FromStr,
7 sync::OnceLock,
8 time::Duration,
9};
10
11use bytes::Bytes;
12use rand::Rng;
13use regex::Regex;
14use sync_docs::sync_docs;
15
16use crate::api;
17
18pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
19pub(crate) const RETRY_AFTER_MS_METADATA_KEY: &str = "retry-after-ms";
20
21#[derive(Debug, Clone, thiserror::Error)]
23#[error("{0}")]
24pub struct ConvertError(String);
25
26impl<T: Into<String>> From<T> for ConvertError {
27 fn from(value: T) -> Self {
28 Self(value.into())
29 }
30}
31
32pub trait MeteredBytes {
40 fn metered_bytes(&self) -> u64;
42}
43
44impl<T: MeteredBytes> MeteredBytes for Vec<T> {
45 fn metered_bytes(&self) -> u64 {
46 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
47 }
48}
49
50macro_rules! metered_impl {
51 ($ty:ty) => {
52 impl MeteredBytes for $ty {
53 fn metered_bytes(&self) -> u64 {
54 let bytes = 8
55 + (2 * self.headers.len())
56 + self
57 .headers
58 .iter()
59 .map(|h| h.name.len() + h.value.len())
60 .sum::<usize>()
61 + self.body.len();
62 bytes as u64
63 }
64 }
65 };
66}
67
68#[sync_docs]
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BasinScope {
71 AwsUsEast1,
72}
73
74impl From<BasinScope> for api::BasinScope {
75 fn from(value: BasinScope) -> Self {
76 match value {
77 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
78 }
79 }
80}
81
82impl From<api::BasinScope> for Option<BasinScope> {
83 fn from(value: api::BasinScope) -> Self {
84 match value {
85 api::BasinScope::Unspecified => None,
86 api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
87 }
88 }
89}
90
91impl FromStr for BasinScope {
92 type Err = ConvertError;
93 fn from_str(value: &str) -> Result<Self, Self::Err> {
94 match value {
95 "aws:us-east-1" => Ok(Self::AwsUsEast1),
96 _ => Err("invalid basin scope value".into()),
97 }
98 }
99}
100
101#[sync_docs]
102#[derive(Debug, Clone)]
103pub struct CreateBasinRequest {
104 pub basin: BasinName,
105 pub config: Option<BasinConfig>,
106 pub scope: Option<BasinScope>,
107}
108
109impl CreateBasinRequest {
110 pub fn new(basin: BasinName) -> Self {
112 Self {
113 basin,
114 config: None,
115 scope: None,
116 }
117 }
118
119 pub fn with_config(self, config: BasinConfig) -> Self {
121 Self {
122 config: Some(config),
123 ..self
124 }
125 }
126
127 pub fn with_scope(self, scope: BasinScope) -> Self {
129 Self {
130 scope: Some(scope),
131 ..self
132 }
133 }
134}
135
136impl From<CreateBasinRequest> for api::CreateBasinRequest {
137 fn from(value: CreateBasinRequest) -> Self {
138 let CreateBasinRequest {
139 basin,
140 config,
141 scope,
142 } = value;
143 Self {
144 basin: basin.0,
145 config: config.map(Into::into),
146 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
147 }
148 }
149}
150
151#[sync_docs]
152#[derive(Debug, Clone, Default)]
153pub struct BasinConfig {
154 pub default_stream_config: Option<StreamConfig>,
155 pub create_stream_on_append: bool,
156 pub create_stream_on_read: bool,
157}
158
159impl BasinConfig {
160 pub fn new() -> Self {
162 Self::default()
163 }
164
165 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
167 Self {
168 default_stream_config: Some(default_stream_config),
169 ..self
170 }
171 }
172
173 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
175 Self {
176 create_stream_on_append,
177 ..self
178 }
179 }
180
181 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
183 Self {
184 create_stream_on_read,
185 ..self
186 }
187 }
188}
189
190impl From<BasinConfig> for api::BasinConfig {
191 fn from(value: BasinConfig) -> Self {
192 let BasinConfig {
193 default_stream_config,
194 create_stream_on_append,
195 create_stream_on_read,
196 } = value;
197 Self {
198 default_stream_config: default_stream_config.map(Into::into),
199 create_stream_on_append,
200 create_stream_on_read,
201 }
202 }
203}
204
205impl From<api::BasinConfig> for BasinConfig {
206 fn from(value: api::BasinConfig) -> Self {
207 let api::BasinConfig {
208 default_stream_config,
209 create_stream_on_append,
210 create_stream_on_read,
211 } = value;
212 Self {
213 default_stream_config: default_stream_config.map(Into::into),
214 create_stream_on_append,
215 create_stream_on_read,
216 }
217 }
218}
219
220#[sync_docs]
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TimestampingMode {
223 ClientPrefer,
224 ClientRequire,
225 Arrival,
226}
227
228impl From<TimestampingMode> for api::TimestampingMode {
229 fn from(value: TimestampingMode) -> Self {
230 match value {
231 TimestampingMode::ClientPrefer => Self::ClientPrefer,
232 TimestampingMode::ClientRequire => Self::ClientRequire,
233 TimestampingMode::Arrival => Self::Arrival,
234 }
235 }
236}
237
238impl From<api::TimestampingMode> for Option<TimestampingMode> {
239 fn from(value: api::TimestampingMode) -> Self {
240 match value {
241 api::TimestampingMode::Unspecified => None,
242 api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
243 api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
244 api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
245 }
246 }
247}
248
249#[sync_docs(TimestampingConfig = "Timestamping")]
250#[derive(Debug, Clone, Default)]
251pub struct TimestampingConfig {
253 pub mode: Option<TimestampingMode>,
254 pub uncapped: Option<bool>,
255}
256
257impl TimestampingConfig {
258 pub fn new() -> Self {
260 Self::default()
261 }
262
263 pub fn with_mode(self, mode: TimestampingMode) -> Self {
265 Self {
266 mode: Some(mode),
267 ..self
268 }
269 }
270
271 pub fn with_uncapped(self, uncapped: bool) -> Self {
273 Self {
274 uncapped: Some(uncapped),
275 ..self
276 }
277 }
278}
279
280impl From<TimestampingConfig> for api::stream_config::Timestamping {
281 fn from(value: TimestampingConfig) -> Self {
282 Self {
283 mode: value
284 .mode
285 .map(api::TimestampingMode::from)
286 .unwrap_or_default()
287 .into(),
288 uncapped: value.uncapped,
289 }
290 }
291}
292
293impl From<api::stream_config::Timestamping> for TimestampingConfig {
294 fn from(value: api::stream_config::Timestamping) -> Self {
295 let mode = value.mode().into();
296 let uncapped = value.uncapped;
297 Self { mode, uncapped }
298 }
299}
300
301#[sync_docs]
302#[derive(Debug, Clone, Default)]
303pub struct StreamConfig {
304 pub storage_class: Option<StorageClass>,
305 pub retention_policy: Option<RetentionPolicy>,
306 pub timestamping: Option<TimestampingConfig>,
307}
308
309impl StreamConfig {
310 pub fn new() -> Self {
312 Self::default()
313 }
314
315 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
317 Self {
318 storage_class: Some(storage_class),
319 ..self
320 }
321 }
322
323 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
325 Self {
326 retention_policy: Some(retention_policy),
327 ..self
328 }
329 }
330
331 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
333 Self {
334 timestamping: Some(timestamping),
335 ..self
336 }
337 }
338}
339
340impl From<StreamConfig> for api::StreamConfig {
341 fn from(value: StreamConfig) -> Self {
342 let StreamConfig {
343 storage_class,
344 retention_policy,
345 timestamping,
346 } = value;
347 Self {
348 storage_class: storage_class
349 .map(api::StorageClass::from)
350 .unwrap_or_default()
351 .into(),
352 retention_policy: retention_policy.map(Into::into),
353 timestamping: timestamping.map(Into::into),
354 }
355 }
356}
357
358impl From<api::StreamConfig> for StreamConfig {
359 fn from(value: api::StreamConfig) -> Self {
360 Self {
361 storage_class: value.storage_class().into(),
362 retention_policy: value.retention_policy.map(Into::into),
363 timestamping: value.timestamping.map(Into::into),
364 }
365 }
366}
367
368#[sync_docs]
369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
370pub enum StorageClass {
371 Standard,
372 Express,
373}
374
375impl From<StorageClass> for api::StorageClass {
376 fn from(value: StorageClass) -> Self {
377 match value {
378 StorageClass::Standard => Self::Standard,
379 StorageClass::Express => Self::Express,
380 }
381 }
382}
383
384impl From<api::StorageClass> for Option<StorageClass> {
385 fn from(value: api::StorageClass) -> Self {
386 match value {
387 api::StorageClass::Unspecified => None,
388 api::StorageClass::Standard => Some(StorageClass::Standard),
389 api::StorageClass::Express => Some(StorageClass::Express),
390 }
391 }
392}
393
394impl FromStr for StorageClass {
395 type Err = ConvertError;
396
397 fn from_str(value: &str) -> Result<Self, Self::Err> {
398 match value {
399 "standard" => Ok(Self::Standard),
400 "express" => Ok(Self::Express),
401 v => Err(format!("unknown storage class: {v}").into()),
402 }
403 }
404}
405
406#[sync_docs(Age = "Age")]
407#[derive(Debug, Clone)]
408pub enum RetentionPolicy {
409 Age(Duration),
410}
411
412impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
413 fn from(value: RetentionPolicy) -> Self {
414 match value {
415 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
416 }
417 }
418}
419
420impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
421 fn from(value: api::stream_config::RetentionPolicy) -> Self {
422 match value {
423 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
424 }
425 }
426}
427
428#[sync_docs]
429#[derive(Debug, Clone, Copy, PartialEq, Eq)]
430pub enum BasinState {
431 Active,
432 Creating,
433 Deleting,
434}
435
436impl From<BasinState> for api::BasinState {
437 fn from(value: BasinState) -> Self {
438 match value {
439 BasinState::Active => Self::Active,
440 BasinState::Creating => Self::Creating,
441 BasinState::Deleting => Self::Deleting,
442 }
443 }
444}
445
446impl From<api::BasinState> for Option<BasinState> {
447 fn from(value: api::BasinState) -> Self {
448 match value {
449 api::BasinState::Unspecified => None,
450 api::BasinState::Active => Some(BasinState::Active),
451 api::BasinState::Creating => Some(BasinState::Creating),
452 api::BasinState::Deleting => Some(BasinState::Deleting),
453 }
454 }
455}
456
457impl std::fmt::Display for BasinState {
458 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459 match self {
460 BasinState::Active => write!(f, "active"),
461 BasinState::Creating => write!(f, "creating"),
462 BasinState::Deleting => write!(f, "deleting"),
463 }
464 }
465}
466
467#[sync_docs]
468#[derive(Debug, Clone)]
469pub struct BasinInfo {
470 pub name: String,
471 pub scope: Option<BasinScope>,
472 pub state: Option<BasinState>,
473}
474
475impl From<BasinInfo> for api::BasinInfo {
476 fn from(value: BasinInfo) -> Self {
477 let BasinInfo { name, scope, state } = value;
478 Self {
479 name,
480 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
481 state: state.map(api::BasinState::from).unwrap_or_default().into(),
482 }
483 }
484}
485
486impl From<api::BasinInfo> for BasinInfo {
487 fn from(value: api::BasinInfo) -> Self {
488 let scope = value.scope().into();
489 let state = value.state().into();
490 let name = value.name;
491 Self { name, scope, state }
492 }
493}
494
495impl TryFrom<api::CreateBasinResponse> for BasinInfo {
496 type Error = ConvertError;
497 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
498 let api::CreateBasinResponse { info } = value;
499 let info = info.ok_or("missing basin info")?;
500 Ok(info.into())
501 }
502}
503
504#[sync_docs]
505#[derive(Debug, Clone, Default)]
506pub struct ListStreamsRequest {
507 pub prefix: String,
508 pub start_after: String,
509 pub limit: Option<usize>,
510}
511
512impl ListStreamsRequest {
513 pub fn new() -> Self {
515 Self::default()
516 }
517
518 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
520 Self {
521 prefix: prefix.into(),
522 ..self
523 }
524 }
525
526 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
528 Self {
529 start_after: start_after.into(),
530 ..self
531 }
532 }
533
534 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
536 Self {
537 limit: limit.into(),
538 ..self
539 }
540 }
541}
542
543impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
544 type Error = ConvertError;
545 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
546 let ListStreamsRequest {
547 prefix,
548 start_after,
549 limit,
550 } = value;
551 Ok(Self {
552 prefix,
553 start_after,
554 limit: limit.map(|n| n as u64),
555 })
556 }
557}
558
559#[sync_docs]
560#[derive(Debug, Clone)]
561pub struct StreamInfo {
562 pub name: String,
563 pub created_at: u32,
564 pub deleted_at: Option<u32>,
565}
566
567impl From<api::StreamInfo> for StreamInfo {
568 fn from(value: api::StreamInfo) -> Self {
569 Self {
570 name: value.name,
571 created_at: value.created_at,
572 deleted_at: value.deleted_at,
573 }
574 }
575}
576
577impl TryFrom<api::CreateStreamResponse> for StreamInfo {
578 type Error = ConvertError;
579
580 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
581 let api::CreateStreamResponse { info } = value;
582 let info = info.ok_or("missing stream info")?;
583 Ok(info.into())
584 }
585}
586
587#[sync_docs]
588#[derive(Debug, Clone)]
589pub struct ListStreamsResponse {
590 pub streams: Vec<StreamInfo>,
591 pub has_more: bool,
592}
593
594impl From<api::ListStreamsResponse> for ListStreamsResponse {
595 fn from(value: api::ListStreamsResponse) -> Self {
596 let api::ListStreamsResponse { streams, has_more } = value;
597 let streams = streams.into_iter().map(Into::into).collect();
598 Self { streams, has_more }
599 }
600}
601
602impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
603 type Error = ConvertError;
604
605 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
606 let api::GetBasinConfigResponse { config } = value;
607 let config = config.ok_or("missing basin config")?;
608 Ok(config.into())
609 }
610}
611
612impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
613 type Error = ConvertError;
614
615 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
616 let api::GetStreamConfigResponse { config } = value;
617 let config = config.ok_or("missing stream config")?;
618 Ok(config.into())
619 }
620}
621
622#[sync_docs]
623#[derive(Debug, Clone)]
624pub struct CreateStreamRequest {
625 pub stream: String,
626 pub config: Option<StreamConfig>,
627}
628
629impl CreateStreamRequest {
630 pub fn new(stream: impl Into<String>) -> Self {
632 Self {
633 stream: stream.into(),
634 config: None,
635 }
636 }
637
638 pub fn with_config(self, config: StreamConfig) -> Self {
640 Self {
641 config: Some(config),
642 ..self
643 }
644 }
645}
646
647impl From<CreateStreamRequest> for api::CreateStreamRequest {
648 fn from(value: CreateStreamRequest) -> Self {
649 let CreateStreamRequest { stream, config } = value;
650 Self {
651 stream,
652 config: config.map(Into::into),
653 }
654 }
655}
656
657#[sync_docs]
658#[derive(Debug, Clone, Default)]
659pub struct ListBasinsRequest {
660 pub prefix: String,
661 pub start_after: String,
662 pub limit: Option<usize>,
663}
664
665impl ListBasinsRequest {
666 pub fn new() -> Self {
668 Self::default()
669 }
670
671 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
673 Self {
674 prefix: prefix.into(),
675 ..self
676 }
677 }
678
679 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
681 Self {
682 start_after: start_after.into(),
683 ..self
684 }
685 }
686
687 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
689 Self {
690 limit: limit.into(),
691 ..self
692 }
693 }
694}
695
696impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
697 type Error = ConvertError;
698 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
699 let ListBasinsRequest {
700 prefix,
701 start_after,
702 limit,
703 } = value;
704 Ok(Self {
705 prefix,
706 start_after,
707 limit: limit
708 .map(TryInto::try_into)
709 .transpose()
710 .map_err(|_| "request limit does not fit into u64 bounds")?,
711 })
712 }
713}
714
715#[sync_docs]
716#[derive(Debug, Clone)]
717pub struct ListBasinsResponse {
718 pub basins: Vec<BasinInfo>,
719 pub has_more: bool,
720}
721
722impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
723 type Error = ConvertError;
724 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
725 let api::ListBasinsResponse { basins, has_more } = value;
726 Ok(Self {
727 basins: basins.into_iter().map(Into::into).collect(),
728 has_more,
729 })
730 }
731}
732
733#[sync_docs]
734#[derive(Debug, Clone)]
735pub struct DeleteBasinRequest {
736 pub basin: BasinName,
737 pub if_exists: bool,
739}
740
741impl DeleteBasinRequest {
742 pub fn new(basin: BasinName) -> Self {
744 Self {
745 basin,
746 if_exists: false,
747 }
748 }
749
750 pub fn with_if_exists(self, if_exists: bool) -> Self {
752 Self { if_exists, ..self }
753 }
754}
755
756impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
757 fn from(value: DeleteBasinRequest) -> Self {
758 let DeleteBasinRequest { basin, .. } = value;
759 Self { basin: basin.0 }
760 }
761}
762
763#[sync_docs]
764#[derive(Debug, Clone)]
765pub struct DeleteStreamRequest {
766 pub stream: String,
767 pub if_exists: bool,
769}
770
771impl DeleteStreamRequest {
772 pub fn new(stream: impl Into<String>) -> Self {
774 Self {
775 stream: stream.into(),
776 if_exists: false,
777 }
778 }
779
780 pub fn with_if_exists(self, if_exists: bool) -> Self {
782 Self { if_exists, ..self }
783 }
784}
785
786impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
787 fn from(value: DeleteStreamRequest) -> Self {
788 let DeleteStreamRequest { stream, .. } = value;
789 Self { stream }
790 }
791}
792
793#[sync_docs]
794#[derive(Debug, Clone)]
795pub struct ReconfigureBasinRequest {
796 pub basin: BasinName,
797 pub config: Option<BasinConfig>,
798 pub mask: Option<Vec<String>>,
799}
800
801impl ReconfigureBasinRequest {
802 pub fn new(basin: BasinName) -> Self {
804 Self {
805 basin,
806 config: None,
807 mask: None,
808 }
809 }
810
811 pub fn with_config(self, config: BasinConfig) -> Self {
813 Self {
814 config: Some(config),
815 ..self
816 }
817 }
818
819 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
821 Self {
822 mask: Some(mask.into()),
823 ..self
824 }
825 }
826}
827
828impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
829 fn from(value: ReconfigureBasinRequest) -> Self {
830 let ReconfigureBasinRequest {
831 basin,
832 config,
833 mask,
834 } = value;
835 Self {
836 basin: basin.0,
837 config: config.map(Into::into),
838 mask: mask.map(|paths| prost_types::FieldMask { paths }),
839 }
840 }
841}
842
843impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
844 type Error = ConvertError;
845 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
846 let api::ReconfigureBasinResponse { config } = value;
847 let config = config.ok_or("missing basin config")?;
848 Ok(config.into())
849 }
850}
851
852#[sync_docs]
853#[derive(Debug, Clone)]
854pub struct ReconfigureStreamRequest {
855 pub stream: String,
856 pub config: Option<StreamConfig>,
857 pub mask: Option<Vec<String>>,
858}
859
860impl ReconfigureStreamRequest {
861 pub fn new(stream: impl Into<String>) -> Self {
863 Self {
864 stream: stream.into(),
865 config: None,
866 mask: None,
867 }
868 }
869
870 pub fn with_config(self, config: StreamConfig) -> Self {
872 Self {
873 config: Some(config),
874 ..self
875 }
876 }
877
878 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
880 Self {
881 mask: Some(mask.into()),
882 ..self
883 }
884 }
885}
886
887impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
888 fn from(value: ReconfigureStreamRequest) -> Self {
889 let ReconfigureStreamRequest {
890 stream,
891 config,
892 mask,
893 } = value;
894 Self {
895 stream,
896 config: config.map(Into::into),
897 mask: mask.map(|paths| prost_types::FieldMask { paths }),
898 }
899 }
900}
901
902impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
903 type Error = ConvertError;
904 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
905 let api::ReconfigureStreamResponse { config } = value;
906 let config = config.ok_or("missing stream config")?;
907 Ok(config.into())
908 }
909}
910
911impl From<api::CheckTailResponse> for StreamPosition {
912 fn from(value: api::CheckTailResponse) -> Self {
913 let api::CheckTailResponse {
914 next_seq_num,
915 last_timestamp,
916 } = value;
917 StreamPosition {
918 seq_num: next_seq_num,
919 timestamp: last_timestamp,
920 }
921 }
922}
923
924#[derive(Debug, Clone, PartialEq, Eq)]
926pub struct StreamPosition {
927 pub seq_num: u64,
929 pub timestamp: u64,
932}
933
934#[sync_docs]
935#[derive(Debug, Clone, PartialEq, Eq)]
936pub struct Header {
937 pub name: Bytes,
938 pub value: Bytes,
939}
940
941impl Header {
942 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
944 Self {
945 name: name.into(),
946 value: value.into(),
947 }
948 }
949
950 pub fn from_value(value: impl Into<Bytes>) -> Self {
952 Self {
953 name: Bytes::new(),
954 value: value.into(),
955 }
956 }
957}
958
959impl From<Header> for api::Header {
960 fn from(value: Header) -> Self {
961 let Header { name, value } = value;
962 Self { name, value }
963 }
964}
965
966impl From<api::Header> for Header {
967 fn from(value: api::Header) -> Self {
968 let api::Header { name, value } = value;
969 Self { name, value }
970 }
971}
972
973#[derive(Debug, Clone, Default, PartialEq, Eq)]
977pub struct FencingToken(String);
978
979impl FencingToken {
980 const MAX_BYTES: usize = 36;
981
982 pub fn generate(n: usize) -> Result<Self, ConvertError> {
984 rand::rng()
985 .sample_iter(&rand::distr::Alphanumeric)
986 .take(n)
987 .map(char::from)
988 .collect::<String>()
989 .parse()
990 }
991}
992
993impl Deref for FencingToken {
994 type Target = str;
995
996 fn deref(&self) -> &Self::Target {
997 &self.0
998 }
999}
1000
1001impl std::fmt::Display for FencingToken {
1002 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1003 write!(f, "{}", self.0)
1004 }
1005}
1006
1007impl FromStr for FencingToken {
1008 type Err = ConvertError;
1009
1010 fn from_str(value: &str) -> Result<Self, Self::Err> {
1011 value.to_string().try_into()
1012 }
1013}
1014
1015impl TryFrom<String> for FencingToken {
1016 type Error = ConvertError;
1017
1018 fn try_from(value: String) -> Result<Self, Self::Error> {
1019 if value.len() > Self::MAX_BYTES {
1020 Err(format!("Fencing token cannot exceed {} bytes", Self::MAX_BYTES).into())
1021 } else {
1022 Ok(Self(value))
1023 }
1024 }
1025}
1026
1027impl From<FencingToken> for String {
1028 fn from(value: FencingToken) -> Self {
1029 value.0
1030 }
1031}
1032
1033#[derive(Debug, Clone)]
1035pub enum Command {
1036 Fence {
1041 fencing_token: FencingToken,
1045 },
1046 Trim {
1051 seq_num: u64,
1056 },
1057}
1058
1059#[derive(Debug, Clone)]
1065pub struct CommandRecord {
1066 pub command: Command,
1068 pub timestamp: Option<u64>,
1070}
1071
1072impl CommandRecord {
1073 const FENCE: &[u8] = b"fence";
1074 const TRIM: &[u8] = b"trim";
1075
1076 pub fn fence(fencing_token: FencingToken) -> Self {
1078 Self {
1079 command: Command::Fence { fencing_token },
1080 timestamp: None,
1081 }
1082 }
1083
1084 pub fn trim(seq_num: impl Into<u64>) -> Self {
1086 Self {
1087 command: Command::Trim {
1088 seq_num: seq_num.into(),
1089 },
1090 timestamp: None,
1091 }
1092 }
1093
1094 pub fn with_timestamp(self, timestamp: u64) -> Self {
1096 Self {
1097 timestamp: Some(timestamp),
1098 ..self
1099 }
1100 }
1101}
1102
1103#[sync_docs]
1104#[derive(Debug, Clone, PartialEq, Eq)]
1105pub struct AppendRecord {
1106 timestamp: Option<u64>,
1107 headers: Vec<Header>,
1108 body: Bytes,
1109 #[cfg(test)]
1110 max_bytes: u64,
1111}
1112
1113metered_impl!(AppendRecord);
1114
1115impl AppendRecord {
1116 const MAX_BYTES: u64 = MIB_BYTES;
1117
1118 fn validated(self) -> Result<Self, ConvertError> {
1119 #[cfg(test)]
1120 let max_bytes = self.max_bytes;
1121 #[cfg(not(test))]
1122 let max_bytes = Self::MAX_BYTES;
1123
1124 if self.metered_bytes() > max_bytes {
1125 Err("AppendRecord should have metered size less than 1 MiB".into())
1126 } else {
1127 Ok(self)
1128 }
1129 }
1130
1131 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1133 Self {
1134 timestamp: None,
1135 headers: Vec::new(),
1136 body: body.into(),
1137 #[cfg(test)]
1138 max_bytes: Self::MAX_BYTES,
1139 }
1140 .validated()
1141 }
1142
1143 #[cfg(test)]
1144 pub(crate) fn with_max_bytes(
1145 max_bytes: u64,
1146 body: impl Into<Bytes>,
1147 ) -> Result<Self, ConvertError> {
1148 Self {
1149 timestamp: None,
1150 headers: Vec::new(),
1151 body: body.into(),
1152 max_bytes,
1153 }
1154 .validated()
1155 }
1156
1157 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1159 Self {
1160 headers: headers.into(),
1161 ..self
1162 }
1163 .validated()
1164 }
1165
1166 pub fn with_timestamp(self, timestamp: u64) -> Self {
1168 Self {
1169 timestamp: Some(timestamp),
1170 ..self
1171 }
1172 }
1173
1174 pub fn body(&self) -> &[u8] {
1176 &self.body
1177 }
1178
1179 pub fn headers(&self) -> &[Header] {
1181 &self.headers
1182 }
1183
1184 pub fn timestamp(&self) -> Option<u64> {
1186 self.timestamp
1187 }
1188
1189 pub fn into_parts(self) -> AppendRecordParts {
1191 AppendRecordParts {
1192 timestamp: self.timestamp,
1193 headers: self.headers,
1194 body: self.body,
1195 }
1196 }
1197
1198 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1200 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1201 if let Some(timestamp) = parts.timestamp {
1202 Ok(record.with_timestamp(timestamp))
1203 } else {
1204 Ok(record)
1205 }
1206 }
1207}
1208
1209impl From<AppendRecord> for api::AppendRecord {
1210 fn from(value: AppendRecord) -> Self {
1211 Self {
1212 timestamp: value.timestamp,
1213 headers: value.headers.into_iter().map(Into::into).collect(),
1214 body: value.body,
1215 }
1216 }
1217}
1218
1219impl From<CommandRecord> for AppendRecord {
1220 fn from(value: CommandRecord) -> Self {
1221 let (header_value, body) = match value.command {
1222 Command::Fence { fencing_token } => (
1223 CommandRecord::FENCE,
1224 Bytes::copy_from_slice(fencing_token.as_bytes()),
1225 ),
1226 Command::Trim { seq_num } => (
1227 CommandRecord::TRIM,
1228 Bytes::copy_from_slice(&seq_num.to_be_bytes()),
1229 ),
1230 };
1231 AppendRecordParts {
1232 timestamp: value.timestamp,
1233 headers: vec![Header::from_value(header_value)],
1234 body,
1235 }
1236 .try_into()
1237 .expect("command record is a valid append record")
1238 }
1239}
1240
1241#[sync_docs(AppendRecordParts = "AppendRecord")]
1242#[derive(Debug, Clone)]
1243pub struct AppendRecordParts {
1244 pub timestamp: Option<u64>,
1245 pub headers: Vec<Header>,
1246 pub body: Bytes,
1247}
1248
1249impl From<AppendRecord> for AppendRecordParts {
1250 fn from(value: AppendRecord) -> Self {
1251 value.into_parts()
1252 }
1253}
1254
1255impl TryFrom<AppendRecordParts> for AppendRecord {
1256 type Error = ConvertError;
1257
1258 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1259 Self::try_from_parts(value)
1260 }
1261}
1262
1263#[derive(Debug, Clone)]
1265pub struct AppendRecordBatch {
1266 records: Vec<AppendRecord>,
1267 metered_bytes: u64,
1268 max_capacity: usize,
1269 #[cfg(test)]
1270 max_bytes: u64,
1271}
1272
1273impl PartialEq for AppendRecordBatch {
1274 fn eq(&self, other: &Self) -> bool {
1275 if self.records.eq(&other.records) {
1276 assert_eq!(self.metered_bytes, other.metered_bytes);
1277 true
1278 } else {
1279 false
1280 }
1281 }
1282}
1283
1284impl Eq for AppendRecordBatch {}
1285
1286impl Default for AppendRecordBatch {
1287 fn default() -> Self {
1288 Self::new()
1289 }
1290}
1291
1292impl AppendRecordBatch {
1293 pub const MAX_CAPACITY: usize = 1000;
1297
1298 pub const MAX_BYTES: u64 = MIB_BYTES;
1300
1301 pub fn new() -> Self {
1303 Self::with_max_capacity(Self::MAX_CAPACITY)
1304 }
1305
1306 pub fn with_max_capacity(max_capacity: usize) -> Self {
1310 assert!(
1311 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1312 "Batch capacity must be between 1 and 1000"
1313 );
1314
1315 Self {
1316 records: Vec::with_capacity(max_capacity),
1317 metered_bytes: 0,
1318 max_capacity,
1319 #[cfg(test)]
1320 max_bytes: Self::MAX_BYTES,
1321 }
1322 }
1323
1324 #[cfg(test)]
1325 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1326 #[cfg(test)]
1327 assert!(
1328 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1329 "Batch size must be between 1 byte and 1 MiB"
1330 );
1331
1332 Self {
1333 max_bytes,
1334 ..Self::with_max_capacity(max_capacity)
1335 }
1336 }
1337
1338 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1344 where
1345 R: Into<AppendRecord>,
1346 T: IntoIterator<Item = R>,
1347 {
1348 let mut records = Self::new();
1349 let mut pending = Vec::new();
1350
1351 let mut iter = iter.into_iter();
1352
1353 for record in iter.by_ref() {
1354 if let Err(record) = records.push(record) {
1355 pending.push(record);
1356 break;
1357 }
1358 }
1359
1360 if pending.is_empty() {
1361 Ok(records)
1362 } else {
1363 pending.extend(iter.map(Into::into));
1364 Err((records, pending))
1365 }
1366 }
1367
1368 pub fn is_empty(&self) -> bool {
1370 if self.records.is_empty() {
1371 assert_eq!(self.metered_bytes, 0);
1372 true
1373 } else {
1374 false
1375 }
1376 }
1377
1378 pub fn len(&self) -> usize {
1380 self.records.len()
1381 }
1382
1383 #[cfg(test)]
1384 fn max_bytes(&self) -> u64 {
1385 self.max_bytes
1386 }
1387
1388 #[cfg(not(test))]
1389 fn max_bytes(&self) -> u64 {
1390 Self::MAX_BYTES
1391 }
1392
1393 pub fn is_full(&self) -> bool {
1395 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1396 }
1397
1398 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1400 assert!(self.records.len() <= self.max_capacity);
1401 assert!(self.metered_bytes <= self.max_bytes());
1402
1403 let record = record.into();
1404 let record_size = record.metered_bytes();
1405 if self.records.len() >= self.max_capacity
1406 || self.metered_bytes + record_size > self.max_bytes()
1407 {
1408 Err(record)
1409 } else {
1410 self.records.push(record);
1411 self.metered_bytes += record_size;
1412 Ok(())
1413 }
1414 }
1415}
1416
1417impl MeteredBytes for AppendRecordBatch {
1418 fn metered_bytes(&self) -> u64 {
1419 self.metered_bytes
1420 }
1421}
1422
1423impl IntoIterator for AppendRecordBatch {
1424 type Item = AppendRecord;
1425 type IntoIter = std::vec::IntoIter<Self::Item>;
1426
1427 fn into_iter(self) -> Self::IntoIter {
1428 self.records.into_iter()
1429 }
1430}
1431
1432impl<'a> IntoIterator for &'a AppendRecordBatch {
1433 type Item = &'a AppendRecord;
1434 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1435
1436 fn into_iter(self) -> Self::IntoIter {
1437 self.records.iter()
1438 }
1439}
1440
1441impl AsRef<[AppendRecord]> for AppendRecordBatch {
1442 fn as_ref(&self) -> &[AppendRecord] {
1443 &self.records
1444 }
1445}
1446
1447#[sync_docs]
1448#[derive(Debug, Default, Clone)]
1449pub struct AppendInput {
1450 pub records: AppendRecordBatch,
1451 pub match_seq_num: Option<u64>,
1452 pub fencing_token: Option<FencingToken>,
1453}
1454
1455impl MeteredBytes for AppendInput {
1456 fn metered_bytes(&self) -> u64 {
1457 self.records.metered_bytes()
1458 }
1459}
1460
1461impl AppendInput {
1462 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1464 Self {
1465 records: records.into(),
1466 match_seq_num: None,
1467 fencing_token: None,
1468 }
1469 }
1470
1471 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1473 Self {
1474 match_seq_num: Some(match_seq_num.into()),
1475 ..self
1476 }
1477 }
1478
1479 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1481 Self {
1482 fencing_token: Some(fencing_token),
1483 ..self
1484 }
1485 }
1486
1487 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1488 let Self {
1489 records,
1490 match_seq_num,
1491 fencing_token,
1492 } = self;
1493
1494 api::AppendInput {
1495 stream: stream.into(),
1496 records: records.into_iter().map(Into::into).collect(),
1497 match_seq_num,
1498 fencing_token: fencing_token.map(|f| f.0),
1499 }
1500 }
1501}
1502
1503#[derive(Debug, Clone)]
1505pub struct AppendAck {
1506 pub start: StreamPosition,
1508 pub end: StreamPosition,
1513 pub tail: StreamPosition,
1517}
1518
1519impl From<api::AppendOutput> for AppendAck {
1520 fn from(value: api::AppendOutput) -> Self {
1521 let api::AppendOutput {
1522 start_seq_num,
1523 start_timestamp,
1524 end_seq_num,
1525 end_timestamp,
1526 next_seq_num,
1527 last_timestamp,
1528 } = value;
1529 let start = StreamPosition {
1530 seq_num: start_seq_num,
1531 timestamp: start_timestamp,
1532 };
1533 let end = StreamPosition {
1534 seq_num: end_seq_num,
1535 timestamp: end_timestamp,
1536 };
1537 let tail = StreamPosition {
1538 seq_num: next_seq_num,
1539 timestamp: last_timestamp,
1540 };
1541 Self { start, end, tail }
1542 }
1543}
1544
1545impl TryFrom<api::AppendResponse> for AppendAck {
1546 type Error = ConvertError;
1547 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1548 let api::AppendResponse { output } = value;
1549 let output = output.ok_or("missing append output")?;
1550 Ok(output.into())
1551 }
1552}
1553
1554impl TryFrom<api::AppendSessionResponse> for AppendAck {
1555 type Error = ConvertError;
1556 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1557 let api::AppendSessionResponse { output } = value;
1558 let output = output.ok_or("missing append output")?;
1559 Ok(output.into())
1560 }
1561}
1562
1563#[sync_docs]
1564#[derive(Debug, Clone, Default)]
1565pub struct ReadLimit {
1566 pub count: Option<u64>,
1567 pub bytes: Option<u64>,
1568}
1569
1570impl ReadLimit {
1571 pub fn new() -> Self {
1573 Self::default()
1574 }
1575
1576 pub fn with_count(self, count: u64) -> Self {
1578 Self {
1579 count: Some(count),
1580 ..self
1581 }
1582 }
1583
1584 pub fn with_bytes(self, bytes: u64) -> Self {
1586 Self {
1587 bytes: Some(bytes),
1588 ..self
1589 }
1590 }
1591}
1592
1593#[derive(Debug, Clone)]
1595pub enum ReadStart {
1596 SeqNum(u64),
1598 Timestamp(u64),
1600 TailOffset(u64),
1602}
1603
1604impl Default for ReadStart {
1605 fn default() -> Self {
1606 Self::SeqNum(0)
1607 }
1608}
1609
1610impl From<ReadStart> for api::read_request::Start {
1611 fn from(start: ReadStart) -> Self {
1612 match start {
1613 ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1614 ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1615 ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1616 }
1617 }
1618}
1619
1620impl From<ReadStart> for api::read_session_request::Start {
1621 fn from(start: ReadStart) -> Self {
1622 match start {
1623 ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1624 ReadStart::Timestamp(timestamp) => {
1625 api::read_session_request::Start::Timestamp(timestamp)
1626 }
1627 ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1628 }
1629 }
1630}
1631
1632#[sync_docs]
1633#[derive(Debug, Clone, Default)]
1634pub struct ReadRequest {
1635 pub start: ReadStart,
1636 pub limit: ReadLimit,
1637 pub until: Option<RangeTo<u64>>,
1638 pub clamp: bool,
1639}
1640
1641impl ReadRequest {
1642 pub fn new(start: ReadStart) -> Self {
1644 Self {
1645 start,
1646 ..Default::default()
1647 }
1648 }
1649
1650 pub fn with_limit(self, limit: ReadLimit) -> Self {
1652 Self { limit, ..self }
1653 }
1654
1655 pub fn with_until(self, until: RangeTo<u64>) -> Self {
1657 Self {
1658 until: Some(until),
1659 ..self
1660 }
1661 }
1662
1663 pub fn with_clamp(self, clamp: bool) -> Self {
1665 Self { clamp, ..self }
1666 }
1667}
1668
1669impl ReadRequest {
1670 pub(crate) fn try_into_api_type(
1671 self,
1672 stream: impl Into<String>,
1673 ) -> Result<api::ReadRequest, ConvertError> {
1674 let Self {
1675 start,
1676 limit,
1677 until,
1678 clamp,
1679 } = self;
1680
1681 let limit = if limit.count > Some(1000) {
1682 Err("read limit: count must not exceed 1000 for unary request")
1683 } else if limit.bytes > Some(MIB_BYTES) {
1684 Err("read limit: bytes must not exceed 1MiB for unary request")
1685 } else {
1686 Ok(api::ReadLimit {
1687 count: limit.count,
1688 bytes: limit.bytes,
1689 })
1690 }?;
1691
1692 Ok(api::ReadRequest {
1693 stream: stream.into(),
1694 start: Some(start.into()),
1695 limit: Some(limit),
1696 until: until.map(|range| range.end),
1697 clamp,
1698 })
1699 }
1700}
1701
1702#[sync_docs]
1703#[derive(Debug, Clone)]
1704pub struct SequencedRecord {
1705 pub seq_num: u64,
1706 pub timestamp: u64,
1707 pub headers: Vec<Header>,
1708 pub body: Bytes,
1709}
1710
1711metered_impl!(SequencedRecord);
1712
1713impl From<api::SequencedRecord> for SequencedRecord {
1714 fn from(value: api::SequencedRecord) -> Self {
1715 let api::SequencedRecord {
1716 seq_num,
1717 timestamp,
1718 headers,
1719 body,
1720 } = value;
1721 Self {
1722 seq_num,
1723 timestamp,
1724 headers: headers.into_iter().map(Into::into).collect(),
1725 body,
1726 }
1727 }
1728}
1729
1730impl SequencedRecord {
1731 pub fn as_command_record(&self) -> Option<CommandRecord> {
1733 if self.headers.len() != 1 {
1734 return None;
1735 }
1736
1737 let header = self.headers.first().expect("pre-validated length");
1738
1739 if !header.name.is_empty() {
1740 return None;
1741 }
1742
1743 match header.value.as_ref() {
1744 CommandRecord::FENCE => {
1745 let fencing_token = std::str::from_utf8(&self.body).ok()?.parse().ok()?;
1746 Some(CommandRecord {
1747 command: Command::Fence { fencing_token },
1748 timestamp: Some(self.timestamp),
1749 })
1750 }
1751 CommandRecord::TRIM => {
1752 let body: &[u8] = &self.body;
1753 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1754 Some(CommandRecord {
1755 command: Command::Trim { seq_num },
1756 timestamp: Some(self.timestamp),
1757 })
1758 }
1759 _ => None,
1760 }
1761 }
1762}
1763
1764#[sync_docs]
1765#[derive(Debug, Clone)]
1766pub struct SequencedRecordBatch {
1767 pub records: Vec<SequencedRecord>,
1768}
1769
1770impl MeteredBytes for SequencedRecordBatch {
1771 fn metered_bytes(&self) -> u64 {
1772 self.records.metered_bytes()
1773 }
1774}
1775
1776impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1777 fn from(value: api::SequencedRecordBatch) -> Self {
1778 let api::SequencedRecordBatch { records } = value;
1779 Self {
1780 records: records.into_iter().map(Into::into).collect(),
1781 }
1782 }
1783}
1784
1785#[sync_docs(ReadOutput = "Output")]
1786#[derive(Debug, Clone)]
1787pub enum ReadOutput {
1788 Batch(SequencedRecordBatch),
1789 NextSeqNum(u64),
1790}
1791
1792impl From<api::read_output::Output> for ReadOutput {
1793 fn from(value: api::read_output::Output) -> Self {
1794 match value {
1795 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1796 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1797 }
1798 }
1799}
1800
1801impl TryFrom<api::ReadOutput> for ReadOutput {
1802 type Error = ConvertError;
1803 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1804 let api::ReadOutput { output } = value;
1805 let output = output.ok_or("missing read output")?;
1806 Ok(output.into())
1807 }
1808}
1809
1810impl TryFrom<api::ReadResponse> for ReadOutput {
1811 type Error = ConvertError;
1812 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1813 let api::ReadResponse { output } = value;
1814 let output = output.ok_or("missing output in read response")?;
1815 output.try_into()
1816 }
1817}
1818
1819#[sync_docs]
1820#[derive(Debug, Clone, Default)]
1821pub struct ReadSessionRequest {
1822 pub start: ReadStart,
1823 pub limit: ReadLimit,
1824 pub until: Option<RangeTo<u64>>,
1825 pub clamp: bool,
1826}
1827
1828impl ReadSessionRequest {
1829 pub fn new(start: ReadStart) -> Self {
1831 Self {
1832 start,
1833 ..Default::default()
1834 }
1835 }
1836
1837 pub fn with_limit(self, limit: ReadLimit) -> Self {
1839 Self { limit, ..self }
1840 }
1841
1842 pub fn with_until(self, until: RangeTo<u64>) -> Self {
1844 Self {
1845 until: Some(until),
1846 ..self
1847 }
1848 }
1849
1850 pub fn with_clamp(self, clamp: bool) -> Self {
1852 Self { clamp, ..self }
1853 }
1854
1855 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1856 let Self {
1857 start,
1858 limit,
1859 until,
1860 clamp,
1861 } = self;
1862 api::ReadSessionRequest {
1863 stream: stream.into(),
1864 start: Some(start.into()),
1865 limit: Some(api::ReadLimit {
1866 count: limit.count,
1867 bytes: limit.bytes,
1868 }),
1869 heartbeats: false,
1870 until: until.map(|range| range.end),
1871 clamp,
1872 }
1873 }
1874}
1875
1876impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1877 type Error = ConvertError;
1878 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1879 let api::ReadSessionResponse { output } = value;
1880 let output = output.ok_or("missing output in read session response")?;
1881 output.try_into()
1882 }
1883}
1884
1885#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1890pub struct BasinName(String);
1891
1892impl Deref for BasinName {
1893 type Target = str;
1894 fn deref(&self) -> &Self::Target {
1895 &self.0
1896 }
1897}
1898
1899impl TryFrom<String> for BasinName {
1900 type Error = ConvertError;
1901
1902 fn try_from(name: String) -> Result<Self, Self::Error> {
1903 if name.len() < 8 || name.len() > 48 {
1904 return Err("Basin name must be between 8 and 48 characters in length".into());
1905 }
1906
1907 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1908 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1909 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1910 .expect("Failed to compile basin name regex")
1911 });
1912
1913 if !regex.is_match(&name) {
1914 return Err(
1915 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1916 It cannot begin or end with a hyphen."
1917 .into(),
1918 );
1919 }
1920
1921 Ok(Self(name))
1922 }
1923}
1924
1925impl FromStr for BasinName {
1926 type Err = ConvertError;
1927
1928 fn from_str(s: &str) -> Result<Self, Self::Err> {
1929 s.to_string().try_into()
1930 }
1931}
1932
1933impl std::fmt::Display for BasinName {
1934 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1935 f.write_str(&self.0)
1936 }
1937}
1938
1939impl From<BasinName> for String {
1940 fn from(value: BasinName) -> Self {
1941 value.0
1942 }
1943}
1944
1945#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1948pub struct AccessTokenId(String);
1949
1950impl Deref for AccessTokenId {
1951 type Target = str;
1952
1953 fn deref(&self) -> &Self::Target {
1954 &self.0
1955 }
1956}
1957
1958impl TryFrom<String> for AccessTokenId {
1959 type Error = ConvertError;
1960
1961 fn try_from(name: String) -> Result<Self, Self::Error> {
1962 if name.is_empty() {
1963 return Err("Access token ID must not be empty".into());
1964 }
1965
1966 if name.len() > 96 {
1967 return Err("Access token ID must not exceed 96 characters".into());
1968 }
1969
1970 Ok(Self(name))
1971 }
1972}
1973
1974impl From<AccessTokenId> for String {
1975 fn from(value: AccessTokenId) -> Self {
1976 value.0
1977 }
1978}
1979
1980impl FromStr for AccessTokenId {
1981 type Err = ConvertError;
1982
1983 fn from_str(s: &str) -> Result<Self, Self::Err> {
1984 s.to_string().try_into()
1985 }
1986}
1987
1988impl std::fmt::Display for AccessTokenId {
1989 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1990 f.write_str(&self.0)
1991 }
1992}
1993
1994impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1995 fn from(value: AccessTokenInfo) -> Self {
1996 Self {
1997 info: Some(value.into()),
1998 }
1999 }
2000}
2001
2002#[sync_docs]
2003#[derive(Debug, Clone)]
2004pub struct AccessTokenInfo {
2005 pub id: AccessTokenId,
2006 pub expires_at: Option<u32>,
2007 pub auto_prefix_streams: bool,
2008 pub scope: Option<AccessTokenScope>,
2009}
2010
2011impl AccessTokenInfo {
2012 pub fn new(id: AccessTokenId) -> Self {
2014 Self {
2015 id,
2016 expires_at: None,
2017 auto_prefix_streams: false,
2018 scope: None,
2019 }
2020 }
2021
2022 pub fn with_expires_at(self, expires_at: u32) -> Self {
2024 Self {
2025 expires_at: Some(expires_at),
2026 ..self
2027 }
2028 }
2029
2030 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
2032 Self {
2033 auto_prefix_streams,
2034 ..self
2035 }
2036 }
2037
2038 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2040 Self {
2041 scope: Some(scope),
2042 ..self
2043 }
2044 }
2045}
2046
2047impl From<AccessTokenInfo> for api::AccessTokenInfo {
2048 fn from(value: AccessTokenInfo) -> Self {
2049 let AccessTokenInfo {
2050 id,
2051 expires_at,
2052 auto_prefix_streams,
2053 scope,
2054 } = value;
2055 Self {
2056 id: id.into(),
2057 expires_at,
2058 auto_prefix_streams,
2059 scope: scope.map(Into::into),
2060 }
2061 }
2062}
2063
2064impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2065 type Error = ConvertError;
2066
2067 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2068 let api::AccessTokenInfo {
2069 id,
2070 expires_at,
2071 auto_prefix_streams,
2072 scope,
2073 } = value;
2074 Ok(Self {
2075 id: id.try_into()?,
2076 expires_at,
2077 auto_prefix_streams,
2078 scope: scope.map(Into::into),
2079 })
2080 }
2081}
2082
2083#[sync_docs]
2084#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2085pub enum Operation {
2086 ListBasins,
2087 CreateBasin,
2088 DeleteBasin,
2089 ReconfigureBasin,
2090 GetBasinConfig,
2091 IssueAccessToken,
2092 RevokeAccessToken,
2093 ListAccessTokens,
2094 ListStreams,
2095 CreateStream,
2096 DeleteStream,
2097 GetStreamConfig,
2098 ReconfigureStream,
2099 CheckTail,
2100 Append,
2101 Read,
2102 Trim,
2103 Fence,
2104 AccountMetrics,
2105 BasinMetrics,
2106 StreamMetrics,
2107}
2108
2109impl FromStr for Operation {
2110 type Err = ConvertError;
2111
2112 fn from_str(s: &str) -> Result<Self, Self::Err> {
2113 match s.to_lowercase().as_str() {
2114 "list-basins" => Ok(Self::ListBasins),
2115 "create-basin" => Ok(Self::CreateBasin),
2116 "delete-basin" => Ok(Self::DeleteBasin),
2117 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2118 "get-basin-config" => Ok(Self::GetBasinConfig),
2119 "issue-access-token" => Ok(Self::IssueAccessToken),
2120 "revoke-access-token" => Ok(Self::RevokeAccessToken),
2121 "list-access-tokens" => Ok(Self::ListAccessTokens),
2122 "list-streams" => Ok(Self::ListStreams),
2123 "create-stream" => Ok(Self::CreateStream),
2124 "delete-stream" => Ok(Self::DeleteStream),
2125 "get-stream-config" => Ok(Self::GetStreamConfig),
2126 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2127 "check-tail" => Ok(Self::CheckTail),
2128 "append" => Ok(Self::Append),
2129 "read" => Ok(Self::Read),
2130 "trim" => Ok(Self::Trim),
2131 "fence" => Ok(Self::Fence),
2132 "account-metrics" => Ok(Self::AccountMetrics),
2133 "basin-metrics" => Ok(Self::BasinMetrics),
2134 "stream-metrics" => Ok(Self::StreamMetrics),
2135 _ => Err("invalid operation".into()),
2136 }
2137 }
2138}
2139
2140impl From<Operation> for api::Operation {
2141 fn from(value: Operation) -> Self {
2142 match value {
2143 Operation::ListBasins => Self::ListBasins,
2144 Operation::CreateBasin => Self::CreateBasin,
2145 Operation::DeleteBasin => Self::DeleteBasin,
2146 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2147 Operation::GetBasinConfig => Self::GetBasinConfig,
2148 Operation::IssueAccessToken => Self::IssueAccessToken,
2149 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2150 Operation::ListAccessTokens => Self::ListAccessTokens,
2151 Operation::ListStreams => Self::ListStreams,
2152 Operation::CreateStream => Self::CreateStream,
2153 Operation::DeleteStream => Self::DeleteStream,
2154 Operation::GetStreamConfig => Self::GetStreamConfig,
2155 Operation::ReconfigureStream => Self::ReconfigureStream,
2156 Operation::CheckTail => Self::CheckTail,
2157 Operation::Append => Self::Append,
2158 Operation::Read => Self::Read,
2159 Operation::Trim => Self::Trim,
2160 Operation::Fence => Self::Fence,
2161 Operation::AccountMetrics => Self::AccountMetrics,
2162 Operation::BasinMetrics => Self::BasinMetrics,
2163 Operation::StreamMetrics => Self::StreamMetrics,
2164 }
2165 }
2166}
2167
2168impl From<api::Operation> for Option<Operation> {
2169 fn from(value: api::Operation) -> Self {
2170 match value {
2171 api::Operation::Unspecified => None,
2172 api::Operation::ListBasins => Some(Operation::ListBasins),
2173 api::Operation::CreateBasin => Some(Operation::CreateBasin),
2174 api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2175 api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2176 api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2177 api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2178 api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2179 api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2180 api::Operation::ListStreams => Some(Operation::ListStreams),
2181 api::Operation::CreateStream => Some(Operation::CreateStream),
2182 api::Operation::DeleteStream => Some(Operation::DeleteStream),
2183 api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2184 api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2185 api::Operation::CheckTail => Some(Operation::CheckTail),
2186 api::Operation::Append => Some(Operation::Append),
2187 api::Operation::Read => Some(Operation::Read),
2188 api::Operation::Trim => Some(Operation::Trim),
2189 api::Operation::Fence => Some(Operation::Fence),
2190 api::Operation::AccountMetrics => Some(Operation::AccountMetrics),
2191 api::Operation::BasinMetrics => Some(Operation::BasinMetrics),
2192 api::Operation::StreamMetrics => Some(Operation::StreamMetrics),
2193 }
2194 }
2195}
2196
2197#[sync_docs]
2198#[derive(Debug, Clone, Default)]
2199pub struct AccessTokenScope {
2200 pub basins: Option<ResourceSet>,
2201 pub streams: Option<ResourceSet>,
2202 pub access_tokens: Option<ResourceSet>,
2203 pub op_groups: Option<PermittedOperationGroups>,
2204 pub ops: HashSet<Operation>,
2205}
2206
2207impl AccessTokenScope {
2208 pub fn new() -> Self {
2210 Self::default()
2211 }
2212
2213 pub fn with_basins(self, basins: ResourceSet) -> Self {
2215 Self {
2216 basins: Some(basins),
2217 ..self
2218 }
2219 }
2220
2221 pub fn with_streams(self, streams: ResourceSet) -> Self {
2223 Self {
2224 streams: Some(streams),
2225 ..self
2226 }
2227 }
2228
2229 pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2231 Self {
2232 access_tokens: Some(access_tokens),
2233 ..self
2234 }
2235 }
2236
2237 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2239 Self {
2240 op_groups: Some(op_groups),
2241 ..self
2242 }
2243 }
2244
2245 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2247 Self {
2248 ops: ops.into_iter().collect(),
2249 ..self
2250 }
2251 }
2252
2253 pub fn with_op(self, op: Operation) -> Self {
2255 let mut ops = self.ops;
2256 ops.insert(op);
2257 Self { ops, ..self }
2258 }
2259}
2260
2261impl From<AccessTokenScope> for api::AccessTokenScope {
2262 fn from(value: AccessTokenScope) -> Self {
2263 let AccessTokenScope {
2264 basins,
2265 streams,
2266 access_tokens,
2267 op_groups,
2268 ops,
2269 } = value;
2270 Self {
2271 basins: basins.map(Into::into),
2272 streams: streams.map(Into::into),
2273 access_tokens: access_tokens.map(Into::into),
2274 op_groups: op_groups.map(Into::into),
2275 ops: ops
2276 .into_iter()
2277 .map(api::Operation::from)
2278 .map(Into::into)
2279 .collect(),
2280 }
2281 }
2282}
2283
2284impl From<api::AccessTokenScope> for AccessTokenScope {
2285 fn from(value: api::AccessTokenScope) -> Self {
2286 let api::AccessTokenScope {
2287 basins,
2288 streams,
2289 access_tokens,
2290 op_groups,
2291 ops,
2292 } = value;
2293 Self {
2294 basins: basins.and_then(|set| set.matching.map(Into::into)),
2295 streams: streams.and_then(|set| set.matching.map(Into::into)),
2296 access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2297 op_groups: op_groups.map(Into::into),
2298 ops: ops
2299 .into_iter()
2300 .map(api::Operation::try_from)
2301 .flat_map(Result::ok)
2302 .flat_map(<Option<Operation>>::from)
2303 .collect(),
2304 }
2305 }
2306}
2307
2308impl From<ResourceSet> for api::ResourceSet {
2309 fn from(value: ResourceSet) -> Self {
2310 Self {
2311 matching: Some(value.into()),
2312 }
2313 }
2314}
2315
2316#[sync_docs(ResourceSet = "Matching")]
2317#[derive(Debug, Clone)]
2318pub enum ResourceSet {
2319 Exact(String),
2320 Prefix(String),
2321}
2322
2323impl From<ResourceSet> for api::resource_set::Matching {
2324 fn from(value: ResourceSet) -> Self {
2325 match value {
2326 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2327 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2328 }
2329 }
2330}
2331
2332impl From<api::resource_set::Matching> for ResourceSet {
2333 fn from(value: api::resource_set::Matching) -> Self {
2334 match value {
2335 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2336 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2337 }
2338 }
2339}
2340
2341#[sync_docs]
2342#[derive(Debug, Clone, Default)]
2343pub struct PermittedOperationGroups {
2344 pub account: Option<ReadWritePermissions>,
2345 pub basin: Option<ReadWritePermissions>,
2346 pub stream: Option<ReadWritePermissions>,
2347}
2348
2349impl PermittedOperationGroups {
2350 pub fn new() -> Self {
2352 Self::default()
2353 }
2354
2355 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2357 Self {
2358 account: Some(account),
2359 ..self
2360 }
2361 }
2362
2363 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2365 Self {
2366 basin: Some(basin),
2367 ..self
2368 }
2369 }
2370
2371 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2373 Self {
2374 stream: Some(stream),
2375 ..self
2376 }
2377 }
2378}
2379
2380impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2381 fn from(value: PermittedOperationGroups) -> Self {
2382 let PermittedOperationGroups {
2383 account,
2384 basin,
2385 stream,
2386 } = value;
2387 Self {
2388 account: account.map(Into::into),
2389 basin: basin.map(Into::into),
2390 stream: stream.map(Into::into),
2391 }
2392 }
2393}
2394
2395impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2396 fn from(value: api::PermittedOperationGroups) -> Self {
2397 let api::PermittedOperationGroups {
2398 account,
2399 basin,
2400 stream,
2401 } = value;
2402 Self {
2403 account: account.map(Into::into),
2404 basin: basin.map(Into::into),
2405 stream: stream.map(Into::into),
2406 }
2407 }
2408}
2409
2410#[sync_docs]
2411#[derive(Debug, Clone, Default)]
2412pub struct ReadWritePermissions {
2413 pub read: bool,
2414 pub write: bool,
2415}
2416
2417impl ReadWritePermissions {
2418 pub fn new() -> Self {
2420 Self::default()
2421 }
2422
2423 pub fn with_read(self, read: bool) -> Self {
2425 Self { read, ..self }
2426 }
2427
2428 pub fn with_write(self, write: bool) -> Self {
2430 Self { write, ..self }
2431 }
2432}
2433
2434impl From<ReadWritePermissions> for api::ReadWritePermissions {
2435 fn from(value: ReadWritePermissions) -> Self {
2436 let ReadWritePermissions { read, write } = value;
2437 Self { read, write }
2438 }
2439}
2440
2441impl From<api::ReadWritePermissions> for ReadWritePermissions {
2442 fn from(value: api::ReadWritePermissions) -> Self {
2443 let api::ReadWritePermissions { read, write } = value;
2444 Self { read, write }
2445 }
2446}
2447
2448impl From<api::IssueAccessTokenResponse> for String {
2449 fn from(value: api::IssueAccessTokenResponse) -> Self {
2450 value.access_token
2451 }
2452}
2453
2454impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2455 fn from(value: AccessTokenId) -> Self {
2456 Self { id: value.into() }
2457 }
2458}
2459
2460impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2461 type Error = ConvertError;
2462 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2463 let token_info = value.info.ok_or("access token info is missing")?;
2464 token_info.try_into()
2465 }
2466}
2467
2468#[sync_docs]
2469#[derive(Debug, Clone, Default)]
2470pub struct ListAccessTokensRequest {
2471 pub prefix: String,
2472 pub start_after: String,
2473 pub limit: Option<usize>,
2474}
2475
2476impl ListAccessTokensRequest {
2477 pub fn new() -> Self {
2479 Self::default()
2480 }
2481
2482 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2484 Self {
2485 prefix: prefix.into(),
2486 ..self
2487 }
2488 }
2489
2490 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2492 Self {
2493 start_after: start_after.into(),
2494 ..self
2495 }
2496 }
2497
2498 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2500 Self {
2501 limit: limit.into(),
2502 ..self
2503 }
2504 }
2505}
2506
2507impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2508 type Error = ConvertError;
2509 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2510 let ListAccessTokensRequest {
2511 prefix,
2512 start_after,
2513 limit,
2514 } = value;
2515 Ok(Self {
2516 prefix,
2517 start_after,
2518 limit: limit
2519 .map(TryInto::try_into)
2520 .transpose()
2521 .map_err(|_| "request limit does not fit into u64 bounds")?,
2522 })
2523 }
2524}
2525
2526#[sync_docs]
2527#[derive(Debug, Clone)]
2528pub struct ListAccessTokensResponse {
2529 pub access_tokens: Vec<AccessTokenInfo>,
2530 pub has_more: bool,
2531}
2532
2533impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2534 type Error = ConvertError;
2535 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2536 let api::ListAccessTokensResponse {
2537 access_tokens,
2538 has_more,
2539 } = value;
2540 let access_tokens = access_tokens
2541 .into_iter()
2542 .map(TryInto::try_into)
2543 .collect::<Result<Vec<_>, _>>()?;
2544 Ok(Self {
2545 access_tokens,
2546 has_more,
2547 })
2548 }
2549}