1use std::{ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::{Rng, distributions::Uniform};
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20 fn from(value: T) -> Self {
21 Self(value.into())
22 }
23}
24
25pub trait MeteredBytes {
33 fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38 fn metered_bytes(&self) -> u64 {
39 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40 }
41}
42
43macro_rules! metered_impl {
44 ($ty:ty) => {
45 impl MeteredBytes for $ty {
46 fn metered_bytes(&self) -> u64 {
47 let bytes = 8
48 + (2 * self.headers.len())
49 + self
50 .headers
51 .iter()
52 .map(|h| h.name.len() + h.value.len())
53 .sum::<usize>()
54 + self.body.len();
55 bytes as u64
56 }
57 }
58 };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64 #[default]
65 Unspecified,
66 AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70 fn from(value: BasinScope) -> Self {
71 match value {
72 BasinScope::Unspecified => Self::Unspecified,
73 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74 }
75 }
76}
77
78impl From<api::BasinScope> for BasinScope {
79 fn from(value: api::BasinScope) -> Self {
80 match value {
81 api::BasinScope::Unspecified => Self::Unspecified,
82 api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83 }
84 }
85}
86
87impl FromStr for BasinScope {
88 type Err = ConvertError;
89 fn from_str(value: &str) -> Result<Self, Self::Err> {
90 match value {
91 "unspecified" => Ok(Self::Unspecified),
92 "aws:us-east-1" => Ok(Self::AwsUsEast1),
93 _ => Err("invalid basin scope value".into()),
94 }
95 }
96}
97
98impl From<BasinScope> for i32 {
99 fn from(value: BasinScope) -> Self {
100 api::BasinScope::from(value).into()
101 }
102}
103
104impl TryFrom<i32> for BasinScope {
105 type Error = ConvertError;
106 fn try_from(value: i32) -> Result<Self, Self::Error> {
107 api::BasinScope::try_from(value)
108 .map(Into::into)
109 .map_err(|_| "invalid basin scope value".into())
110 }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116 pub basin: BasinName,
117 pub config: Option<BasinConfig>,
118 pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122 pub fn new(basin: BasinName) -> Self {
124 Self {
125 basin,
126 config: None,
127 scope: BasinScope::Unspecified,
128 }
129 }
130
131 pub fn with_config(self, config: BasinConfig) -> Self {
133 Self {
134 config: Some(config),
135 ..self
136 }
137 }
138
139 pub fn with_scope(self, scope: BasinScope) -> Self {
141 Self { scope, ..self }
142 }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146 fn from(value: CreateBasinRequest) -> Self {
147 let CreateBasinRequest {
148 basin,
149 config,
150 scope,
151 } = value;
152 Self {
153 basin: basin.0,
154 config: config.map(Into::into),
155 scope: scope.into(),
156 }
157 }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163 pub default_stream_config: Option<StreamConfig>,
164 pub create_stream_on_append: bool,
165}
166
167impl From<BasinConfig> for api::BasinConfig {
168 fn from(value: BasinConfig) -> Self {
169 let BasinConfig {
170 default_stream_config,
171 create_stream_on_append,
172 } = value;
173 Self {
174 default_stream_config: default_stream_config.map(Into::into),
175 create_stream_on_append,
176 }
177 }
178}
179
180impl TryFrom<api::BasinConfig> for BasinConfig {
181 type Error = ConvertError;
182 fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
183 let api::BasinConfig {
184 default_stream_config,
185 create_stream_on_append,
186 } = value;
187 Ok(Self {
188 default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
189 create_stream_on_append,
190 })
191 }
192}
193
194#[sync_docs]
195#[derive(Debug, Clone, Default)]
196pub struct StreamConfig {
197 pub storage_class: StorageClass,
198 pub retention_policy: Option<RetentionPolicy>,
199}
200
201impl StreamConfig {
202 pub fn new() -> Self {
204 Self::default()
205 }
206
207 pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
209 Self {
210 storage_class: storage_class.into(),
211 ..self
212 }
213 }
214
215 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
217 Self {
218 retention_policy: Some(retention_policy),
219 ..self
220 }
221 }
222}
223
224impl From<StreamConfig> for api::StreamConfig {
225 fn from(value: StreamConfig) -> Self {
226 let StreamConfig {
227 storage_class,
228 retention_policy,
229 } = value;
230 Self {
231 storage_class: storage_class.into(),
232 retention_policy: retention_policy.map(Into::into),
233 }
234 }
235}
236
237impl TryFrom<api::StreamConfig> for StreamConfig {
238 type Error = ConvertError;
239 fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
240 let api::StreamConfig {
241 storage_class,
242 retention_policy,
243 } = value;
244 Ok(Self {
245 storage_class: storage_class.try_into()?,
246 retention_policy: retention_policy.map(Into::into),
247 })
248 }
249}
250
251#[sync_docs]
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
253pub enum StorageClass {
254 #[default]
255 Unspecified,
256 Standard,
257 Express,
258}
259
260impl From<StorageClass> for api::StorageClass {
261 fn from(value: StorageClass) -> Self {
262 match value {
263 StorageClass::Unspecified => Self::Unspecified,
264 StorageClass::Standard => Self::Standard,
265 StorageClass::Express => Self::Express,
266 }
267 }
268}
269
270impl From<api::StorageClass> for StorageClass {
271 fn from(value: api::StorageClass) -> Self {
272 match value {
273 api::StorageClass::Unspecified => Self::Unspecified,
274 api::StorageClass::Standard => Self::Standard,
275 api::StorageClass::Express => Self::Express,
276 }
277 }
278}
279
280impl FromStr for StorageClass {
281 type Err = ConvertError;
282 fn from_str(value: &str) -> Result<Self, Self::Err> {
283 match value {
284 "unspecified" => Ok(Self::Unspecified),
285 "standard" => Ok(Self::Standard),
286 "express" => Ok(Self::Express),
287 _ => Err("invalid storage class value".into()),
288 }
289 }
290}
291
292impl From<StorageClass> for i32 {
293 fn from(value: StorageClass) -> Self {
294 api::StorageClass::from(value).into()
295 }
296}
297
298impl TryFrom<i32> for StorageClass {
299 type Error = ConvertError;
300 fn try_from(value: i32) -> Result<Self, Self::Error> {
301 api::StorageClass::try_from(value)
302 .map(Into::into)
303 .map_err(|_| "invalid storage class value".into())
304 }
305}
306
307#[sync_docs(Age = "Age")]
308#[derive(Debug, Clone)]
309pub enum RetentionPolicy {
310 Age(Duration),
311}
312
313impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
314 fn from(value: RetentionPolicy) -> Self {
315 match value {
316 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
317 }
318 }
319}
320
321impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
322 fn from(value: api::stream_config::RetentionPolicy) -> Self {
323 match value {
324 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
325 }
326 }
327}
328
329#[sync_docs]
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331pub enum BasinState {
332 Unspecified,
333 Active,
334 Creating,
335 Deleting,
336}
337
338impl From<BasinState> for api::BasinState {
339 fn from(value: BasinState) -> Self {
340 match value {
341 BasinState::Unspecified => Self::Unspecified,
342 BasinState::Active => Self::Active,
343 BasinState::Creating => Self::Creating,
344 BasinState::Deleting => Self::Deleting,
345 }
346 }
347}
348
349impl From<api::BasinState> for BasinState {
350 fn from(value: api::BasinState) -> Self {
351 match value {
352 api::BasinState::Unspecified => Self::Unspecified,
353 api::BasinState::Active => Self::Active,
354 api::BasinState::Creating => Self::Creating,
355 api::BasinState::Deleting => Self::Deleting,
356 }
357 }
358}
359
360impl From<BasinState> for i32 {
361 fn from(value: BasinState) -> Self {
362 api::BasinState::from(value).into()
363 }
364}
365
366impl TryFrom<i32> for BasinState {
367 type Error = ConvertError;
368 fn try_from(value: i32) -> Result<Self, Self::Error> {
369 api::BasinState::try_from(value)
370 .map(Into::into)
371 .map_err(|_| "invalid basin status value".into())
372 }
373}
374
375impl std::fmt::Display for BasinState {
376 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377 match self {
378 BasinState::Unspecified => write!(f, "unspecified"),
379 BasinState::Active => write!(f, "active"),
380 BasinState::Creating => write!(f, "creating"),
381 BasinState::Deleting => write!(f, "deleting"),
382 }
383 }
384}
385
386#[sync_docs]
387#[derive(Debug, Clone)]
388pub struct BasinInfo {
389 pub name: String,
390 pub scope: BasinScope,
391 pub state: BasinState,
392}
393
394impl From<BasinInfo> for api::BasinInfo {
395 fn from(value: BasinInfo) -> Self {
396 let BasinInfo { name, scope, state } = value;
397 Self {
398 name,
399 scope: scope.into(),
400 state: state.into(),
401 }
402 }
403}
404
405impl TryFrom<api::BasinInfo> for BasinInfo {
406 type Error = ConvertError;
407 fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
408 let api::BasinInfo { name, scope, state } = value;
409 Ok(Self {
410 name,
411 scope: scope.try_into()?,
412 state: state.try_into()?,
413 })
414 }
415}
416
417impl TryFrom<api::CreateBasinResponse> for BasinInfo {
418 type Error = ConvertError;
419 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
420 let api::CreateBasinResponse { info } = value;
421 let info = info.ok_or("missing basin info")?;
422 info.try_into()
423 }
424}
425
426#[sync_docs]
427#[derive(Debug, Clone, Default)]
428pub struct ListStreamsRequest {
429 pub prefix: String,
430 pub start_after: String,
431 pub limit: Option<usize>,
432}
433
434impl ListStreamsRequest {
435 pub fn new() -> Self {
437 Self::default()
438 }
439
440 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
442 Self {
443 prefix: prefix.into(),
444 ..self
445 }
446 }
447
448 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
450 Self {
451 start_after: start_after.into(),
452 ..self
453 }
454 }
455
456 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
458 Self {
459 limit: limit.into(),
460 ..self
461 }
462 }
463}
464
465impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
466 type Error = ConvertError;
467 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
468 let ListStreamsRequest {
469 prefix,
470 start_after,
471 limit,
472 } = value;
473 Ok(Self {
474 prefix,
475 start_after,
476 limit: limit.map(|n| n as u64),
477 })
478 }
479}
480
481#[sync_docs]
482#[derive(Debug, Clone)]
483pub struct StreamInfo {
484 pub name: String,
485 pub created_at: u32,
486 pub deleted_at: Option<u32>,
487}
488
489impl From<api::StreamInfo> for StreamInfo {
490 fn from(value: api::StreamInfo) -> Self {
491 Self {
492 name: value.name,
493 created_at: value.created_at,
494 deleted_at: value.deleted_at,
495 }
496 }
497}
498
499impl TryFrom<api::CreateStreamResponse> for StreamInfo {
500 type Error = ConvertError;
501
502 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
503 let api::CreateStreamResponse { info } = value;
504 let info = info.ok_or("missing stream info")?;
505 Ok(info.into())
506 }
507}
508
509#[sync_docs]
510#[derive(Debug, Clone)]
511pub struct ListStreamsResponse {
512 pub streams: Vec<StreamInfo>,
513 pub has_more: bool,
514}
515
516impl From<api::ListStreamsResponse> for ListStreamsResponse {
517 fn from(value: api::ListStreamsResponse) -> Self {
518 let api::ListStreamsResponse { streams, has_more } = value;
519 let streams = streams.into_iter().map(Into::into).collect();
520 Self { streams, has_more }
521 }
522}
523
524impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
525 type Error = ConvertError;
526 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
527 let api::GetBasinConfigResponse { config } = value;
528 let config = config.ok_or("missing basin config")?;
529 config.try_into()
530 }
531}
532
533impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
534 type Error = ConvertError;
535 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
536 let api::GetStreamConfigResponse { config } = value;
537 let config = config.ok_or("missing stream config")?;
538 config.try_into()
539 }
540}
541
542#[sync_docs]
543#[derive(Debug, Clone)]
544pub struct CreateStreamRequest {
545 pub stream: String,
546 pub config: Option<StreamConfig>,
547}
548
549impl CreateStreamRequest {
550 pub fn new(stream: impl Into<String>) -> Self {
552 Self {
553 stream: stream.into(),
554 config: None,
555 }
556 }
557
558 pub fn with_config(self, config: StreamConfig) -> Self {
560 Self {
561 config: Some(config),
562 ..self
563 }
564 }
565}
566
567impl From<CreateStreamRequest> for api::CreateStreamRequest {
568 fn from(value: CreateStreamRequest) -> Self {
569 let CreateStreamRequest { stream, config } = value;
570 Self {
571 stream,
572 config: config.map(Into::into),
573 }
574 }
575}
576
577#[sync_docs]
578#[derive(Debug, Clone, Default)]
579pub struct ListBasinsRequest {
580 pub prefix: String,
581 pub start_after: String,
582 pub limit: Option<usize>,
583}
584
585impl ListBasinsRequest {
586 pub fn new() -> Self {
588 Self::default()
589 }
590
591 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
593 Self {
594 prefix: prefix.into(),
595 ..self
596 }
597 }
598
599 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
601 Self {
602 start_after: start_after.into(),
603 ..self
604 }
605 }
606
607 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
609 Self {
610 limit: limit.into(),
611 ..self
612 }
613 }
614}
615
616impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
617 type Error = ConvertError;
618 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
619 let ListBasinsRequest {
620 prefix,
621 start_after,
622 limit,
623 } = value;
624 Ok(Self {
625 prefix,
626 start_after,
627 limit: limit
628 .map(TryInto::try_into)
629 .transpose()
630 .map_err(|_| "request limit does not fit into u64 bounds")?,
631 })
632 }
633}
634
635#[sync_docs]
636#[derive(Debug, Clone)]
637pub struct ListBasinsResponse {
638 pub basins: Vec<BasinInfo>,
639 pub has_more: bool,
640}
641
642impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
643 type Error = ConvertError;
644 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
645 let api::ListBasinsResponse { basins, has_more } = value;
646 Ok(Self {
647 basins: basins
648 .into_iter()
649 .map(TryInto::try_into)
650 .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
651 has_more,
652 })
653 }
654}
655
656#[sync_docs]
657#[derive(Debug, Clone)]
658pub struct DeleteBasinRequest {
659 pub basin: BasinName,
660 pub if_exists: bool,
662}
663
664impl DeleteBasinRequest {
665 pub fn new(basin: BasinName) -> Self {
667 Self {
668 basin,
669 if_exists: false,
670 }
671 }
672
673 pub fn with_if_exists(self, if_exists: bool) -> Self {
675 Self { if_exists, ..self }
676 }
677}
678
679impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
680 fn from(value: DeleteBasinRequest) -> Self {
681 let DeleteBasinRequest { basin, .. } = value;
682 Self { basin: basin.0 }
683 }
684}
685
686#[sync_docs]
687#[derive(Debug, Clone)]
688pub struct DeleteStreamRequest {
689 pub stream: String,
690 pub if_exists: bool,
692}
693
694impl DeleteStreamRequest {
695 pub fn new(stream: impl Into<String>) -> Self {
697 Self {
698 stream: stream.into(),
699 if_exists: false,
700 }
701 }
702
703 pub fn with_if_exists(self, if_exists: bool) -> Self {
705 Self { if_exists, ..self }
706 }
707}
708
709impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
710 fn from(value: DeleteStreamRequest) -> Self {
711 let DeleteStreamRequest { stream, .. } = value;
712 Self { stream }
713 }
714}
715
716#[sync_docs]
717#[derive(Debug, Clone)]
718pub struct ReconfigureBasinRequest {
719 pub basin: BasinName,
720 pub config: Option<BasinConfig>,
721 pub mask: Option<Vec<String>>,
722}
723
724impl ReconfigureBasinRequest {
725 pub fn new(basin: BasinName) -> Self {
727 Self {
728 basin,
729 config: None,
730 mask: None,
731 }
732 }
733
734 pub fn with_config(self, config: BasinConfig) -> Self {
736 Self {
737 config: Some(config),
738 ..self
739 }
740 }
741
742 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
744 Self {
745 mask: Some(mask.into()),
746 ..self
747 }
748 }
749}
750
751impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
752 fn from(value: ReconfigureBasinRequest) -> Self {
753 let ReconfigureBasinRequest {
754 basin,
755 config,
756 mask,
757 } = value;
758 Self {
759 basin: basin.0,
760 config: config.map(Into::into),
761 mask: mask.map(|paths| prost_types::FieldMask { paths }),
762 }
763 }
764}
765
766impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
767 type Error = ConvertError;
768 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
769 let api::ReconfigureBasinResponse { config } = value;
770 let config = config.ok_or("missing basin config")?;
771 config.try_into()
772 }
773}
774
775#[sync_docs]
776#[derive(Debug, Clone)]
777pub struct ReconfigureStreamRequest {
778 pub stream: String,
779 pub config: Option<StreamConfig>,
780 pub mask: Option<Vec<String>>,
781}
782
783impl ReconfigureStreamRequest {
784 pub fn new(stream: impl Into<String>) -> Self {
786 Self {
787 stream: stream.into(),
788 config: None,
789 mask: None,
790 }
791 }
792
793 pub fn with_config(self, config: StreamConfig) -> Self {
795 Self {
796 config: Some(config),
797 ..self
798 }
799 }
800
801 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
803 Self {
804 mask: Some(mask.into()),
805 ..self
806 }
807 }
808}
809
810impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
811 fn from(value: ReconfigureStreamRequest) -> Self {
812 let ReconfigureStreamRequest {
813 stream,
814 config,
815 mask,
816 } = value;
817 Self {
818 stream,
819 config: config.map(Into::into),
820 mask: mask.map(|paths| prost_types::FieldMask { paths }),
821 }
822 }
823}
824
825impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
826 type Error = ConvertError;
827 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
828 let api::ReconfigureStreamResponse { config } = value;
829 let config = config.ok_or("missing stream config")?;
830 config.try_into()
831 }
832}
833
834impl From<api::CheckTailResponse> for u64 {
835 fn from(value: api::CheckTailResponse) -> Self {
836 let api::CheckTailResponse { next_seq_num } = value;
837 next_seq_num
838 }
839}
840
841#[sync_docs]
842#[derive(Debug, Clone, PartialEq, Eq)]
843pub struct Header {
844 pub name: Bytes,
845 pub value: Bytes,
846}
847
848impl Header {
849 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
851 Self {
852 name: name.into(),
853 value: value.into(),
854 }
855 }
856
857 pub fn from_value(value: impl Into<Bytes>) -> Self {
859 Self {
860 name: Bytes::new(),
861 value: value.into(),
862 }
863 }
864}
865
866impl From<Header> for api::Header {
867 fn from(value: Header) -> Self {
868 let Header { name, value } = value;
869 Self { name, value }
870 }
871}
872
873impl From<api::Header> for Header {
874 fn from(value: api::Header) -> Self {
875 let api::Header { name, value } = value;
876 Self { name, value }
877 }
878}
879
880#[derive(Debug, Clone, Default, PartialEq, Eq)]
884pub struct FencingToken(Bytes);
885
886impl FencingToken {
887 const MAX_BYTES: usize = 16;
888
889 pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
891 let bytes = bytes.into();
892 if bytes.len() > Self::MAX_BYTES {
893 Err(format!(
894 "Size of a fencing token cannot exceed {} bytes",
895 Self::MAX_BYTES
896 )
897 .into())
898 } else {
899 Ok(Self(bytes))
900 }
901 }
902
903 pub fn generate(n: usize) -> Result<Self, ConvertError> {
905 Self::new(
906 rand::thread_rng()
907 .sample_iter(&Uniform::new_inclusive(0, u8::MAX))
908 .take(n)
909 .collect::<Bytes>(),
910 )
911 }
912}
913
914impl AsRef<Bytes> for FencingToken {
915 fn as_ref(&self) -> &Bytes {
916 &self.0
917 }
918}
919
920impl AsRef<[u8]> for FencingToken {
921 fn as_ref(&self) -> &[u8] {
922 &self.0
923 }
924}
925
926impl Deref for FencingToken {
927 type Target = [u8];
928
929 fn deref(&self) -> &Self::Target {
930 &self.0
931 }
932}
933
934impl From<FencingToken> for Bytes {
935 fn from(value: FencingToken) -> Self {
936 value.0
937 }
938}
939
940impl From<FencingToken> for Vec<u8> {
941 fn from(value: FencingToken) -> Self {
942 value.0.into()
943 }
944}
945
946impl TryFrom<Bytes> for FencingToken {
947 type Error = ConvertError;
948
949 fn try_from(value: Bytes) -> Result<Self, Self::Error> {
950 Self::new(value)
951 }
952}
953
954impl TryFrom<Vec<u8>> for FencingToken {
955 type Error = ConvertError;
956
957 fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
958 Self::new(value)
959 }
960}
961
962#[derive(Debug, Clone)]
968pub enum CommandRecord {
969 Fence {
974 fencing_token: FencingToken,
978 },
979 Trim {
984 seq_num: u64,
989 },
990}
991
992impl CommandRecord {
993 const FENCE: &[u8] = b"fence";
994 const TRIM: &[u8] = b"trim";
995
996 pub fn fence(fencing_token: FencingToken) -> Self {
998 Self::Fence { fencing_token }
999 }
1000
1001 pub fn trim(seq_num: impl Into<u64>) -> Self {
1003 Self::Trim {
1004 seq_num: seq_num.into(),
1005 }
1006 }
1007}
1008
1009#[sync_docs]
1010#[derive(Debug, Clone, PartialEq, Eq)]
1011pub struct AppendRecord {
1012 headers: Vec<Header>,
1013 body: Bytes,
1014 #[cfg(test)]
1015 max_bytes: u64,
1016}
1017
1018metered_impl!(AppendRecord);
1019
1020impl AppendRecord {
1021 const MAX_BYTES: u64 = MIB_BYTES;
1022
1023 fn validated(self) -> Result<Self, ConvertError> {
1024 #[cfg(test)]
1025 let max_bytes = self.max_bytes;
1026 #[cfg(not(test))]
1027 let max_bytes = Self::MAX_BYTES;
1028
1029 if self.metered_bytes() > max_bytes {
1030 Err("AppendRecord should have metered size less than 1 MiB".into())
1031 } else {
1032 Ok(self)
1033 }
1034 }
1035
1036 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1038 Self {
1039 headers: Vec::new(),
1040 body: body.into(),
1041 #[cfg(test)]
1042 max_bytes: Self::MAX_BYTES,
1043 }
1044 .validated()
1045 }
1046
1047 #[cfg(test)]
1048 pub(crate) fn with_max_bytes(
1049 max_bytes: u64,
1050 body: impl Into<Bytes>,
1051 ) -> Result<Self, ConvertError> {
1052 Self {
1053 headers: Vec::new(),
1054 body: body.into(),
1055 max_bytes,
1056 }
1057 .validated()
1058 }
1059
1060 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1062 Self {
1063 headers: headers.into(),
1064 ..self
1065 }
1066 .validated()
1067 }
1068
1069 pub fn body(&self) -> &[u8] {
1071 &self.body
1072 }
1073
1074 pub fn headers(&self) -> &[Header] {
1076 &self.headers
1077 }
1078
1079 pub fn into_parts(self) -> AppendRecordParts {
1081 AppendRecordParts {
1082 headers: self.headers,
1083 body: self.body,
1084 }
1085 }
1086
1087 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1089 Self::new(parts.body)?.with_headers(parts.headers)
1090 }
1091}
1092
1093impl From<AppendRecord> for api::AppendRecord {
1094 fn from(value: AppendRecord) -> Self {
1095 Self {
1096 headers: value.headers.into_iter().map(Into::into).collect(),
1097 body: value.body,
1098 }
1099 }
1100}
1101
1102impl From<CommandRecord> for AppendRecord {
1103 fn from(value: CommandRecord) -> Self {
1104 let (header_value, body) = match value {
1105 CommandRecord::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1106 CommandRecord::Trim { seq_num } => {
1107 (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec())
1108 }
1109 };
1110 AppendRecordParts {
1111 headers: vec![Header::from_value(header_value)],
1112 body: body.into(),
1113 }
1114 .try_into()
1115 .expect("command record is a valid append record")
1116 }
1117}
1118
1119#[sync_docs(AppendRecordParts = "AppendRecord")]
1120#[derive(Debug, Clone)]
1121pub struct AppendRecordParts {
1122 pub headers: Vec<Header>,
1123 pub body: Bytes,
1124}
1125
1126impl From<AppendRecord> for AppendRecordParts {
1127 fn from(value: AppendRecord) -> Self {
1128 value.into_parts()
1129 }
1130}
1131
1132impl TryFrom<AppendRecordParts> for AppendRecord {
1133 type Error = ConvertError;
1134
1135 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1136 Self::try_from_parts(value)
1137 }
1138}
1139
1140#[derive(Debug, Clone)]
1142pub struct AppendRecordBatch {
1143 records: Vec<AppendRecord>,
1144 metered_bytes: u64,
1145 max_capacity: usize,
1146 #[cfg(test)]
1147 max_bytes: u64,
1148}
1149
1150impl PartialEq for AppendRecordBatch {
1151 fn eq(&self, other: &Self) -> bool {
1152 if self.records.eq(&other.records) {
1153 assert_eq!(self.metered_bytes, other.metered_bytes);
1154 true
1155 } else {
1156 false
1157 }
1158 }
1159}
1160
1161impl Eq for AppendRecordBatch {}
1162
1163impl Default for AppendRecordBatch {
1164 fn default() -> Self {
1165 Self::new()
1166 }
1167}
1168
1169impl AppendRecordBatch {
1170 pub const MAX_CAPACITY: usize = 1000;
1174
1175 pub const MAX_BYTES: u64 = MIB_BYTES;
1177
1178 pub fn new() -> Self {
1180 Self::with_max_capacity(Self::MAX_CAPACITY)
1181 }
1182
1183 pub fn with_max_capacity(max_capacity: usize) -> Self {
1187 assert!(
1188 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1189 "Batch capacity must be between 1 and 1000"
1190 );
1191
1192 Self {
1193 records: Vec::with_capacity(max_capacity),
1194 metered_bytes: 0,
1195 max_capacity,
1196 #[cfg(test)]
1197 max_bytes: Self::MAX_BYTES,
1198 }
1199 }
1200
1201 #[cfg(test)]
1202 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1203 #[cfg(test)]
1204 assert!(
1205 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1206 "Batch size must be between 1 byte and 1 MiB"
1207 );
1208
1209 Self {
1210 max_bytes,
1211 ..Self::with_max_capacity(max_capacity)
1212 }
1213 }
1214
1215 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1221 where
1222 R: Into<AppendRecord>,
1223 T: IntoIterator<Item = R>,
1224 {
1225 let mut records = Self::new();
1226 let mut pending = Vec::new();
1227
1228 let mut iter = iter.into_iter();
1229
1230 for record in iter.by_ref() {
1231 if let Err(record) = records.push(record) {
1232 pending.push(record);
1233 break;
1234 }
1235 }
1236
1237 if pending.is_empty() {
1238 Ok(records)
1239 } else {
1240 pending.extend(iter.map(Into::into));
1241 Err((records, pending))
1242 }
1243 }
1244
1245 pub fn is_empty(&self) -> bool {
1247 if self.records.is_empty() {
1248 assert_eq!(self.metered_bytes, 0);
1249 true
1250 } else {
1251 false
1252 }
1253 }
1254
1255 pub fn len(&self) -> usize {
1257 self.records.len()
1258 }
1259
1260 #[cfg(test)]
1261 fn max_bytes(&self) -> u64 {
1262 self.max_bytes
1263 }
1264
1265 #[cfg(not(test))]
1266 fn max_bytes(&self) -> u64 {
1267 Self::MAX_BYTES
1268 }
1269
1270 pub fn is_full(&self) -> bool {
1272 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1273 }
1274
1275 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1277 assert!(self.records.len() <= self.max_capacity);
1278 assert!(self.metered_bytes <= self.max_bytes());
1279
1280 let record = record.into();
1281 let record_size = record.metered_bytes();
1282 if self.records.len() >= self.max_capacity
1283 || self.metered_bytes + record_size > self.max_bytes()
1284 {
1285 Err(record)
1286 } else {
1287 self.records.push(record);
1288 self.metered_bytes += record_size;
1289 Ok(())
1290 }
1291 }
1292}
1293
1294impl MeteredBytes for AppendRecordBatch {
1295 fn metered_bytes(&self) -> u64 {
1296 self.metered_bytes
1297 }
1298}
1299
1300impl IntoIterator for AppendRecordBatch {
1301 type Item = AppendRecord;
1302 type IntoIter = std::vec::IntoIter<Self::Item>;
1303
1304 fn into_iter(self) -> Self::IntoIter {
1305 self.records.into_iter()
1306 }
1307}
1308
1309impl<'a> IntoIterator for &'a AppendRecordBatch {
1310 type Item = &'a AppendRecord;
1311 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1312
1313 fn into_iter(self) -> Self::IntoIter {
1314 self.records.iter()
1315 }
1316}
1317
1318impl AsRef<[AppendRecord]> for AppendRecordBatch {
1319 fn as_ref(&self) -> &[AppendRecord] {
1320 &self.records
1321 }
1322}
1323
1324#[sync_docs]
1325#[derive(Debug, Default, Clone)]
1326pub struct AppendInput {
1327 pub records: AppendRecordBatch,
1328 pub match_seq_num: Option<u64>,
1329 pub fencing_token: Option<FencingToken>,
1330}
1331
1332impl MeteredBytes for AppendInput {
1333 fn metered_bytes(&self) -> u64 {
1334 self.records.metered_bytes()
1335 }
1336}
1337
1338impl AppendInput {
1339 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1341 Self {
1342 records: records.into(),
1343 match_seq_num: None,
1344 fencing_token: None,
1345 }
1346 }
1347
1348 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1350 Self {
1351 match_seq_num: Some(match_seq_num.into()),
1352 ..self
1353 }
1354 }
1355
1356 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1358 Self {
1359 fencing_token: Some(fencing_token),
1360 ..self
1361 }
1362 }
1363
1364 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1365 let Self {
1366 records,
1367 match_seq_num,
1368 fencing_token,
1369 } = self;
1370
1371 api::AppendInput {
1372 stream: stream.into(),
1373 records: records.into_iter().map(Into::into).collect(),
1374 match_seq_num,
1375 fencing_token: fencing_token.map(|f| f.0),
1376 }
1377 }
1378}
1379
1380#[sync_docs]
1381#[derive(Debug, Clone)]
1382pub struct AppendOutput {
1383 pub start_seq_num: u64,
1384 pub end_seq_num: u64,
1385 pub next_seq_num: u64,
1386}
1387
1388impl From<api::AppendOutput> for AppendOutput {
1389 fn from(value: api::AppendOutput) -> Self {
1390 let api::AppendOutput {
1391 start_seq_num,
1392 end_seq_num,
1393 next_seq_num,
1394 } = value;
1395 Self {
1396 start_seq_num,
1397 end_seq_num,
1398 next_seq_num,
1399 }
1400 }
1401}
1402
1403impl TryFrom<api::AppendResponse> for AppendOutput {
1404 type Error = ConvertError;
1405 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1406 let api::AppendResponse { output } = value;
1407 let output = output.ok_or("missing append output")?;
1408 Ok(output.into())
1409 }
1410}
1411
1412impl TryFrom<api::AppendSessionResponse> for AppendOutput {
1413 type Error = ConvertError;
1414 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1415 let api::AppendSessionResponse { output } = value;
1416 let output = output.ok_or("missing append output")?;
1417 Ok(output.into())
1418 }
1419}
1420
1421#[sync_docs]
1422#[derive(Debug, Clone, Default)]
1423pub struct ReadLimit {
1424 pub count: Option<u64>,
1425 pub bytes: Option<u64>,
1426}
1427
1428#[sync_docs]
1429#[derive(Debug, Clone, Default)]
1430pub struct ReadRequest {
1431 pub start_seq_num: u64,
1432 pub limit: ReadLimit,
1433}
1434
1435impl ReadRequest {
1436 pub fn new(start_seq_num: u64) -> Self {
1438 Self {
1439 start_seq_num,
1440 ..Default::default()
1441 }
1442 }
1443
1444 pub fn with_limit(self, limit: ReadLimit) -> Self {
1446 Self { limit, ..self }
1447 }
1448}
1449
1450impl ReadRequest {
1451 pub(crate) fn try_into_api_type(
1452 self,
1453 stream: impl Into<String>,
1454 ) -> Result<api::ReadRequest, ConvertError> {
1455 let Self {
1456 start_seq_num,
1457 limit,
1458 } = self;
1459
1460 let limit = if limit.count > Some(1000) {
1461 Err("read limit: count must not exceed 1000 for unary request")
1462 } else if limit.bytes > Some(MIB_BYTES) {
1463 Err("read limit: bytes must not exceed 1MiB for unary request")
1464 } else {
1465 Ok(api::ReadLimit {
1466 count: limit.count,
1467 bytes: limit.bytes,
1468 })
1469 }?;
1470
1471 Ok(api::ReadRequest {
1472 stream: stream.into(),
1473 start_seq_num,
1474 limit: Some(limit),
1475 })
1476 }
1477}
1478
1479#[sync_docs]
1480#[derive(Debug, Clone)]
1481pub struct SequencedRecord {
1482 pub seq_num: u64,
1483 pub headers: Vec<Header>,
1484 pub body: Bytes,
1485}
1486
1487metered_impl!(SequencedRecord);
1488
1489impl From<api::SequencedRecord> for SequencedRecord {
1490 fn from(value: api::SequencedRecord) -> Self {
1491 let api::SequencedRecord {
1492 seq_num,
1493 headers,
1494 body,
1495 } = value;
1496 Self {
1497 seq_num,
1498 headers: headers.into_iter().map(Into::into).collect(),
1499 body,
1500 }
1501 }
1502}
1503
1504impl SequencedRecord {
1505 pub fn as_command_record(&self) -> Option<CommandRecord> {
1507 if self.headers.len() != 1 {
1508 return None;
1509 }
1510
1511 let header = self.headers.first().expect("pre-validated length");
1512
1513 if !header.name.is_empty() {
1514 return None;
1515 }
1516
1517 match header.value.as_ref() {
1518 CommandRecord::FENCE => {
1519 let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1520 Some(CommandRecord::Fence { fencing_token })
1521 }
1522 CommandRecord::TRIM => {
1523 let body: &[u8] = &self.body;
1524 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1525 Some(CommandRecord::Trim { seq_num })
1526 }
1527 _ => None,
1528 }
1529 }
1530}
1531
1532#[sync_docs]
1533#[derive(Debug, Clone)]
1534pub struct SequencedRecordBatch {
1535 pub records: Vec<SequencedRecord>,
1536}
1537
1538impl MeteredBytes for SequencedRecordBatch {
1539 fn metered_bytes(&self) -> u64 {
1540 self.records.metered_bytes()
1541 }
1542}
1543
1544impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1545 fn from(value: api::SequencedRecordBatch) -> Self {
1546 let api::SequencedRecordBatch { records } = value;
1547 Self {
1548 records: records.into_iter().map(Into::into).collect(),
1549 }
1550 }
1551}
1552
1553#[sync_docs(ReadOutput = "Output")]
1554#[derive(Debug, Clone)]
1555pub enum ReadOutput {
1556 Batch(SequencedRecordBatch),
1557 FirstSeqNum(u64),
1558 NextSeqNum(u64),
1559}
1560
1561impl From<api::read_output::Output> for ReadOutput {
1562 fn from(value: api::read_output::Output) -> Self {
1563 match value {
1564 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1565 api::read_output::Output::FirstSeqNum(first_seq_num) => {
1566 Self::FirstSeqNum(first_seq_num)
1567 }
1568 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1569 }
1570 }
1571}
1572
1573impl TryFrom<api::ReadOutput> for ReadOutput {
1574 type Error = ConvertError;
1575 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1576 let api::ReadOutput { output } = value;
1577 let output = output.ok_or("missing read output")?;
1578 Ok(output.into())
1579 }
1580}
1581
1582impl TryFrom<api::ReadResponse> for ReadOutput {
1583 type Error = ConvertError;
1584 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1585 let api::ReadResponse { output } = value;
1586 let output = output.ok_or("missing output in read response")?;
1587 output.try_into()
1588 }
1589}
1590
1591#[sync_docs]
1592#[derive(Debug, Clone, Default)]
1593pub struct ReadSessionRequest {
1594 pub start_seq_num: u64,
1595 pub limit: ReadLimit,
1596}
1597
1598impl ReadSessionRequest {
1599 pub fn new(start_seq_num: u64) -> Self {
1601 Self {
1602 start_seq_num,
1603 ..Default::default()
1604 }
1605 }
1606
1607 pub fn with_limit(self, limit: ReadLimit) -> Self {
1609 Self { limit, ..self }
1610 }
1611
1612 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1613 let Self {
1614 start_seq_num,
1615 limit,
1616 } = self;
1617 api::ReadSessionRequest {
1618 stream: stream.into(),
1619 start_seq_num,
1620 limit: Some(api::ReadLimit {
1621 count: limit.count,
1622 bytes: limit.bytes,
1623 }),
1624 heartbeats: false,
1625 }
1626 }
1627}
1628
1629impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1630 type Error = ConvertError;
1631 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1632 let api::ReadSessionResponse { output } = value;
1633 let output = output.ok_or("missing output in read session response")?;
1634 output.try_into()
1635 }
1636}
1637
1638#[derive(Debug, Clone)]
1643pub struct BasinName(String);
1644
1645impl AsRef<str> for BasinName {
1646 fn as_ref(&self) -> &str {
1647 &self.0
1648 }
1649}
1650
1651impl Deref for BasinName {
1652 type Target = str;
1653 fn deref(&self) -> &Self::Target {
1654 &self.0
1655 }
1656}
1657
1658impl TryFrom<String> for BasinName {
1659 type Error = ConvertError;
1660
1661 fn try_from(name: String) -> Result<Self, Self::Error> {
1662 if name.len() < 8 || name.len() > 48 {
1663 return Err("Basin name must be between 8 and 48 characters in length".into());
1664 }
1665
1666 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1667 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1668 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1669 .expect("Failed to compile basin name regex")
1670 });
1671
1672 if !regex.is_match(&name) {
1673 return Err(
1674 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1675 It cannot begin or end with a hyphen."
1676 .into(),
1677 );
1678 }
1679
1680 Ok(Self(name))
1681 }
1682}
1683
1684impl FromStr for BasinName {
1685 type Err = ConvertError;
1686
1687 fn from_str(s: &str) -> Result<Self, Self::Err> {
1688 s.to_string().try_into()
1689 }
1690}
1691
1692impl std::fmt::Display for BasinName {
1693 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1694 f.write_str(&self.0)
1695 }
1696}