1use std::{collections::HashSet, ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
4
5use bytes::Bytes;
6use rand::Rng;
7use regex::Regex;
8use sync_docs::sync_docs;
9
10use crate::api;
11
12pub(crate) const MIB_BYTES: u64 = 1024 * 1024;
13
14#[derive(Debug, Clone, thiserror::Error)]
16#[error("{0}")]
17pub struct ConvertError(String);
18
19impl<T: Into<String>> From<T> for ConvertError {
20 fn from(value: T) -> Self {
21 Self(value.into())
22 }
23}
24
25pub trait MeteredBytes {
33 fn metered_bytes(&self) -> u64;
35}
36
37impl<T: MeteredBytes> MeteredBytes for Vec<T> {
38 fn metered_bytes(&self) -> u64 {
39 self.iter().fold(0, |acc, item| acc + item.metered_bytes())
40 }
41}
42
43macro_rules! metered_impl {
44 ($ty:ty) => {
45 impl MeteredBytes for $ty {
46 fn metered_bytes(&self) -> u64 {
47 let bytes = 8
48 + (2 * self.headers.len())
49 + self
50 .headers
51 .iter()
52 .map(|h| h.name.len() + h.value.len())
53 .sum::<usize>()
54 + self.body.len();
55 bytes as u64
56 }
57 }
58 };
59}
60
61#[sync_docs]
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum BasinScope {
64 AwsUsEast1,
65}
66
67impl From<BasinScope> for api::BasinScope {
68 fn from(value: BasinScope) -> Self {
69 match value {
70 BasinScope::AwsUsEast1 => Self::AwsUsEast1,
71 }
72 }
73}
74
75impl From<api::BasinScope> for Option<BasinScope> {
76 fn from(value: api::BasinScope) -> Self {
77 match value {
78 api::BasinScope::Unspecified => None,
79 api::BasinScope::AwsUsEast1 => Some(BasinScope::AwsUsEast1),
80 }
81 }
82}
83
84impl FromStr for BasinScope {
85 type Err = ConvertError;
86 fn from_str(value: &str) -> Result<Self, Self::Err> {
87 match value {
88 "aws:us-east-1" => Ok(Self::AwsUsEast1),
89 _ => Err("invalid basin scope value".into()),
90 }
91 }
92}
93
94#[sync_docs]
95#[derive(Debug, Clone)]
96pub struct CreateBasinRequest {
97 pub basin: BasinName,
98 pub config: Option<BasinConfig>,
99 pub scope: Option<BasinScope>,
100}
101
102impl CreateBasinRequest {
103 pub fn new(basin: BasinName) -> Self {
105 Self {
106 basin,
107 config: None,
108 scope: None,
109 }
110 }
111
112 pub fn with_config(self, config: BasinConfig) -> Self {
114 Self {
115 config: Some(config),
116 ..self
117 }
118 }
119
120 pub fn with_scope(self, scope: BasinScope) -> Self {
122 Self {
123 scope: Some(scope),
124 ..self
125 }
126 }
127}
128
129impl From<CreateBasinRequest> for api::CreateBasinRequest {
130 fn from(value: CreateBasinRequest) -> Self {
131 let CreateBasinRequest {
132 basin,
133 config,
134 scope,
135 } = value;
136 Self {
137 basin: basin.0,
138 config: config.map(Into::into),
139 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
140 }
141 }
142}
143
144#[sync_docs]
145#[derive(Debug, Clone, Default)]
146pub struct BasinConfig {
147 pub default_stream_config: Option<StreamConfig>,
148 pub create_stream_on_append: bool,
149 pub create_stream_on_read: bool,
150}
151
152impl BasinConfig {
153 pub fn new() -> Self {
155 Self::default()
156 }
157
158 pub fn with_default_stream_config(self, default_stream_config: StreamConfig) -> Self {
160 Self {
161 default_stream_config: Some(default_stream_config),
162 ..self
163 }
164 }
165
166 pub fn with_create_stream_on_append(self, create_stream_on_append: bool) -> Self {
168 Self {
169 create_stream_on_append,
170 ..self
171 }
172 }
173
174 pub fn with_create_stream_on_read(self, create_stream_on_read: bool) -> Self {
176 Self {
177 create_stream_on_read,
178 ..self
179 }
180 }
181}
182
183impl From<BasinConfig> for api::BasinConfig {
184 fn from(value: BasinConfig) -> Self {
185 let BasinConfig {
186 default_stream_config,
187 create_stream_on_append,
188 create_stream_on_read,
189 } = value;
190 Self {
191 default_stream_config: default_stream_config.map(Into::into),
192 create_stream_on_append,
193 create_stream_on_read,
194 }
195 }
196}
197
198impl From<api::BasinConfig> for BasinConfig {
199 fn from(value: api::BasinConfig) -> Self {
200 let api::BasinConfig {
201 default_stream_config,
202 create_stream_on_append,
203 create_stream_on_read,
204 } = value;
205 Self {
206 default_stream_config: default_stream_config.map(Into::into),
207 create_stream_on_append,
208 create_stream_on_read,
209 }
210 }
211}
212
213#[sync_docs]
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub enum TimestampingMode {
216 ClientPrefer,
217 ClientRequire,
218 Arrival,
219}
220
221impl From<TimestampingMode> for api::TimestampingMode {
222 fn from(value: TimestampingMode) -> Self {
223 match value {
224 TimestampingMode::ClientPrefer => Self::ClientPrefer,
225 TimestampingMode::ClientRequire => Self::ClientRequire,
226 TimestampingMode::Arrival => Self::Arrival,
227 }
228 }
229}
230
231impl From<api::TimestampingMode> for Option<TimestampingMode> {
232 fn from(value: api::TimestampingMode) -> Self {
233 match value {
234 api::TimestampingMode::Unspecified => None,
235 api::TimestampingMode::ClientPrefer => Some(TimestampingMode::ClientPrefer),
236 api::TimestampingMode::ClientRequire => Some(TimestampingMode::ClientRequire),
237 api::TimestampingMode::Arrival => Some(TimestampingMode::Arrival),
238 }
239 }
240}
241
242#[sync_docs(TimestampingConfig = "Timestamping")]
243#[derive(Debug, Clone, Default)]
244pub struct TimestampingConfig {
246 pub mode: Option<TimestampingMode>,
247 pub uncapped: Option<bool>,
248}
249
250impl TimestampingConfig {
251 pub fn new() -> Self {
253 Self::default()
254 }
255
256 pub fn with_mode(self, mode: TimestampingMode) -> Self {
258 Self {
259 mode: Some(mode),
260 ..self
261 }
262 }
263
264 pub fn with_uncapped(self, uncapped: bool) -> Self {
266 Self {
267 uncapped: Some(uncapped),
268 ..self
269 }
270 }
271}
272
273impl From<TimestampingConfig> for api::stream_config::Timestamping {
274 fn from(value: TimestampingConfig) -> Self {
275 Self {
276 mode: value
277 .mode
278 .map(api::TimestampingMode::from)
279 .unwrap_or_default()
280 .into(),
281 uncapped: value.uncapped,
282 }
283 }
284}
285
286impl From<api::stream_config::Timestamping> for TimestampingConfig {
287 fn from(value: api::stream_config::Timestamping) -> Self {
288 let mode = value.mode().into();
289 let uncapped = value.uncapped;
290 Self { mode, uncapped }
291 }
292}
293
294#[sync_docs]
295#[derive(Debug, Clone, Default)]
296pub struct StreamConfig {
297 pub storage_class: Option<StorageClass>,
298 pub retention_policy: Option<RetentionPolicy>,
299 pub timestamping: Option<TimestampingConfig>,
300}
301
302impl StreamConfig {
303 pub fn new() -> Self {
305 Self::default()
306 }
307
308 pub fn with_storage_class(self, storage_class: StorageClass) -> Self {
310 Self {
311 storage_class: Some(storage_class),
312 ..self
313 }
314 }
315
316 pub fn with_retention_policy(self, retention_policy: RetentionPolicy) -> Self {
318 Self {
319 retention_policy: Some(retention_policy),
320 ..self
321 }
322 }
323
324 pub fn with_timestamping(self, timestamping: TimestampingConfig) -> Self {
326 Self {
327 timestamping: Some(timestamping),
328 ..self
329 }
330 }
331}
332
333impl From<StreamConfig> for api::StreamConfig {
334 fn from(value: StreamConfig) -> Self {
335 let StreamConfig {
336 storage_class,
337 retention_policy,
338 timestamping,
339 } = value;
340 Self {
341 storage_class: storage_class
342 .map(api::StorageClass::from)
343 .unwrap_or_default()
344 .into(),
345 retention_policy: retention_policy.map(Into::into),
346 timestamping: timestamping.map(Into::into),
347 }
348 }
349}
350
351impl From<api::StreamConfig> for StreamConfig {
352 fn from(value: api::StreamConfig) -> Self {
353 Self {
354 storage_class: value.storage_class().into(),
355 retention_policy: value.retention_policy.map(Into::into),
356 timestamping: value.timestamping.map(Into::into),
357 }
358 }
359}
360
361#[sync_docs]
362#[derive(Debug, Clone, Copy, PartialEq, Eq)]
363pub enum StorageClass {
364 Standard,
365 Express,
366}
367
368impl From<StorageClass> for api::StorageClass {
369 fn from(value: StorageClass) -> Self {
370 match value {
371 StorageClass::Standard => Self::Standard,
372 StorageClass::Express => Self::Express,
373 }
374 }
375}
376
377impl From<api::StorageClass> for Option<StorageClass> {
378 fn from(value: api::StorageClass) -> Self {
379 match value {
380 api::StorageClass::Unspecified => None,
381 api::StorageClass::Standard => Some(StorageClass::Standard),
382 api::StorageClass::Express => Some(StorageClass::Express),
383 }
384 }
385}
386
387impl FromStr for StorageClass {
388 type Err = ConvertError;
389
390 fn from_str(value: &str) -> Result<Self, Self::Err> {
391 match value {
392 "standard" => Ok(Self::Standard),
393 "express" => Ok(Self::Express),
394 v => Err(format!("unknown storage class: {v}").into()),
395 }
396 }
397}
398
399#[sync_docs(Age = "Age")]
400#[derive(Debug, Clone)]
401pub enum RetentionPolicy {
402 Age(Duration),
403}
404
405impl From<RetentionPolicy> for api::stream_config::RetentionPolicy {
406 fn from(value: RetentionPolicy) -> Self {
407 match value {
408 RetentionPolicy::Age(duration) => Self::Age(duration.as_secs()),
409 }
410 }
411}
412
413impl From<api::stream_config::RetentionPolicy> for RetentionPolicy {
414 fn from(value: api::stream_config::RetentionPolicy) -> Self {
415 match value {
416 api::stream_config::RetentionPolicy::Age(secs) => Self::Age(Duration::from_secs(secs)),
417 }
418 }
419}
420
421#[sync_docs]
422#[derive(Debug, Clone, Copy, PartialEq, Eq)]
423pub enum BasinState {
424 Active,
425 Creating,
426 Deleting,
427}
428
429impl From<BasinState> for api::BasinState {
430 fn from(value: BasinState) -> Self {
431 match value {
432 BasinState::Active => Self::Active,
433 BasinState::Creating => Self::Creating,
434 BasinState::Deleting => Self::Deleting,
435 }
436 }
437}
438
439impl From<api::BasinState> for Option<BasinState> {
440 fn from(value: api::BasinState) -> Self {
441 match value {
442 api::BasinState::Unspecified => None,
443 api::BasinState::Active => Some(BasinState::Active),
444 api::BasinState::Creating => Some(BasinState::Creating),
445 api::BasinState::Deleting => Some(BasinState::Deleting),
446 }
447 }
448}
449
450impl std::fmt::Display for BasinState {
451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452 match self {
453 BasinState::Active => write!(f, "active"),
454 BasinState::Creating => write!(f, "creating"),
455 BasinState::Deleting => write!(f, "deleting"),
456 }
457 }
458}
459
460#[sync_docs]
461#[derive(Debug, Clone)]
462pub struct BasinInfo {
463 pub name: String,
464 pub scope: Option<BasinScope>,
465 pub state: Option<BasinState>,
466}
467
468impl From<BasinInfo> for api::BasinInfo {
469 fn from(value: BasinInfo) -> Self {
470 let BasinInfo { name, scope, state } = value;
471 Self {
472 name,
473 scope: scope.map(api::BasinScope::from).unwrap_or_default().into(),
474 state: state.map(api::BasinState::from).unwrap_or_default().into(),
475 }
476 }
477}
478
479impl From<api::BasinInfo> for BasinInfo {
480 fn from(value: api::BasinInfo) -> Self {
481 let scope = value.scope().into();
482 let state = value.state().into();
483 let name = value.name;
484 Self { name, scope, state }
485 }
486}
487
488impl TryFrom<api::CreateBasinResponse> for BasinInfo {
489 type Error = ConvertError;
490 fn try_from(value: api::CreateBasinResponse) -> Result<Self, Self::Error> {
491 let api::CreateBasinResponse { info } = value;
492 let info = info.ok_or("missing basin info")?;
493 Ok(info.into())
494 }
495}
496
497#[sync_docs]
498#[derive(Debug, Clone, Default)]
499pub struct ListStreamsRequest {
500 pub prefix: String,
501 pub start_after: String,
502 pub limit: Option<usize>,
503}
504
505impl ListStreamsRequest {
506 pub fn new() -> Self {
508 Self::default()
509 }
510
511 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
513 Self {
514 prefix: prefix.into(),
515 ..self
516 }
517 }
518
519 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
521 Self {
522 start_after: start_after.into(),
523 ..self
524 }
525 }
526
527 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
529 Self {
530 limit: limit.into(),
531 ..self
532 }
533 }
534}
535
536impl TryFrom<ListStreamsRequest> for api::ListStreamsRequest {
537 type Error = ConvertError;
538 fn try_from(value: ListStreamsRequest) -> Result<Self, Self::Error> {
539 let ListStreamsRequest {
540 prefix,
541 start_after,
542 limit,
543 } = value;
544 Ok(Self {
545 prefix,
546 start_after,
547 limit: limit.map(|n| n as u64),
548 })
549 }
550}
551
552#[sync_docs]
553#[derive(Debug, Clone)]
554pub struct StreamInfo {
555 pub name: String,
556 pub created_at: u32,
557 pub deleted_at: Option<u32>,
558}
559
560impl From<api::StreamInfo> for StreamInfo {
561 fn from(value: api::StreamInfo) -> Self {
562 Self {
563 name: value.name,
564 created_at: value.created_at,
565 deleted_at: value.deleted_at,
566 }
567 }
568}
569
570impl TryFrom<api::CreateStreamResponse> for StreamInfo {
571 type Error = ConvertError;
572
573 fn try_from(value: api::CreateStreamResponse) -> Result<Self, Self::Error> {
574 let api::CreateStreamResponse { info } = value;
575 let info = info.ok_or("missing stream info")?;
576 Ok(info.into())
577 }
578}
579
580#[sync_docs]
581#[derive(Debug, Clone)]
582pub struct ListStreamsResponse {
583 pub streams: Vec<StreamInfo>,
584 pub has_more: bool,
585}
586
587impl From<api::ListStreamsResponse> for ListStreamsResponse {
588 fn from(value: api::ListStreamsResponse) -> Self {
589 let api::ListStreamsResponse { streams, has_more } = value;
590 let streams = streams.into_iter().map(Into::into).collect();
591 Self { streams, has_more }
592 }
593}
594
595impl TryFrom<api::GetBasinConfigResponse> for BasinConfig {
596 type Error = ConvertError;
597
598 fn try_from(value: api::GetBasinConfigResponse) -> Result<Self, Self::Error> {
599 let api::GetBasinConfigResponse { config } = value;
600 let config = config.ok_or("missing basin config")?;
601 Ok(config.into())
602 }
603}
604
605impl TryFrom<api::GetStreamConfigResponse> for StreamConfig {
606 type Error = ConvertError;
607
608 fn try_from(value: api::GetStreamConfigResponse) -> Result<Self, Self::Error> {
609 let api::GetStreamConfigResponse { config } = value;
610 let config = config.ok_or("missing stream config")?;
611 Ok(config.into())
612 }
613}
614
615#[sync_docs]
616#[derive(Debug, Clone)]
617pub struct CreateStreamRequest {
618 pub stream: String,
619 pub config: Option<StreamConfig>,
620}
621
622impl CreateStreamRequest {
623 pub fn new(stream: impl Into<String>) -> Self {
625 Self {
626 stream: stream.into(),
627 config: None,
628 }
629 }
630
631 pub fn with_config(self, config: StreamConfig) -> Self {
633 Self {
634 config: Some(config),
635 ..self
636 }
637 }
638}
639
640impl From<CreateStreamRequest> for api::CreateStreamRequest {
641 fn from(value: CreateStreamRequest) -> Self {
642 let CreateStreamRequest { stream, config } = value;
643 Self {
644 stream,
645 config: config.map(Into::into),
646 }
647 }
648}
649
650#[sync_docs]
651#[derive(Debug, Clone, Default)]
652pub struct ListBasinsRequest {
653 pub prefix: String,
654 pub start_after: String,
655 pub limit: Option<usize>,
656}
657
658impl ListBasinsRequest {
659 pub fn new() -> Self {
661 Self::default()
662 }
663
664 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
666 Self {
667 prefix: prefix.into(),
668 ..self
669 }
670 }
671
672 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
674 Self {
675 start_after: start_after.into(),
676 ..self
677 }
678 }
679
680 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
682 Self {
683 limit: limit.into(),
684 ..self
685 }
686 }
687}
688
689impl TryFrom<ListBasinsRequest> for api::ListBasinsRequest {
690 type Error = ConvertError;
691 fn try_from(value: ListBasinsRequest) -> Result<Self, Self::Error> {
692 let ListBasinsRequest {
693 prefix,
694 start_after,
695 limit,
696 } = value;
697 Ok(Self {
698 prefix,
699 start_after,
700 limit: limit
701 .map(TryInto::try_into)
702 .transpose()
703 .map_err(|_| "request limit does not fit into u64 bounds")?,
704 })
705 }
706}
707
708#[sync_docs]
709#[derive(Debug, Clone)]
710pub struct ListBasinsResponse {
711 pub basins: Vec<BasinInfo>,
712 pub has_more: bool,
713}
714
715impl TryFrom<api::ListBasinsResponse> for ListBasinsResponse {
716 type Error = ConvertError;
717 fn try_from(value: api::ListBasinsResponse) -> Result<Self, ConvertError> {
718 let api::ListBasinsResponse { basins, has_more } = value;
719 Ok(Self {
720 basins: basins.into_iter().map(Into::into).collect(),
721 has_more,
722 })
723 }
724}
725
726#[sync_docs]
727#[derive(Debug, Clone)]
728pub struct DeleteBasinRequest {
729 pub basin: BasinName,
730 pub if_exists: bool,
732}
733
734impl DeleteBasinRequest {
735 pub fn new(basin: BasinName) -> Self {
737 Self {
738 basin,
739 if_exists: false,
740 }
741 }
742
743 pub fn with_if_exists(self, if_exists: bool) -> Self {
745 Self { if_exists, ..self }
746 }
747}
748
749impl From<DeleteBasinRequest> for api::DeleteBasinRequest {
750 fn from(value: DeleteBasinRequest) -> Self {
751 let DeleteBasinRequest { basin, .. } = value;
752 Self { basin: basin.0 }
753 }
754}
755
756#[sync_docs]
757#[derive(Debug, Clone)]
758pub struct DeleteStreamRequest {
759 pub stream: String,
760 pub if_exists: bool,
762}
763
764impl DeleteStreamRequest {
765 pub fn new(stream: impl Into<String>) -> Self {
767 Self {
768 stream: stream.into(),
769 if_exists: false,
770 }
771 }
772
773 pub fn with_if_exists(self, if_exists: bool) -> Self {
775 Self { if_exists, ..self }
776 }
777}
778
779impl From<DeleteStreamRequest> for api::DeleteStreamRequest {
780 fn from(value: DeleteStreamRequest) -> Self {
781 let DeleteStreamRequest { stream, .. } = value;
782 Self { stream }
783 }
784}
785
786#[sync_docs]
787#[derive(Debug, Clone)]
788pub struct ReconfigureBasinRequest {
789 pub basin: BasinName,
790 pub config: Option<BasinConfig>,
791 pub mask: Option<Vec<String>>,
792}
793
794impl ReconfigureBasinRequest {
795 pub fn new(basin: BasinName) -> Self {
797 Self {
798 basin,
799 config: None,
800 mask: None,
801 }
802 }
803
804 pub fn with_config(self, config: BasinConfig) -> Self {
806 Self {
807 config: Some(config),
808 ..self
809 }
810 }
811
812 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
814 Self {
815 mask: Some(mask.into()),
816 ..self
817 }
818 }
819}
820
821impl From<ReconfigureBasinRequest> for api::ReconfigureBasinRequest {
822 fn from(value: ReconfigureBasinRequest) -> Self {
823 let ReconfigureBasinRequest {
824 basin,
825 config,
826 mask,
827 } = value;
828 Self {
829 basin: basin.0,
830 config: config.map(Into::into),
831 mask: mask.map(|paths| prost_types::FieldMask { paths }),
832 }
833 }
834}
835
836impl TryFrom<api::ReconfigureBasinResponse> for BasinConfig {
837 type Error = ConvertError;
838 fn try_from(value: api::ReconfigureBasinResponse) -> Result<Self, Self::Error> {
839 let api::ReconfigureBasinResponse { config } = value;
840 let config = config.ok_or("missing basin config")?;
841 Ok(config.into())
842 }
843}
844
845#[sync_docs]
846#[derive(Debug, Clone)]
847pub struct ReconfigureStreamRequest {
848 pub stream: String,
849 pub config: Option<StreamConfig>,
850 pub mask: Option<Vec<String>>,
851}
852
853impl ReconfigureStreamRequest {
854 pub fn new(stream: impl Into<String>) -> Self {
856 Self {
857 stream: stream.into(),
858 config: None,
859 mask: None,
860 }
861 }
862
863 pub fn with_config(self, config: StreamConfig) -> Self {
865 Self {
866 config: Some(config),
867 ..self
868 }
869 }
870
871 pub fn with_mask(self, mask: impl Into<Vec<String>>) -> Self {
873 Self {
874 mask: Some(mask.into()),
875 ..self
876 }
877 }
878}
879
880impl From<ReconfigureStreamRequest> for api::ReconfigureStreamRequest {
881 fn from(value: ReconfigureStreamRequest) -> Self {
882 let ReconfigureStreamRequest {
883 stream,
884 config,
885 mask,
886 } = value;
887 Self {
888 stream,
889 config: config.map(Into::into),
890 mask: mask.map(|paths| prost_types::FieldMask { paths }),
891 }
892 }
893}
894
895impl TryFrom<api::ReconfigureStreamResponse> for StreamConfig {
896 type Error = ConvertError;
897 fn try_from(value: api::ReconfigureStreamResponse) -> Result<Self, Self::Error> {
898 let api::ReconfigureStreamResponse { config } = value;
899 let config = config.ok_or("missing stream config")?;
900 Ok(config.into())
901 }
902}
903
904impl From<api::CheckTailResponse> for StreamPosition {
905 fn from(value: api::CheckTailResponse) -> Self {
906 let api::CheckTailResponse {
907 next_seq_num,
908 last_timestamp,
909 } = value;
910 StreamPosition {
911 seq_num: next_seq_num,
912 timestamp: last_timestamp,
913 }
914 }
915}
916
917#[derive(Debug, Clone, PartialEq, Eq)]
919pub struct StreamPosition {
920 pub seq_num: u64,
922 pub timestamp: u64,
925}
926
927#[sync_docs]
928#[derive(Debug, Clone, PartialEq, Eq)]
929pub struct Header {
930 pub name: Bytes,
931 pub value: Bytes,
932}
933
934impl Header {
935 pub fn new(name: impl Into<Bytes>, value: impl Into<Bytes>) -> Self {
937 Self {
938 name: name.into(),
939 value: value.into(),
940 }
941 }
942
943 pub fn from_value(value: impl Into<Bytes>) -> Self {
945 Self {
946 name: Bytes::new(),
947 value: value.into(),
948 }
949 }
950}
951
952impl From<Header> for api::Header {
953 fn from(value: Header) -> Self {
954 let Header { name, value } = value;
955 Self { name, value }
956 }
957}
958
959impl From<api::Header> for Header {
960 fn from(value: api::Header) -> Self {
961 let api::Header { name, value } = value;
962 Self { name, value }
963 }
964}
965
966#[derive(Debug, Clone, Default, PartialEq, Eq)]
970pub struct FencingToken(Bytes);
971
972impl FencingToken {
973 const MAX_BYTES: usize = 16;
974
975 pub fn new(bytes: impl Into<Bytes>) -> Result<Self, ConvertError> {
977 let bytes = bytes.into();
978 if bytes.len() > Self::MAX_BYTES {
979 Err(format!(
980 "Size of a fencing token cannot exceed {} bytes",
981 Self::MAX_BYTES
982 )
983 .into())
984 } else {
985 Ok(Self(bytes))
986 }
987 }
988
989 pub fn generate(n: usize) -> Result<Self, ConvertError> {
991 Self::new(
992 rand::rng()
993 .sample_iter(rand::distr::StandardUniform)
994 .take(n)
995 .collect::<Bytes>(),
996 )
997 }
998}
999
1000impl AsRef<Bytes> for FencingToken {
1001 fn as_ref(&self) -> &Bytes {
1002 &self.0
1003 }
1004}
1005
1006impl AsRef<[u8]> for FencingToken {
1007 fn as_ref(&self) -> &[u8] {
1008 &self.0
1009 }
1010}
1011
1012impl Deref for FencingToken {
1013 type Target = [u8];
1014
1015 fn deref(&self) -> &Self::Target {
1016 &self.0
1017 }
1018}
1019
1020impl From<FencingToken> for Bytes {
1021 fn from(value: FencingToken) -> Self {
1022 value.0
1023 }
1024}
1025
1026impl From<FencingToken> for Vec<u8> {
1027 fn from(value: FencingToken) -> Self {
1028 value.0.into()
1029 }
1030}
1031
1032impl TryFrom<Bytes> for FencingToken {
1033 type Error = ConvertError;
1034
1035 fn try_from(value: Bytes) -> Result<Self, Self::Error> {
1036 Self::new(value)
1037 }
1038}
1039
1040impl TryFrom<Vec<u8>> for FencingToken {
1041 type Error = ConvertError;
1042
1043 fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
1044 Self::new(value)
1045 }
1046}
1047
1048#[derive(Debug, Clone)]
1050pub enum Command {
1051 Fence {
1056 fencing_token: FencingToken,
1060 },
1061 Trim {
1066 seq_num: u64,
1071 },
1072}
1073
1074#[derive(Debug, Clone)]
1080pub struct CommandRecord {
1081 pub command: Command,
1083 pub timestamp: Option<u64>,
1085}
1086
1087impl CommandRecord {
1088 const FENCE: &[u8] = b"fence";
1089 const TRIM: &[u8] = b"trim";
1090
1091 pub fn fence(fencing_token: FencingToken) -> Self {
1093 Self {
1094 command: Command::Fence { fencing_token },
1095 timestamp: None,
1096 }
1097 }
1098
1099 pub fn trim(seq_num: impl Into<u64>) -> Self {
1101 Self {
1102 command: Command::Trim {
1103 seq_num: seq_num.into(),
1104 },
1105 timestamp: None,
1106 }
1107 }
1108
1109 pub fn with_timestamp(self, timestamp: u64) -> Self {
1111 Self {
1112 timestamp: Some(timestamp),
1113 ..self
1114 }
1115 }
1116}
1117
1118#[sync_docs]
1119#[derive(Debug, Clone, PartialEq, Eq)]
1120pub struct AppendRecord {
1121 timestamp: Option<u64>,
1122 headers: Vec<Header>,
1123 body: Bytes,
1124 #[cfg(test)]
1125 max_bytes: u64,
1126}
1127
1128metered_impl!(AppendRecord);
1129
1130impl AppendRecord {
1131 const MAX_BYTES: u64 = MIB_BYTES;
1132
1133 fn validated(self) -> Result<Self, ConvertError> {
1134 #[cfg(test)]
1135 let max_bytes = self.max_bytes;
1136 #[cfg(not(test))]
1137 let max_bytes = Self::MAX_BYTES;
1138
1139 if self.metered_bytes() > max_bytes {
1140 Err("AppendRecord should have metered size less than 1 MiB".into())
1141 } else {
1142 Ok(self)
1143 }
1144 }
1145
1146 pub fn new(body: impl Into<Bytes>) -> Result<Self, ConvertError> {
1148 Self {
1149 timestamp: None,
1150 headers: Vec::new(),
1151 body: body.into(),
1152 #[cfg(test)]
1153 max_bytes: Self::MAX_BYTES,
1154 }
1155 .validated()
1156 }
1157
1158 #[cfg(test)]
1159 pub(crate) fn with_max_bytes(
1160 max_bytes: u64,
1161 body: impl Into<Bytes>,
1162 ) -> Result<Self, ConvertError> {
1163 Self {
1164 timestamp: None,
1165 headers: Vec::new(),
1166 body: body.into(),
1167 max_bytes,
1168 }
1169 .validated()
1170 }
1171
1172 pub fn with_headers(self, headers: impl Into<Vec<Header>>) -> Result<Self, ConvertError> {
1174 Self {
1175 headers: headers.into(),
1176 ..self
1177 }
1178 .validated()
1179 }
1180
1181 pub fn with_timestamp(self, timestamp: u64) -> Self {
1183 Self {
1184 timestamp: Some(timestamp),
1185 ..self
1186 }
1187 }
1188
1189 pub fn body(&self) -> &[u8] {
1191 &self.body
1192 }
1193
1194 pub fn headers(&self) -> &[Header] {
1196 &self.headers
1197 }
1198
1199 pub fn timestamp(&self) -> Option<u64> {
1201 self.timestamp
1202 }
1203
1204 pub fn into_parts(self) -> AppendRecordParts {
1206 AppendRecordParts {
1207 timestamp: self.timestamp,
1208 headers: self.headers,
1209 body: self.body,
1210 }
1211 }
1212
1213 pub fn try_from_parts(parts: AppendRecordParts) -> Result<Self, ConvertError> {
1215 let record = Self::new(parts.body)?.with_headers(parts.headers)?;
1216 if let Some(timestamp) = parts.timestamp {
1217 Ok(record.with_timestamp(timestamp))
1218 } else {
1219 Ok(record)
1220 }
1221 }
1222}
1223
1224impl From<AppendRecord> for api::AppendRecord {
1225 fn from(value: AppendRecord) -> Self {
1226 Self {
1227 timestamp: value.timestamp,
1228 headers: value.headers.into_iter().map(Into::into).collect(),
1229 body: value.body,
1230 }
1231 }
1232}
1233
1234impl From<CommandRecord> for AppendRecord {
1235 fn from(value: CommandRecord) -> Self {
1236 let (header_value, body) = match value.command {
1237 Command::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
1238 Command::Trim { seq_num } => (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()),
1239 };
1240 AppendRecordParts {
1241 timestamp: value.timestamp,
1242 headers: vec![Header::from_value(header_value)],
1243 body: body.into(),
1244 }
1245 .try_into()
1246 .expect("command record is a valid append record")
1247 }
1248}
1249
1250#[sync_docs(AppendRecordParts = "AppendRecord")]
1251#[derive(Debug, Clone)]
1252pub struct AppendRecordParts {
1253 pub timestamp: Option<u64>,
1254 pub headers: Vec<Header>,
1255 pub body: Bytes,
1256}
1257
1258impl From<AppendRecord> for AppendRecordParts {
1259 fn from(value: AppendRecord) -> Self {
1260 value.into_parts()
1261 }
1262}
1263
1264impl TryFrom<AppendRecordParts> for AppendRecord {
1265 type Error = ConvertError;
1266
1267 fn try_from(value: AppendRecordParts) -> Result<Self, Self::Error> {
1268 Self::try_from_parts(value)
1269 }
1270}
1271
1272#[derive(Debug, Clone)]
1274pub struct AppendRecordBatch {
1275 records: Vec<AppendRecord>,
1276 metered_bytes: u64,
1277 max_capacity: usize,
1278 #[cfg(test)]
1279 max_bytes: u64,
1280}
1281
1282impl PartialEq for AppendRecordBatch {
1283 fn eq(&self, other: &Self) -> bool {
1284 if self.records.eq(&other.records) {
1285 assert_eq!(self.metered_bytes, other.metered_bytes);
1286 true
1287 } else {
1288 false
1289 }
1290 }
1291}
1292
1293impl Eq for AppendRecordBatch {}
1294
1295impl Default for AppendRecordBatch {
1296 fn default() -> Self {
1297 Self::new()
1298 }
1299}
1300
1301impl AppendRecordBatch {
1302 pub const MAX_CAPACITY: usize = 1000;
1306
1307 pub const MAX_BYTES: u64 = MIB_BYTES;
1309
1310 pub fn new() -> Self {
1312 Self::with_max_capacity(Self::MAX_CAPACITY)
1313 }
1314
1315 pub fn with_max_capacity(max_capacity: usize) -> Self {
1319 assert!(
1320 max_capacity > 0 && max_capacity <= Self::MAX_CAPACITY,
1321 "Batch capacity must be between 1 and 1000"
1322 );
1323
1324 Self {
1325 records: Vec::with_capacity(max_capacity),
1326 metered_bytes: 0,
1327 max_capacity,
1328 #[cfg(test)]
1329 max_bytes: Self::MAX_BYTES,
1330 }
1331 }
1332
1333 #[cfg(test)]
1334 pub(crate) fn with_max_capacity_and_bytes(max_capacity: usize, max_bytes: u64) -> Self {
1335 #[cfg(test)]
1336 assert!(
1337 max_bytes > 0 || max_bytes <= Self::MAX_BYTES,
1338 "Batch size must be between 1 byte and 1 MiB"
1339 );
1340
1341 Self {
1342 max_bytes,
1343 ..Self::with_max_capacity(max_capacity)
1344 }
1345 }
1346
1347 pub fn try_from_iter<R, T>(iter: T) -> Result<Self, (Self, Vec<AppendRecord>)>
1353 where
1354 R: Into<AppendRecord>,
1355 T: IntoIterator<Item = R>,
1356 {
1357 let mut records = Self::new();
1358 let mut pending = Vec::new();
1359
1360 let mut iter = iter.into_iter();
1361
1362 for record in iter.by_ref() {
1363 if let Err(record) = records.push(record) {
1364 pending.push(record);
1365 break;
1366 }
1367 }
1368
1369 if pending.is_empty() {
1370 Ok(records)
1371 } else {
1372 pending.extend(iter.map(Into::into));
1373 Err((records, pending))
1374 }
1375 }
1376
1377 pub fn is_empty(&self) -> bool {
1379 if self.records.is_empty() {
1380 assert_eq!(self.metered_bytes, 0);
1381 true
1382 } else {
1383 false
1384 }
1385 }
1386
1387 pub fn len(&self) -> usize {
1389 self.records.len()
1390 }
1391
1392 #[cfg(test)]
1393 fn max_bytes(&self) -> u64 {
1394 self.max_bytes
1395 }
1396
1397 #[cfg(not(test))]
1398 fn max_bytes(&self) -> u64 {
1399 Self::MAX_BYTES
1400 }
1401
1402 pub fn is_full(&self) -> bool {
1404 self.records.len() >= self.max_capacity || self.metered_bytes >= self.max_bytes()
1405 }
1406
1407 pub fn push(&mut self, record: impl Into<AppendRecord>) -> Result<(), AppendRecord> {
1409 assert!(self.records.len() <= self.max_capacity);
1410 assert!(self.metered_bytes <= self.max_bytes());
1411
1412 let record = record.into();
1413 let record_size = record.metered_bytes();
1414 if self.records.len() >= self.max_capacity
1415 || self.metered_bytes + record_size > self.max_bytes()
1416 {
1417 Err(record)
1418 } else {
1419 self.records.push(record);
1420 self.metered_bytes += record_size;
1421 Ok(())
1422 }
1423 }
1424}
1425
1426impl MeteredBytes for AppendRecordBatch {
1427 fn metered_bytes(&self) -> u64 {
1428 self.metered_bytes
1429 }
1430}
1431
1432impl IntoIterator for AppendRecordBatch {
1433 type Item = AppendRecord;
1434 type IntoIter = std::vec::IntoIter<Self::Item>;
1435
1436 fn into_iter(self) -> Self::IntoIter {
1437 self.records.into_iter()
1438 }
1439}
1440
1441impl<'a> IntoIterator for &'a AppendRecordBatch {
1442 type Item = &'a AppendRecord;
1443 type IntoIter = std::slice::Iter<'a, AppendRecord>;
1444
1445 fn into_iter(self) -> Self::IntoIter {
1446 self.records.iter()
1447 }
1448}
1449
1450impl AsRef<[AppendRecord]> for AppendRecordBatch {
1451 fn as_ref(&self) -> &[AppendRecord] {
1452 &self.records
1453 }
1454}
1455
1456#[sync_docs]
1457#[derive(Debug, Default, Clone)]
1458pub struct AppendInput {
1459 pub records: AppendRecordBatch,
1460 pub match_seq_num: Option<u64>,
1461 pub fencing_token: Option<FencingToken>,
1462}
1463
1464impl MeteredBytes for AppendInput {
1465 fn metered_bytes(&self) -> u64 {
1466 self.records.metered_bytes()
1467 }
1468}
1469
1470impl AppendInput {
1471 pub fn new(records: impl Into<AppendRecordBatch>) -> Self {
1473 Self {
1474 records: records.into(),
1475 match_seq_num: None,
1476 fencing_token: None,
1477 }
1478 }
1479
1480 pub fn with_match_seq_num(self, match_seq_num: impl Into<u64>) -> Self {
1482 Self {
1483 match_seq_num: Some(match_seq_num.into()),
1484 ..self
1485 }
1486 }
1487
1488 pub fn with_fencing_token(self, fencing_token: FencingToken) -> Self {
1490 Self {
1491 fencing_token: Some(fencing_token),
1492 ..self
1493 }
1494 }
1495
1496 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::AppendInput {
1497 let Self {
1498 records,
1499 match_seq_num,
1500 fencing_token,
1501 } = self;
1502
1503 api::AppendInput {
1504 stream: stream.into(),
1505 records: records.into_iter().map(Into::into).collect(),
1506 match_seq_num,
1507 fencing_token: fencing_token.map(|f| f.0),
1508 }
1509 }
1510}
1511
1512#[derive(Debug, Clone)]
1514pub struct AppendAck {
1515 pub start: StreamPosition,
1517 pub end: StreamPosition,
1522 pub tail: StreamPosition,
1526}
1527
1528impl From<api::AppendOutput> for AppendAck {
1529 fn from(value: api::AppendOutput) -> Self {
1530 let api::AppendOutput {
1531 start_seq_num,
1532 start_timestamp,
1533 end_seq_num,
1534 end_timestamp,
1535 next_seq_num,
1536 last_timestamp,
1537 } = value;
1538 let start = StreamPosition {
1539 seq_num: start_seq_num,
1540 timestamp: start_timestamp,
1541 };
1542 let end = StreamPosition {
1543 seq_num: end_seq_num,
1544 timestamp: end_timestamp,
1545 };
1546 let tail = StreamPosition {
1547 seq_num: next_seq_num,
1548 timestamp: last_timestamp,
1549 };
1550 Self { start, end, tail }
1551 }
1552}
1553
1554impl TryFrom<api::AppendResponse> for AppendAck {
1555 type Error = ConvertError;
1556 fn try_from(value: api::AppendResponse) -> Result<Self, Self::Error> {
1557 let api::AppendResponse { output } = value;
1558 let output = output.ok_or("missing append output")?;
1559 Ok(output.into())
1560 }
1561}
1562
1563impl TryFrom<api::AppendSessionResponse> for AppendAck {
1564 type Error = ConvertError;
1565 fn try_from(value: api::AppendSessionResponse) -> Result<Self, Self::Error> {
1566 let api::AppendSessionResponse { output } = value;
1567 let output = output.ok_or("missing append output")?;
1568 Ok(output.into())
1569 }
1570}
1571
1572#[sync_docs]
1573#[derive(Debug, Clone, Default)]
1574pub struct ReadLimit {
1575 pub count: Option<u64>,
1576 pub bytes: Option<u64>,
1577}
1578
1579impl ReadLimit {
1580 pub fn new() -> Self {
1582 Self::default()
1583 }
1584
1585 pub fn with_count(self, count: u64) -> Self {
1587 Self {
1588 count: Some(count),
1589 ..self
1590 }
1591 }
1592
1593 pub fn with_bytes(self, bytes: u64) -> Self {
1595 Self {
1596 bytes: Some(bytes),
1597 ..self
1598 }
1599 }
1600}
1601
1602#[derive(Debug, Clone)]
1604pub enum ReadStart {
1605 SeqNum(u64),
1607 Timestamp(u64),
1609 TailOffset(u64),
1611}
1612
1613impl Default for ReadStart {
1614 fn default() -> Self {
1615 Self::SeqNum(0)
1616 }
1617}
1618
1619impl From<ReadStart> for api::read_request::Start {
1620 fn from(start: ReadStart) -> Self {
1621 match start {
1622 ReadStart::SeqNum(seq_num) => api::read_request::Start::SeqNum(seq_num),
1623 ReadStart::Timestamp(timestamp) => api::read_request::Start::Timestamp(timestamp),
1624 ReadStart::TailOffset(offset) => api::read_request::Start::TailOffset(offset),
1625 }
1626 }
1627}
1628
1629impl From<ReadStart> for api::read_session_request::Start {
1630 fn from(start: ReadStart) -> Self {
1631 match start {
1632 ReadStart::SeqNum(seq_num) => api::read_session_request::Start::SeqNum(seq_num),
1633 ReadStart::Timestamp(timestamp) => {
1634 api::read_session_request::Start::Timestamp(timestamp)
1635 }
1636 ReadStart::TailOffset(offset) => api::read_session_request::Start::TailOffset(offset),
1637 }
1638 }
1639}
1640
1641#[sync_docs]
1642#[derive(Debug, Clone, Default)]
1643pub struct ReadRequest {
1644 pub start: ReadStart,
1645 pub limit: ReadLimit,
1646}
1647
1648impl ReadRequest {
1649 pub fn new(start: ReadStart) -> Self {
1651 Self {
1652 start,
1653 ..Default::default()
1654 }
1655 }
1656
1657 pub fn with_limit(self, limit: ReadLimit) -> Self {
1659 Self { limit, ..self }
1660 }
1661}
1662
1663impl ReadRequest {
1664 pub(crate) fn try_into_api_type(
1665 self,
1666 stream: impl Into<String>,
1667 ) -> Result<api::ReadRequest, ConvertError> {
1668 let Self { start, limit } = self;
1669
1670 let limit = if limit.count > Some(1000) {
1671 Err("read limit: count must not exceed 1000 for unary request")
1672 } else if limit.bytes > Some(MIB_BYTES) {
1673 Err("read limit: bytes must not exceed 1MiB for unary request")
1674 } else {
1675 Ok(api::ReadLimit {
1676 count: limit.count,
1677 bytes: limit.bytes,
1678 })
1679 }?;
1680
1681 Ok(api::ReadRequest {
1682 stream: stream.into(),
1683 start: Some(start.into()),
1684 limit: Some(limit),
1685 })
1686 }
1687}
1688
1689#[sync_docs]
1690#[derive(Debug, Clone)]
1691pub struct SequencedRecord {
1692 pub seq_num: u64,
1693 pub timestamp: u64,
1694 pub headers: Vec<Header>,
1695 pub body: Bytes,
1696}
1697
1698metered_impl!(SequencedRecord);
1699
1700impl From<api::SequencedRecord> for SequencedRecord {
1701 fn from(value: api::SequencedRecord) -> Self {
1702 let api::SequencedRecord {
1703 seq_num,
1704 timestamp,
1705 headers,
1706 body,
1707 } = value;
1708 Self {
1709 seq_num,
1710 timestamp,
1711 headers: headers.into_iter().map(Into::into).collect(),
1712 body,
1713 }
1714 }
1715}
1716
1717impl SequencedRecord {
1718 pub fn as_command_record(&self) -> Option<CommandRecord> {
1720 if self.headers.len() != 1 {
1721 return None;
1722 }
1723
1724 let header = self.headers.first().expect("pre-validated length");
1725
1726 if !header.name.is_empty() {
1727 return None;
1728 }
1729
1730 match header.value.as_ref() {
1731 CommandRecord::FENCE => {
1732 let fencing_token = FencingToken::new(self.body.clone()).ok()?;
1733 Some(CommandRecord {
1734 command: Command::Fence { fencing_token },
1735 timestamp: Some(self.timestamp),
1736 })
1737 }
1738 CommandRecord::TRIM => {
1739 let body: &[u8] = &self.body;
1740 let seq_num = u64::from_be_bytes(body.try_into().ok()?);
1741 Some(CommandRecord {
1742 command: Command::Trim { seq_num },
1743 timestamp: Some(self.timestamp),
1744 })
1745 }
1746 _ => None,
1747 }
1748 }
1749}
1750
1751#[sync_docs]
1752#[derive(Debug, Clone)]
1753pub struct SequencedRecordBatch {
1754 pub records: Vec<SequencedRecord>,
1755}
1756
1757impl MeteredBytes for SequencedRecordBatch {
1758 fn metered_bytes(&self) -> u64 {
1759 self.records.metered_bytes()
1760 }
1761}
1762
1763impl From<api::SequencedRecordBatch> for SequencedRecordBatch {
1764 fn from(value: api::SequencedRecordBatch) -> Self {
1765 let api::SequencedRecordBatch { records } = value;
1766 Self {
1767 records: records.into_iter().map(Into::into).collect(),
1768 }
1769 }
1770}
1771
1772#[sync_docs(ReadOutput = "Output")]
1773#[derive(Debug, Clone)]
1774pub enum ReadOutput {
1775 Batch(SequencedRecordBatch),
1776 NextSeqNum(u64),
1777}
1778
1779impl From<api::read_output::Output> for ReadOutput {
1780 fn from(value: api::read_output::Output) -> Self {
1781 match value {
1782 api::read_output::Output::Batch(batch) => Self::Batch(batch.into()),
1783 api::read_output::Output::NextSeqNum(next_seq_num) => Self::NextSeqNum(next_seq_num),
1784 }
1785 }
1786}
1787
1788impl TryFrom<api::ReadOutput> for ReadOutput {
1789 type Error = ConvertError;
1790 fn try_from(value: api::ReadOutput) -> Result<Self, Self::Error> {
1791 let api::ReadOutput { output } = value;
1792 let output = output.ok_or("missing read output")?;
1793 Ok(output.into())
1794 }
1795}
1796
1797impl TryFrom<api::ReadResponse> for ReadOutput {
1798 type Error = ConvertError;
1799 fn try_from(value: api::ReadResponse) -> Result<Self, Self::Error> {
1800 let api::ReadResponse { output } = value;
1801 let output = output.ok_or("missing output in read response")?;
1802 output.try_into()
1803 }
1804}
1805
1806#[sync_docs]
1807#[derive(Debug, Clone, Default)]
1808pub struct ReadSessionRequest {
1809 pub start: ReadStart,
1810 pub limit: ReadLimit,
1811}
1812
1813impl ReadSessionRequest {
1814 pub fn new(start: ReadStart) -> Self {
1816 Self {
1817 start,
1818 ..Default::default()
1819 }
1820 }
1821
1822 pub fn with_limit(self, limit: ReadLimit) -> Self {
1824 Self { limit, ..self }
1825 }
1826
1827 pub(crate) fn into_api_type(self, stream: impl Into<String>) -> api::ReadSessionRequest {
1828 let Self { start, limit } = self;
1829 api::ReadSessionRequest {
1830 stream: stream.into(),
1831 start: Some(start.into()),
1832 limit: Some(api::ReadLimit {
1833 count: limit.count,
1834 bytes: limit.bytes,
1835 }),
1836 heartbeats: false,
1837 }
1838 }
1839}
1840
1841impl TryFrom<api::ReadSessionResponse> for ReadOutput {
1842 type Error = ConvertError;
1843 fn try_from(value: api::ReadSessionResponse) -> Result<Self, Self::Error> {
1844 let api::ReadSessionResponse { output } = value;
1845 let output = output.ok_or("missing output in read session response")?;
1846 output.try_into()
1847 }
1848}
1849
1850#[derive(Debug, Clone)]
1855pub struct BasinName(String);
1856
1857impl AsRef<str> for BasinName {
1858 fn as_ref(&self) -> &str {
1859 &self.0
1860 }
1861}
1862
1863impl Deref for BasinName {
1864 type Target = str;
1865 fn deref(&self) -> &Self::Target {
1866 &self.0
1867 }
1868}
1869
1870impl TryFrom<String> for BasinName {
1871 type Error = ConvertError;
1872
1873 fn try_from(name: String) -> Result<Self, Self::Error> {
1874 if name.len() < 8 || name.len() > 48 {
1875 return Err("Basin name must be between 8 and 48 characters in length".into());
1876 }
1877
1878 static BASIN_NAME_REGEX: OnceLock<Regex> = OnceLock::new();
1879 let regex = BASIN_NAME_REGEX.get_or_init(|| {
1880 Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
1881 .expect("Failed to compile basin name regex")
1882 });
1883
1884 if !regex.is_match(&name) {
1885 return Err(
1886 "Basin name must comprise lowercase letters, numbers, and hyphens. \
1887 It cannot begin or end with a hyphen."
1888 .into(),
1889 );
1890 }
1891
1892 Ok(Self(name))
1893 }
1894}
1895
1896impl FromStr for BasinName {
1897 type Err = ConvertError;
1898
1899 fn from_str(s: &str) -> Result<Self, Self::Err> {
1900 s.to_string().try_into()
1901 }
1902}
1903
1904impl std::fmt::Display for BasinName {
1905 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1906 f.write_str(&self.0)
1907 }
1908}
1909
1910#[derive(Debug, Clone)]
1913pub struct AccessTokenId(String);
1914
1915impl AsRef<str> for AccessTokenId {
1916 fn as_ref(&self) -> &str {
1917 &self.0
1918 }
1919}
1920
1921impl Deref for AccessTokenId {
1922 type Target = str;
1923
1924 fn deref(&self) -> &Self::Target {
1925 &self.0
1926 }
1927}
1928
1929impl TryFrom<String> for AccessTokenId {
1930 type Error = ConvertError;
1931
1932 fn try_from(name: String) -> Result<Self, Self::Error> {
1933 if name.is_empty() {
1934 return Err("Access token ID must not be empty".into());
1935 }
1936
1937 if name.len() > 96 {
1938 return Err("Access token ID must not exceed 96 characters".into());
1939 }
1940
1941 Ok(Self(name))
1942 }
1943}
1944
1945impl From<AccessTokenId> for String {
1946 fn from(value: AccessTokenId) -> Self {
1947 value.0
1948 }
1949}
1950
1951impl FromStr for AccessTokenId {
1952 type Err = ConvertError;
1953
1954 fn from_str(s: &str) -> Result<Self, Self::Err> {
1955 s.to_string().try_into()
1956 }
1957}
1958
1959impl From<AccessTokenInfo> for api::IssueAccessTokenRequest {
1960 fn from(value: AccessTokenInfo) -> Self {
1961 Self {
1962 info: Some(value.into()),
1963 }
1964 }
1965}
1966
1967#[sync_docs]
1968#[derive(Debug, Clone)]
1969pub struct AccessTokenInfo {
1970 pub id: AccessTokenId,
1971 pub expires_at: Option<u32>,
1972 pub auto_prefix_streams: bool,
1973 pub scope: Option<AccessTokenScope>,
1974}
1975
1976impl AccessTokenInfo {
1977 pub fn new(id: AccessTokenId) -> Self {
1979 Self {
1980 id,
1981 expires_at: None,
1982 auto_prefix_streams: false,
1983 scope: None,
1984 }
1985 }
1986
1987 pub fn with_expires_at(self, expires_at: u32) -> Self {
1989 Self {
1990 expires_at: Some(expires_at),
1991 ..self
1992 }
1993 }
1994
1995 pub fn with_auto_prefix_streams(self, auto_prefix_streams: bool) -> Self {
1997 Self {
1998 auto_prefix_streams,
1999 ..self
2000 }
2001 }
2002
2003 pub fn with_scope(self, scope: AccessTokenScope) -> Self {
2005 Self {
2006 scope: Some(scope),
2007 ..self
2008 }
2009 }
2010}
2011
2012impl From<AccessTokenInfo> for api::AccessTokenInfo {
2013 fn from(value: AccessTokenInfo) -> Self {
2014 let AccessTokenInfo {
2015 id,
2016 expires_at,
2017 auto_prefix_streams,
2018 scope,
2019 } = value;
2020 Self {
2021 id: id.into(),
2022 expires_at,
2023 auto_prefix_streams,
2024 scope: scope.map(Into::into),
2025 }
2026 }
2027}
2028
2029impl TryFrom<api::AccessTokenInfo> for AccessTokenInfo {
2030 type Error = ConvertError;
2031
2032 fn try_from(value: api::AccessTokenInfo) -> Result<Self, Self::Error> {
2033 let api::AccessTokenInfo {
2034 id,
2035 expires_at,
2036 auto_prefix_streams,
2037 scope,
2038 } = value;
2039 Ok(Self {
2040 id: id.try_into()?,
2041 expires_at,
2042 auto_prefix_streams,
2043 scope: scope.map(Into::into),
2044 })
2045 }
2046}
2047
2048#[sync_docs]
2049#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2050pub enum Operation {
2051 ListBasins,
2052 CreateBasin,
2053 DeleteBasin,
2054 ReconfigureBasin,
2055 GetBasinConfig,
2056 IssueAccessToken,
2057 RevokeAccessToken,
2058 ListAccessTokens,
2059 ListStreams,
2060 CreateStream,
2061 DeleteStream,
2062 GetStreamConfig,
2063 ReconfigureStream,
2064 CheckTail,
2065 Append,
2066 Read,
2067 Trim,
2068 Fence,
2069}
2070
2071impl FromStr for Operation {
2072 type Err = ConvertError;
2073
2074 fn from_str(s: &str) -> Result<Self, Self::Err> {
2075 match s.to_lowercase().as_str() {
2076 "list-basins" => Ok(Self::ListBasins),
2077 "create-basin" => Ok(Self::CreateBasin),
2078 "delete-basin" => Ok(Self::DeleteBasin),
2079 "reconfigure-basin" => Ok(Self::ReconfigureBasin),
2080 "get-basin-config" => Ok(Self::GetBasinConfig),
2081 "issue-access-token" => Ok(Self::IssueAccessToken),
2082 "revoke-access-token" => Ok(Self::RevokeAccessToken),
2083 "list-access-tokens" => Ok(Self::ListAccessTokens),
2084 "list-streams" => Ok(Self::ListStreams),
2085 "create-stream" => Ok(Self::CreateStream),
2086 "delete-stream" => Ok(Self::DeleteStream),
2087 "get-stream-config" => Ok(Self::GetStreamConfig),
2088 "reconfigure-stream" => Ok(Self::ReconfigureStream),
2089 "check-tail" => Ok(Self::CheckTail),
2090 "append" => Ok(Self::Append),
2091 "read" => Ok(Self::Read),
2092 "trim" => Ok(Self::Trim),
2093 "fence" => Ok(Self::Fence),
2094 _ => Err("invalid operation".into()),
2095 }
2096 }
2097}
2098
2099impl From<Operation> for api::Operation {
2100 fn from(value: Operation) -> Self {
2101 match value {
2102 Operation::ListBasins => Self::ListBasins,
2103 Operation::CreateBasin => Self::CreateBasin,
2104 Operation::DeleteBasin => Self::DeleteBasin,
2105 Operation::ReconfigureBasin => Self::ReconfigureBasin,
2106 Operation::GetBasinConfig => Self::GetBasinConfig,
2107 Operation::IssueAccessToken => Self::IssueAccessToken,
2108 Operation::RevokeAccessToken => Self::RevokeAccessToken,
2109 Operation::ListAccessTokens => Self::ListAccessTokens,
2110 Operation::ListStreams => Self::ListStreams,
2111 Operation::CreateStream => Self::CreateStream,
2112 Operation::DeleteStream => Self::DeleteStream,
2113 Operation::GetStreamConfig => Self::GetStreamConfig,
2114 Operation::ReconfigureStream => Self::ReconfigureStream,
2115 Operation::CheckTail => Self::CheckTail,
2116 Operation::Append => Self::Append,
2117 Operation::Read => Self::Read,
2118 Operation::Trim => Self::Trim,
2119 Operation::Fence => Self::Fence,
2120 }
2121 }
2122}
2123
2124impl From<api::Operation> for Option<Operation> {
2125 fn from(value: api::Operation) -> Self {
2126 match value {
2127 api::Operation::Unspecified => None,
2128 api::Operation::ListBasins => Some(Operation::ListBasins),
2129 api::Operation::CreateBasin => Some(Operation::CreateBasin),
2130 api::Operation::DeleteBasin => Some(Operation::DeleteBasin),
2131 api::Operation::ReconfigureBasin => Some(Operation::ReconfigureBasin),
2132 api::Operation::GetBasinConfig => Some(Operation::GetBasinConfig),
2133 api::Operation::IssueAccessToken => Some(Operation::IssueAccessToken),
2134 api::Operation::RevokeAccessToken => Some(Operation::RevokeAccessToken),
2135 api::Operation::ListAccessTokens => Some(Operation::ListAccessTokens),
2136 api::Operation::ListStreams => Some(Operation::ListStreams),
2137 api::Operation::CreateStream => Some(Operation::CreateStream),
2138 api::Operation::DeleteStream => Some(Operation::DeleteStream),
2139 api::Operation::GetStreamConfig => Some(Operation::GetStreamConfig),
2140 api::Operation::ReconfigureStream => Some(Operation::ReconfigureStream),
2141 api::Operation::CheckTail => Some(Operation::CheckTail),
2142 api::Operation::Append => Some(Operation::Append),
2143 api::Operation::Read => Some(Operation::Read),
2144 api::Operation::Trim => Some(Operation::Trim),
2145 api::Operation::Fence => Some(Operation::Fence),
2146 }
2147 }
2148}
2149
2150#[sync_docs]
2151#[derive(Debug, Clone, Default)]
2152pub struct AccessTokenScope {
2153 pub basins: Option<ResourceSet>,
2154 pub streams: Option<ResourceSet>,
2155 pub access_tokens: Option<ResourceSet>,
2156 pub op_groups: Option<PermittedOperationGroups>,
2157 pub ops: HashSet<Operation>,
2158}
2159
2160impl AccessTokenScope {
2161 pub fn new() -> Self {
2163 Self::default()
2164 }
2165
2166 pub fn with_basins(self, basins: ResourceSet) -> Self {
2168 Self {
2169 basins: Some(basins),
2170 ..self
2171 }
2172 }
2173
2174 pub fn with_streams(self, streams: ResourceSet) -> Self {
2176 Self {
2177 streams: Some(streams),
2178 ..self
2179 }
2180 }
2181
2182 pub fn with_tokens(self, access_tokens: ResourceSet) -> Self {
2184 Self {
2185 access_tokens: Some(access_tokens),
2186 ..self
2187 }
2188 }
2189
2190 pub fn with_op_groups(self, op_groups: PermittedOperationGroups) -> Self {
2192 Self {
2193 op_groups: Some(op_groups),
2194 ..self
2195 }
2196 }
2197
2198 pub fn with_ops(self, ops: impl IntoIterator<Item = Operation>) -> Self {
2200 Self {
2201 ops: ops.into_iter().collect(),
2202 ..self
2203 }
2204 }
2205
2206 pub fn with_op(self, op: Operation) -> Self {
2208 let mut ops = self.ops;
2209 ops.insert(op);
2210 Self { ops, ..self }
2211 }
2212}
2213
2214impl From<AccessTokenScope> for api::AccessTokenScope {
2215 fn from(value: AccessTokenScope) -> Self {
2216 let AccessTokenScope {
2217 basins,
2218 streams,
2219 access_tokens,
2220 op_groups,
2221 ops,
2222 } = value;
2223 Self {
2224 basins: basins.map(Into::into),
2225 streams: streams.map(Into::into),
2226 access_tokens: access_tokens.map(Into::into),
2227 op_groups: op_groups.map(Into::into),
2228 ops: ops
2229 .into_iter()
2230 .map(api::Operation::from)
2231 .map(Into::into)
2232 .collect(),
2233 }
2234 }
2235}
2236
2237impl From<api::AccessTokenScope> for AccessTokenScope {
2238 fn from(value: api::AccessTokenScope) -> Self {
2239 let api::AccessTokenScope {
2240 basins,
2241 streams,
2242 access_tokens,
2243 op_groups,
2244 ops,
2245 } = value;
2246 Self {
2247 basins: basins.and_then(|set| set.matching.map(Into::into)),
2248 streams: streams.and_then(|set| set.matching.map(Into::into)),
2249 access_tokens: access_tokens.and_then(|set| set.matching.map(Into::into)),
2250 op_groups: op_groups.map(Into::into),
2251 ops: ops
2252 .into_iter()
2253 .map(api::Operation::try_from)
2254 .flat_map(Result::ok)
2255 .flat_map(<Option<Operation>>::from)
2256 .collect(),
2257 }
2258 }
2259}
2260
2261impl From<ResourceSet> for api::ResourceSet {
2262 fn from(value: ResourceSet) -> Self {
2263 Self {
2264 matching: Some(value.into()),
2265 }
2266 }
2267}
2268
2269#[sync_docs(ResourceSet = "Matching")]
2270#[derive(Debug, Clone)]
2271pub enum ResourceSet {
2272 Exact(String),
2273 Prefix(String),
2274}
2275
2276impl From<ResourceSet> for api::resource_set::Matching {
2277 fn from(value: ResourceSet) -> Self {
2278 match value {
2279 ResourceSet::Exact(name) => api::resource_set::Matching::Exact(name),
2280 ResourceSet::Prefix(name) => api::resource_set::Matching::Prefix(name),
2281 }
2282 }
2283}
2284
2285impl From<api::resource_set::Matching> for ResourceSet {
2286 fn from(value: api::resource_set::Matching) -> Self {
2287 match value {
2288 api::resource_set::Matching::Exact(name) => ResourceSet::Exact(name),
2289 api::resource_set::Matching::Prefix(name) => ResourceSet::Prefix(name),
2290 }
2291 }
2292}
2293
2294#[sync_docs]
2295#[derive(Debug, Clone, Default)]
2296pub struct PermittedOperationGroups {
2297 pub account: Option<ReadWritePermissions>,
2298 pub basin: Option<ReadWritePermissions>,
2299 pub stream: Option<ReadWritePermissions>,
2300}
2301
2302impl PermittedOperationGroups {
2303 pub fn new() -> Self {
2305 Self::default()
2306 }
2307
2308 pub fn with_account(self, account: ReadWritePermissions) -> Self {
2310 Self {
2311 account: Some(account),
2312 ..self
2313 }
2314 }
2315
2316 pub fn with_basin(self, basin: ReadWritePermissions) -> Self {
2318 Self {
2319 basin: Some(basin),
2320 ..self
2321 }
2322 }
2323
2324 pub fn with_stream(self, stream: ReadWritePermissions) -> Self {
2326 Self {
2327 stream: Some(stream),
2328 ..self
2329 }
2330 }
2331}
2332
2333impl From<PermittedOperationGroups> for api::PermittedOperationGroups {
2334 fn from(value: PermittedOperationGroups) -> Self {
2335 let PermittedOperationGroups {
2336 account,
2337 basin,
2338 stream,
2339 } = value;
2340 Self {
2341 account: account.map(Into::into),
2342 basin: basin.map(Into::into),
2343 stream: stream.map(Into::into),
2344 }
2345 }
2346}
2347
2348impl From<api::PermittedOperationGroups> for PermittedOperationGroups {
2349 fn from(value: api::PermittedOperationGroups) -> Self {
2350 let api::PermittedOperationGroups {
2351 account,
2352 basin,
2353 stream,
2354 } = value;
2355 Self {
2356 account: account.map(Into::into),
2357 basin: basin.map(Into::into),
2358 stream: stream.map(Into::into),
2359 }
2360 }
2361}
2362
2363#[sync_docs]
2364#[derive(Debug, Clone, Default)]
2365pub struct ReadWritePermissions {
2366 pub read: bool,
2367 pub write: bool,
2368}
2369
2370impl ReadWritePermissions {
2371 pub fn new() -> Self {
2373 Self::default()
2374 }
2375
2376 pub fn with_read(self, read: bool) -> Self {
2378 Self { read, ..self }
2379 }
2380
2381 pub fn with_write(self, write: bool) -> Self {
2383 Self { write, ..self }
2384 }
2385}
2386
2387impl From<ReadWritePermissions> for api::ReadWritePermissions {
2388 fn from(value: ReadWritePermissions) -> Self {
2389 let ReadWritePermissions { read, write } = value;
2390 Self { read, write }
2391 }
2392}
2393
2394impl From<api::ReadWritePermissions> for ReadWritePermissions {
2395 fn from(value: api::ReadWritePermissions) -> Self {
2396 let api::ReadWritePermissions { read, write } = value;
2397 Self { read, write }
2398 }
2399}
2400
2401impl From<api::IssueAccessTokenResponse> for String {
2402 fn from(value: api::IssueAccessTokenResponse) -> Self {
2403 value.access_token
2404 }
2405}
2406
2407impl From<AccessTokenId> for api::RevokeAccessTokenRequest {
2408 fn from(value: AccessTokenId) -> Self {
2409 Self { id: value.into() }
2410 }
2411}
2412
2413impl TryFrom<api::RevokeAccessTokenResponse> for AccessTokenInfo {
2414 type Error = ConvertError;
2415 fn try_from(value: api::RevokeAccessTokenResponse) -> Result<Self, Self::Error> {
2416 let token_info = value.info.ok_or("access token info is missing")?;
2417 token_info.try_into()
2418 }
2419}
2420
2421#[sync_docs]
2422#[derive(Debug, Clone, Default)]
2423pub struct ListAccessTokensRequest {
2424 pub prefix: String,
2425 pub start_after: String,
2426 pub limit: Option<usize>,
2427}
2428
2429impl ListAccessTokensRequest {
2430 pub fn new() -> Self {
2432 Self::default()
2433 }
2434
2435 pub fn with_prefix(self, prefix: impl Into<String>) -> Self {
2437 Self {
2438 prefix: prefix.into(),
2439 ..self
2440 }
2441 }
2442
2443 pub fn with_start_after(self, start_after: impl Into<String>) -> Self {
2445 Self {
2446 start_after: start_after.into(),
2447 ..self
2448 }
2449 }
2450
2451 pub fn with_limit(self, limit: impl Into<Option<usize>>) -> Self {
2453 Self {
2454 limit: limit.into(),
2455 ..self
2456 }
2457 }
2458}
2459
2460impl TryFrom<ListAccessTokensRequest> for api::ListAccessTokensRequest {
2461 type Error = ConvertError;
2462 fn try_from(value: ListAccessTokensRequest) -> Result<Self, Self::Error> {
2463 let ListAccessTokensRequest {
2464 prefix,
2465 start_after,
2466 limit,
2467 } = value;
2468 Ok(Self {
2469 prefix,
2470 start_after,
2471 limit: limit
2472 .map(TryInto::try_into)
2473 .transpose()
2474 .map_err(|_| "request limit does not fit into u64 bounds")?,
2475 })
2476 }
2477}
2478
2479#[sync_docs]
2480#[derive(Debug, Clone)]
2481pub struct ListAccessTokensResponse {
2482 pub access_tokens: Vec<AccessTokenInfo>,
2483 pub has_more: bool,
2484}
2485
2486impl TryFrom<api::ListAccessTokensResponse> for ListAccessTokensResponse {
2487 type Error = ConvertError;
2488 fn try_from(value: api::ListAccessTokensResponse) -> Result<Self, Self::Error> {
2489 let api::ListAccessTokensResponse {
2490 access_tokens,
2491 has_more,
2492 } = value;
2493 let access_tokens = access_tokens
2494 .into_iter()
2495 .map(TryInto::try_into)
2496 .collect::<Result<Vec<_>, _>>()?;
2497 Ok(Self {
2498 access_tokens,
2499 has_more,
2500 })
2501 }
2502}