1use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::{Rng, distributions::Uniform};
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20 fn from(value: T) -> Self {
21 Self(value.into())
22 }
23}
24
25pub trait MeteredBytes {
33 fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38 fn metered_bytes(&self) -> u64 {
39 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40 }
41}
42
43macro_rules! metered_impl {
44 ($ty:ty) => {
45 impl MeteredBytes for $ty {
46 fn metered_bytes(&self) -> u64 {
47 let bytes = 8
48 + (2 * self.headers.len())
49 + self
50 .headers
51 .iter()
52 .map(|h| h.name.len() + h.value.len())
53 .sum::<usize>()
54 + self.body.len();
55 bytes as u64
56 }
57 }
58 };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum BasinScope {
64 #[default]
65 Unspecified,
66 AwsUsEast1,
67}
68
69impl From<BasinScope> for api::BasinScope {
70 fn from(value: BasinScope) -> Self {
71 match value {
72 BasinScope::Unspecified => Self::Unspecified,
73 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
74 }
75 }
76}
77
78impl From<api::BasinScope> for BasinScope {
79 fn from(value: api::BasinScope) -> Self {
80 match value {
81 api::BasinScope::Unspecified => Self::Unspecified,
82 api::BasinScope::AwsUsEast1 => Self::AwsUsEast1,
83 }
84 }
85}
86
87impl FromStr for BasinScope {
88 type Err = ConvertError;
89 fn from_str(value: &str) -> Result<Self, Self::Err> {
90 match value {
91 "unspecified" => Ok(Self::Unspecified),
92 "aws:us-east-1" => Ok(Self::AwsUsEast1),
93 _ => Err("invalid basin scope value".into()),
94 }
95 }
96}
97
98impl From<BasinScope> for i32 {
99 fn from(value: BasinScope) -> Self {
100 api::BasinScope::from(value).into()
101 }
102}
103
104impl TryFrom<i32> for BasinScope {
105 type Error = ConvertError;
106 fn try_from(value: i32) -> Result<Self, Self::Error> {
107 api::BasinScope::try_from(value)
108 .map(Into::into)
109 .map_err(|_| "invalid basin scope value".into())
110 }
111}
112
113#[sync_docs]
114#[derive(Debug, Clone)]
115pub struct CreateBasinRequest {
116 pub basin: BasinName,
117 pub config: Option<BasinConfig>,
118 pub scope: BasinScope,
119}
120
121impl CreateBasinRequest {
122 pub fn new(basin: BasinName) -> Self {
124 Self {
125 basin,
126 config: None,
127 scope: BasinScope::Unspecified,
128 }
129 }
130
131 pub fn with_config(self, config: BasinConfig) -> Self {
133 Self {
134 config: Some(config),
135 ..self
136 }
137 }
138
139 pub fn with_scope(self, scope: BasinScope) -> Self {
141 Self { scope, ..self }
142 }
143}
144
145impl From<CreateBasinRequest> for api::CreateBasinRequest {
146 fn from(value: CreateBasinRequest) -> Self {
147 let CreateBasinRequest {
148 basin,
149 config,
150 scope,
151 } = value;
152 Self {
153 basin: basin.0,
154 config: config.map(Into::into),
155 scope: scope.into(),
156 }
157 }
158}
159
160#[sync_docs]
161#[derive(Debug, Clone, Default)]
162pub struct BasinConfig {
163 pub default_stream_config: Option<StreamConfig>,
164 pub create_stream_on_append: bool,
165 pub create_stream_on_read: bool,
166}
167
168impl From<BasinConfig> for api::BasinConfig {
169 fn from(value: BasinConfig) -> Self {
170 let BasinConfig {
171 default_stream_config,
172 create_stream_on_append,
173 create_stream_on_read,
174 } = value;
175 Self {
176 default_stream_config: default_stream_config.map(Into::into),
177 create_stream_on_append,
178 create_stream_on_read,
179 }
180 }
181}
182
183impl TryFrom<api::BasinConfig> for BasinConfig {
184 type Error = ConvertError;
185 fn try_from(value: api::BasinConfig) -> Result<Self, Self::Error> {
186 let api::BasinConfig {
187 default_stream_config,
188 create_stream_on_append,
189 create_stream_on_read,
190 } = value;
191 Ok(Self {
192 default_stream_config: default_stream_config.map(TryInto::try_into).transpose()?,
193 create_stream_on_append,
194 create_stream_on_read,
195 })
196 }
197}
198
199#[sync_docs]
200#[derive(Debug, Clone, Default)]
201pub struct StreamConfig {
202 pub storage_class: StorageClass,
203 pub retention_policy: Option<RetentionPolicy>,
204 pub require_client_timestamps: Option<bool>,
205}
206
207impl StreamConfig {
208 pub fn new() -> Self {
210 Self::default()
211 }
212
213 pub fn with_storage_class(self, storage_class: impl Into<StorageClass>) -> Self {
215 Self {
216 storage_class: storage_class.into(),
217 ..self
218 }
219 }
220
221 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
223 Self {
224 retention_policy: Some(retention_policy),
225 ..self
226 }
227 }
228
229 pub fn with_require_client_timestamps(self, require_client_timestamps: bool) -> Self {
231 Self {
232 require_client_timestamps: Some(require_client_timestamps),
233 ..self
234 }
235 }
236}
237
238impl From<StreamConfig> for api::StreamConfig {
239 fn from(value: StreamConfig) -> Self {
240 let StreamConfig {
241 storage_class,
242 retention_policy,
243 require_client_timestamps,
244 } = value;
245 Self {
246 storage_class: storage_class.into(),
247 retention_policy: retention_policy.map(Into::into),
248 require_client_timestamps,
249 }
250 }
251}
252
253impl TryFrom<api::StreamConfig> for StreamConfig {
254 type Error = ConvertError;
255 fn try_from(value: api::StreamConfig) -> Result<Self, Self::Error> {
256 let api::StreamConfig {
257 storage_class,
258 retention_policy,
259 require_client_timestamps,
260 } = value;
261 Ok(Self {
262 storage_class: storage_class.try_into()?,
263 retention_policy: retention_policy.map(Into::into),
264 require_client_timestamps,
265 })
266 }
267}
268
269#[sync_docs]
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
271pub enum StorageClass {
272 #[default]
273 Unspecified,
274 Standard,
275 Express,
276}
277
278impl From<StorageClass> for api::StorageClass {
279 fn from(value: StorageClass) -> Self {
280 match value {
281 StorageClass::Unspecified => Self::Unspecified,
282 StorageClass::Standard => Self::Standard,
283 StorageClass::Express => Self::Express,
284 }
285 }
286}
287
288impl From<api::StorageClass> for StorageClass {
289 fn from(value: api::StorageClass) -> Self {
290 match value {
291 api::StorageClass::Unspecified => Self::Unspecified,
292 api::StorageClass::Standard => Self::Standard,
293 api::StorageClass::Express => Self::Express,
294 }
295 }
296}
297
298impl FromStr for StorageClass {
299 type Err = ConvertError;
300 fn from_str(value: &str) -> Result<Self, Self::Err> {
301 match value {
302 "unspecified" => Ok(Self::Unspecified),
303 "standard" => Ok(Self::Standard),
304 "express" => Ok(Self::Express),
305 _ => Err("invalid storage class value".into()),
306 }
307 }
308}
309
310impl From<StorageClass> for i32 {
311 fn from(value: StorageClass) -> Self {
312 api::StorageClass::from(value).into()
313 }
314}
315
316impl TryFrom<i32> for StorageClass {
317 type Error = ConvertError;
318 fn try_from(value: i32) -> Result<Self, Self::Error> {
319 api::StorageClass::try_from(value)
320 .map(Into::into)
321 .map_err(|_| "invalid storage class value".into())
322 }
323}
324
325#[sync_docs(Age = "Age")]
326#[derive(Debug, Clone)]
327pub enum RetentionPolicy {
328 Age(Duration),
329}
330
331impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
332 fn from(value: RetentionPolicy) -> Self {
333 match value {
334 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
335 }
336 }
337}
338
339impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
340 fn from(value: api::stream_config::RetentionPolicy) -> Self {
341 match value {
342 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
343 }
344 }
345}
346
347#[sync_docs]
348#[derive(Debug, Clone, Copy, PartialEq, Eq)]
349pub enum BasinState {
350 Unspecified,
351 Active,
352 Creating,
353 Deleting,
354}
355
356impl From<BasinState> for api::BasinState {
357 fn from(value: BasinState) -> Self {
358 match value {
359 BasinState::Unspecified => Self::Unspecified,
360 BasinState::Active => Self::Active,
361 BasinState::Creating => Self::Creating,
362 BasinState::Deleting => Self::Deleting,
363 }
364 }
365}
366
367impl From<api::BasinState> for BasinState {
368 fn from(value: api::BasinState) -> Self {
369 match value {
370 api::BasinState::Unspecified => Self::Unspecified,
371 api::BasinState::Active => Self::Active,
372 api::BasinState::Creating => Self::Creating,
373 api::BasinState::Deleting => Self::Deleting,
374 }
375 }
376}
377
378impl From<BasinState> for i32 {
379 fn from(value: BasinState) -> Self {
380 api::BasinState::from(value).into()
381 }
382}
383
384impl TryFrom<i32> for BasinState {
385 type Error = ConvertError;
386 fn try_from(value: i32) -> Result<Self, Self::Error> {
387 api::BasinState::try_from(value)
388 .map(Into::into)
389 .map_err(|_| "invalid basin status value".into())
390 }
391}
392
393impl std::fmt::Display for BasinState {
394 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395 match self {
396 BasinState::Unspecified => write!(f, "unspecified"),
397 BasinState::Active => write!(f, "active"),
398 BasinState::Creating => write!(f, "creating"),
399 BasinState::Deleting => write!(f, "deleting"),
400 }
401 }
402}
403
404#[sync_docs]
405#[derive(Debug, Clone)]
406pub struct BasinInfo {
407 pub name: String,
408 pub scope: BasinScope,
409 pub state: BasinState,
410}
411
412impl From<BasinInfo> for api::BasinInfo {
413 fn from(value: BasinInfo) -> Self {
414 let BasinInfo { name, scope, state } = value;
415 Self {
416 name,
417 scope: scope.into(),
418 state: state.into(),
419 }
420 }
421}
422
423impl TryFrom<api::BasinInfo> for BasinInfo {
424 type Error = ConvertError;
425 fn try_from(value: api::BasinInfo) -> Result<Self, Self::Error> {
426 let api::BasinInfo { name, scope, state } = value;
427 Ok(Self {
428 name,
429 scope: scope.try_into()?,
430 state: state.try_into()?,
431 })
432 }
433}
434
435impl TryFrom<api::CreateBasinResponse> for BasinInfo {
436 type Error = ConvertError;
437 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
438 let api::CreateBasinResponse { info } = value;
439 let info = info.ok_or("missing basin info")?;
440 info.try_into()
441 }
442}
443
444#[sync_docs]
445#[derive(Debug, Clone, Default)]
446pub struct ListStreamsRequest {
447 pub prefix: String,
448 pub start_after: String,
449 pub limit: Option<usize>,
450}
451
452impl ListStreamsRequest {
453 pub fn new() -> Self {
455 Self::default()
456 }
457
458 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
460 Self {
461 prefix: prefix.into(),
462 ..self
463 }
464 }
465
466 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
468 Self {
469 start_after: start_after.into(),
470 ..self
471 }
472 }
473
474 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
476 Self {
477 limit: limit.into(),
478 ..self
479 }
480 }
481}
482
483impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
484 type Error = ConvertError;
485 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
486 let ListStreamsRequest {
487 prefix,
488 start_after,
489 limit,
490 } = value;
491 Ok(Self {
492 prefix,
493 start_after,
494 limit: limit.map(|n| n as u64),
495 })
496 }
497}
498
499#[sync_docs]
500#[derive(Debug, Clone)]
501pub struct StreamInfo {
502 pub name: String,
503 pub created_at: u32,
504 pub deleted_at: Option<u32>,
505}
506
507impl From<api::StreamInfo> for StreamInfo {
508 fn from(value: api::StreamInfo) -> Self {
509 Self {
510 name: value.name,
511 created_at: value.created_at,
512 deleted_at: value.deleted_at,
513 }
514 }
515}
516
517impl TryFrom<api::CreateStreamResponse> for StreamInfo {
518 type Error = ConvertError;
519
520 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
521 let api::CreateStreamResponse { info } = value;
522 let info = info.ok_or("missing stream info")?;
523 Ok(info.into())
524 }
525}
526
527#[sync_docs]
528#[derive(Debug, Clone)]
529pub struct ListStreamsResponse {
530 pub streams: Vec<StreamInfo>,
531 pub has_more: bool,
532}
533
534impl From<api::ListStreamsResponse> for ListStreamsResponse {
535 fn from(value: api::ListStreamsResponse) -> Self {
536 let api::ListStreamsResponse { streams, has_more } = value;
537 let streams = streams.into_iter().map(Into::into).collect();
538 Self { streams, has_more }
539 }
540}
541
542impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
543 type Error = ConvertError;
544 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
545 let api::GetBasinConfigResponse { config } = value;
546 let config = config.ok_or("missing basin config")?;
547 config.try_into()
548 }
549}
550
551impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
552 type Error = ConvertError;
553 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
554 let api::GetStreamConfigResponse { config } = value;
555 let config = config.ok_or("missing stream config")?;
556 config.try_into()
557 }
558}
559
560#[sync_docs]
561#[derive(Debug, Clone)]
562pub struct CreateStreamRequest {
563 pub stream: String,
564 pub config: Option<StreamConfig>,
565}
566
567impl CreateStreamRequest {
568 pub fn new(stream: impl Into<String>) -> Self {
570 Self {
571 stream: stream.into(),
572 config: None,
573 }
574 }
575
576 pub fn with_config(self, config: StreamConfig) -> Self {
578 Self {
579 config: Some(config),
580 ..self
581 }
582 }
583}
584
585impl From<CreateStreamRequest> for api::CreateStreamRequest {
586 fn from(value: CreateStreamRequest) -> Self {
587 let CreateStreamRequest { stream, config } = value;
588 Self {
589 stream,
590 config: config.map(Into::into),
591 }
592 }
593}
594
595#[sync_docs]
596#[derive(Debug, Clone, Default)]
597pub struct ListBasinsRequest {
598 pub prefix: String,
599 pub start_after: String,
600 pub limit: Option<usize>,
601}
602
603impl ListBasinsRequest {
604 pub fn new() -> Self {
606 Self::default()
607 }
608
609 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
611 Self {
612 prefix: prefix.into(),
613 ..self
614 }
615 }
616
617 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
619 Self {
620 start_after: start_after.into(),
621 ..self
622 }
623 }
624
625 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
627 Self {
628 limit: limit.into(),
629 ..self
630 }
631 }
632}
633
634impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
635 type Error = ConvertError;
636 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
637 let ListBasinsRequest {
638 prefix,
639 start_after,
640 limit,
641 } = value;
642 Ok(Self {
643 prefix,
644 start_after,
645 limit: limit
646 .map(TryInto::try_into)
647 .transpose()
648 .map_err(|_| "request limit does not fit into u64 bounds")?,
649 })
650 }
651}
652
653#[sync_docs]
654#[derive(Debug, Clone)]
655pub struct ListBasinsResponse {
656 pub basins: Vec<BasinInfo>,
657 pub has_more: bool,
658}
659
660impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
661 type Error = ConvertError;
662 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
663 let api::ListBasinsResponse { basins, has_more } = value;
664 Ok(Self {
665 basins: basins
666 .into_iter()
667 .map(TryInto::try_into)
668 .collect::<Result<Vec<BasinInfo>, ConvertError>>()?,
669 has_more,
670 })
671 }
672}
673
674#[sync_docs]
675#[derive(Debug, Clone)]
676pub struct DeleteBasinRequest {
677 pub basin: BasinName,
678 pub if_exists: bool,
680}
681
682impl DeleteBasinRequest {
683 pub fn new(basin: BasinName) -> Self {
685 Self {
686 basin,
687 if_exists: false,
688 }
689 }
690
691 pub fn with_if_exists(self, if_exists: bool) -> Self {
693 Self { if_exists, ..self }
694 }
695}
696
697impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
698 fn from(value: DeleteBasinRequest) -> Self {
699 let DeleteBasinRequest { basin, .. } = value;
700 Self { basin: basin.0 }
701 }
702}
703
704#[sync_docs]
705#[derive(Debug, Clone)]
706pub struct DeleteStreamRequest {
707 pub stream: String,
708 pub if_exists: bool,
710}
711
712impl DeleteStreamRequest {
713 pub fn new(stream: impl Into<String>) -> Self {
715 Self {
716 stream: stream.into(),
717 if_exists: false,
718 }
719 }
720
721 pub fn with_if_exists(self, if_exists: bool) -> Self {
723 Self { if_exists, ..self }
724 }
725}
726
727impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
728 fn from(value: DeleteStreamRequest) -> Self {
729 let DeleteStreamRequest { stream, .. } = value;
730 Self { stream }
731 }
732}
733
734#[sync_docs]
735#[derive(Debug, Clone)]
736pub struct ReconfigureBasinRequest {
737 pub basin: BasinName,
738 pub config: Option<BasinConfig>,
739 pub mask: Option<Vec<String>>,
740}
741
742impl ReconfigureBasinRequest {
743 pub fn new(basin: BasinName) -> Self {
745 Self {
746 basin,
747 config: None,
748 mask: None,
749 }
750 }
751
752 pub fn with_config(self, config: BasinConfig) -> Self {
754 Self {
755 config: Some(config),
756 ..self
757 }
758 }
759
760 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
762 Self {
763 mask: Some(mask.into()),
764 ..self
765 }
766 }
767}
768
769impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
770 fn from(value: ReconfigureBasinRequest) -> Self {
771 let ReconfigureBasinRequest {
772 basin,
773 config,
774 mask,
775 } = value;
776 Self {
777 basin: basin.0,
778 config: config.map(Into::into),
779 mask: mask.map(|paths| prost_types::FieldMask { paths }),
780 }
781 }
782}
783
784impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
785 type Error = ConvertError;
786 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
787 let api::ReconfigureBasinResponse { config } = value;
788 let config = config.ok_or("missing basin config")?;
789 config.try_into()
790 }
791}
792
793#[sync_docs]
794#[derive(Debug, Clone)]
795pub struct ReconfigureStreamRequest {
796 pub stream: String,
797 pub config: Option<StreamConfig>,
798 pub mask: Option<Vec<String>>,
799}
800
801impl ReconfigureStreamRequest {
802 pub fn new(stream: impl Into<String>) -> Self {
804 Self {
805 stream: stream.into(),
806 config: None,
807 mask: None,
808 }
809 }
810
811 pub fn with_config(self, config: StreamConfig) -> Self {
813 Self {
814 config: Some(config),
815 ..self
816 }
817 }
818
819 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
821 Self {
822 mask: Some(mask.into()),
823 ..self
824 }
825 }
826}
827
828impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
829 fn from(value: ReconfigureStreamRequest) -> Self {
830 let ReconfigureStreamRequest {
831 stream,
832 config,
833 mask,
834 } = value;
835 Self {
836 stream,
837 config: config.map(Into::into),
838 mask: mask.map(|paths| prost_types::FieldMask { paths }),
839 }
840 }
841}
842
843impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
844 type Error = ConvertError;
845 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
846 let api::ReconfigureStreamResponse { config } = value;
847 let config = config.ok_or("missing stream config")?;
848 config.try_into()
849 }
850}
851
852impl From<api::CheckTailResponse> for u64 {
853 fn from(value: api::CheckTailResponse) -> Self {
854 let api::CheckTailResponse { next_seq_num } = value;
855 next_seq_num
856 }
857}
858
859#[sync_docs]
860#[derive(Debug, Clone, PartialEq, Eq)]
861pub struct Header {
862 pub name: Bytes,
863 pub value: Bytes,
864}
865
866impl Header {
867 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
869 Self {
870 name: name.into(),
871 value: value.into(),
872 }
873 }
874
875 pub fn from_value(value: impl Into<Bytes>) -> Self {
877 Self {
878 name: Bytes::new(),
879 value: value.into(),
880 }
881 }
882}
883
884impl From<Header> for api::Header {
885 fn from(value: Header) -> Self {
886 let Header { name, value } = value;
887 Self { name, value }
888 }
889}
890
891impl From<api::Header> for Header {
892 fn from(value: api::Header) -> Self {
893 let api::Header { name, value } = value;
894 Self { name, value }
895 }
896}
897
898#[derive(Debug, Clone, Default, PartialEq, Eq)]
902pub struct FencingToken(Bytes);
903
904impl FencingToken {
905 const MAX_BYTES: usize = 16;
906
907 pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
909 let bytes = bytes.into();
910 if bytes.len() > Self::MAX_BYTES {
911 Err(format!(
912 "Size of a fencing token cannot exceed {} bytes",
913 Self::MAX_BYTES
914 )
915 .into())
916 } else {
917 Ok(Self(bytes))
918 }
919 }
920
921 pub fn generate(n: usize) -> Result<Self, ConvertError> {
923 Self::new(
924 rand::thread_rng()
925 .sample_iter(&Uniform::new_inclusive(0, u8::MAX))
926 .take(n)
927 .collect::<Bytes>(),
928 )
929 }
930}
931
932impl AsRef<Bytes> for FencingToken {
933 fn as_ref(&self) -> &Bytes {
934 &self.0
935 }
936}
937
938impl AsRef<[u8]> for FencingToken {
939 fn as_ref(&self) -> &[u8] {
940 &self.0
941 }
942}
943
944impl Deref for FencingToken {
945 type Target = [u8];
946
947 fn deref(&self) -> &Self::Target {
948 &self.0
949 }
950}
951
952impl From<FencingToken> for Bytes {
953 fn from(value: FencingToken) -> Self {
954 value.0
955 }
956}
957
958impl From<FencingToken> for Vec<u8> {
959 fn from(value: FencingToken) -> Self {
960 value.0.into()
961 }
962}
963
964impl TryFrom<Bytes> for FencingToken {
965 type Error = ConvertError;
966
967 fn try_from(value: Bytes) -> Result<Self, Self::Error> {
968 Self::new(value)
969 }
970}
971
972impl TryFrom<Vec<u8>> for FencingToken {
973 type Error = ConvertError;
974
975 fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
976 Self::new(value)
977 }
978}
979
980#[derive(Debug, Clone)]
982pub enum Command {
983 Fence {
988 fencing_token: FencingToken,
992 },
993 Trim {
998 seq_num: u64,
1003 },
1004}
1005
1006#[derive(Debug, Clone)]
1012pub struct CommandRecord {
1013 pub command: Command,
1015 pub timestamp: Option<u64>,
1017}
1018
1019impl CommandRecord {
1020 const FENCE: &[u8] = b"fence";
1021 const TRIM: &[u8] = b"trim";
1022
1023 pub fn fence(fencing_token: FencingToken) -> Self {
1025 Self {
1026 command: Command::Fence { fencing_token },
1027 timestamp: None,
1028 }
1029 }
1030
1031 pub fn trim(seq_num: impl Into<u64>) -> Self {
1033 Self {
1034 command: Command::Trim {
1035 seq_num: seq_num.into(),
1036 },
1037 timestamp: None,
1038 }
1039 }
1040
1041 pub fn with_timestamp(self, timestamp: u64) -> Self {
1043 Self {
1044 timestamp: Some(timestamp),
1045 ..self
1046 }
1047 }
1048}
1049
1050#[sync_docs]
1051#[derive(Debug, Clone, PartialEq, Eq)]
1052pub struct AppendRecord {
1053 timestamp: Option<u64>,
1054 headers: Vec<Header>,
1055 body: Bytes,
1056 #[cfg(test)]
1057 max_bytes: u64,
1058}
1059
1060metered_impl!(AppendRecord);
1061
1062impl AppendRecord {
1063 const MAX_BYTES: u64 = MIB_BYTES;
1064
1065 fn validated(self) -> Result<Self, ConvertError> {
1066 #[cfg(test)]
1067 let max_bytes = self.max_bytes;
1068 #[cfg(not(test))]
1069 let max_bytes = Self::MAX_BYTES;
1070
1071 if self.metered_bytes() > max_bytes {
1072 Err("AppendRecord should have metered size less than 1 MiB".into())
1073 } else {
1074 Ok(self)
1075 }
1076 }
1077
1078 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1080 Self {
1081 timestamp: None,
1082 headers: Vec::new(),
1083 body: body.into(),
1084 #[cfg(test)]
1085 max_bytes: Self::MAX_BYTES,
1086 }
1087 .validated()
1088 }
1089
1090 #[cfg(test)]
1091 pub(crate) fn with_max_bytes(
1092 max_bytes: u64,
1093 body: impl Into<Bytes>,
1094 ) -> Result<Self, ConvertError> {
1095 Self {
1096 timestamp: None,
1097 headers: Vec::new(),
1098 body: body.into(),
1099 max_bytes,
1100 }
1101 .validated()
1102 }
1103
1104 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1106 Self {
1107 headers: headers.into(),
1108 ..self
1109 }
1110 .validated()
1111 }
1112
1113 pub fn with_timestamp(self, timestamp: u64) -> Self {
1115 Self {
1116 timestamp: Some(timestamp),
1117 ..self
1118 }
1119 }
1120
1121 pub fn body(&self) -> &[u8] {
1123 &self.body
1124 }
1125
1126 pub fn headers(&self) -> &[Header] {
1128 &self.headers
1129 }
1130
1131 pub fn timestamp(&self) -> Option<u64> {
1133 self.timestamp
1134 }
1135
1136 pub fn into_parts(self) -> AppendRecordParts {
1138 AppendRecordParts {
1139 timestamp: self.timestamp,
1140 headers: self.headers,
1141 body: self.body,
1142 }
1143 }
1144
1145 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1147 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1148 if let Some(timestamp) = parts.timestamp {
1149 Ok(record.with_timestamp(timestamp))
1150 } else {
1151 Ok(record)
1152 }
1153 }
1154}
1155
1156impl From<AppendRecord> for api::AppendRecord {
1157 fn from(value: AppendRecord) -> Self {
1158 Self {
1159 timestamp: value.timestamp,
1160 headers: value.headers.into_iter().map(Into::into).collect(),
1161 body: value.body,
1162 }
1163 }
1164}
1165
1166impl From<CommandRecord> for AppendRecord {
1167 fn from(value: CommandRecord) -> Self {
1168 let (header_value, body) = match value.command {
1169 Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1170 Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1171 };
1172 AppendRecordParts {
1173 timestamp: value.timestamp,
1174 headers: vec![Header::from_value(header_value)],
1175 body: body.into(),
1176 }
1177 .try_into()
1178 .expect("command record is a valid append record")
1179 }
1180}
1181
1182#[sync_docs(AppendRecordParts = "AppendRecord")]
1183#[derive(Debug, Clone)]
1184pub struct AppendRecordParts {
1185 pub timestamp: Option<u64>,
1186 pub headers: Vec<Header>,
1187 pub body: Bytes,
1188}
1189
1190impl From<AppendRecord> for AppendRecordParts {
1191 fn from(value: AppendRecord) -> Self {
1192 value.into_parts()
1193 }
1194}
1195
1196impl TryFrom<AppendRecordParts> for AppendRecord {
1197 type Error = ConvertError;
1198
1199 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1200 Self::try_from_parts(value)
1201 }
1202}
1203
1204#[derive(Debug, Clone)]
1206pub struct AppendRecordBatch {
1207 records: Vec<AppendRecord>,
1208 metered_bytes: u64,
1209 max_capacity: usize,
1210 #[cfg(test)]
1211 max_bytes: u64,
1212}
1213
1214impl PartialEq for AppendRecordBatch {
1215 fn eq(&self, other: &Self) -> bool {
1216 if self.records.eq(&other.records) {
1217 assert_eq!(self.metered_bytes, other.metered_bytes);
1218 true
1219 } else {
1220 false
1221 }
1222 }
1223}
1224
1225impl Eq for AppendRecordBatch {}
1226
1227impl Default for AppendRecordBatch {
1228 fn default() -> Self {
1229 Self::new()
1230 }
1231}
1232
1233impl AppendRecordBatch {
1234 pub const MAX_CAPACITY: usize = 1000;
1238
1239 pub const MAX_BYTES: u64 = MIB_BYTES;
1241
1242 pub fn new() -> Self {
1244 Self::with_max_capacity(Self::MAX_CAPACITY)
1245 }
1246
1247 pub fn with_max_capacity(max_capacity: usize) -> Self {
1251 assert!(
1252 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1253 "Batch capacity must be between 1 and 1000"
1254 );
1255
1256 Self {
1257 records: Vec::with_capacity(max_capacity),
1258 metered_bytes: 0,
1259 max_capacity,
1260 #[cfg(test)]
1261 max_bytes: Self::MAX_BYTES,
1262 }
1263 }
1264
1265 #[cfg(test)]
1266 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1267 #[cfg(test)]
1268 assert!(
1269 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1270 "Batch size must be between 1 byte and 1 MiB"
1271 );
1272
1273 Self {
1274 max_bytes,
1275 ..Self::with_max_capacity(max_capacity)
1276 }
1277 }
1278
1279 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1285 where
1286 R: Into<AppendRecord>,
1287 T: IntoIterator<Item = R>,
1288 {
1289 let mut records = Self::new();
1290 let mut pending = Vec::new();
1291
1292 let mut iter = iter.into_iter();
1293
1294 for record in iter.by_ref() {
1295 if let Err(record) = records.push(record) {
1296 pending.push(record);
1297 break;
1298 }
1299 }
1300
1301 if pending.is_empty() {
1302 Ok(records)
1303 } else {
1304 pending.extend(iter.map(Into::into));
1305 Err((records, pending))
1306 }
1307 }
1308
1309 pub fn is_empty(&self) -> bool {
1311 if self.records.is_empty() {
1312 assert_eq!(self.metered_bytes, 0);
1313 true
1314 } else {
1315 false
1316 }
1317 }
1318
1319 pub fn len(&self) -> usize {
1321 self.records.len()
1322 }
1323
1324 #[cfg(test)]
1325 fn max_bytes(&self) -> u64 {
1326 self.max_bytes
1327 }
1328
1329 #[cfg(not(test))]
1330 fn max_bytes(&self) -> u64 {
1331 Self::MAX_BYTES
1332 }
1333
1334 pub fn is_full(&self) -> bool {
1336 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1337 }
1338
1339 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1341 assert!(self.records.len() <= self.max_capacity);
1342 assert!(self.metered_bytes <= self.max_bytes());
1343
1344 let record = record.into();
1345 let record_size = record.metered_bytes();
1346 if self.records.len() >= self.max_capacity
1347 || self.metered_bytes + record_size > self.max_bytes()
1348 {
1349 Err(record)
1350 } else {
1351 self.records.push(record);
1352 self.metered_bytes += record_size;
1353 Ok(())
1354 }
1355 }
1356}
1357
1358impl MeteredBytes for AppendRecordBatch {
1359 fn metered_bytes(&self) -> u64 {
1360 self.metered_bytes
1361 }
1362}
1363
1364impl IntoIterator for AppendRecordBatch {
1365 type Item = AppendRecord;
1366 type IntoIter = std::vec::IntoIter<Self::Item>;
1367
1368 fn into_iter(self) -> Self::IntoIter {
1369 self.records.into_iter()
1370 }
1371}
1372
1373impl<'a> IntoIterator for &'a AppendRecordBatch {
1374 type Item = &'a AppendRecord;
1375 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1376
1377 fn into_iter(self) -> Self::IntoIter {
1378 self.records.iter()
1379 }
1380}
1381
1382impl AsRef<[AppendRecord]> for AppendRecordBatch {
1383 fn as_ref(&self) -> &[AppendRecord] {
1384 &self.records
1385 }
1386}
1387
1388#[sync_docs]
1389#[derive(Debug, Default, Clone)]
1390pub struct AppendInput {
1391 pub records: AppendRecordBatch,
1392 pub match_seq_num: Option<u64>,
1393 pub fencing_token: Option<FencingToken>,
1394}
1395
1396impl MeteredBytes for AppendInput {
1397 fn metered_bytes(&self) -> u64 {
1398 self.records.metered_bytes()
1399 }
1400}
1401
1402impl AppendInput {
1403 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1405 Self {
1406 records: records.into(),
1407 match_seq_num: None,
1408 fencing_token: None,
1409 }
1410 }
1411
1412 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1414 Self {
1415 match_seq_num: Some(match_seq_num.into()),
1416 ..self
1417 }
1418 }
1419
1420 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1422 Self {
1423 fencing_token: Some(fencing_token),
1424 ..self
1425 }
1426 }
1427
1428 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1429 let Self {
1430 records,
1431 match_seq_num,
1432 fencing_token,
1433 } = self;
1434
1435 api::AppendInput {
1436 stream: stream.into(),
1437 records: records.into_iter().map(Into::into).collect(),
1438 match_seq_num,
1439 fencing_token: fencing_token.map(|f| f.0),
1440 }
1441 }
1442}
1443
1444#[sync_docs]
1445#[derive(Debug, Clone)]
1446pub struct AppendOutput {
1447 pub start_seq_num: u64,
1448 pub end_seq_num: u64,
1449 pub next_seq_num: u64,
1450}
1451
1452impl From<api::AppendOutput> for AppendOutput {
1453 fn from(value: api::AppendOutput) -> Self {
1454 let api::AppendOutput {
1455 start_seq_num,
1456 end_seq_num,
1457 next_seq_num,
1458 } = value;
1459 Self {
1460 start_seq_num,
1461 end_seq_num,
1462 next_seq_num,
1463 }
1464 }
1465}
1466
1467impl TryFrom<api::AppendResponse> for AppendOutput {
1468 type Error = ConvertError;
1469 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1470 let api::AppendResponse { output } = value;
1471 let output = output.ok_or("missing append output")?;
1472 Ok(output.into())
1473 }
1474}
1475
1476impl TryFrom<api::AppendSessionResponse> for AppendOutput {
1477 type Error = ConvertError;
1478 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1479 let api::AppendSessionResponse { output } = value;
1480 let output = output.ok_or("missing append output")?;
1481 Ok(output.into())
1482 }
1483}
1484
1485#[sync_docs]
1486#[derive(Debug, Clone, Default)]
1487pub struct ReadLimit {
1488 pub count: Option<u64>,
1489 pub bytes: Option<u64>,
1490}
1491
1492#[sync_docs]
1493#[derive(Debug, Clone, Default)]
1494pub struct ReadRequest {
1495 pub start_seq_num: u64,
1496 pub limit: ReadLimit,
1497}
1498
1499impl ReadRequest {
1500 pub fn new(start_seq_num: u64) -> Self {
1502 Self {
1503 start_seq_num,
1504 ..Default::default()
1505 }
1506 }
1507
1508 pub fn with_limit(self, limit: ReadLimit) -> Self {
1510 Self { limit, ..self }
1511 }
1512}
1513
1514impl ReadRequest {
1515 pub(crate) fn try_into_api_type(
1516 self,
1517 stream: impl Into<String>,
1518 ) -> Result<api::ReadRequest, ConvertError> {
1519 let Self {
1520 start_seq_num,
1521 limit,
1522 } = self;
1523
1524 let limit = if limit.count > Some(1000) {
1525 Err("read limit: count must not exceed 1000 for unary request")
1526 } else if limit.bytes > Some(MIB_BYTES) {
1527 Err("read limit: bytes must not exceed 1MiB for unary request")
1528 } else {
1529 Ok(api::ReadLimit {
1530 count: limit.count,
1531 bytes: limit.bytes,
1532 })
1533 }?;
1534
1535 Ok(api::ReadRequest {
1536 stream: stream.into(),
1537 start_seq_num,
1538 limit: Some(limit),
1539 })
1540 }
1541}
1542
1543#[sync_docs]
1544#[derive(Debug, Clone)]
1545pub struct SequencedRecord {
1546 pub seq_num: u64,
1547 pub timestamp: u64,
1548 pub headers: Vec<Header>,
1549 pub body: Bytes,
1550}
1551
1552metered_impl!(SequencedRecord);
1553
1554impl From<api::SequencedRecord> for SequencedRecord {
1555 fn from(value: api::SequencedRecord) -> Self {
1556 let api::SequencedRecord {
1557 seq_num,
1558 timestamp,
1559 headers,
1560 body,
1561 } = value;
1562 Self {
1563 seq_num,
1564 timestamp,
1565 headers: headers.into_iter().map(Into::into).collect(),
1566 body,
1567 }
1568 }
1569}
1570
1571impl SequencedRecord {
1572 pub fn as_command_record(&self) -> Option<CommandRecord> {
1574 if self.headers.len() != 1 {
1575 return None;
1576 }
1577
1578 let header = self.headers.first().expect("pre-validated length");
1579
1580 if !header.name.is_empty() {
1581 return None;
1582 }
1583
1584 match header.value.as_ref() {
1585 CommandRecord::FENCE => {
1586 let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1587 Some(CommandRecord {
1588 command: Command::Fence { fencing_token },
1589 timestamp: Some(self.timestamp),
1590 })
1591 }
1592 CommandRecord::TRIM => {
1593 let body: &[u8] = &self.body;
1594 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1595 Some(CommandRecord {
1596 command: Command::Trim { seq_num },
1597 timestamp: Some(self.timestamp),
1598 })
1599 }
1600 _ => None,
1601 }
1602 }
1603}
1604
1605#[sync_docs]
1606#[derive(Debug, Clone)]
1607pub struct SequencedRecordBatch {
1608 pub records: Vec<SequencedRecord>,
1609}
1610
1611impl MeteredBytes for SequencedRecordBatch {
1612 fn metered_bytes(&self) -> u64 {
1613 self.records.metered_bytes()
1614 }
1615}
1616
1617impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1618 fn from(value: api::SequencedRecordBatch) -> Self {
1619 let api::SequencedRecordBatch { records } = value;
1620 Self {
1621 records: records.into_iter().map(Into::into).collect(),
1622 }
1623 }
1624}
1625
1626#[sync_docs(ReadOutput = "Output")]
1627#[derive(Debug, Clone)]
1628pub enum ReadOutput {
1629 Batch(SequencedRecordBatch),
1630 FirstSeqNum(u64),
1631 NextSeqNum(u64),
1632}
1633
1634impl From<api::read_output::Output> for ReadOutput {
1635 fn from(value: api::read_output::Output) -> Self {
1636 match value {
1637 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1638 api::read_output::Output::FirstSeqNum(first_seq_num) => {
1639 Self::FirstSeqNum(first_seq_num)
1640 }
1641 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1642 }
1643 }
1644}
1645
1646impl TryFrom<api::ReadOutput> for ReadOutput {
1647 type Error = ConvertError;
1648 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1649 let api::ReadOutput { output } = value;
1650 let output = output.ok_or("missing read output")?;
1651 Ok(output.into())
1652 }
1653}
1654
1655impl TryFrom<api::ReadResponse> for ReadOutput {
1656 type Error = ConvertError;
1657 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1658 let api::ReadResponse { output } = value;
1659 let output = output.ok_or("missing output in read response")?;
1660 output.try_into()
1661 }
1662}
1663
1664#[sync_docs]
1665#[derive(Debug, Clone, Default)]
1666pub struct ReadSessionRequest {
1667 pub start_seq_num: u64,
1668 pub limit: ReadLimit,
1669}
1670
1671impl ReadSessionRequest {
1672 pub fn new(start_seq_num: u64) -> Self {
1674 Self {
1675 start_seq_num,
1676 ..Default::default()
1677 }
1678 }
1679
1680 pub fn with_limit(self, limit: ReadLimit) -> Self {
1682 Self { limit, ..self }
1683 }
1684
1685 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1686 let Self {
1687 start_seq_num,
1688 limit,
1689 } = self;
1690 api::ReadSessionRequest {
1691 stream: stream.into(),
1692 start_seq_num,
1693 limit: Some(api::ReadLimit {
1694 count: limit.count,
1695 bytes: limit.bytes,
1696 }),
1697 heartbeats: false,
1698 }
1699 }
1700}
1701
1702impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1703 type Error = ConvertError;
1704 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1705 let api::ReadSessionResponse { output } = value;
1706 let output = output.ok_or("missing output in read session response")?;
1707 output.try_into()
1708 }
1709}
1710
1711#[derive(Debug, Clone)]
1716pub struct BasinName(String);
1717
1718impl AsRef<str> for BasinName {
1719 fn as_ref(&self) -> &str {
1720 &self.0
1721 }
1722}
1723
1724impl Deref for BasinName {
1725 type Target = str;
1726 fn deref(&self) -> &Self::Target {
1727 &self.0
1728 }
1729}
1730
1731impl TryFrom<String> for BasinName {
1732 type Error = ConvertError;
1733
1734 fn try_from(name: String) -> Result<Self, Self::Error> {
1735 if name.len() < 8 || name.len() > 48 {
1736 return Err("Basin name must be between 8 and 48 characters in length".into());
1737 }
1738
1739 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1740 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1741 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1742 .expect("Failed to compile basin name regex")
1743 });
1744
1745 if !regex.is_match(&name) {
1746 return Err(
1747 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1748 It cannot begin or end with a hyphen."
1749 .into(),
1750 );
1751 }
1752
1753 Ok(Self(name))
1754 }
1755}
1756
1757impl FromStr for BasinName {
1758 type Err = ConvertError;
1759
1760 fn from_str(s: &str) -> Result<Self, Self::Err> {
1761 s.to_string().try_into()
1762 }
1763}
1764
1765impl std::fmt::Display for BasinName {
1766 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1767 f.write_str(&self.0)
1768 }
1769}
1770
1771#[derive(Debug, Clone)]
1774pub struct AccessTokenId(String);
1775
1776impl AsRef<str> for AccessTokenId {
1777 fn as_ref(&self) -> &str {
1778 &self.0
1779 }
1780}
1781
1782impl Deref for AccessTokenId {
1783 type Target = str;
1784
1785 fn deref(&self) -> &Self::Target {
1786 &self.0
1787 }
1788}
1789
1790impl TryFrom<String> for AccessTokenId {
1791 type Error = ConvertError;
1792
1793 fn try_from(name: String) -> Result<Self, Self::Error> {
1794 if name.is_empty() {
1795 return Err("Access token ID must not be empty".into());
1796 }
1797
1798 if name.len() > 50 {
1799 return Err("Access token ID must not exceed 50 characters".into());
1800 }
1801
1802 Ok(Self(name))
1803 }
1804}
1805
1806impl From<AccessTokenId> for String {
1807 fn from(value: AccessTokenId) -> Self {
1808 value.0
1809 }
1810}
1811
1812impl FromStr for AccessTokenId {
1813 type Err = ConvertError;
1814
1815 fn from_str(s: &str) -> Result<Self, Self::Err> {
1816 s.to_string().try_into()
1817 }
1818}
1819
1820impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1821 fn from(value: AccessTokenInfo) -> Self {
1822 Self {
1823 info: Some(value.into()),
1824 }
1825 }
1826}
1827
1828#[sync_docs]
1829#[derive(Debug, Clone)]
1830pub struct AccessTokenInfo {
1831 pub id: AccessTokenId,
1832 pub expires_at: Option<u32>,
1833 pub auto_prefix_streams: bool,
1834 pub scope: Option<AccessTokenScope>,
1835}
1836
1837impl AccessTokenInfo {
1838 pub fn new(id: AccessTokenId) -> Self {
1840 Self {
1841 id,
1842 expires_at: None,
1843 auto_prefix_streams: false,
1844 scope: None,
1845 }
1846 }
1847
1848 pub fn with_expires_at(self, expires_at: u32) -> Self {
1850 Self {
1851 expires_at: Some(expires_at),
1852 ..self
1853 }
1854 }
1855
1856 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1858 Self {
1859 auto_prefix_streams,
1860 ..self
1861 }
1862 }
1863
1864 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
1866 Self {
1867 scope: Some(scope),
1868 ..self
1869 }
1870 }
1871}
1872
1873impl From<AccessTokenInfo> for api::AccessTokenInfo {
1874 fn from(value: AccessTokenInfo) -> Self {
1875 let AccessTokenInfo {
1876 id,
1877 expires_at,
1878 auto_prefix_streams,
1879 scope,
1880 } = value;
1881 Self {
1882 id: id.into(),
1883 expires_at,
1884 auto_prefix_streams,
1885 scope: scope.map(Into::into),
1886 }
1887 }
1888}
1889
1890impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
1891 type Error = ConvertError;
1892 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
1893 let api::AccessTokenInfo {
1894 id,
1895 expires_at,
1896 auto_prefix_streams,
1897 scope,
1898 } = value;
1899 Ok(Self {
1900 id: id.try_into()?,
1901 expires_at,
1902 auto_prefix_streams,
1903 scope: scope.map(TryInto::try_into).transpose()?,
1904 })
1905 }
1906}
1907
1908#[sync_docs]
1909#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1910pub enum Operation {
1911 Unspecified,
1912 ListBasins,
1913 CreateBasin,
1914 DeleteBasin,
1915 ReconfigureBasin,
1916 GetBasinConfig,
1917 IssueAccessToken,
1918 RevokeAccessToken,
1919 ListAccessTokens,
1920 ListStreams,
1921 CreateStream,
1922 DeleteStream,
1923 GetStreamConfig,
1924 ReconfigureStream,
1925 CheckTail,
1926 Append,
1927 Read,
1928 Trim,
1929 Fence,
1930}
1931
1932impl FromStr for Operation {
1933 type Err = ConvertError;
1934
1935 fn from_str(s: &str) -> Result<Self, Self::Err> {
1936 match s.to_lowercase().as_str() {
1937 "unspecified" => Ok(Self::Unspecified),
1938 "list-basins" | "listbasins" => Ok(Self::ListBasins),
1939 "create-basin" | "createbasin" => Ok(Self::CreateBasin),
1940 "delete-basin" | "deletebasin" => Ok(Self::DeleteBasin),
1941 "reconfigure-basin" | "reconfigurebasin" => Ok(Self::ReconfigureBasin),
1942 "get-basin-config" | "getbasinconfig" => Ok(Self::GetBasinConfig),
1943 "issue-access-token" | "issueaccesstoken" => Ok(Self::IssueAccessToken),
1944 "revoke-access-token" | "revokeaccesstoken" => Ok(Self::RevokeAccessToken),
1945 "list-access-tokens" | "listaccesstokens" => Ok(Self::ListAccessTokens),
1946 "list-streams" | "liststreams" => Ok(Self::ListStreams),
1947 "create-stream" | "createstream" => Ok(Self::CreateStream),
1948 "delete-stream" | "deletestream" => Ok(Self::DeleteStream),
1949 "get-stream-config" | "getstreamconfig" => Ok(Self::GetStreamConfig),
1950 "reconfigure-stream" | "reconfigurestream" => Ok(Self::ReconfigureStream),
1951 "check-tail" | "checktail" => Ok(Self::CheckTail),
1952 "append" => Ok(Self::Append),
1953 "read" => Ok(Self::Read),
1954 "trim" => Ok(Self::Trim),
1955 "fence" => Ok(Self::Fence),
1956 _ => Err("invalid operation".into()),
1957 }
1958 }
1959}
1960
1961impl From<Operation> for i32 {
1962 fn from(value: Operation) -> Self {
1963 api::Operation::from(value).into()
1964 }
1965}
1966
1967impl TryFrom<i32> for Operation {
1968 type Error = ConvertError;
1969 fn try_from(value: i32) -> Result<Self, Self::Error> {
1970 api::Operation::try_from(value)
1971 .map_err(|_| "invalid operation".into())
1972 .map(Into::into)
1973 }
1974}
1975
1976impl From<Operation> for api::Operation {
1977 fn from(value: Operation) -> Self {
1978 match value {
1979 Operation::Unspecified => Self::Unspecified,
1980 Operation::ListBasins => Self::ListBasins,
1981 Operation::CreateBasin => Self::CreateBasin,
1982 Operation::DeleteBasin => Self::DeleteBasin,
1983 Operation::ReconfigureBasin => Self::ReconfigureBasin,
1984 Operation::GetBasinConfig => Self::GetBasinConfig,
1985 Operation::IssueAccessToken => Self::IssueAccessToken,
1986 Operation::RevokeAccessToken => Self::RevokeAccessToken,
1987 Operation::ListAccessTokens => Self::ListAccessTokens,
1988 Operation::ListStreams => Self::ListStreams,
1989 Operation::CreateStream => Self::CreateStream,
1990 Operation::DeleteStream => Self::DeleteStream,
1991 Operation::GetStreamConfig => Self::GetStreamConfig,
1992 Operation::ReconfigureStream => Self::ReconfigureStream,
1993 Operation::CheckTail => Self::CheckTail,
1994 Operation::Append => Self::Append,
1995 Operation::Read => Self::Read,
1996 Operation::Trim => Self::Trim,
1997 Operation::Fence => Self::Fence,
1998 }
1999 }
2000}
2001
2002impl From<api::Operation> for Operation {
2003 fn from(value: api::Operation) -> Self {
2004 match value {
2005 api::Operation::Unspecified => Self::Unspecified,
2006 api::Operation::ListBasins => Self::ListBasins,
2007 api::Operation::CreateBasin => Self::CreateBasin,
2008 api::Operation::DeleteBasin => Self::DeleteBasin,
2009 api::Operation::ReconfigureBasin => Self::ReconfigureBasin,
2010 api::Operation::GetBasinConfig => Self::GetBasinConfig,
2011 api::Operation::IssueAccessToken => Self::IssueAccessToken,
2012 api::Operation::RevokeAccessToken => Self::RevokeAccessToken,
2013 api::Operation::ListAccessTokens => Self::ListAccessTokens,
2014 api::Operation::ListStreams => Self::ListStreams,
2015 api::Operation::CreateStream => Self::CreateStream,
2016 api::Operation::DeleteStream => Self::DeleteStream,
2017 api::Operation::GetStreamConfig => Self::GetStreamConfig,
2018 api::Operation::ReconfigureStream => Self::ReconfigureStream,
2019 api::Operation::CheckTail => Self::CheckTail,
2020 api::Operation::Append => Self::Append,
2021 api::Operation::Read => Self::Read,
2022 api::Operation::Trim => Self::Trim,
2023 api::Operation::Fence => Self::Fence,
2024 }
2025 }
2026}
2027
2028#[sync_docs]
2029#[derive(Debug, Clone, Default)]
2030pub struct AccessTokenScope {
2031 pub basins: Option<ResourceSet>,
2032 pub streams: Option<ResourceSet>,
2033 pub tokens: Option<ResourceSet>,
2034 pub op_groups: Option<PermittedOperationGroups>,
2035 pub ops: HashSet<Operation>,
2036}
2037
2038impl AccessTokenScope {
2039 pub fn new() -> Self {
2041 Self::default()
2042 }
2043
2044 pub fn with_basins(self, basins: ResourceSet) -> Self {
2046 Self {
2047 basins: Some(basins),
2048 ..self
2049 }
2050 }
2051
2052 pub fn with_streams(self, streams: ResourceSet) -> Self {
2054 Self {
2055 streams: Some(streams),
2056 ..self
2057 }
2058 }
2059
2060 pub fn with_tokens(self, tokens: ResourceSet) -> Self {
2062 Self {
2063 tokens: Some(tokens),
2064 ..self
2065 }
2066 }
2067
2068 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2070 Self {
2071 op_groups: Some(op_groups),
2072 ..self
2073 }
2074 }
2075
2076 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2078 Self {
2079 ops: ops.into_iter().collect(),
2080 ..self
2081 }
2082 }
2083
2084 pub fn with_op(self, op: Operation) -> Self {
2086 let mut ops = self.ops;
2087 ops.insert(op);
2088 Self { ops, ..self }
2089 }
2090}
2091
2092impl From<AccessTokenScope> for api::AccessTokenScope {
2093 fn from(value: AccessTokenScope) -> Self {
2094 let AccessTokenScope {
2095 basins,
2096 streams,
2097 tokens,
2098 op_groups,
2099 ops,
2100 } = value;
2101 Self {
2102 basins: basins.map(Into::into),
2103 streams: streams.map(Into::into),
2104 tokens: tokens.map(Into::into),
2105 op_groups: op_groups.map(Into::into),
2106 ops: ops.into_iter().map(Into::into).collect(),
2107 }
2108 }
2109}
2110
2111impl TryFrom<api::AccessTokenScope> for AccessTokenScope {
2112 type Error = ConvertError;
2113 fn try_from(value: api::AccessTokenScope) -> Result<Self, Self::Error> {
2114 let api::AccessTokenScope {
2115 basins,
2116 streams,
2117 tokens,
2118 op_groups,
2119 ops,
2120 } = value;
2121 Ok(Self {
2122 basins: basins.and_then(|set| set.matching.map(Into::into)),
2123 streams: streams.and_then(|set| set.matching.map(Into::into)),
2124 tokens: tokens.and_then(|set| set.matching.map(Into::into)),
2125 op_groups: op_groups.map(Into::into),
2126 ops: ops
2127 .into_iter()
2128 .map(Operation::try_from)
2129 .collect::<Result<HashSet<_>, _>>()?,
2130 })
2131 }
2132}
2133
2134impl From<ResourceSet> for api::ResourceSet {
2135 fn from(value: ResourceSet) -> Self {
2136 Self {
2137 matching: Some(value.into()),
2138 }
2139 }
2140}
2141
2142#[sync_docs(ResourceSet = "Matching")]
2143#[derive(Debug, Clone)]
2144pub enum ResourceSet {
2145 Exact(String),
2146 Prefix(String),
2147}
2148
2149impl From<ResourceSet> for api::resource_set::Matching {
2150 fn from(value: ResourceSet) -> Self {
2151 match value {
2152 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2153 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2154 }
2155 }
2156}
2157
2158impl From<api::resource_set::Matching> for ResourceSet {
2159 fn from(value: api::resource_set::Matching) -> Self {
2160 match value {
2161 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2162 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2163 }
2164 }
2165}
2166
2167#[sync_docs]
2168#[derive(Debug, Clone, Default)]
2169pub struct PermittedOperationGroups {
2170 pub account: Option<ReadWritePermissions>,
2171 pub basin: Option<ReadWritePermissions>,
2172 pub stream: Option<ReadWritePermissions>,
2173}
2174
2175impl PermittedOperationGroups {
2176 pub fn new() -> Self {
2178 Self::default()
2179 }
2180
2181 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2183 Self {
2184 account: Some(account),
2185 ..self
2186 }
2187 }
2188
2189 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2191 Self {
2192 basin: Some(basin),
2193 ..self
2194 }
2195 }
2196
2197 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2199 Self {
2200 stream: Some(stream),
2201 ..self
2202 }
2203 }
2204}
2205
2206impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2207 fn from(value: PermittedOperationGroups) -> Self {
2208 let PermittedOperationGroups {
2209 account,
2210 basin,
2211 stream,
2212 } = value;
2213 Self {
2214 account: account.map(Into::into),
2215 basin: basin.map(Into::into),
2216 stream: stream.map(Into::into),
2217 }
2218 }
2219}
2220
2221impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2222 fn from(value: api::PermittedOperationGroups) -> Self {
2223 let api::PermittedOperationGroups {
2224 account,
2225 basin,
2226 stream,
2227 } = value;
2228 Self {
2229 account: account.map(Into::into),
2230 basin: basin.map(Into::into),
2231 stream: stream.map(Into::into),
2232 }
2233 }
2234}
2235
2236#[sync_docs]
2237#[derive(Debug, Clone, Default)]
2238pub struct ReadWritePermissions {
2239 pub read: bool,
2240 pub write: bool,
2241}
2242
2243impl ReadWritePermissions {
2244 pub fn new() -> Self {
2246 Self::default()
2247 }
2248
2249 pub fn with_read(self, read: bool) -> Self {
2251 Self { read, ..self }
2252 }
2253
2254 pub fn with_write(self, write: bool) -> Self {
2256 Self { write, ..self }
2257 }
2258}
2259
2260impl From<ReadWritePermissions> for api::ReadWritePermissions {
2261 fn from(value: ReadWritePermissions) -> Self {
2262 let ReadWritePermissions { read, write } = value;
2263 Self { read, write }
2264 }
2265}
2266
2267impl From<api::ReadWritePermissions> for ReadWritePermissions {
2268 fn from(value: api::ReadWritePermissions) -> Self {
2269 let api::ReadWritePermissions { read, write } = value;
2270 Self { read, write }
2271 }
2272}
2273
2274impl From<api::IssueAccessTokenResponse> for String {
2275 fn from(value: api::IssueAccessTokenResponse) -> Self {
2276 value.token
2277 }
2278}
2279
2280impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2281 fn from(value: AccessTokenId) -> Self {
2282 Self { id: value.into() }
2283 }
2284}
2285
2286impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2287 type Error = ConvertError;
2288 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2289 let token_info = value.info.ok_or("access token info is missing")?;
2290 token_info.try_into()
2291 }
2292}
2293
2294#[sync_docs]
2295#[derive(Debug, Clone, Default)]
2296pub struct ListAccessTokensRequest {
2297 pub prefix: String,
2298 pub start_after: String,
2299 pub limit: Option<usize>,
2300}
2301
2302impl ListAccessTokensRequest {
2303 pub fn new() -> Self {
2305 Self::default()
2306 }
2307
2308 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2310 Self {
2311 prefix: prefix.into(),
2312 ..self
2313 }
2314 }
2315
2316 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2318 Self {
2319 start_after: start_after.into(),
2320 ..self
2321 }
2322 }
2323
2324 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2326 Self {
2327 limit: limit.into(),
2328 ..self
2329 }
2330 }
2331}
2332
2333impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2334 type Error = ConvertError;
2335 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2336 let ListAccessTokensRequest {
2337 prefix,
2338 start_after,
2339 limit,
2340 } = value;
2341 Ok(Self {
2342 prefix,
2343 start_after,
2344 limit: limit
2345 .map(TryInto::try_into)
2346 .transpose()
2347 .map_err(|_| "request limit does not fit into u64 bounds")?,
2348 })
2349 }
2350}
2351
2352#[sync_docs]
2353#[derive(Debug, Clone)]
2354pub struct ListAccessTokensResponse {
2355 pub tokens: Vec<AccessTokenInfo>,
2356 pub has_more: bool,
2357}
2358
2359impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2360 type Error = ConvertError;
2361 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2362 let api::ListAccessTokensResponse { tokens, has_more } = value;
2363 let tokens = tokens
2364 .into_iter()
2365 .map(TryInto::try_into)
2366 .collect::<Result<Vec<_>, _>>()?;
2367 Ok(Self { tokens, has_more })
2368 }
2369}