1use std::{ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::{Rng, distributions::Uniform};
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20 fn from(value: T) -> Self {
21 Self(value.into())
22 }
23}
24
25pub trait MeteredBytes {
33 fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38 fn metered_bytes(&self) -> u64 {
39 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40 }
41}
42
43macro_rules! metered_impl {
44 ($ty:ty) => {
45 impl MeteredBytes for $ty {
46 fn metered_bytes(&self) -> u64 {
47 let bytes = 8
48 + (2 * self.headers.len())
49 + self
50 .headers
51 .iter()
52 .map(|h| h.name.len() + h.value.len())
53 .sum::<usize>()
54 + self.body.len();
55 bytes as u64
56 }
57 }
58 };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64 #[default]
65 Unspecified,
66 AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70 fn from(value: BasinScope) -> Self {
71 match value {
72 BasinScope::Unspecified => Self::Unspecified,
73 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74 }
75 }
76}
77
78impl From<api::BasinScope> for BasinScope {
79 fn from(value: api::BasinScope) -> Self {
80 match value {
81 api::BasinScope::Unspecified => Self::Unspecified,
82 api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83 }
84 }
85}
86
87impl FromStr for BasinScope {
88 type Err = ConvertError;
89 fn from_str(value: &str) -> Result<Self, Self::Err> {
90 match value {
91 "unspecified" => Ok(Self::Unspecified),
92 "aws:us-east-1" => Ok(Self::AwsUsEast1),
93 _ => Err("invalid basin scope value".into()),
94 }
95 }
96}
97
98impl From<BasinScope> for i32 {
99 fn from(value: BasinScope) -> Self {
100 api::BasinScope::from(value).into()
101 }
102}
103
104impl TryFrom<i32> for BasinScope {
105 type Error = ConvertError;
106 fn try_from(value: i32) -> Result<Self, Self::Error> {
107 api::BasinScope::try_from(value)
108 .map(Into::into)
109 .map_err(|_| "invalid basin scope value".into())
110 }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116 pub basin: BasinName,
117 pub config: Option<BasinConfig>,
118 pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122 pub fn new(basin: BasinName) -> Self {
124 Self {
125 basin,
126 config: None,
127 scope: BasinScope::Unspecified,
128 }
129 }
130
131 pub fn with_config(self, config: BasinConfig) -> Self {
133 Self {
134 config: Some(config),
135 ..self
136 }
137 }
138
139 pub fn with_scope(self, scope: BasinScope) -> Self {
141 Self { scope, ..self }
142 }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146 fn from(value: CreateBasinRequest) -> Self {
147 let CreateBasinRequest {
148 basin,
149 config,
150 scope,
151 } = value;
152 Self {
153 basin: basin.0,
154 config: config.map(Into::into),
155 scope: scope.into(),
156 }
157 }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163 pub default_stream_config: Option<StreamConfig>,
164 pub create_stream_on_append: bool,
165}
166
167impl From<BasinConfig> for api::BasinConfig {
168 fn from(value: BasinConfig) -> Self {
169 let BasinConfig {
170 default_stream_config,
171 create_stream_on_append,
172 } = value;
173 Self {
174 default_stream_config: default_stream_config.map(Into::into),
175 create_stream_on_append,
176 }
177 }
178}
179
180impl TryFrom<api::BasinConfig> for BasinConfig {
181 type Error = ConvertError;
182 fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
183 let api::BasinConfig {
184 default_stream_config,
185 create_stream_on_append,
186 } = value;
187 Ok(Self {
188 default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
189 create_stream_on_append,
190 })
191 }
192}
193
194#[sync_docs]
195#[derive(Debug, Clone, Default)]
196pub struct StreamConfig {
197 pub storage_class: StorageClass,
198 pub retention_policy: Option<RetentionPolicy>,
199 pub require_client_timestamps: Option<bool>,
200}
201
202impl StreamConfig {
203 pub fn new() -> Self {
205 Self::default()
206 }
207
208 pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
210 Self {
211 storage_class: storage_class.into(),
212 ..self
213 }
214 }
215
216 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
218 Self {
219 retention_policy: Some(retention_policy),
220 ..self
221 }
222 }
223
224 pub fn with_require_client_timestamps(self, require_client_timestamps: bool) -> Self {
226 Self {
227 require_client_timestamps: Some(require_client_timestamps),
228 ..self
229 }
230 }
231}
232
233impl From<StreamConfig> for api::StreamConfig {
234 fn from(value: StreamConfig) -> Self {
235 let StreamConfig {
236 storage_class,
237 retention_policy,
238 require_client_timestamps,
239 } = value;
240 Self {
241 storage_class: storage_class.into(),
242 retention_policy: retention_policy.map(Into::into),
243 require_client_timestamps,
244 }
245 }
246}
247
248impl TryFrom<api::StreamConfig> for StreamConfig {
249 type Error = ConvertError;
250 fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
251 let api::StreamConfig {
252 storage_class,
253 retention_policy,
254 require_client_timestamps,
255 } = value;
256 Ok(Self {
257 storage_class: storage_class.try_into()?,
258 retention_policy: retention_policy.map(Into::into),
259 require_client_timestamps,
260 })
261 }
262}
263
264#[sync_docs]
265#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
266pub enum StorageClass {
267 #[default]
268 Unspecified,
269 Standard,
270 Express,
271}
272
273impl From<StorageClass> for api::StorageClass {
274 fn from(value: StorageClass) -> Self {
275 match value {
276 StorageClass::Unspecified => Self::Unspecified,
277 StorageClass::Standard => Self::Standard,
278 StorageClass::Express => Self::Express,
279 }
280 }
281}
282
283impl From<api::StorageClass> for StorageClass {
284 fn from(value: api::StorageClass) -> Self {
285 match value {
286 api::StorageClass::Unspecified => Self::Unspecified,
287 api::StorageClass::Standard => Self::Standard,
288 api::StorageClass::Express => Self::Express,
289 }
290 }
291}
292
293impl FromStr for StorageClass {
294 type Err = ConvertError;
295 fn from_str(value: &str) -> Result<Self, Self::Err> {
296 match value {
297 "unspecified" => Ok(Self::Unspecified),
298 "standard" => Ok(Self::Standard),
299 "express" => Ok(Self::Express),
300 _ => Err("invalid storage class value".into()),
301 }
302 }
303}
304
305impl From<StorageClass> for i32 {
306 fn from(value: StorageClass) -> Self {
307 api::StorageClass::from(value).into()
308 }
309}
310
311impl TryFrom<i32> for StorageClass {
312 type Error = ConvertError;
313 fn try_from(value: i32) -> Result<Self, Self::Error> {
314 api::StorageClass::try_from(value)
315 .map(Into::into)
316 .map_err(|_| "invalid storage class value".into())
317 }
318}
319
320#[sync_docs(Age = "Age")]
321#[derive(Debug, Clone)]
322pub enum RetentionPolicy {
323 Age(Duration),
324}
325
326impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
327 fn from(value: RetentionPolicy) -> Self {
328 match value {
329 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
330 }
331 }
332}
333
334impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
335 fn from(value: api::stream_config::RetentionPolicy) -> Self {
336 match value {
337 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
338 }
339 }
340}
341
342#[sync_docs]
343#[derive(Debug, Clone, Copy, PartialEq, Eq)]
344pub enum BasinState {
345 Unspecified,
346 Active,
347 Creating,
348 Deleting,
349}
350
351impl From<BasinState> for api::BasinState {
352 fn from(value: BasinState) -> Self {
353 match value {
354 BasinState::Unspecified => Self::Unspecified,
355 BasinState::Active => Self::Active,
356 BasinState::Creating => Self::Creating,
357 BasinState::Deleting => Self::Deleting,
358 }
359 }
360}
361
362impl From<api::BasinState> for BasinState {
363 fn from(value: api::BasinState) -> Self {
364 match value {
365 api::BasinState::Unspecified => Self::Unspecified,
366 api::BasinState::Active => Self::Active,
367 api::BasinState::Creating => Self::Creating,
368 api::BasinState::Deleting => Self::Deleting,
369 }
370 }
371}
372
373impl From<BasinState> for i32 {
374 fn from(value: BasinState) -> Self {
375 api::BasinState::from(value).into()
376 }
377}
378
379impl TryFrom<i32> for BasinState {
380 type Error = ConvertError;
381 fn try_from(value: i32) -> Result<Self, Self::Error> {
382 api::BasinState::try_from(value)
383 .map(Into::into)
384 .map_err(|_| "invalid basin status value".into())
385 }
386}
387
388impl std::fmt::Display for BasinState {
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 match self {
391 BasinState::Unspecified => write!(f, "unspecified"),
392 BasinState::Active => write!(f, "active"),
393 BasinState::Creating => write!(f, "creating"),
394 BasinState::Deleting => write!(f, "deleting"),
395 }
396 }
397}
398
399#[sync_docs]
400#[derive(Debug, Clone)]
401pub struct BasinInfo {
402 pub name: String,
403 pub scope: BasinScope,
404 pub state: BasinState,
405}
406
407impl From<BasinInfo> for api::BasinInfo {
408 fn from(value: BasinInfo) -> Self {
409 let BasinInfo { name, scope, state } = value;
410 Self {
411 name,
412 scope: scope.into(),
413 state: state.into(),
414 }
415 }
416}
417
418impl TryFrom<api::BasinInfo> for BasinInfo {
419 type Error = ConvertError;
420 fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
421 let api::BasinInfo { name, scope, state } = value;
422 Ok(Self {
423 name,
424 scope: scope.try_into()?,
425 state: state.try_into()?,
426 })
427 }
428}
429
430impl TryFrom<api::CreateBasinResponse> for BasinInfo {
431 type Error = ConvertError;
432 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
433 let api::CreateBasinResponse { info } = value;
434 let info = info.ok_or("missing basin info")?;
435 info.try_into()
436 }
437}
438
439#[sync_docs]
440#[derive(Debug, Clone, Default)]
441pub struct ListStreamsRequest {
442 pub prefix: String,
443 pub start_after: String,
444 pub limit: Option<usize>,
445}
446
447impl ListStreamsRequest {
448 pub fn new() -> Self {
450 Self::default()
451 }
452
453 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
455 Self {
456 prefix: prefix.into(),
457 ..self
458 }
459 }
460
461 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
463 Self {
464 start_after: start_after.into(),
465 ..self
466 }
467 }
468
469 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
471 Self {
472 limit: limit.into(),
473 ..self
474 }
475 }
476}
477
478impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
479 type Error = ConvertError;
480 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
481 let ListStreamsRequest {
482 prefix,
483 start_after,
484 limit,
485 } = value;
486 Ok(Self {
487 prefix,
488 start_after,
489 limit: limit.map(|n| n as u64),
490 })
491 }
492}
493
494#[sync_docs]
495#[derive(Debug, Clone)]
496pub struct StreamInfo {
497 pub name: String,
498 pub created_at: u32,
499 pub deleted_at: Option<u32>,
500}
501
502impl From<api::StreamInfo> for StreamInfo {
503 fn from(value: api::StreamInfo) -> Self {
504 Self {
505 name: value.name,
506 created_at: value.created_at,
507 deleted_at: value.deleted_at,
508 }
509 }
510}
511
512impl TryFrom<api::CreateStreamResponse> for StreamInfo {
513 type Error = ConvertError;
514
515 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
516 let api::CreateStreamResponse { info } = value;
517 let info = info.ok_or("missing stream info")?;
518 Ok(info.into())
519 }
520}
521
522#[sync_docs]
523#[derive(Debug, Clone)]
524pub struct ListStreamsResponse {
525 pub streams: Vec<StreamInfo>,
526 pub has_more: bool,
527}
528
529impl From<api::ListStreamsResponse> for ListStreamsResponse {
530 fn from(value: api::ListStreamsResponse) -> Self {
531 let api::ListStreamsResponse { streams, has_more } = value;
532 let streams = streams.into_iter().map(Into::into).collect();
533 Self { streams, has_more }
534 }
535}
536
537impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
538 type Error = ConvertError;
539 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
540 let api::GetBasinConfigResponse { config } = value;
541 let config = config.ok_or("missing basin config")?;
542 config.try_into()
543 }
544}
545
546impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
547 type Error = ConvertError;
548 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
549 let api::GetStreamConfigResponse { config } = value;
550 let config = config.ok_or("missing stream config")?;
551 config.try_into()
552 }
553}
554
555#[sync_docs]
556#[derive(Debug, Clone)]
557pub struct CreateStreamRequest {
558 pub stream: String,
559 pub config: Option<StreamConfig>,
560}
561
562impl CreateStreamRequest {
563 pub fn new(stream: impl Into<String>) -> Self {
565 Self {
566 stream: stream.into(),
567 config: None,
568 }
569 }
570
571 pub fn with_config(self, config: StreamConfig) -> Self {
573 Self {
574 config: Some(config),
575 ..self
576 }
577 }
578}
579
580impl From<CreateStreamRequest> for api::CreateStreamRequest {
581 fn from(value: CreateStreamRequest) -> Self {
582 let CreateStreamRequest { stream, config } = value;
583 Self {
584 stream,
585 config: config.map(Into::into),
586 }
587 }
588}
589
590#[sync_docs]
591#[derive(Debug, Clone, Default)]
592pub struct ListBasinsRequest {
593 pub prefix: String,
594 pub start_after: String,
595 pub limit: Option<usize>,
596}
597
598impl ListBasinsRequest {
599 pub fn new() -> Self {
601 Self::default()
602 }
603
604 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
606 Self {
607 prefix: prefix.into(),
608 ..self
609 }
610 }
611
612 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
614 Self {
615 start_after: start_after.into(),
616 ..self
617 }
618 }
619
620 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
622 Self {
623 limit: limit.into(),
624 ..self
625 }
626 }
627}
628
629impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
630 type Error = ConvertError;
631 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
632 let ListBasinsRequest {
633 prefix,
634 start_after,
635 limit,
636 } = value;
637 Ok(Self {
638 prefix,
639 start_after,
640 limit: limit
641 .map(TryInto::try_into)
642 .transpose()
643 .map_err(|_| "request limit does not fit into u64 bounds")?,
644 })
645 }
646}
647
648#[sync_docs]
649#[derive(Debug, Clone)]
650pub struct ListBasinsResponse {
651 pub basins: Vec<BasinInfo>,
652 pub has_more: bool,
653}
654
655impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
656 type Error = ConvertError;
657 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
658 let api::ListBasinsResponse { basins, has_more } = value;
659 Ok(Self {
660 basins: basins
661 .into_iter()
662 .map(TryInto::try_into)
663 .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
664 has_more,
665 })
666 }
667}
668
669#[sync_docs]
670#[derive(Debug, Clone)]
671pub struct DeleteBasinRequest {
672 pub basin: BasinName,
673 pub if_exists: bool,
675}
676
677impl DeleteBasinRequest {
678 pub fn new(basin: BasinName) -> Self {
680 Self {
681 basin,
682 if_exists: false,
683 }
684 }
685
686 pub fn with_if_exists(self, if_exists: bool) -> Self {
688 Self { if_exists, ..self }
689 }
690}
691
692impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
693 fn from(value: DeleteBasinRequest) -> Self {
694 let DeleteBasinRequest { basin, .. } = value;
695 Self { basin: basin.0 }
696 }
697}
698
699#[sync_docs]
700#[derive(Debug, Clone)]
701pub struct DeleteStreamRequest {
702 pub stream: String,
703 pub if_exists: bool,
705}
706
707impl DeleteStreamRequest {
708 pub fn new(stream: impl Into<String>) -> Self {
710 Self {
711 stream: stream.into(),
712 if_exists: false,
713 }
714 }
715
716 pub fn with_if_exists(self, if_exists: bool) -> Self {
718 Self { if_exists, ..self }
719 }
720}
721
722impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
723 fn from(value: DeleteStreamRequest) -> Self {
724 let DeleteStreamRequest { stream, .. } = value;
725 Self { stream }
726 }
727}
728
729#[sync_docs]
730#[derive(Debug, Clone)]
731pub struct ReconfigureBasinRequest {
732 pub basin: BasinName,
733 pub config: Option<BasinConfig>,
734 pub mask: Option<Vec<String>>,
735}
736
737impl ReconfigureBasinRequest {
738 pub fn new(basin: BasinName) -> Self {
740 Self {
741 basin,
742 config: None,
743 mask: None,
744 }
745 }
746
747 pub fn with_config(self, config: BasinConfig) -> Self {
749 Self {
750 config: Some(config),
751 ..self
752 }
753 }
754
755 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
757 Self {
758 mask: Some(mask.into()),
759 ..self
760 }
761 }
762}
763
764impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
765 fn from(value: ReconfigureBasinRequest) -> Self {
766 let ReconfigureBasinRequest {
767 basin,
768 config,
769 mask,
770 } = value;
771 Self {
772 basin: basin.0,
773 config: config.map(Into::into),
774 mask: mask.map(|paths| prost_types::FieldMask { paths }),
775 }
776 }
777}
778
779impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
780 type Error = ConvertError;
781 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
782 let api::ReconfigureBasinResponse { config } = value;
783 let config = config.ok_or("missing basin config")?;
784 config.try_into()
785 }
786}
787
788#[sync_docs]
789#[derive(Debug, Clone)]
790pub struct ReconfigureStreamRequest {
791 pub stream: String,
792 pub config: Option<StreamConfig>,
793 pub mask: Option<Vec<String>>,
794}
795
796impl ReconfigureStreamRequest {
797 pub fn new(stream: impl Into<String>) -> Self {
799 Self {
800 stream: stream.into(),
801 config: None,
802 mask: None,
803 }
804 }
805
806 pub fn with_config(self, config: StreamConfig) -> Self {
808 Self {
809 config: Some(config),
810 ..self
811 }
812 }
813
814 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
816 Self {
817 mask: Some(mask.into()),
818 ..self
819 }
820 }
821}
822
823impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
824 fn from(value: ReconfigureStreamRequest) -> Self {
825 let ReconfigureStreamRequest {
826 stream,
827 config,
828 mask,
829 } = value;
830 Self {
831 stream,
832 config: config.map(Into::into),
833 mask: mask.map(|paths| prost_types::FieldMask { paths }),
834 }
835 }
836}
837
838impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
839 type Error = ConvertError;
840 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
841 let api::ReconfigureStreamResponse { config } = value;
842 let config = config.ok_or("missing stream config")?;
843 config.try_into()
844 }
845}
846
847impl From<api::CheckTailResponse> for u64 {
848 fn from(value: api::CheckTailResponse) -> Self {
849 let api::CheckTailResponse { next_seq_num } = value;
850 next_seq_num
851 }
852}
853
854#[sync_docs]
855#[derive(Debug, Clone, PartialEq, Eq)]
856pub struct Header {
857 pub name: Bytes,
858 pub value: Bytes,
859}
860
861impl Header {
862 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
864 Self {
865 name: name.into(),
866 value: value.into(),
867 }
868 }
869
870 pub fn from_value(value: impl Into<Bytes>) -> Self {
872 Self {
873 name: Bytes::new(),
874 value: value.into(),
875 }
876 }
877}
878
879impl From<Header> for api::Header {
880 fn from(value: Header) -> Self {
881 let Header { name, value } = value;
882 Self { name, value }
883 }
884}
885
886impl From<api::Header> for Header {
887 fn from(value: api::Header) -> Self {
888 let api::Header { name, value } = value;
889 Self { name, value }
890 }
891}
892
893#[derive(Debug, Clone, Default, PartialEq, Eq)]
897pub struct FencingToken(Bytes);
898
899impl FencingToken {
900 const MAX_BYTES: usize = 16;
901
902 pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
904 let bytes = bytes.into();
905 if bytes.len() > Self::MAX_BYTES {
906 Err(format!(
907 "Size of a fencing token cannot exceed {} bytes",
908 Self::MAX_BYTES
909 )
910 .into())
911 } else {
912 Ok(Self(bytes))
913 }
914 }
915
916 pub fn generate(n: usize) -> Result<Self, ConvertError> {
918 Self::new(
919 rand::thread_rng()
920 .sample_iter(&Uniform::new_inclusive(0, u8::MAX))
921 .take(n)
922 .collect::<Bytes>(),
923 )
924 }
925}
926
927impl AsRef<Bytes> for FencingToken {
928 fn as_ref(&self) -> &Bytes {
929 &self.0
930 }
931}
932
933impl AsRef<[u8]> for FencingToken {
934 fn as_ref(&self) -> &[u8] {
935 &self.0
936 }
937}
938
939impl Deref for FencingToken {
940 type Target = [u8];
941
942 fn deref(&self) -> &Self::Target {
943 &self.0
944 }
945}
946
947impl From<FencingToken> for Bytes {
948 fn from(value: FencingToken) -> Self {
949 value.0
950 }
951}
952
953impl From<FencingToken> for Vec<u8> {
954 fn from(value: FencingToken) -> Self {
955 value.0.into()
956 }
957}
958
959impl TryFrom<Bytes> for FencingToken {
960 type Error = ConvertError;
961
962 fn try_from(value: Bytes) -> Result<Self, Self::Error> {
963 Self::new(value)
964 }
965}
966
967impl TryFrom<Vec<u8>> for FencingToken {
968 type Error = ConvertError;
969
970 fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
971 Self::new(value)
972 }
973}
974
975#[derive(Debug, Clone)]
977pub enum Command {
978 Fence {
983 fencing_token: FencingToken,
987 },
988 Trim {
993 seq_num: u64,
998 },
999}
1000
1001#[derive(Debug, Clone)]
1007pub struct CommandRecord {
1008 pub command: Command,
1010 pub timestamp: Option<u64>,
1012}
1013
1014impl CommandRecord {
1015 const FENCE: &[u8] = b"fence";
1016 const TRIM: &[u8] = b"trim";
1017
1018 pub fn fence(fencing_token: FencingToken) -> Self {
1020 Self {
1021 command: Command::Fence { fencing_token },
1022 timestamp: None,
1023 }
1024 }
1025
1026 pub fn trim(seq_num: impl Into<u64>) -> Self {
1028 Self {
1029 command: Command::Trim {
1030 seq_num: seq_num.into(),
1031 },
1032 timestamp: None,
1033 }
1034 }
1035
1036 pub fn with_timestamp(self, timestamp: u64) -> Self {
1038 Self {
1039 timestamp: Some(timestamp),
1040 ..self
1041 }
1042 }
1043}
1044
1045#[sync_docs]
1046#[derive(Debug, Clone, PartialEq, Eq)]
1047pub struct AppendRecord {
1048 timestamp: Option<u64>,
1049 headers: Vec<Header>,
1050 body: Bytes,
1051 #[cfg(test)]
1052 max_bytes: u64,
1053}
1054
1055metered_impl!(AppendRecord);
1056
1057impl AppendRecord {
1058 const MAX_BYTES: u64 = MIB_BYTES;
1059
1060 fn validated(self) -> Result<Self, ConvertError> {
1061 #[cfg(test)]
1062 let max_bytes = self.max_bytes;
1063 #[cfg(not(test))]
1064 let max_bytes = Self::MAX_BYTES;
1065
1066 if self.metered_bytes() > max_bytes {
1067 Err("AppendRecord should have metered size less than 1 MiB".into())
1068 } else {
1069 Ok(self)
1070 }
1071 }
1072
1073 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1075 Self {
1076 timestamp: None,
1077 headers: Vec::new(),
1078 body: body.into(),
1079 #[cfg(test)]
1080 max_bytes: Self::MAX_BYTES,
1081 }
1082 .validated()
1083 }
1084
1085 #[cfg(test)]
1086 pub(crate) fn with_max_bytes(
1087 max_bytes: u64,
1088 body: impl Into<Bytes>,
1089 ) -> Result<Self, ConvertError> {
1090 Self {
1091 timestamp: None,
1092 headers: Vec::new(),
1093 body: body.into(),
1094 max_bytes,
1095 }
1096 .validated()
1097 }
1098
1099 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1101 Self {
1102 headers: headers.into(),
1103 ..self
1104 }
1105 .validated()
1106 }
1107
1108 pub fn with_timestamp(self, timestamp: u64) -> Self {
1110 Self {
1111 timestamp: Some(timestamp),
1112 ..self
1113 }
1114 }
1115
1116 pub fn body(&self) -> &[u8] {
1118 &self.body
1119 }
1120
1121 pub fn headers(&self) -> &[Header] {
1123 &self.headers
1124 }
1125
1126 pub fn timestamp(&self) -> Option<u64> {
1128 self.timestamp
1129 }
1130
1131 pub fn into_parts(self) -> AppendRecordParts {
1133 AppendRecordParts {
1134 timestamp: self.timestamp,
1135 headers: self.headers,
1136 body: self.body,
1137 }
1138 }
1139
1140 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1142 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1143 if let Some(timestamp) = parts.timestamp {
1144 Ok(record.with_timestamp(timestamp))
1145 } else {
1146 Ok(record)
1147 }
1148 }
1149}
1150
1151impl From<AppendRecord> for api::AppendRecord {
1152 fn from(value: AppendRecord) -> Self {
1153 Self {
1154 timestamp: value.timestamp,
1155 headers: value.headers.into_iter().map(Into::into).collect(),
1156 body: value.body,
1157 }
1158 }
1159}
1160
1161impl From<CommandRecord> for AppendRecord {
1162 fn from(value: CommandRecord) -> Self {
1163 let (header_value, body) = match value.command {
1164 Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1165 Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1166 };
1167 AppendRecordParts {
1168 timestamp: value.timestamp,
1169 headers: vec![Header::from_value(header_value)],
1170 body: body.into(),
1171 }
1172 .try_into()
1173 .expect("command record is a valid append record")
1174 }
1175}
1176
1177#[sync_docs(AppendRecordParts = "AppendRecord")]
1178#[derive(Debug, Clone)]
1179pub struct AppendRecordParts {
1180 pub timestamp: Option<u64>,
1181 pub headers: Vec<Header>,
1182 pub body: Bytes,
1183}
1184
1185impl From<AppendRecord> for AppendRecordParts {
1186 fn from(value: AppendRecord) -> Self {
1187 value.into_parts()
1188 }
1189}
1190
1191impl TryFrom<AppendRecordParts> for AppendRecord {
1192 type Error = ConvertError;
1193
1194 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1195 Self::try_from_parts(value)
1196 }
1197}
1198
1199#[derive(Debug, Clone)]
1201pub struct AppendRecordBatch {
1202 records: Vec<AppendRecord>,
1203 metered_bytes: u64,
1204 max_capacity: usize,
1205 #[cfg(test)]
1206 max_bytes: u64,
1207}
1208
1209impl PartialEq for AppendRecordBatch {
1210 fn eq(&self, other: &Self) -> bool {
1211 if self.records.eq(&other.records) {
1212 assert_eq!(self.metered_bytes, other.metered_bytes);
1213 true
1214 } else {
1215 false
1216 }
1217 }
1218}
1219
1220impl Eq for AppendRecordBatch {}
1221
1222impl Default for AppendRecordBatch {
1223 fn default() -> Self {
1224 Self::new()
1225 }
1226}
1227
1228impl AppendRecordBatch {
1229 pub const MAX_CAPACITY: usize = 1000;
1233
1234 pub const MAX_BYTES: u64 = MIB_BYTES;
1236
1237 pub fn new() -> Self {
1239 Self::with_max_capacity(Self::MAX_CAPACITY)
1240 }
1241
1242 pub fn with_max_capacity(max_capacity: usize) -> Self {
1246 assert!(
1247 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1248 "Batch capacity must be between 1 and 1000"
1249 );
1250
1251 Self {
1252 records: Vec::with_capacity(max_capacity),
1253 metered_bytes: 0,
1254 max_capacity,
1255 #[cfg(test)]
1256 max_bytes: Self::MAX_BYTES,
1257 }
1258 }
1259
1260 #[cfg(test)]
1261 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1262 #[cfg(test)]
1263 assert!(
1264 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1265 "Batch size must be between 1 byte and 1 MiB"
1266 );
1267
1268 Self {
1269 max_bytes,
1270 ..Self::with_max_capacity(max_capacity)
1271 }
1272 }
1273
1274 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1280 where
1281 R: Into<AppendRecord>,
1282 T: IntoIterator<Item = R>,
1283 {
1284 let mut records = Self::new();
1285 let mut pending = Vec::new();
1286
1287 let mut iter = iter.into_iter();
1288
1289 for record in iter.by_ref() {
1290 if let Err(record) = records.push(record) {
1291 pending.push(record);
1292 break;
1293 }
1294 }
1295
1296 if pending.is_empty() {
1297 Ok(records)
1298 } else {
1299 pending.extend(iter.map(Into::into));
1300 Err((records, pending))
1301 }
1302 }
1303
1304 pub fn is_empty(&self) -> bool {
1306 if self.records.is_empty() {
1307 assert_eq!(self.metered_bytes, 0);
1308 true
1309 } else {
1310 false
1311 }
1312 }
1313
1314 pub fn len(&self) -> usize {
1316 self.records.len()
1317 }
1318
1319 #[cfg(test)]
1320 fn max_bytes(&self) -> u64 {
1321 self.max_bytes
1322 }
1323
1324 #[cfg(not(test))]
1325 fn max_bytes(&self) -> u64 {
1326 Self::MAX_BYTES
1327 }
1328
1329 pub fn is_full(&self) -> bool {
1331 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1332 }
1333
1334 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1336 assert!(self.records.len() <= self.max_capacity);
1337 assert!(self.metered_bytes <= self.max_bytes());
1338
1339 let record = record.into();
1340 let record_size = record.metered_bytes();
1341 if self.records.len() >= self.max_capacity
1342 || self.metered_bytes + record_size > self.max_bytes()
1343 {
1344 Err(record)
1345 } else {
1346 self.records.push(record);
1347 self.metered_bytes += record_size;
1348 Ok(())
1349 }
1350 }
1351}
1352
1353impl MeteredBytes for AppendRecordBatch {
1354 fn metered_bytes(&self) -> u64 {
1355 self.metered_bytes
1356 }
1357}
1358
1359impl IntoIterator for AppendRecordBatch {
1360 type Item = AppendRecord;
1361 type IntoIter = std::vec::IntoIter<Self::Item>;
1362
1363 fn into_iter(self) -> Self::IntoIter {
1364 self.records.into_iter()
1365 }
1366}
1367
1368impl<'a> IntoIterator for &'a AppendRecordBatch {
1369 type Item = &'a AppendRecord;
1370 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1371
1372 fn into_iter(self) -> Self::IntoIter {
1373 self.records.iter()
1374 }
1375}
1376
1377impl AsRef<[AppendRecord]> for AppendRecordBatch {
1378 fn as_ref(&self) -> &[AppendRecord] {
1379 &self.records
1380 }
1381}
1382
1383#[sync_docs]
1384#[derive(Debug, Default, Clone)]
1385pub struct AppendInput {
1386 pub records: AppendRecordBatch,
1387 pub match_seq_num: Option<u64>,
1388 pub fencing_token: Option<FencingToken>,
1389}
1390
1391impl MeteredBytes for AppendInput {
1392 fn metered_bytes(&self) -> u64 {
1393 self.records.metered_bytes()
1394 }
1395}
1396
1397impl AppendInput {
1398 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1400 Self {
1401 records: records.into(),
1402 match_seq_num: None,
1403 fencing_token: None,
1404 }
1405 }
1406
1407 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1409 Self {
1410 match_seq_num: Some(match_seq_num.into()),
1411 ..self
1412 }
1413 }
1414
1415 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1417 Self {
1418 fencing_token: Some(fencing_token),
1419 ..self
1420 }
1421 }
1422
1423 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1424 let Self {
1425 records,
1426 match_seq_num,
1427 fencing_token,
1428 } = self;
1429
1430 api::AppendInput {
1431 stream: stream.into(),
1432 records: records.into_iter().map(Into::into).collect(),
1433 match_seq_num,
1434 fencing_token: fencing_token.map(|f| f.0),
1435 }
1436 }
1437}
1438
1439#[sync_docs]
1440#[derive(Debug, Clone)]
1441pub struct AppendOutput {
1442 pub start_seq_num: u64,
1443 pub end_seq_num: u64,
1444 pub next_seq_num: u64,
1445}
1446
1447impl From<api::AppendOutput> for AppendOutput {
1448 fn from(value: api::AppendOutput) -> Self {
1449 let api::AppendOutput {
1450 start_seq_num,
1451 end_seq_num,
1452 next_seq_num,
1453 } = value;
1454 Self {
1455 start_seq_num,
1456 end_seq_num,
1457 next_seq_num,
1458 }
1459 }
1460}
1461
1462impl TryFrom<api::AppendResponse> for AppendOutput {
1463 type Error = ConvertError;
1464 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1465 let api::AppendResponse { output } = value;
1466 let output = output.ok_or("missing append output")?;
1467 Ok(output.into())
1468 }
1469}
1470
1471impl TryFrom<api::AppendSessionResponse> for AppendOutput {
1472 type Error = ConvertError;
1473 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1474 let api::AppendSessionResponse { output } = value;
1475 let output = output.ok_or("missing append output")?;
1476 Ok(output.into())
1477 }
1478}
1479
1480#[sync_docs]
1481#[derive(Debug, Clone, Default)]
1482pub struct ReadLimit {
1483 pub count: Option<u64>,
1484 pub bytes: Option<u64>,
1485}
1486
1487#[sync_docs]
1488#[derive(Debug, Clone, Default)]
1489pub struct ReadRequest {
1490 pub start_seq_num: u64,
1491 pub limit: ReadLimit,
1492}
1493
1494impl ReadRequest {
1495 pub fn new(start_seq_num: u64) -> Self {
1497 Self {
1498 start_seq_num,
1499 ..Default::default()
1500 }
1501 }
1502
1503 pub fn with_limit(self, limit: ReadLimit) -> Self {
1505 Self { limit, ..self }
1506 }
1507}
1508
1509impl ReadRequest {
1510 pub(crate) fn try_into_api_type(
1511 self,
1512 stream: impl Into<String>,
1513 ) -> Result<api::ReadRequest, ConvertError> {
1514 let Self {
1515 start_seq_num,
1516 limit,
1517 } = self;
1518
1519 let limit = if limit.count > Some(1000) {
1520 Err("read limit: count must not exceed 1000 for unary request")
1521 } else if limit.bytes > Some(MIB_BYTES) {
1522 Err("read limit: bytes must not exceed 1MiB for unary request")
1523 } else {
1524 Ok(api::ReadLimit {
1525 count: limit.count,
1526 bytes: limit.bytes,
1527 })
1528 }?;
1529
1530 Ok(api::ReadRequest {
1531 stream: stream.into(),
1532 start_seq_num,
1533 limit: Some(limit),
1534 })
1535 }
1536}
1537
1538#[sync_docs]
1539#[derive(Debug, Clone)]
1540pub struct SequencedRecord {
1541 pub seq_num: u64,
1542 pub timestamp: u64,
1543 pub headers: Vec<Header>,
1544 pub body: Bytes,
1545}
1546
1547metered_impl!(SequencedRecord);
1548
1549impl From<api::SequencedRecord> for SequencedRecord {
1550 fn from(value: api::SequencedRecord) -> Self {
1551 let api::SequencedRecord {
1552 seq_num,
1553 timestamp,
1554 headers,
1555 body,
1556 } = value;
1557 Self {
1558 seq_num,
1559 timestamp,
1560 headers: headers.into_iter().map(Into::into).collect(),
1561 body,
1562 }
1563 }
1564}
1565
1566impl SequencedRecord {
1567 pub fn as_command_record(&self) -> Option<CommandRecord> {
1569 if self.headers.len() != 1 {
1570 return None;
1571 }
1572
1573 let header = self.headers.first().expect("pre-validated length");
1574
1575 if !header.name.is_empty() {
1576 return None;
1577 }
1578
1579 match header.value.as_ref() {
1580 CommandRecord::FENCE => {
1581 let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1582 Some(CommandRecord {
1583 command: Command::Fence { fencing_token },
1584 timestamp: Some(self.timestamp),
1585 })
1586 }
1587 CommandRecord::TRIM => {
1588 let body: &[u8] = &self.body;
1589 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1590 Some(CommandRecord {
1591 command: Command::Trim { seq_num },
1592 timestamp: Some(self.timestamp),
1593 })
1594 }
1595 _ => None,
1596 }
1597 }
1598}
1599
1600#[sync_docs]
1601#[derive(Debug, Clone)]
1602pub struct SequencedRecordBatch {
1603 pub records: Vec<SequencedRecord>,
1604}
1605
1606impl MeteredBytes for SequencedRecordBatch {
1607 fn metered_bytes(&self) -> u64 {
1608 self.records.metered_bytes()
1609 }
1610}
1611
1612impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1613 fn from(value: api::SequencedRecordBatch) -> Self {
1614 let api::SequencedRecordBatch { records } = value;
1615 Self {
1616 records: records.into_iter().map(Into::into).collect(),
1617 }
1618 }
1619}
1620
1621#[sync_docs(ReadOutput = "Output")]
1622#[derive(Debug, Clone)]
1623pub enum ReadOutput {
1624 Batch(SequencedRecordBatch),
1625 FirstSeqNum(u64),
1626 NextSeqNum(u64),
1627}
1628
1629impl From<api::read_output::Output> for ReadOutput {
1630 fn from(value: api::read_output::Output) -> Self {
1631 match value {
1632 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1633 api::read_output::Output::FirstSeqNum(first_seq_num) => {
1634 Self::FirstSeqNum(first_seq_num)
1635 }
1636 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1637 }
1638 }
1639}
1640
1641impl TryFrom<api::ReadOutput> for ReadOutput {
1642 type Error = ConvertError;
1643 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1644 let api::ReadOutput { output } = value;
1645 let output = output.ok_or("missing read output")?;
1646 Ok(output.into())
1647 }
1648}
1649
1650impl TryFrom<api::ReadResponse> for ReadOutput {
1651 type Error = ConvertError;
1652 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1653 let api::ReadResponse { output } = value;
1654 let output = output.ok_or("missing output in read response")?;
1655 output.try_into()
1656 }
1657}
1658
1659#[sync_docs]
1660#[derive(Debug, Clone, Default)]
1661pub struct ReadSessionRequest {
1662 pub start_seq_num: u64,
1663 pub limit: ReadLimit,
1664}
1665
1666impl ReadSessionRequest {
1667 pub fn new(start_seq_num: u64) -> Self {
1669 Self {
1670 start_seq_num,
1671 ..Default::default()
1672 }
1673 }
1674
1675 pub fn with_limit(self, limit: ReadLimit) -> Self {
1677 Self { limit, ..self }
1678 }
1679
1680 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1681 let Self {
1682 start_seq_num,
1683 limit,
1684 } = self;
1685 api::ReadSessionRequest {
1686 stream: stream.into(),
1687 start_seq_num,
1688 limit: Some(api::ReadLimit {
1689 count: limit.count,
1690 bytes: limit.bytes,
1691 }),
1692 heartbeats: false,
1693 }
1694 }
1695}
1696
1697impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1698 type Error = ConvertError;
1699 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1700 let api::ReadSessionResponse { output } = value;
1701 let output = output.ok_or("missing output in read session response")?;
1702 output.try_into()
1703 }
1704}
1705
1706#[derive(Debug, Clone)]
1711pub struct BasinName(String);
1712
1713impl AsRef<str> for BasinName {
1714 fn as_ref(&self) -> &str {
1715 &self.0
1716 }
1717}
1718
1719impl Deref for BasinName {
1720 type Target = str;
1721 fn deref(&self) -> &Self::Target {
1722 &self.0
1723 }
1724}
1725
1726impl TryFrom<String> for BasinName {
1727 type Error = ConvertError;
1728
1729 fn try_from(name: String) -> Result<Self, Self::Error> {
1730 if name.len() < 8 || name.len() > 48 {
1731 return Err("Basin name must be between 8 and 48 characters in length".into());
1732 }
1733
1734 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1735 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1736 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1737 .expect("Failed to compile basin name regex")
1738 });
1739
1740 if !regex.is_match(&name) {
1741 return Err(
1742 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1743 It cannot begin or end with a hyphen."
1744 .into(),
1745 );
1746 }
1747
1748 Ok(Self(name))
1749 }
1750}
1751
1752impl FromStr for BasinName {
1753 type Err = ConvertError;
1754
1755 fn from_str(s: &str) -> Result<Self, Self::Err> {
1756 s.to_string().try_into()
1757 }
1758}
1759
1760impl std::fmt::Display for BasinName {
1761 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1762 f.write_str(&self.0)
1763 }
1764}