1use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20 fn from(value: T) -> Self {
21 Self(value.into())
22 }
23}
24
25pub trait MeteredBytes {
33 fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38 fn metered_bytes(&self) -> u64 {
39 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40 }
41}
42
43macro_rules! metered_impl {
44 ($ty:ty) => {
45 impl MeteredBytes for $ty {
46 fn metered_bytes(&self) -> u64 {
47 let bytes = 8
48 + (2 * self.headers.len())
49 + self
50 .headers
51 .iter()
52 .map(|h| h.name.len() + h.value.len())
53 .sum::<usize>()
54 + self.body.len();
55 bytes as u64
56 }
57 }
58 };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64 #[default]
65 Unspecified,
66 AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70 fn from(value: BasinScope) -> Self {
71 match value {
72 BasinScope::Unspecified => Self::Unspecified,
73 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74 }
75 }
76}
77
78impl From<api::BasinScope> for BasinScope {
79 fn from(value: api::BasinScope) -> Self {
80 match value {
81 api::BasinScope::Unspecified => Self::Unspecified,
82 api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83 }
84 }
85}
86
87impl FromStr for BasinScope {
88 type Err = ConvertError;
89 fn from_str(value: &str) -> Result<Self, Self::Err> {
90 match value {
91 "unspecified" => Ok(Self::Unspecified),
92 "aws:us-east-1" => Ok(Self::AwsUsEast1),
93 _ => Err("invalid basin scope value".into()),
94 }
95 }
96}
97
98impl From<BasinScope> for i32 {
99 fn from(value: BasinScope) -> Self {
100 api::BasinScope::from(value).into()
101 }
102}
103
104impl TryFrom<i32> for BasinScope {
105 type Error = ConvertError;
106 fn try_from(value: i32) -> Result<Self, Self::Error> {
107 api::BasinScope::try_from(value)
108 .map(Into::into)
109 .map_err(|_| "invalid basin scope value".into())
110 }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116 pub basin: BasinName,
117 pub config: Option<BasinConfig>,
118 pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122 pub fn new(basin: BasinName) -> Self {
124 Self {
125 basin,
126 config: None,
127 scope: BasinScope::Unspecified,
128 }
129 }
130
131 pub fn with_config(self, config: BasinConfig) -> Self {
133 Self {
134 config: Some(config),
135 ..self
136 }
137 }
138
139 pub fn with_scope(self, scope: BasinScope) -> Self {
141 Self { scope, ..self }
142 }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146 fn from(value: CreateBasinRequest) -> Self {
147 let CreateBasinRequest {
148 basin,
149 config,
150 scope,
151 } = value;
152 Self {
153 basin: basin.0,
154 config: config.map(Into::into),
155 scope: scope.into(),
156 }
157 }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163 pub default_stream_config: Option<StreamConfig>,
164 pub create_stream_on_append: bool,
165 pub create_stream_on_read: bool,
166}
167
168impl BasinConfig {
169 pub fn new() -> Self {
171 Self::default()
172 }
173
174 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
176 Self {
177 default_stream_config: Some(default_stream_config),
178 ..self
179 }
180 }
181
182 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
184 Self {
185 create_stream_on_append,
186 ..self
187 }
188 }
189
190 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
192 Self {
193 create_stream_on_read,
194 ..self
195 }
196 }
197}
198
199impl From<BasinConfig> for api::BasinConfig {
200 fn from(value: BasinConfig) -> Self {
201 let BasinConfig {
202 default_stream_config,
203 create_stream_on_append,
204 create_stream_on_read,
205 } = value;
206 Self {
207 default_stream_config: default_stream_config.map(Into::into),
208 create_stream_on_append,
209 create_stream_on_read,
210 }
211 }
212}
213
214impl TryFrom<api::BasinConfig> for BasinConfig {
215 type Error = ConvertError;
216 fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
217 let api::BasinConfig {
218 default_stream_config,
219 create_stream_on_append,
220 create_stream_on_read,
221 } = value;
222 Ok(Self {
223 default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
224 create_stream_on_append,
225 create_stream_on_read,
226 })
227 }
228}
229
230#[sync_docs]
231#[derive(Debug, Clone, Default)]
232pub struct StreamConfig {
233 pub storage_class: StorageClass,
234 pub retention_policy: Option<RetentionPolicy>,
235 pub require_client_timestamps: Option<bool>,
236}
237
238impl StreamConfig {
239 pub fn new() -> Self {
241 Self::default()
242 }
243
244 pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
246 Self {
247 storage_class: storage_class.into(),
248 ..self
249 }
250 }
251
252 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
254 Self {
255 retention_policy: Some(retention_policy),
256 ..self
257 }
258 }
259
260 pub fn with_require_client_timestamps(self, require_client_timestamps: bool) -> Self {
262 Self {
263 require_client_timestamps: Some(require_client_timestamps),
264 ..self
265 }
266 }
267}
268
269impl From<StreamConfig> for api::StreamConfig {
270 fn from(value: StreamConfig) -> Self {
271 let StreamConfig {
272 storage_class,
273 retention_policy,
274 require_client_timestamps,
275 } = value;
276 Self {
277 storage_class: storage_class.into(),
278 retention_policy: retention_policy.map(Into::into),
279 require_client_timestamps,
280 }
281 }
282}
283
284impl TryFrom<api::StreamConfig> for StreamConfig {
285 type Error = ConvertError;
286 fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
287 let api::StreamConfig {
288 storage_class,
289 retention_policy,
290 require_client_timestamps,
291 } = value;
292 Ok(Self {
293 storage_class: storage_class.try_into()?,
294 retention_policy: retention_policy.map(Into::into),
295 require_client_timestamps,
296 })
297 }
298}
299
300#[sync_docs]
301#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
302pub enum StorageClass {
303 #[default]
304 Unspecified,
305 Standard,
306 Express,
307}
308
309impl From<StorageClass> for api::StorageClass {
310 fn from(value: StorageClass) -> Self {
311 match value {
312 StorageClass::Unspecified => Self::Unspecified,
313 StorageClass::Standard => Self::Standard,
314 StorageClass::Express => Self::Express,
315 }
316 }
317}
318
319impl From<api::StorageClass> for StorageClass {
320 fn from(value: api::StorageClass) -> Self {
321 match value {
322 api::StorageClass::Unspecified => Self::Unspecified,
323 api::StorageClass::Standard => Self::Standard,
324 api::StorageClass::Express => Self::Express,
325 }
326 }
327}
328
329impl FromStr for StorageClass {
330 type Err = ConvertError;
331 fn from_str(value: &str) -> Result<Self, Self::Err> {
332 match value {
333 "unspecified" => Ok(Self::Unspecified),
334 "standard" => Ok(Self::Standard),
335 "express" => Ok(Self::Express),
336 _ => Err("invalid storage class value".into()),
337 }
338 }
339}
340
341impl From<StorageClass> for i32 {
342 fn from(value: StorageClass) -> Self {
343 api::StorageClass::from(value).into()
344 }
345}
346
347impl TryFrom<i32> for StorageClass {
348 type Error = ConvertError;
349 fn try_from(value: i32) -> Result<Self, Self::Error> {
350 api::StorageClass::try_from(value)
351 .map(Into::into)
352 .map_err(|_| "invalid storage class value".into())
353 }
354}
355
356#[sync_docs(Age = "Age")]
357#[derive(Debug, Clone)]
358pub enum RetentionPolicy {
359 Age(Duration),
360}
361
362impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
363 fn from(value: RetentionPolicy) -> Self {
364 match value {
365 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
366 }
367 }
368}
369
370impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
371 fn from(value: api::stream_config::RetentionPolicy) -> Self {
372 match value {
373 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
374 }
375 }
376}
377
378#[sync_docs]
379#[derive(Debug, Clone, Copy, PartialEq, Eq)]
380pub enum BasinState {
381 Unspecified,
382 Active,
383 Creating,
384 Deleting,
385}
386
387impl From<BasinState> for api::BasinState {
388 fn from(value: BasinState) -> Self {
389 match value {
390 BasinState::Unspecified => Self::Unspecified,
391 BasinState::Active => Self::Active,
392 BasinState::Creating => Self::Creating,
393 BasinState::Deleting => Self::Deleting,
394 }
395 }
396}
397
398impl From<api::BasinState> for BasinState {
399 fn from(value: api::BasinState) -> Self {
400 match value {
401 api::BasinState::Unspecified => Self::Unspecified,
402 api::BasinState::Active => Self::Active,
403 api::BasinState::Creating => Self::Creating,
404 api::BasinState::Deleting => Self::Deleting,
405 }
406 }
407}
408
409impl From<BasinState> for i32 {
410 fn from(value: BasinState) -> Self {
411 api::BasinState::from(value).into()
412 }
413}
414
415impl TryFrom<i32> for BasinState {
416 type Error = ConvertError;
417 fn try_from(value: i32) -> Result<Self, Self::Error> {
418 api::BasinState::try_from(value)
419 .map(Into::into)
420 .map_err(|_| "invalid basin status value".into())
421 }
422}
423
424impl std::fmt::Display for BasinState {
425 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426 match self {
427 BasinState::Unspecified => write!(f, "unspecified"),
428 BasinState::Active => write!(f, "active"),
429 BasinState::Creating => write!(f, "creating"),
430 BasinState::Deleting => write!(f, "deleting"),
431 }
432 }
433}
434
435#[sync_docs]
436#[derive(Debug, Clone)]
437pub struct BasinInfo {
438 pub name: String,
439 pub scope: BasinScope,
440 pub state: BasinState,
441}
442
443impl From<BasinInfo> for api::BasinInfo {
444 fn from(value: BasinInfo) -> Self {
445 let BasinInfo { name, scope, state } = value;
446 Self {
447 name,
448 scope: scope.into(),
449 state: state.into(),
450 }
451 }
452}
453
454impl TryFrom<api::BasinInfo> for BasinInfo {
455 type Error = ConvertError;
456 fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
457 let api::BasinInfo { name, scope, state } = value;
458 Ok(Self {
459 name,
460 scope: scope.try_into()?,
461 state: state.try_into()?,
462 })
463 }
464}
465
466impl TryFrom<api::CreateBasinResponse> for BasinInfo {
467 type Error = ConvertError;
468 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
469 let api::CreateBasinResponse { info } = value;
470 let info = info.ok_or("missing basin info")?;
471 info.try_into()
472 }
473}
474
475#[sync_docs]
476#[derive(Debug, Clone, Default)]
477pub struct ListStreamsRequest {
478 pub prefix: String,
479 pub start_after: String,
480 pub limit: Option<usize>,
481}
482
483impl ListStreamsRequest {
484 pub fn new() -> Self {
486 Self::default()
487 }
488
489 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
491 Self {
492 prefix: prefix.into(),
493 ..self
494 }
495 }
496
497 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
499 Self {
500 start_after: start_after.into(),
501 ..self
502 }
503 }
504
505 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
507 Self {
508 limit: limit.into(),
509 ..self
510 }
511 }
512}
513
514impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
515 type Error = ConvertError;
516 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
517 let ListStreamsRequest {
518 prefix,
519 start_after,
520 limit,
521 } = value;
522 Ok(Self {
523 prefix,
524 start_after,
525 limit: limit.map(|n| n as u64),
526 })
527 }
528}
529
530#[sync_docs]
531#[derive(Debug, Clone)]
532pub struct StreamInfo {
533 pub name: String,
534 pub created_at: u32,
535 pub deleted_at: Option<u32>,
536}
537
538impl From<api::StreamInfo> for StreamInfo {
539 fn from(value: api::StreamInfo) -> Self {
540 Self {
541 name: value.name,
542 created_at: value.created_at,
543 deleted_at: value.deleted_at,
544 }
545 }
546}
547
548impl TryFrom<api::CreateStreamResponse> for StreamInfo {
549 type Error = ConvertError;
550
551 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
552 let api::CreateStreamResponse { info } = value;
553 let info = info.ok_or("missing stream info")?;
554 Ok(info.into())
555 }
556}
557
558#[sync_docs]
559#[derive(Debug, Clone)]
560pub struct ListStreamsResponse {
561 pub streams: Vec<StreamInfo>,
562 pub has_more: bool,
563}
564
565impl From<api::ListStreamsResponse> for ListStreamsResponse {
566 fn from(value: api::ListStreamsResponse) -> Self {
567 let api::ListStreamsResponse { streams, has_more } = value;
568 let streams = streams.into_iter().map(Into::into).collect();
569 Self { streams, has_more }
570 }
571}
572
573impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
574 type Error = ConvertError;
575 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
576 let api::GetBasinConfigResponse { config } = value;
577 let config = config.ok_or("missing basin config")?;
578 config.try_into()
579 }
580}
581
582impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
583 type Error = ConvertError;
584 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
585 let api::GetStreamConfigResponse { config } = value;
586 let config = config.ok_or("missing stream config")?;
587 config.try_into()
588 }
589}
590
591#[sync_docs]
592#[derive(Debug, Clone)]
593pub struct CreateStreamRequest {
594 pub stream: String,
595 pub config: Option<StreamConfig>,
596}
597
598impl CreateStreamRequest {
599 pub fn new(stream: impl Into<String>) -> Self {
601 Self {
602 stream: stream.into(),
603 config: None,
604 }
605 }
606
607 pub fn with_config(self, config: StreamConfig) -> Self {
609 Self {
610 config: Some(config),
611 ..self
612 }
613 }
614}
615
616impl From<CreateStreamRequest> for api::CreateStreamRequest {
617 fn from(value: CreateStreamRequest) -> Self {
618 let CreateStreamRequest { stream, config } = value;
619 Self {
620 stream,
621 config: config.map(Into::into),
622 }
623 }
624}
625
626#[sync_docs]
627#[derive(Debug, Clone, Default)]
628pub struct ListBasinsRequest {
629 pub prefix: String,
630 pub start_after: String,
631 pub limit: Option<usize>,
632}
633
634impl ListBasinsRequest {
635 pub fn new() -> Self {
637 Self::default()
638 }
639
640 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
642 Self {
643 prefix: prefix.into(),
644 ..self
645 }
646 }
647
648 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
650 Self {
651 start_after: start_after.into(),
652 ..self
653 }
654 }
655
656 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
658 Self {
659 limit: limit.into(),
660 ..self
661 }
662 }
663}
664
665impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
666 type Error = ConvertError;
667 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
668 let ListBasinsRequest {
669 prefix,
670 start_after,
671 limit,
672 } = value;
673 Ok(Self {
674 prefix,
675 start_after,
676 limit: limit
677 .map(TryInto::try_into)
678 .transpose()
679 .map_err(|_| "request limit does not fit into u64 bounds")?,
680 })
681 }
682}
683
684#[sync_docs]
685#[derive(Debug, Clone)]
686pub struct ListBasinsResponse {
687 pub basins: Vec<BasinInfo>,
688 pub has_more: bool,
689}
690
691impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
692 type Error = ConvertError;
693 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
694 let api::ListBasinsResponse { basins, has_more } = value;
695 Ok(Self {
696 basins: basins
697 .into_iter()
698 .map(TryInto::try_into)
699 .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
700 has_more,
701 })
702 }
703}
704
705#[sync_docs]
706#[derive(Debug, Clone)]
707pub struct DeleteBasinRequest {
708 pub basin: BasinName,
709 pub if_exists: bool,
711}
712
713impl DeleteBasinRequest {
714 pub fn new(basin: BasinName) -> Self {
716 Self {
717 basin,
718 if_exists: false,
719 }
720 }
721
722 pub fn with_if_exists(self, if_exists: bool) -> Self {
724 Self { if_exists, ..self }
725 }
726}
727
728impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
729 fn from(value: DeleteBasinRequest) -> Self {
730 let DeleteBasinRequest { basin, .. } = value;
731 Self { basin: basin.0 }
732 }
733}
734
735#[sync_docs]
736#[derive(Debug, Clone)]
737pub struct DeleteStreamRequest {
738 pub stream: String,
739 pub if_exists: bool,
741}
742
743impl DeleteStreamRequest {
744 pub fn new(stream: impl Into<String>) -> Self {
746 Self {
747 stream: stream.into(),
748 if_exists: false,
749 }
750 }
751
752 pub fn with_if_exists(self, if_exists: bool) -> Self {
754 Self { if_exists, ..self }
755 }
756}
757
758impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
759 fn from(value: DeleteStreamRequest) -> Self {
760 let DeleteStreamRequest { stream, .. } = value;
761 Self { stream }
762 }
763}
764
765#[sync_docs]
766#[derive(Debug, Clone)]
767pub struct ReconfigureBasinRequest {
768 pub basin: BasinName,
769 pub config: Option<BasinConfig>,
770 pub mask: Option<Vec<String>>,
771}
772
773impl ReconfigureBasinRequest {
774 pub fn new(basin: BasinName) -> Self {
776 Self {
777 basin,
778 config: None,
779 mask: None,
780 }
781 }
782
783 pub fn with_config(self, config: BasinConfig) -> Self {
785 Self {
786 config: Some(config),
787 ..self
788 }
789 }
790
791 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
793 Self {
794 mask: Some(mask.into()),
795 ..self
796 }
797 }
798}
799
800impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
801 fn from(value: ReconfigureBasinRequest) -> Self {
802 let ReconfigureBasinRequest {
803 basin,
804 config,
805 mask,
806 } = value;
807 Self {
808 basin: basin.0,
809 config: config.map(Into::into),
810 mask: mask.map(|paths| prost_types::FieldMask { paths }),
811 }
812 }
813}
814
815impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
816 type Error = ConvertError;
817 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
818 let api::ReconfigureBasinResponse { config } = value;
819 let config = config.ok_or("missing basin config")?;
820 config.try_into()
821 }
822}
823
824#[sync_docs]
825#[derive(Debug, Clone)]
826pub struct ReconfigureStreamRequest {
827 pub stream: String,
828 pub config: Option<StreamConfig>,
829 pub mask: Option<Vec<String>>,
830}
831
832impl ReconfigureStreamRequest {
833 pub fn new(stream: impl Into<String>) -> Self {
835 Self {
836 stream: stream.into(),
837 config: None,
838 mask: None,
839 }
840 }
841
842 pub fn with_config(self, config: StreamConfig) -> Self {
844 Self {
845 config: Some(config),
846 ..self
847 }
848 }
849
850 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
852 Self {
853 mask: Some(mask.into()),
854 ..self
855 }
856 }
857}
858
859impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
860 fn from(value: ReconfigureStreamRequest) -> Self {
861 let ReconfigureStreamRequest {
862 stream,
863 config,
864 mask,
865 } = value;
866 Self {
867 stream,
868 config: config.map(Into::into),
869 mask: mask.map(|paths| prost_types::FieldMask { paths }),
870 }
871 }
872}
873
874impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
875 type Error = ConvertError;
876 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
877 let api::ReconfigureStreamResponse { config } = value;
878 let config = config.ok_or("missing stream config")?;
879 config.try_into()
880 }
881}
882
883impl From<api::CheckTailResponse> for u64 {
884 fn from(value: api::CheckTailResponse) -> Self {
885 let api::CheckTailResponse { next_seq_num } = value;
886 next_seq_num
887 }
888}
889
890#[sync_docs]
891#[derive(Debug, Clone, PartialEq, Eq)]
892pub struct Header {
893 pub name: Bytes,
894 pub value: Bytes,
895}
896
897impl Header {
898 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
900 Self {
901 name: name.into(),
902 value: value.into(),
903 }
904 }
905
906 pub fn from_value(value: impl Into<Bytes>) -> Self {
908 Self {
909 name: Bytes::new(),
910 value: value.into(),
911 }
912 }
913}
914
915impl From<Header> for api::Header {
916 fn from(value: Header) -> Self {
917 let Header { name, value } = value;
918 Self { name, value }
919 }
920}
921
922impl From<api::Header> for Header {
923 fn from(value: api::Header) -> Self {
924 let api::Header { name, value } = value;
925 Self { name, value }
926 }
927}
928
929#[derive(Debug, Clone, Default, PartialEq, Eq)]
933pub struct FencingToken(Bytes);
934
935impl FencingToken {
936 const MAX_BYTES: usize = 16;
937
938 pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
940 let bytes = bytes.into();
941 if bytes.len() > Self::MAX_BYTES {
942 Err(format!(
943 "Size of a fencing token cannot exceed {} bytes",
944 Self::MAX_BYTES
945 )
946 .into())
947 } else {
948 Ok(Self(bytes))
949 }
950 }
951
952 pub fn generate(n: usize) -> Result<Self, ConvertError> {
954 Self::new(
955 rand::rng()
956 .sample_iter(rand::distr::StandardUniform)
957 .take(n)
958 .collect::<Bytes>(),
959 )
960 }
961}
962
963impl AsRef<Bytes> for FencingToken {
964 fn as_ref(&self) -> &Bytes {
965 &self.0
966 }
967}
968
969impl AsRef<[u8]> for FencingToken {
970 fn as_ref(&self) -> &[u8] {
971 &self.0
972 }
973}
974
975impl Deref for FencingToken {
976 type Target = [u8];
977
978 fn deref(&self) -> &Self::Target {
979 &self.0
980 }
981}
982
983impl From<FencingToken> for Bytes {
984 fn from(value: FencingToken) -> Self {
985 value.0
986 }
987}
988
989impl From<FencingToken> for Vec<u8> {
990 fn from(value: FencingToken) -> Self {
991 value.0.into()
992 }
993}
994
995impl TryFrom<Bytes> for FencingToken {
996 type Error = ConvertError;
997
998 fn try_from(value: Bytes) -> Result<Self, Self::Error> {
999 Self::new(value)
1000 }
1001}
1002
1003impl TryFrom<Vec<u8>> for FencingToken {
1004 type Error = ConvertError;
1005
1006 fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
1007 Self::new(value)
1008 }
1009}
1010
1011#[derive(Debug, Clone)]
1013pub enum Command {
1014 Fence {
1019 fencing_token: FencingToken,
1023 },
1024 Trim {
1029 seq_num: u64,
1034 },
1035}
1036
1037#[derive(Debug, Clone)]
1043pub struct CommandRecord {
1044 pub command: Command,
1046 pub timestamp: Option<u64>,
1048}
1049
1050impl CommandRecord {
1051 const FENCE: &[u8] = b"fence";
1052 const TRIM: &[u8] = b"trim";
1053
1054 pub fn fence(fencing_token: FencingToken) -> Self {
1056 Self {
1057 command: Command::Fence { fencing_token },
1058 timestamp: None,
1059 }
1060 }
1061
1062 pub fn trim(seq_num: impl Into<u64>) -> Self {
1064 Self {
1065 command: Command::Trim {
1066 seq_num: seq_num.into(),
1067 },
1068 timestamp: None,
1069 }
1070 }
1071
1072 pub fn with_timestamp(self, timestamp: u64) -> Self {
1074 Self {
1075 timestamp: Some(timestamp),
1076 ..self
1077 }
1078 }
1079}
1080
1081#[sync_docs]
1082#[derive(Debug, Clone, PartialEq, Eq)]
1083pub struct AppendRecord {
1084 timestamp: Option<u64>,
1085 headers: Vec<Header>,
1086 body: Bytes,
1087 #[cfg(test)]
1088 max_bytes: u64,
1089}
1090
1091metered_impl!(AppendRecord);
1092
1093impl AppendRecord {
1094 const MAX_BYTES: u64 = MIB_BYTES;
1095
1096 fn validated(self) -> Result<Self, ConvertError> {
1097 #[cfg(test)]
1098 let max_bytes = self.max_bytes;
1099 #[cfg(not(test))]
1100 let max_bytes = Self::MAX_BYTES;
1101
1102 if self.metered_bytes() > max_bytes {
1103 Err("AppendRecord should have metered size less than 1 MiB".into())
1104 } else {
1105 Ok(self)
1106 }
1107 }
1108
1109 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1111 Self {
1112 timestamp: None,
1113 headers: Vec::new(),
1114 body: body.into(),
1115 #[cfg(test)]
1116 max_bytes: Self::MAX_BYTES,
1117 }
1118 .validated()
1119 }
1120
1121 #[cfg(test)]
1122 pub(crate) fn with_max_bytes(
1123 max_bytes: u64,
1124 body: impl Into<Bytes>,
1125 ) -> Result<Self, ConvertError> {
1126 Self {
1127 timestamp: None,
1128 headers: Vec::new(),
1129 body: body.into(),
1130 max_bytes,
1131 }
1132 .validated()
1133 }
1134
1135 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1137 Self {
1138 headers: headers.into(),
1139 ..self
1140 }
1141 .validated()
1142 }
1143
1144 pub fn with_timestamp(self, timestamp: u64) -> Self {
1146 Self {
1147 timestamp: Some(timestamp),
1148 ..self
1149 }
1150 }
1151
1152 pub fn body(&self) -> &[u8] {
1154 &self.body
1155 }
1156
1157 pub fn headers(&self) -> &[Header] {
1159 &self.headers
1160 }
1161
1162 pub fn timestamp(&self) -> Option<u64> {
1164 self.timestamp
1165 }
1166
1167 pub fn into_parts(self) -> AppendRecordParts {
1169 AppendRecordParts {
1170 timestamp: self.timestamp,
1171 headers: self.headers,
1172 body: self.body,
1173 }
1174 }
1175
1176 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1178 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1179 if let Some(timestamp) = parts.timestamp {
1180 Ok(record.with_timestamp(timestamp))
1181 } else {
1182 Ok(record)
1183 }
1184 }
1185}
1186
1187impl From<AppendRecord> for api::AppendRecord {
1188 fn from(value: AppendRecord) -> Self {
1189 Self {
1190 timestamp: value.timestamp,
1191 headers: value.headers.into_iter().map(Into::into).collect(),
1192 body: value.body,
1193 }
1194 }
1195}
1196
1197impl From<CommandRecord> for AppendRecord {
1198 fn from(value: CommandRecord) -> Self {
1199 let (header_value, body) = match value.command {
1200 Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1201 Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1202 };
1203 AppendRecordParts {
1204 timestamp: value.timestamp,
1205 headers: vec![Header::from_value(header_value)],
1206 body: body.into(),
1207 }
1208 .try_into()
1209 .expect("command record is a valid append record")
1210 }
1211}
1212
1213#[sync_docs(AppendRecordParts = "AppendRecord")]
1214#[derive(Debug, Clone)]
1215pub struct AppendRecordParts {
1216 pub timestamp: Option<u64>,
1217 pub headers: Vec<Header>,
1218 pub body: Bytes,
1219}
1220
1221impl From<AppendRecord> for AppendRecordParts {
1222 fn from(value: AppendRecord) -> Self {
1223 value.into_parts()
1224 }
1225}
1226
1227impl TryFrom<AppendRecordParts> for AppendRecord {
1228 type Error = ConvertError;
1229
1230 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1231 Self::try_from_parts(value)
1232 }
1233}
1234
1235#[derive(Debug, Clone)]
1237pub struct AppendRecordBatch {
1238 records: Vec<AppendRecord>,
1239 metered_bytes: u64,
1240 max_capacity: usize,
1241 #[cfg(test)]
1242 max_bytes: u64,
1243}
1244
1245impl PartialEq for AppendRecordBatch {
1246 fn eq(&self, other: &Self) -> bool {
1247 if self.records.eq(&other.records) {
1248 assert_eq!(self.metered_bytes, other.metered_bytes);
1249 true
1250 } else {
1251 false
1252 }
1253 }
1254}
1255
1256impl Eq for AppendRecordBatch {}
1257
1258impl Default for AppendRecordBatch {
1259 fn default() -> Self {
1260 Self::new()
1261 }
1262}
1263
1264impl AppendRecordBatch {
1265 pub const MAX_CAPACITY: usize = 1000;
1269
1270 pub const MAX_BYTES: u64 = MIB_BYTES;
1272
1273 pub fn new() -> Self {
1275 Self::with_max_capacity(Self::MAX_CAPACITY)
1276 }
1277
1278 pub fn with_max_capacity(max_capacity: usize) -> Self {
1282 assert!(
1283 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1284 "Batch capacity must be between 1 and 1000"
1285 );
1286
1287 Self {
1288 records: Vec::with_capacity(max_capacity),
1289 metered_bytes: 0,
1290 max_capacity,
1291 #[cfg(test)]
1292 max_bytes: Self::MAX_BYTES,
1293 }
1294 }
1295
1296 #[cfg(test)]
1297 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1298 #[cfg(test)]
1299 assert!(
1300 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1301 "Batch size must be between 1 byte and 1 MiB"
1302 );
1303
1304 Self {
1305 max_bytes,
1306 ..Self::with_max_capacity(max_capacity)
1307 }
1308 }
1309
1310 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1316 where
1317 R: Into<AppendRecord>,
1318 T: IntoIterator<Item = R>,
1319 {
1320 let mut records = Self::new();
1321 let mut pending = Vec::new();
1322
1323 let mut iter = iter.into_iter();
1324
1325 for record in iter.by_ref() {
1326 if let Err(record) = records.push(record) {
1327 pending.push(record);
1328 break;
1329 }
1330 }
1331
1332 if pending.is_empty() {
1333 Ok(records)
1334 } else {
1335 pending.extend(iter.map(Into::into));
1336 Err((records, pending))
1337 }
1338 }
1339
1340 pub fn is_empty(&self) -> bool {
1342 if self.records.is_empty() {
1343 assert_eq!(self.metered_bytes, 0);
1344 true
1345 } else {
1346 false
1347 }
1348 }
1349
1350 pub fn len(&self) -> usize {
1352 self.records.len()
1353 }
1354
1355 #[cfg(test)]
1356 fn max_bytes(&self) -> u64 {
1357 self.max_bytes
1358 }
1359
1360 #[cfg(not(test))]
1361 fn max_bytes(&self) -> u64 {
1362 Self::MAX_BYTES
1363 }
1364
1365 pub fn is_full(&self) -> bool {
1367 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1368 }
1369
1370 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1372 assert!(self.records.len() <= self.max_capacity);
1373 assert!(self.metered_bytes <= self.max_bytes());
1374
1375 let record = record.into();
1376 let record_size = record.metered_bytes();
1377 if self.records.len() >= self.max_capacity
1378 || self.metered_bytes + record_size > self.max_bytes()
1379 {
1380 Err(record)
1381 } else {
1382 self.records.push(record);
1383 self.metered_bytes += record_size;
1384 Ok(())
1385 }
1386 }
1387}
1388
1389impl MeteredBytes for AppendRecordBatch {
1390 fn metered_bytes(&self) -> u64 {
1391 self.metered_bytes
1392 }
1393}
1394
1395impl IntoIterator for AppendRecordBatch {
1396 type Item = AppendRecord;
1397 type IntoIter = std::vec::IntoIter<Self::Item>;
1398
1399 fn into_iter(self) -> Self::IntoIter {
1400 self.records.into_iter()
1401 }
1402}
1403
1404impl<'a> IntoIterator for &'a AppendRecordBatch {
1405 type Item = &'a AppendRecord;
1406 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1407
1408 fn into_iter(self) -> Self::IntoIter {
1409 self.records.iter()
1410 }
1411}
1412
1413impl AsRef<[AppendRecord]> for AppendRecordBatch {
1414 fn as_ref(&self) -> &[AppendRecord] {
1415 &self.records
1416 }
1417}
1418
1419#[sync_docs]
1420#[derive(Debug, Default, Clone)]
1421pub struct AppendInput {
1422 pub records: AppendRecordBatch,
1423 pub match_seq_num: Option<u64>,
1424 pub fencing_token: Option<FencingToken>,
1425}
1426
1427impl MeteredBytes for AppendInput {
1428 fn metered_bytes(&self) -> u64 {
1429 self.records.metered_bytes()
1430 }
1431}
1432
1433impl AppendInput {
1434 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1436 Self {
1437 records: records.into(),
1438 match_seq_num: None,
1439 fencing_token: None,
1440 }
1441 }
1442
1443 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1445 Self {
1446 match_seq_num: Some(match_seq_num.into()),
1447 ..self
1448 }
1449 }
1450
1451 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1453 Self {
1454 fencing_token: Some(fencing_token),
1455 ..self
1456 }
1457 }
1458
1459 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1460 let Self {
1461 records,
1462 match_seq_num,
1463 fencing_token,
1464 } = self;
1465
1466 api::AppendInput {
1467 stream: stream.into(),
1468 records: records.into_iter().map(Into::into).collect(),
1469 match_seq_num,
1470 fencing_token: fencing_token.map(|f| f.0),
1471 }
1472 }
1473}
1474
1475#[sync_docs]
1476#[derive(Debug, Clone)]
1477pub struct AppendOutput {
1478 pub start_seq_num: u64,
1479 pub end_seq_num: u64,
1480 pub next_seq_num: u64,
1481}
1482
1483impl From<api::AppendOutput> for AppendOutput {
1484 fn from(value: api::AppendOutput) -> Self {
1485 let api::AppendOutput {
1486 start_seq_num,
1487 end_seq_num,
1488 next_seq_num,
1489 } = value;
1490 Self {
1491 start_seq_num,
1492 end_seq_num,
1493 next_seq_num,
1494 }
1495 }
1496}
1497
1498impl TryFrom<api::AppendResponse> for AppendOutput {
1499 type Error = ConvertError;
1500 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1501 let api::AppendResponse { output } = value;
1502 let output = output.ok_or("missing append output")?;
1503 Ok(output.into())
1504 }
1505}
1506
1507impl TryFrom<api::AppendSessionResponse> for AppendOutput {
1508 type Error = ConvertError;
1509 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1510 let api::AppendSessionResponse { output } = value;
1511 let output = output.ok_or("missing append output")?;
1512 Ok(output.into())
1513 }
1514}
1515
1516#[sync_docs]
1517#[derive(Debug, Clone, Default)]
1518pub struct ReadLimit {
1519 pub count: Option<u64>,
1520 pub bytes: Option<u64>,
1521}
1522
1523impl ReadLimit {
1524 pub fn new() -> Self {
1526 Self::default()
1527 }
1528
1529 pub fn with_count(self, count: u64) -> Self {
1531 Self {
1532 count: Some(count),
1533 ..self
1534 }
1535 }
1536
1537 pub fn with_bytes(self, bytes: u64) -> Self {
1539 Self {
1540 bytes: Some(bytes),
1541 ..self
1542 }
1543 }
1544}
1545
1546#[sync_docs]
1547#[derive(Debug, Clone, Default)]
1548pub struct ReadRequest {
1549 pub start_seq_num: u64,
1550 pub limit: ReadLimit,
1551}
1552
1553impl ReadRequest {
1554 pub fn new(start_seq_num: u64) -> Self {
1556 Self {
1557 start_seq_num,
1558 ..Default::default()
1559 }
1560 }
1561
1562 pub fn with_limit(self, limit: ReadLimit) -> Self {
1564 Self { limit, ..self }
1565 }
1566}
1567
1568impl ReadRequest {
1569 pub(crate) fn try_into_api_type(
1570 self,
1571 stream: impl Into<String>,
1572 ) -> Result<api::ReadRequest, ConvertError> {
1573 let Self {
1574 start_seq_num,
1575 limit,
1576 } = self;
1577
1578 let limit = if limit.count > Some(1000) {
1579 Err("read limit: count must not exceed 1000 for unary request")
1580 } else if limit.bytes > Some(MIB_BYTES) {
1581 Err("read limit: bytes must not exceed 1MiB for unary request")
1582 } else {
1583 Ok(api::ReadLimit {
1584 count: limit.count,
1585 bytes: limit.bytes,
1586 })
1587 }?;
1588
1589 Ok(api::ReadRequest {
1590 stream: stream.into(),
1591 start_seq_num,
1592 limit: Some(limit),
1593 })
1594 }
1595}
1596
1597#[sync_docs]
1598#[derive(Debug, Clone)]
1599pub struct SequencedRecord {
1600 pub seq_num: u64,
1601 pub timestamp: u64,
1602 pub headers: Vec<Header>,
1603 pub body: Bytes,
1604}
1605
1606metered_impl!(SequencedRecord);
1607
1608impl From<api::SequencedRecord> for SequencedRecord {
1609 fn from(value: api::SequencedRecord) -> Self {
1610 let api::SequencedRecord {
1611 seq_num,
1612 timestamp,
1613 headers,
1614 body,
1615 } = value;
1616 Self {
1617 seq_num,
1618 timestamp,
1619 headers: headers.into_iter().map(Into::into).collect(),
1620 body,
1621 }
1622 }
1623}
1624
1625impl SequencedRecord {
1626 pub fn as_command_record(&self) -> Option<CommandRecord> {
1628 if self.headers.len() != 1 {
1629 return None;
1630 }
1631
1632 let header = self.headers.first().expect("pre-validated length");
1633
1634 if !header.name.is_empty() {
1635 return None;
1636 }
1637
1638 match header.value.as_ref() {
1639 CommandRecord::FENCE => {
1640 let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1641 Some(CommandRecord {
1642 command: Command::Fence { fencing_token },
1643 timestamp: Some(self.timestamp),
1644 })
1645 }
1646 CommandRecord::TRIM => {
1647 let body: &[u8] = &self.body;
1648 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1649 Some(CommandRecord {
1650 command: Command::Trim { seq_num },
1651 timestamp: Some(self.timestamp),
1652 })
1653 }
1654 _ => None,
1655 }
1656 }
1657}
1658
1659#[sync_docs]
1660#[derive(Debug, Clone)]
1661pub struct SequencedRecordBatch {
1662 pub records: Vec<SequencedRecord>,
1663}
1664
1665impl MeteredBytes for SequencedRecordBatch {
1666 fn metered_bytes(&self) -> u64 {
1667 self.records.metered_bytes()
1668 }
1669}
1670
1671impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1672 fn from(value: api::SequencedRecordBatch) -> Self {
1673 let api::SequencedRecordBatch { records } = value;
1674 Self {
1675 records: records.into_iter().map(Into::into).collect(),
1676 }
1677 }
1678}
1679
1680#[sync_docs(ReadOutput = "Output")]
1681#[derive(Debug, Clone)]
1682pub enum ReadOutput {
1683 Batch(SequencedRecordBatch),
1684 NextSeqNum(u64),
1685}
1686
1687impl From<api::read_output::Output> for ReadOutput {
1688 fn from(value: api::read_output::Output) -> Self {
1689 match value {
1690 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1691 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1692 }
1693 }
1694}
1695
1696impl TryFrom<api::ReadOutput> for ReadOutput {
1697 type Error = ConvertError;
1698 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1699 let api::ReadOutput { output } = value;
1700 let output = output.ok_or("missing read output")?;
1701 Ok(output.into())
1702 }
1703}
1704
1705impl TryFrom<api::ReadResponse> for ReadOutput {
1706 type Error = ConvertError;
1707 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1708 let api::ReadResponse { output } = value;
1709 let output = output.ok_or("missing output in read response")?;
1710 output.try_into()
1711 }
1712}
1713
1714#[sync_docs]
1715#[derive(Debug, Clone, Default)]
1716pub struct ReadSessionRequest {
1717 pub start_seq_num: u64,
1718 pub limit: ReadLimit,
1719}
1720
1721impl ReadSessionRequest {
1722 pub fn new(start_seq_num: u64) -> Self {
1724 Self {
1725 start_seq_num,
1726 ..Default::default()
1727 }
1728 }
1729
1730 pub fn with_limit(self, limit: ReadLimit) -> Self {
1732 Self { limit, ..self }
1733 }
1734
1735 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1736 let Self {
1737 start_seq_num,
1738 limit,
1739 } = self;
1740 api::ReadSessionRequest {
1741 stream: stream.into(),
1742 start_seq_num,
1743 limit: Some(api::ReadLimit {
1744 count: limit.count,
1745 bytes: limit.bytes,
1746 }),
1747 heartbeats: false,
1748 }
1749 }
1750}
1751
1752impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1753 type Error = ConvertError;
1754 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1755 let api::ReadSessionResponse { output } = value;
1756 let output = output.ok_or("missing output in read session response")?;
1757 output.try_into()
1758 }
1759}
1760
1761#[derive(Debug, Clone)]
1766pub struct BasinName(String);
1767
1768impl AsRef<str> for BasinName {
1769 fn as_ref(&self) -> &str {
1770 &self.0
1771 }
1772}
1773
1774impl Deref for BasinName {
1775 type Target = str;
1776 fn deref(&self) -> &Self::Target {
1777 &self.0
1778 }
1779}
1780
1781impl TryFrom<String> for BasinName {
1782 type Error = ConvertError;
1783
1784 fn try_from(name: String) -> Result<Self, Self::Error> {
1785 if name.len() < 8 || name.len() > 48 {
1786 return Err("Basin name must be between 8 and 48 characters in length".into());
1787 }
1788
1789 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1790 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1791 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1792 .expect("Failed to compile basin name regex")
1793 });
1794
1795 if !regex.is_match(&name) {
1796 return Err(
1797 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1798 It cannot begin or end with a hyphen."
1799 .into(),
1800 );
1801 }
1802
1803 Ok(Self(name))
1804 }
1805}
1806
1807impl FromStr for BasinName {
1808 type Err = ConvertError;
1809
1810 fn from_str(s: &str) -> Result<Self, Self::Err> {
1811 s.to_string().try_into()
1812 }
1813}
1814
1815impl std::fmt::Display for BasinName {
1816 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1817 f.write_str(&self.0)
1818 }
1819}
1820
1821#[derive(Debug, Clone)]
1824pub struct AccessTokenId(String);
1825
1826impl AsRef<str> for AccessTokenId {
1827 fn as_ref(&self) -> &str {
1828 &self.0
1829 }
1830}
1831
1832impl Deref for AccessTokenId {
1833 type Target = str;
1834
1835 fn deref(&self) -> &Self::Target {
1836 &self.0
1837 }
1838}
1839
1840impl TryFrom<String> for AccessTokenId {
1841 type Error = ConvertError;
1842
1843 fn try_from(name: String) -> Result<Self, Self::Error> {
1844 if name.is_empty() {
1845 return Err("Access token ID must not be empty".into());
1846 }
1847
1848 if name.len() > 50 {
1849 return Err("Access token ID must not exceed 50 characters".into());
1850 }
1851
1852 Ok(Self(name))
1853 }
1854}
1855
1856impl From<AccessTokenId> for String {
1857 fn from(value: AccessTokenId) -> Self {
1858 value.0
1859 }
1860}
1861
1862impl FromStr for AccessTokenId {
1863 type Err = ConvertError;
1864
1865 fn from_str(s: &str) -> Result<Self, Self::Err> {
1866 s.to_string().try_into()
1867 }
1868}
1869
1870impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1871 fn from(value: AccessTokenInfo) -> Self {
1872 Self {
1873 info: Some(value.into()),
1874 }
1875 }
1876}
1877
1878#[sync_docs]
1879#[derive(Debug, Clone)]
1880pub struct AccessTokenInfo {
1881 pub id: AccessTokenId,
1882 pub expires_at: Option<u32>,
1883 pub auto_prefix_streams: bool,
1884 pub scope: Option<AccessTokenScope>,
1885}
1886
1887impl AccessTokenInfo {
1888 pub fn new(id: AccessTokenId) -> Self {
1890 Self {
1891 id,
1892 expires_at: None,
1893 auto_prefix_streams: false,
1894 scope: None,
1895 }
1896 }
1897
1898 pub fn with_expires_at(self, expires_at: u32) -> Self {
1900 Self {
1901 expires_at: Some(expires_at),
1902 ..self
1903 }
1904 }
1905
1906 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1908 Self {
1909 auto_prefix_streams,
1910 ..self
1911 }
1912 }
1913
1914 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1916 Self {
1917 scope: Some(scope),
1918 ..self
1919 }
1920 }
1921}
1922
1923impl From<AccessTokenInfo> for api::AccessTokenInfo {
1924 fn from(value: AccessTokenInfo) -> Self {
1925 let AccessTokenInfo {
1926 id,
1927 expires_at,
1928 auto_prefix_streams,
1929 scope,
1930 } = value;
1931 Self {
1932 id: id.into(),
1933 expires_at,
1934 auto_prefix_streams,
1935 scope: scope.map(Into::into),
1936 }
1937 }
1938}
1939
1940impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
1941 type Error = ConvertError;
1942 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
1943 let api::AccessTokenInfo {
1944 id,
1945 expires_at,
1946 auto_prefix_streams,
1947 scope,
1948 } = value;
1949 Ok(Self {
1950 id: id.try_into()?,
1951 expires_at,
1952 auto_prefix_streams,
1953 scope: scope.map(TryInto::try_into).transpose()?,
1954 })
1955 }
1956}
1957
1958#[sync_docs]
1959#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1960pub enum Operation {
1961 Unspecified,
1962 ListBasins,
1963 CreateBasin,
1964 DeleteBasin,
1965 ReconfigureBasin,
1966 GetBasinConfig,
1967 IssueAccessToken,
1968 RevokeAccessToken,
1969 ListAccessTokens,
1970 ListStreams,
1971 CreateStream,
1972 DeleteStream,
1973 GetStreamConfig,
1974 ReconfigureStream,
1975 CheckTail,
1976 Append,
1977 Read,
1978 Trim,
1979 Fence,
1980}
1981
1982impl FromStr for Operation {
1983 type Err = ConvertError;
1984
1985 fn from_str(s: &str) -> Result<Self, Self::Err> {
1986 match s.to_lowercase().as_str() {
1987 "unspecified" => Ok(Self::Unspecified),
1988 "list-basins" => Ok(Self::ListBasins),
1989 "create-basin" => Ok(Self::CreateBasin),
1990 "delete-basin" => Ok(Self::DeleteBasin),
1991 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
1992 "get-basin-config" => Ok(Self::GetBasinConfig),
1993 "issue-access-token" => Ok(Self::IssueAccessToken),
1994 "revoke-access-token" => Ok(Self::RevokeAccessToken),
1995 "list-access-tokens" => Ok(Self::ListAccessTokens),
1996 "list-streams" => Ok(Self::ListStreams),
1997 "create-stream" => Ok(Self::CreateStream),
1998 "delete-stream" => Ok(Self::DeleteStream),
1999 "get-stream-config" => Ok(Self::GetStreamConfig),
2000 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2001 "check-tail" => Ok(Self::CheckTail),
2002 "append" => Ok(Self::Append),
2003 "read" => Ok(Self::Read),
2004 "trim" => Ok(Self::Trim),
2005 "fence" => Ok(Self::Fence),
2006 _ => Err("invalid operation".into()),
2007 }
2008 }
2009}
2010
2011impl From<Operation> for i32 {
2012 fn from(value: Operation) -> Self {
2013 api::Operation::from(value).into()
2014 }
2015}
2016
2017impl TryFrom<i32> for Operation {
2018 type Error = ConvertError;
2019 fn try_from(value: i32) -> Result<Self, Self::Error> {
2020 api::Operation::try_from(value)
2021 .map_err(|_| "invalid operation".into())
2022 .map(Into::into)
2023 }
2024}
2025
2026impl From<Operation> for api::Operation {
2027 fn from(value: Operation) -> Self {
2028 match value {
2029 Operation::Unspecified => Self::Unspecified,
2030 Operation::ListBasins => Self::ListBasins,
2031 Operation::CreateBasin => Self::CreateBasin,
2032 Operation::DeleteBasin => Self::DeleteBasin,
2033 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2034 Operation::GetBasinConfig => Self::GetBasinConfig,
2035 Operation::IssueAccessToken => Self::IssueAccessToken,
2036 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2037 Operation::ListAccessTokens => Self::ListAccessTokens,
2038 Operation::ListStreams => Self::ListStreams,
2039 Operation::CreateStream => Self::CreateStream,
2040 Operation::DeleteStream => Self::DeleteStream,
2041 Operation::GetStreamConfig => Self::GetStreamConfig,
2042 Operation::ReconfigureStream => Self::ReconfigureStream,
2043 Operation::CheckTail => Self::CheckTail,
2044 Operation::Append => Self::Append,
2045 Operation::Read => Self::Read,
2046 Operation::Trim => Self::Trim,
2047 Operation::Fence => Self::Fence,
2048 }
2049 }
2050}
2051
2052impl From<api::Operation> for Operation {
2053 fn from(value: api::Operation) -> Self {
2054 match value {
2055 api::Operation::Unspecified => Self::Unspecified,
2056 api::Operation::ListBasins => Self::ListBasins,
2057 api::Operation::CreateBasin => Self::CreateBasin,
2058 api::Operation::DeleteBasin => Self::DeleteBasin,
2059 api::Operation::ReconfigureBasin => Self::ReconfigureBasin,
2060 api::Operation::GetBasinConfig => Self::GetBasinConfig,
2061 api::Operation::IssueAccessToken => Self::IssueAccessToken,
2062 api::Operation::RevokeAccessToken => Self::RevokeAccessToken,
2063 api::Operation::ListAccessTokens => Self::ListAccessTokens,
2064 api::Operation::ListStreams => Self::ListStreams,
2065 api::Operation::CreateStream => Self::CreateStream,
2066 api::Operation::DeleteStream => Self::DeleteStream,
2067 api::Operation::GetStreamConfig => Self::GetStreamConfig,
2068 api::Operation::ReconfigureStream => Self::ReconfigureStream,
2069 api::Operation::CheckTail => Self::CheckTail,
2070 api::Operation::Append => Self::Append,
2071 api::Operation::Read => Self::Read,
2072 api::Operation::Trim => Self::Trim,
2073 api::Operation::Fence => Self::Fence,
2074 }
2075 }
2076}
2077
2078#[sync_docs]
2079#[derive(Debug, Clone, Default)]
2080pub struct AccessTokenScope {
2081 pub basins: Option<ResourceSet>,
2082 pub streams: Option<ResourceSet>,
2083 pub tokens: Option<ResourceSet>,
2084 pub op_groups: Option<PermittedOperationGroups>,
2085 pub ops: HashSet<Operation>,
2086}
2087
2088impl AccessTokenScope {
2089 pub fn new() -> Self {
2091 Self::default()
2092 }
2093
2094 pub fn with_basins(self, basins: ResourceSet) -> Self {
2096 Self {
2097 basins: Some(basins),
2098 ..self
2099 }
2100 }
2101
2102 pub fn with_streams(self, streams: ResourceSet) -> Self {
2104 Self {
2105 streams: Some(streams),
2106 ..self
2107 }
2108 }
2109
2110 pub fn with_tokens(self, tokens: ResourceSet) -> Self {
2112 Self {
2113 tokens: Some(tokens),
2114 ..self
2115 }
2116 }
2117
2118 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2120 Self {
2121 op_groups: Some(op_groups),
2122 ..self
2123 }
2124 }
2125
2126 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2128 Self {
2129 ops: ops.into_iter().collect(),
2130 ..self
2131 }
2132 }
2133
2134 pub fn with_op(self, op: Operation) -> Self {
2136 let mut ops = self.ops;
2137 ops.insert(op);
2138 Self { ops, ..self }
2139 }
2140}
2141
2142impl From<AccessTokenScope> for api::AccessTokenScope {
2143 fn from(value: AccessTokenScope) -> Self {
2144 let AccessTokenScope {
2145 basins,
2146 streams,
2147 tokens,
2148 op_groups,
2149 ops,
2150 } = value;
2151 Self {
2152 basins: basins.map(Into::into),
2153 streams: streams.map(Into::into),
2154 tokens: tokens.map(Into::into),
2155 op_groups: op_groups.map(Into::into),
2156 ops: ops.into_iter().map(Into::into).collect(),
2157 }
2158 }
2159}
2160
2161impl TryFrom<api::AccessTokenScope> for AccessTokenScope {
2162 type Error = ConvertError;
2163 fn try_from(value: api::AccessTokenScope) -> Result<Self, Self::Error> {
2164 let api::AccessTokenScope {
2165 basins,
2166 streams,
2167 tokens,
2168 op_groups,
2169 ops,
2170 } = value;
2171 Ok(Self {
2172 basins: basins.and_then(|set| set.matching.map(Into::into)),
2173 streams: streams.and_then(|set| set.matching.map(Into::into)),
2174 tokens: tokens.and_then(|set| set.matching.map(Into::into)),
2175 op_groups: op_groups.map(Into::into),
2176 ops: ops
2177 .into_iter()
2178 .map(Operation::try_from)
2179 .collect::<Result<HashSet<_>, _>>()?,
2180 })
2181 }
2182}
2183
2184impl From<ResourceSet> for api::ResourceSet {
2185 fn from(value: ResourceSet) -> Self {
2186 Self {
2187 matching: Some(value.into()),
2188 }
2189 }
2190}
2191
2192#[sync_docs(ResourceSet = "Matching")]
2193#[derive(Debug, Clone)]
2194pub enum ResourceSet {
2195 Exact(String),
2196 Prefix(String),
2197}
2198
2199impl From<ResourceSet> for api::resource_set::Matching {
2200 fn from(value: ResourceSet) -> Self {
2201 match value {
2202 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2203 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2204 }
2205 }
2206}
2207
2208impl From<api::resource_set::Matching> for ResourceSet {
2209 fn from(value: api::resource_set::Matching) -> Self {
2210 match value {
2211 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2212 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2213 }
2214 }
2215}
2216
2217#[sync_docs]
2218#[derive(Debug, Clone, Default)]
2219pub struct PermittedOperationGroups {
2220 pub account: Option<ReadWritePermissions>,
2221 pub basin: Option<ReadWritePermissions>,
2222 pub stream: Option<ReadWritePermissions>,
2223}
2224
2225impl PermittedOperationGroups {
2226 pub fn new() -> Self {
2228 Self::default()
2229 }
2230
2231 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2233 Self {
2234 account: Some(account),
2235 ..self
2236 }
2237 }
2238
2239 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2241 Self {
2242 basin: Some(basin),
2243 ..self
2244 }
2245 }
2246
2247 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2249 Self {
2250 stream: Some(stream),
2251 ..self
2252 }
2253 }
2254}
2255
2256impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2257 fn from(value: PermittedOperationGroups) -> Self {
2258 let PermittedOperationGroups {
2259 account,
2260 basin,
2261 stream,
2262 } = value;
2263 Self {
2264 account: account.map(Into::into),
2265 basin: basin.map(Into::into),
2266 stream: stream.map(Into::into),
2267 }
2268 }
2269}
2270
2271impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2272 fn from(value: api::PermittedOperationGroups) -> Self {
2273 let api::PermittedOperationGroups {
2274 account,
2275 basin,
2276 stream,
2277 } = value;
2278 Self {
2279 account: account.map(Into::into),
2280 basin: basin.map(Into::into),
2281 stream: stream.map(Into::into),
2282 }
2283 }
2284}
2285
2286#[sync_docs]
2287#[derive(Debug, Clone, Default)]
2288pub struct ReadWritePermissions {
2289 pub read: bool,
2290 pub write: bool,
2291}
2292
2293impl ReadWritePermissions {
2294 pub fn new() -> Self {
2296 Self::default()
2297 }
2298
2299 pub fn with_read(self, read: bool) -> Self {
2301 Self { read, ..self }
2302 }
2303
2304 pub fn with_write(self, write: bool) -> Self {
2306 Self { write, ..self }
2307 }
2308}
2309
2310impl From<ReadWritePermissions> for api::ReadWritePermissions {
2311 fn from(value: ReadWritePermissions) -> Self {
2312 let ReadWritePermissions { read, write } = value;
2313 Self { read, write }
2314 }
2315}
2316
2317impl From<api::ReadWritePermissions> for ReadWritePermissions {
2318 fn from(value: api::ReadWritePermissions) -> Self {
2319 let api::ReadWritePermissions { read, write } = value;
2320 Self { read, write }
2321 }
2322}
2323
2324impl From<api::IssueAccessTokenResponse> for String {
2325 fn from(value: api::IssueAccessTokenResponse) -> Self {
2326 value.token
2327 }
2328}
2329
2330impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2331 fn from(value: AccessTokenId) -> Self {
2332 Self { id: value.into() }
2333 }
2334}
2335
2336impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2337 type Error = ConvertError;
2338 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2339 let token_info = value.info.ok_or("access token info is missing")?;
2340 token_info.try_into()
2341 }
2342}
2343
2344#[sync_docs]
2345#[derive(Debug, Clone, Default)]
2346pub struct ListAccessTokensRequest {
2347 pub prefix: String,
2348 pub start_after: String,
2349 pub limit: Option<usize>,
2350}
2351
2352impl ListAccessTokensRequest {
2353 pub fn new() -> Self {
2355 Self::default()
2356 }
2357
2358 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2360 Self {
2361 prefix: prefix.into(),
2362 ..self
2363 }
2364 }
2365
2366 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2368 Self {
2369 start_after: start_after.into(),
2370 ..self
2371 }
2372 }
2373
2374 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2376 Self {
2377 limit: limit.into(),
2378 ..self
2379 }
2380 }
2381}
2382
2383impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2384 type Error = ConvertError;
2385 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2386 let ListAccessTokensRequest {
2387 prefix,
2388 start_after,
2389 limit,
2390 } = value;
2391 Ok(Self {
2392 prefix,
2393 start_after,
2394 limit: limit
2395 .map(TryInto::try_into)
2396 .transpose()
2397 .map_err(|_| "request limit does not fit into u64 bounds")?,
2398 })
2399 }
2400}
2401
2402#[sync_docs]
2403#[derive(Debug, Clone)]
2404pub struct ListAccessTokensResponse {
2405 pub tokens: Vec<AccessTokenInfo>,
2406 pub has_more: bool,
2407}
2408
2409impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2410 type Error = ConvertError;
2411 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2412 let api::ListAccessTokensResponse { tokens, has_more } = value;
2413 let tokens = tokens
2414 .into_iter()
2415 .map(TryInto::try_into)
2416 .collect::<Result<Vec<_>, _>>()?;
2417 Ok(Self { tokens, has_more })
2418 }
2419}