1pub mod keys;
14pub mod kv_codec;
15pub mod match_key;
16pub mod proto;
17pub mod prune_policy;
18pub mod stream_filter;
19pub use keys::{Key, KeyCodec, KeyCodecError, KeyMut, KeyValidationError, Value, MAX_KEY_LEN};
20pub use proto::*;
21extern crate self as exoware_proto;
22
23use bytes::Bytes;
24use connectrpc::client::{ClientConfig, ServerStream as ConnectServerStream};
25use connectrpc::{ConnectError, ErrorCode};
26use exoware_proto::compact::ServiceClient as CompactServiceClient;
27use exoware_proto::ingest::ServiceClient as IngestServiceClient;
28use exoware_proto::query as proto_query;
29use exoware_proto::query::ServiceClient as QueryServiceClient;
30use exoware_proto::store::compact::v1::PruneRequest as ProtoPruneRequest;
31use exoware_proto::store::ingest::v1::PutRequest as ProtoPutRequest;
32use exoware_proto::store::query::v1::{
33 GetManyRequest as ProtoGetManyRequest, GetRequest as ProtoGetRequest,
34 RangeRequest as ProtoRangeRequest, ReduceRequest as ProtoWireReduceRequest,
35};
36use exoware_proto::RangeReduceRequest as DomainRangeReduceRequest;
37use exoware_proto::{
38 connect_compression_registry as proto_connect_compression_registry,
39 decode_connect_error as proto_decode_connect_error,
40 to_domain_reduce_response as proto_to_domain_reduce_response,
41 to_proto_reduce_params as proto_to_proto_reduce_params,
42 PreferZstdHttpClient as ProtoPreferZstdHttpClient,
43};
44use futures::future::BoxFuture;
45use keys::is_valid_key_size;
46use kv_codec::{KvExpr, KvFieldRef, KvReducedValue};
47use std::collections::HashMap;
48use std::sync::{
49 atomic::{AtomicU64, Ordering},
50 Arc,
51};
52use std::time::Duration;
53
54const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
55const DEFAULT_RETRY_INITIAL_BACKOFF_MS: u64 = 100;
56const DEFAULT_RETRY_MAX_BACKOFF_MS: u64 = 2_000;
57
58pub trait IntoStoreWriteValue {
61 fn into_store_write_value(self) -> Bytes;
62}
63
64impl IntoStoreWriteValue for Bytes {
65 fn into_store_write_value(self) -> Bytes {
66 self
67 }
68}
69
70impl IntoStoreWriteValue for &Bytes {
71 fn into_store_write_value(self) -> Bytes {
72 self.clone()
73 }
74}
75
76impl IntoStoreWriteValue for Vec<u8> {
77 fn into_store_write_value(self) -> Bytes {
78 self.into()
79 }
80}
81
82impl IntoStoreWriteValue for &Vec<u8> {
83 fn into_store_write_value(self) -> Bytes {
84 Bytes::copy_from_slice(self)
85 }
86}
87
88impl IntoStoreWriteValue for &[u8] {
89 fn into_store_write_value(self) -> Bytes {
90 Bytes::copy_from_slice(self)
91 }
92}
93
94impl<const N: usize> IntoStoreWriteValue for &[u8; N] {
95 fn into_store_write_value(self) -> Bytes {
96 Bytes::copy_from_slice(self)
97 }
98}
99
100#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
110pub enum ConnectRequestCompression {
111 #[default]
113 Zstd,
114 Gzip,
116}
117
118impl ConnectRequestCompression {
119 fn wire_name(self) -> &'static str {
120 match self {
121 Self::Zstd => "zstd",
122 Self::Gzip => "gzip",
123 }
124 }
125}
126
127const STORE_CLIENT_MAX_MESSAGE_BYTES: usize = 256 * 1024 * 1024;
132
133fn store_connect_client_config(
139 base_uri: http::Uri,
140 request_compression: ConnectRequestCompression,
141) -> ClientConfig {
142 ClientConfig::new(base_uri)
143 .with_compression(proto_connect_compression_registry())
144 .compress_requests(request_compression.wire_name())
145 .with_default_max_message_size(STORE_CLIENT_MAX_MESSAGE_BYTES)
146}
147
148#[derive(Debug, thiserror::Error)]
150pub enum ClientError {
151 #[error("HTTP error: {0}")]
152 Http(#[from] reqwest::Error),
153 #[error("RPC error ({0})")]
154 Rpc(Box<ConnectError>),
155 #[error("store key prefix error: {0}")]
156 KeyPrefix(#[from] StoreKeyPrefixError),
157 #[error("invalid key length: expected {expected}, got {got}")]
158 InvalidKeyLength { expected: usize, got: usize },
159 #[error("wire format error: {0}")]
160 WireFormat(String),
161}
162
163impl ClientError {
164 pub fn rpc_error(&self) -> Option<&ConnectError> {
165 match self {
166 Self::Rpc(err) => Some(err.as_ref()),
167 _ => None,
168 }
169 }
170
171 pub fn rpc_code(&self) -> Option<ErrorCode> {
172 self.rpc_error().map(|err| err.code)
173 }
174
175 pub fn decoded_rpc_error(
176 &self,
177 ) -> Result<Option<exoware_proto::DecodedConnectError>, buffa::DecodeError> {
178 self.rpc_error().map(proto_decode_connect_error).transpose()
179 }
180}
181
182#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
185pub enum StoreKeyPrefixError {
186 #[error("reserved_bits {reserved_bits} exceeds 16")]
187 ReservedBitsTooLarge { reserved_bits: u8 },
188 #[error("prefix {prefix} does not fit in {reserved_bits} reserved bits")]
189 PrefixTooLarge { reserved_bits: u8, prefix: u16 },
190 #[error(
191 "combined reserved bits exceed 16: store prefix bits {prefix_bits} + logical bits {logical_bits}"
192 )]
193 CombinedReservedBitsTooLarge { prefix_bits: u8, logical_bits: u8 },
194 #[error("key does not belong to this store prefix")]
195 PrefixMismatch,
196 #[error("key bit offset {offset} plus store prefix bits {prefix_bits} exceeds u16")]
197 BitOffsetOverflow { offset: u16, prefix_bits: u8 },
198 #[error("key codec error: {0}")]
199 Codec(#[from] KeyCodecError),
200}
201
202#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
210pub struct StoreKeyPrefix {
211 codec: KeyCodec,
212}
213
214impl StoreKeyPrefix {
215 pub fn new(reserved_bits: u8, prefix: u16) -> Result<Self, StoreKeyPrefixError> {
216 validate_prefix_bits(reserved_bits, prefix)?;
217 Ok(Self {
218 codec: KeyCodec::new(reserved_bits, prefix),
219 })
220 }
221
222 #[inline]
223 pub fn reserved_bits(self) -> u8 {
224 self.codec.reserved_bits()
225 }
226
227 #[inline]
228 pub fn prefix(self) -> u16 {
229 self.codec.prefix()
230 }
231
232 #[inline]
234 pub fn max_logical_key_len(self) -> usize {
235 self.codec.max_payload_capacity_bytes()
236 }
237
238 pub fn encode_key(self, key: &Key) -> Result<Key, StoreKeyPrefixError> {
240 Ok(self.codec.encode(key)?)
241 }
242
243 pub fn decode_key(self, key: &Key) -> Result<Key, StoreKeyPrefixError> {
245 if !self.codec.matches(key) {
246 return Err(StoreKeyPrefixError::PrefixMismatch);
247 }
248 let payload_len = self.codec.payload_capacity_bytes_for_key_len(key.len());
249 Ok(Bytes::from(self.codec.read_payload(key, 0, payload_len)?))
250 }
251
252 pub fn encode_range(self, start: &Key, end: &Key) -> Result<(Key, Key), StoreKeyPrefixError> {
260 let start = self.encode_key(start)?;
261 let end = if end.is_empty() {
262 self.codec.prefix_bounds().1
263 } else {
264 let max_len = self.max_logical_key_len();
265 let end = if end.len() > max_len {
266 Bytes::copy_from_slice(&end[..max_len])
267 } else {
268 Bytes::copy_from_slice(end)
269 };
270 self.encode_key(&end)?
271 };
272 Ok((start, end))
273 }
274
275 fn prefix_match_key(
276 self,
277 match_key: &crate::match_key::MatchKey,
278 ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
279 self.prefix_match_key_with_regex(match_key, match_key.payload_regex.clone())
280 }
281
282 fn prefix_stream_match_key(
283 self,
284 match_key: &crate::match_key::MatchKey,
285 ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
286 self.prefix_match_key_with_regex(match_key, crate::kv_codec::Utf8::from("(?s-u).*"))
287 }
288
289 fn prefix_match_key_with_regex(
290 self,
291 match_key: &crate::match_key::MatchKey,
292 payload_regex: crate::kv_codec::Utf8,
293 ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
294 validate_prefix_bits(match_key.reserved_bits, match_key.prefix)?;
295 let reserved_bits = self
296 .reserved_bits()
297 .checked_add(match_key.reserved_bits)
298 .ok_or(StoreKeyPrefixError::CombinedReservedBitsTooLarge {
299 prefix_bits: self.reserved_bits(),
300 logical_bits: match_key.reserved_bits,
301 })?;
302 if reserved_bits > 16 {
303 return Err(StoreKeyPrefixError::CombinedReservedBitsTooLarge {
304 prefix_bits: self.reserved_bits(),
305 logical_bits: match_key.reserved_bits,
306 });
307 }
308
309 let prefix = (u32::from(self.prefix()) << u32::from(match_key.reserved_bits))
310 | u32::from(match_key.prefix);
311 let prefix = u16::try_from(prefix).map_err(|_| StoreKeyPrefixError::PrefixTooLarge {
312 reserved_bits,
313 prefix: u16::MAX,
314 })?;
315 validate_prefix_bits(reserved_bits, prefix)?;
316 Ok(crate::match_key::MatchKey {
317 reserved_bits,
318 prefix,
319 payload_regex,
320 })
321 }
322}
323
324#[derive(Clone, Debug, Default)]
330pub struct StoreWriteBatch {
331 entries: Vec<(Key, Bytes)>,
332}
333
334impl StoreWriteBatch {
335 pub fn new() -> Self {
336 Self::default()
337 }
338
339 pub fn len(&self) -> usize {
340 self.entries.len()
341 }
342
343 pub fn is_empty(&self) -> bool {
344 self.entries.is_empty()
345 }
346
347 pub fn clear(&mut self) {
348 self.entries.clear();
349 }
350
351 pub fn push(
352 &mut self,
353 client: &StoreClient,
354 key: &Key,
355 value: impl IntoStoreWriteValue,
356 ) -> Result<&mut Self, ClientError> {
357 self.entries.push((
358 client.encode_store_key(key)?,
359 value.into_store_write_value(),
360 ));
361 Ok(self)
362 }
363
364 pub async fn commit(&self, client: &StoreClient) -> Result<u64, ClientError> {
365 let refs: Vec<(&Key, &[u8])> = self
366 .entries
367 .iter()
368 .map(|(key, value)| (key, value.as_ref()))
369 .collect();
370 client.put_physical(&refs).await
371 }
372}
373
374pub trait StoreBatchUpload {
386 type Prepared: Send;
387 type Receipt: Send;
388 type Error: std::fmt::Display + Send;
389
390 fn stage_upload(
391 &self,
392 prepared: &Self::Prepared,
393 batch: &mut StoreWriteBatch,
394 ) -> Result<(), Self::Error>;
395
396 fn commit_error(&self, error: ClientError) -> Self::Error;
397
398 fn mark_upload_persisted<'a>(
399 &'a self,
400 prepared: Self::Prepared,
401 sequence_number: u64,
402 ) -> BoxFuture<'a, Self::Receipt>
403 where
404 Self: Sync + 'a,
405 Self::Prepared: 'a;
406
407 fn mark_upload_failed<'a>(
408 &'a self,
409 prepared: Self::Prepared,
410 error: String,
411 ) -> BoxFuture<'a, ()>
412 where
413 Self: Sync + 'a,
414 Self::Prepared: 'a;
415
416 fn commit_upload<'a>(
417 &'a self,
418 client: &'a StoreClient,
419 prepared: Self::Prepared,
420 ) -> BoxFuture<'a, Result<Self::Receipt, Self::Error>>
421 where
422 Self: Sync + Sized + 'a,
423 Self::Prepared: 'a,
424 Self::Receipt: 'a,
425 Self::Error: 'a,
426 {
427 Box::pin(async move {
428 let mut batch = StoreWriteBatch::new();
429 if let Err(err) = self.stage_upload(&prepared, &mut batch) {
430 let message = err.to_string();
431 self.mark_upload_failed(prepared, message).await;
432 return Err(err);
433 }
434 match batch.commit(client).await {
435 Ok(sequence_number) => {
436 Ok(self.mark_upload_persisted(prepared, sequence_number).await)
437 }
438 Err(err) => {
439 let message = err.to_string();
440 self.mark_upload_failed(prepared, message).await;
441 Err(self.commit_error(err))
442 }
443 }
444 })
445 }
446}
447
448pub trait StoreBatchPublication {
456 type PreparedPublication: Send;
457 type PublicationReceipt: Send;
458 type Error: std::fmt::Display + Send;
459
460 fn stage_publication(
461 &self,
462 prepared: &Self::PreparedPublication,
463 batch: &mut StoreWriteBatch,
464 ) -> Result<(), Self::Error>;
465
466 fn publication_commit_error(&self, error: ClientError) -> Self::Error;
467
468 fn mark_publication_persisted<'a>(
469 &'a self,
470 prepared: Self::PreparedPublication,
471 sequence_number: u64,
472 ) -> BoxFuture<'a, Self::PublicationReceipt>
473 where
474 Self: Sync + 'a,
475 Self::PreparedPublication: 'a;
476
477 fn mark_publication_failed<'a>(
478 &'a self,
479 _prepared: Self::PreparedPublication,
480 _error: String,
481 ) -> BoxFuture<'a, ()>
482 where
483 Self: Sync + 'a,
484 Self::PreparedPublication: 'a,
485 {
486 Box::pin(async {})
487 }
488
489 fn commit_publication<'a>(
490 &'a self,
491 client: &'a StoreClient,
492 prepared: Self::PreparedPublication,
493 ) -> BoxFuture<'a, Result<Self::PublicationReceipt, Self::Error>>
494 where
495 Self: Sync + Sized + 'a,
496 Self::PreparedPublication: 'a,
497 Self::PublicationReceipt: 'a,
498 Self::Error: 'a,
499 {
500 Box::pin(async move {
501 let mut batch = StoreWriteBatch::new();
502 if let Err(err) = self.stage_publication(&prepared, &mut batch) {
503 let message = err.to_string();
504 self.mark_publication_failed(prepared, message).await;
505 return Err(err);
506 }
507 match batch.commit(client).await {
508 Ok(sequence_number) => Ok(self
509 .mark_publication_persisted(prepared, sequence_number)
510 .await),
511 Err(err) => {
512 let message = err.to_string();
513 self.mark_publication_failed(prepared, message).await;
514 Err(self.publication_commit_error(err))
515 }
516 }
517 })
518 }
519}
520
521pub trait StorePublicationFrontierWriter: StoreBatchPublication {
530 fn latest_publication_receipt<'a>(&'a self) -> BoxFuture<'a, Option<Self::PublicationReceipt>>
531 where
532 Self: Sync + 'a,
533 Self::PublicationReceipt: 'a;
534
535 fn prepare_publication<'a>(
536 &'a self,
537 ) -> BoxFuture<'a, Result<Option<Self::PreparedPublication>, Self::Error>>
538 where
539 Self: Sync + 'a,
540 Self::PreparedPublication: 'a,
541 Self::Error: 'a;
542
543 fn flush_publication_with_receipt<'a>(
544 &'a self,
545 ) -> BoxFuture<'a, Result<Option<Self::PublicationReceipt>, Self::Error>>
546 where
547 Self: Sync + 'a,
548 Self::PublicationReceipt: 'a,
549 Self::Error: 'a;
550
551 fn flush_publication<'a>(&'a self) -> BoxFuture<'a, Result<(), Self::Error>>
552 where
553 Self: Sync + 'a,
554 Self::PublicationReceipt: 'a,
555 Self::Error: 'a,
556 {
557 Box::pin(async move { self.flush_publication_with_receipt().await.map(|_| ()) })
558 }
559}
560
561#[derive(Clone, Copy, Debug, Eq, PartialEq)]
563pub enum RangeMode {
564 Forward,
565 Reverse,
566}
567
568#[derive(Clone, Debug)]
569pub struct RangeChunk {
570 pub rows: Vec<(Key, Bytes)>,
572 pub detail: Option<proto_query::Detail>,
574}
575
576#[derive(Clone, Debug)]
577pub struct GetManyChunk {
578 pub entries: Vec<(Key, Option<Bytes>)>,
580 pub detail: Option<proto_query::Detail>,
582}
583
584pub struct RangeStream {
586 stream:
587 ConnectServerStream<hyper::body::Incoming, exoware_proto::query::RangeFrameView<'static>>,
588 pending_frame: Option<exoware_proto::query::RangeFrame>,
589 rows_seen: usize,
590 final_count: Option<usize>,
591 finished: bool,
592 observed_sequence: Option<Arc<AtomicU64>>,
593 key_prefix: Option<StoreKeyPrefix>,
594}
595
596impl RangeStream {
597 fn from_connect_stream(
598 stream: ConnectServerStream<
599 hyper::body::Incoming,
600 exoware_proto::query::RangeFrameView<'static>,
601 >,
602 observed_sequence: Option<Arc<AtomicU64>>,
603 key_prefix: Option<StoreKeyPrefix>,
604 ) -> Self {
605 Self {
606 stream,
607 pending_frame: None,
608 rows_seen: 0,
609 final_count: None,
610 finished: false,
611 observed_sequence,
612 key_prefix,
613 }
614 }
615
616 pub fn final_count(&self) -> Option<usize> {
617 self.final_count
618 }
619
620 async fn prefetch_first_frame(&mut self) -> Result<(), ConnectError> {
621 if self.pending_frame.is_some() || self.finished {
622 return Ok(());
623 }
624 match self.stream.message().await? {
625 Some(frame) => {
626 self.pending_frame = Some(frame.to_owned_message());
627 Ok(())
628 }
629 None => {
630 self.finished = true;
631 if let Some(err) = self.stream.error() {
632 Err(err.clone())
633 } else {
634 self.final_count = Some(self.rows_seen);
635 Ok(())
636 }
637 }
638 }
639 }
640
641 pub async fn next_chunk(&mut self) -> Result<Option<RangeChunk>, ClientError> {
642 loop {
643 if self.finished {
644 return Ok(None);
645 }
646
647 let frame = if let Some(frame) = self.pending_frame.take() {
648 frame
649 } else {
650 let Some(frame) = self
651 .stream
652 .message()
653 .await
654 .map_err(client_error_from_connect)?
655 else {
656 self.finished = true;
657 if let Some(err) = self.stream.error() {
658 return Err(client_error_from_connect(err.clone()));
659 }
660 self.final_count = Some(self.rows_seen);
661 return Ok(None);
662 };
663 frame.to_owned_message()
664 };
665
666 let detail = frame.detail.as_option().cloned();
667 if let (Some(sequence_store), Some(detail)) = (&self.observed_sequence, detail.as_ref())
668 {
669 sequence_store.fetch_max(detail.sequence_number, Ordering::SeqCst);
670 }
671 let n = frame.results.len();
672
673 if n == 0 && detail.is_none() {
675 continue;
676 }
677
678 let mut out = Vec::with_capacity(n);
679 for entry in &frame.results {
680 let key = Bytes::copy_from_slice(&entry.key);
681 let key = match self.key_prefix {
682 Some(prefix) => prefix.decode_key(&key)?,
683 None => key,
684 };
685 out.push((key, Bytes::copy_from_slice(&entry.value)));
686 }
687 self.rows_seen += n;
688 return Ok(Some(RangeChunk { rows: out, detail }));
689 }
690 }
691
692 pub async fn collect(mut self) -> Result<Vec<(Key, Bytes)>, ClientError> {
693 let mut entries = Vec::new();
694 while let Some(chunk) = self.next_chunk().await? {
695 entries.extend(chunk.rows);
696 }
697 Ok(entries)
698 }
699}
700
701pub struct GetManyStream {
702 stream:
703 ConnectServerStream<hyper::body::Incoming, exoware_proto::query::GetManyFrameView<'static>>,
704 pending_frame: Option<exoware_proto::query::GetManyFrame>,
705 finished: bool,
706 observed_sequence: Option<Arc<AtomicU64>>,
707 key_prefix: Option<StoreKeyPrefix>,
708}
709
710impl GetManyStream {
711 fn from_connect_stream(
712 stream: ConnectServerStream<
713 hyper::body::Incoming,
714 exoware_proto::query::GetManyFrameView<'static>,
715 >,
716 observed_sequence: Option<Arc<AtomicU64>>,
717 key_prefix: Option<StoreKeyPrefix>,
718 ) -> Self {
719 Self {
720 stream,
721 pending_frame: None,
722 finished: false,
723 observed_sequence,
724 key_prefix,
725 }
726 }
727
728 async fn prefetch_first_frame(&mut self) -> Result<(), ConnectError> {
729 if self.pending_frame.is_some() || self.finished {
730 return Ok(());
731 }
732 match self.stream.message().await? {
733 Some(frame) => {
734 self.pending_frame = Some(frame.to_owned_message());
735 Ok(())
736 }
737 None => {
738 self.finished = true;
739 if let Some(err) = self.stream.error() {
740 Err(err.clone())
741 } else {
742 Ok(())
743 }
744 }
745 }
746 }
747
748 pub async fn next_chunk(&mut self) -> Result<Option<GetManyChunk>, ClientError> {
749 loop {
750 if self.finished {
751 return Ok(None);
752 }
753 let frame = if let Some(frame) = self.pending_frame.take() {
754 frame
755 } else {
756 let Some(frame) = self
757 .stream
758 .message()
759 .await
760 .map_err(client_error_from_connect)?
761 else {
762 self.finished = true;
763 if let Some(err) = self.stream.error() {
764 return Err(client_error_from_connect(err.clone()));
765 }
766 return Ok(None);
767 };
768 frame.to_owned_message()
769 };
770
771 let detail = frame.detail.as_option().cloned();
772 if let (Some(sequence_store), Some(detail)) = (&self.observed_sequence, detail.as_ref())
773 {
774 sequence_store.fetch_max(detail.sequence_number, Ordering::SeqCst);
775 }
776 let n = frame.results.len();
777
778 if n == 0 && detail.is_none() {
780 continue;
781 }
782
783 let mut out = Vec::with_capacity(n);
784 for entry in &frame.results {
785 let key = Bytes::copy_from_slice(&entry.key);
786 let key = match self.key_prefix {
787 Some(prefix) => prefix.decode_key(&key)?,
788 None => key,
789 };
790 let value = entry.value.as_ref().map(|v| Bytes::copy_from_slice(v));
791 out.push((key, value));
792 }
793 return Ok(Some(GetManyChunk {
794 entries: out,
795 detail,
796 }));
797 }
798 }
799
800 pub async fn collect(mut self) -> Result<HashMap<Key, Bytes>, ClientError> {
801 let mut map = HashMap::new();
802 while let Some(chunk) = self.next_chunk().await? {
803 for (key, value) in chunk.entries {
804 if let Some(v) = value {
805 map.insert(key, v);
806 }
807 }
808 }
809 Ok(map)
810 }
811}
812
813impl RangeMode {
814 fn to_proto(self) -> proto_query::TraversalMode {
815 match self {
816 Self::Forward => proto_query::TraversalMode::TRAVERSAL_MODE_FORWARD,
817 Self::Reverse => proto_query::TraversalMode::TRAVERSAL_MODE_REVERSE,
818 }
819 }
820}
821
822#[inline]
823fn key_prefix_mask(bits: u8) -> Result<u16, StoreKeyPrefixError> {
824 if bits > 16 {
825 return Err(StoreKeyPrefixError::ReservedBitsTooLarge {
826 reserved_bits: bits,
827 });
828 }
829 Ok(if bits == 0 {
830 0
831 } else if bits == 16 {
832 u16::MAX
833 } else {
834 (1u16 << bits) - 1
835 })
836}
837
838fn validate_prefix_bits(reserved_bits: u8, prefix: u16) -> Result<(), StoreKeyPrefixError> {
839 let mask = key_prefix_mask(reserved_bits)?;
840 if prefix > mask {
841 return Err(StoreKeyPrefixError::PrefixTooLarge {
842 reserved_bits,
843 prefix,
844 });
845 }
846 Ok(())
847}
848
849#[derive(Clone, Debug)]
853pub struct StreamSubscriptionEntry {
854 pub key: Key,
855 pub value: Bytes,
856}
857
858#[derive(Clone, Debug)]
860pub struct StreamSubscriptionFrame {
861 pub sequence_number: u64,
862 pub entries: Vec<StreamSubscriptionEntry>,
863}
864
865pub struct StreamSubscription {
868 stream: ConnectServerStream<
869 hyper::body::Incoming,
870 exoware_proto::store::stream::v1::SubscribeResponseView<'static>,
871 >,
872 key_prefix: Option<StoreKeyPrefix>,
873 logical_filter: Option<ClientStreamFilter>,
874}
875
876impl std::fmt::Debug for StreamSubscription {
877 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
878 f.debug_struct("StreamSubscription").finish_non_exhaustive()
879 }
880}
881
882impl StreamSubscription {
883 pub async fn next(&mut self) -> Result<Option<StreamSubscriptionFrame>, ClientError> {
885 loop {
886 match self
887 .stream
888 .message()
889 .await
890 .map_err(client_error_from_connect)?
891 {
892 Some(view) => {
893 let owned = view.to_owned_message();
894 let mut entries = Vec::with_capacity(owned.entries.len());
895 for entry in owned.entries {
896 let key = Bytes::from(entry.key);
897 let key = match self.key_prefix {
898 Some(prefix) => prefix.decode_key(&key)?,
899 None => key,
900 };
901 let value = entry.value;
902 if self
903 .logical_filter
904 .as_ref()
905 .is_none_or(|filter| filter.matches(&key, value.as_ref()))
906 {
907 entries.push(StreamSubscriptionEntry { key, value });
908 }
909 }
910 if entries.is_empty() {
911 continue;
912 }
913 let frame = StreamSubscriptionFrame {
914 sequence_number: owned.sequence_number,
915 entries,
916 };
917 return Ok(Some(frame));
918 }
919 None => {
920 if let Some(err) = self.stream.error() {
921 return Err(client_error_from_connect(err.clone()));
922 } else {
923 return Ok(None);
924 }
925 }
926 }
927 }
928 }
929}
930
931#[derive(Clone)]
932struct ClientKeyMatcher {
933 codec: KeyCodec,
934 regex: regex::bytes::Regex,
935}
936
937#[derive(Clone)]
938struct ClientStreamFilter {
939 keys: Vec<ClientKeyMatcher>,
940 values: Option<crate::stream_filter::CompiledBytesFilters>,
941}
942
943impl ClientStreamFilter {
944 fn compile(filter: &crate::stream_filter::StreamFilter) -> Result<Self, ClientError> {
945 crate::stream_filter::validate_filter(filter)
946 .map_err(|e| ClientError::WireFormat(e.to_string()))?;
947 let keys = filter
948 .match_keys
949 .iter()
950 .map(|mk| {
951 let regex = crate::match_key::compile_payload_regex(&mk.payload_regex)
952 .map_err(|e| ClientError::WireFormat(e.to_string()))?;
953 Ok(ClientKeyMatcher {
954 codec: KeyCodec::new(mk.reserved_bits, mk.prefix),
955 regex,
956 })
957 })
958 .collect::<Result<Vec<_>, ClientError>>()?;
959 let values = crate::stream_filter::CompiledBytesFilters::compile(&filter.value_filters)
960 .map_err(ClientError::WireFormat)?;
961 Ok(Self { keys, values })
962 }
963
964 fn matches(&self, key: &Key, value: &[u8]) -> bool {
965 if !self
966 .values
967 .as_ref()
968 .is_none_or(|filter| filter.matches(value))
969 {
970 return false;
971 }
972 self.keys.iter().any(|matcher| {
973 if !matcher.codec.matches(key) {
974 return false;
975 }
976 let payload_len = matcher.codec.payload_capacity_bytes_for_key_len(key.len());
977 matcher
978 .codec
979 .read_payload(key, 0, payload_len)
980 .is_ok_and(|payload| matcher.regex.is_match(&payload))
981 })
982 }
983}
984
985fn is_batch_missing_error(err: &ConnectError) -> bool {
988 match proto_decode_connect_error(err) {
989 Ok(decoded) => decoded.error_info.is_some_and(|info| {
990 info.domain == "store.stream"
991 && matches!(info.reason.as_str(), "BATCH_EVICTED" | "BATCH_NOT_FOUND")
992 }),
993 Err(_) => false,
994 }
995}
996
997#[derive(Clone, Copy, Debug)]
999pub struct RetryConfig {
1000 max_attempts: usize,
1001 initial_backoff: Duration,
1002 max_backoff: Duration,
1003}
1004
1005impl RetryConfig {
1006 pub fn standard() -> Self {
1007 Self {
1008 max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
1009 initial_backoff: Duration::from_millis(DEFAULT_RETRY_INITIAL_BACKOFF_MS),
1010 max_backoff: Duration::from_millis(DEFAULT_RETRY_MAX_BACKOFF_MS),
1011 }
1012 }
1013
1014 pub fn disabled() -> Self {
1015 Self::standard().with_max_attempts(1)
1016 }
1017
1018 pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
1019 self.max_attempts = max_attempts.max(1);
1020 self
1021 }
1022
1023 pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self {
1024 self.initial_backoff = initial_backoff;
1025 self
1026 }
1027
1028 pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self {
1029 self.max_backoff = max_backoff;
1030 self
1031 }
1032
1033 pub(crate) fn sanitized(self) -> Self {
1034 let max_attempts = self.max_attempts.max(1);
1035 let max_backoff = self.max_backoff.max(self.initial_backoff);
1036 Self {
1037 max_attempts,
1038 initial_backoff: self.initial_backoff,
1039 max_backoff,
1040 }
1041 }
1042}
1043
1044impl Default for RetryConfig {
1045 fn default() -> Self {
1046 Self::standard()
1047 }
1048}
1049
1050fn trim_connect_base(url: &str) -> String {
1051 url.trim_end_matches('/').to_string()
1052}
1053
1054fn new_http_client() -> reqwest::Client {
1055 reqwest::Client::builder()
1056 .pool_max_idle_per_host(32)
1057 .timeout(Duration::from_secs(30))
1058 .build()
1059 .expect("failed to build HTTP client")
1060}
1061
1062#[derive(Debug, thiserror::Error)]
1064pub enum ClientBuildError {
1065 #[error("StoreClientBuilder: missing health URL (set health_url or url)")]
1066 MissingHealthUrl,
1067 #[error("StoreClientBuilder: missing ingest URL (set ingest_url or url)")]
1068 MissingIngestUrl,
1069 #[error("StoreClientBuilder: missing query URL (set query_url or url)")]
1070 MissingQueryUrl,
1071 #[error("StoreClientBuilder: missing compact URL (set compact_url or url)")]
1072 MissingCompactUrl,
1073 #[error("StoreClientBuilder: missing stream URL (set stream_url or url)")]
1074 MissingStreamUrl,
1075 #[error("StoreClientBuilder: invalid URL \"{url}\": {source}")]
1076 InvalidUrl {
1077 url: String,
1078 source: http::uri::InvalidUri,
1079 },
1080}
1081
1082#[derive(Debug, Default)]
1087pub struct StoreClientBuilder {
1088 health_url: Option<String>,
1089 ingest_url: Option<String>,
1090 query_url: Option<String>,
1091 compact_url: Option<String>,
1092 stream_url: Option<String>,
1093 key_prefix: Option<StoreKeyPrefix>,
1094 retry_config: RetryConfig,
1095 connect_request_compression: ConnectRequestCompression,
1096}
1097
1098impl StoreClientBuilder {
1099 pub fn url(mut self, url: &str) -> Self {
1101 let u = trim_connect_base(url);
1102 self.health_url = Some(u.clone());
1103 self.ingest_url = Some(u.clone());
1104 self.query_url = Some(u.clone());
1105 self.compact_url = Some(u.clone());
1106 self.stream_url = Some(u);
1107 self
1108 }
1109
1110 pub fn health_url(mut self, url: &str) -> Self {
1112 self.health_url = Some(trim_connect_base(url));
1113 self
1114 }
1115
1116 pub fn ingest_url(mut self, url: &str) -> Self {
1118 self.ingest_url = Some(trim_connect_base(url));
1119 self
1120 }
1121
1122 pub fn query_url(mut self, url: &str) -> Self {
1124 self.query_url = Some(trim_connect_base(url));
1125 self
1126 }
1127
1128 pub fn compact_url(mut self, url: &str) -> Self {
1130 self.compact_url = Some(trim_connect_base(url));
1131 self
1132 }
1133
1134 pub fn stream_url(mut self, url: &str) -> Self {
1136 self.stream_url = Some(trim_connect_base(url));
1137 self
1138 }
1139
1140 pub fn key_prefix(mut self, prefix: StoreKeyPrefix) -> Self {
1142 self.key_prefix = Some(prefix);
1143 self
1144 }
1145
1146 pub fn retry_config(mut self, retry: RetryConfig) -> Self {
1148 self.retry_config = retry.sanitized();
1149 self
1150 }
1151
1152 pub fn connect_request_compression(mut self, compression: ConnectRequestCompression) -> Self {
1154 self.connect_request_compression = compression;
1155 self
1156 }
1157
1158 pub fn build(self) -> Result<StoreClient, ClientBuildError> {
1160 let health_url = self.health_url.ok_or(ClientBuildError::MissingHealthUrl)?;
1161 let ingest_url = self.ingest_url.ok_or(ClientBuildError::MissingIngestUrl)?;
1162 let query_url = self.query_url.ok_or(ClientBuildError::MissingQueryUrl)?;
1163 let compact_url = self
1164 .compact_url
1165 .ok_or(ClientBuildError::MissingCompactUrl)?;
1166 let stream_url = self.stream_url.ok_or(ClientBuildError::MissingStreamUrl)?;
1167 let ingest_uri: http::Uri =
1168 ingest_url
1169 .parse()
1170 .map_err(|e| ClientBuildError::InvalidUrl {
1171 url: ingest_url.clone(),
1172 source: e,
1173 })?;
1174 let query_uri: http::Uri = query_url
1175 .parse()
1176 .map_err(|e| ClientBuildError::InvalidUrl {
1177 url: query_url.clone(),
1178 source: e,
1179 })?;
1180 let compact_uri: http::Uri =
1181 compact_url
1182 .parse()
1183 .map_err(|e| ClientBuildError::InvalidUrl {
1184 url: compact_url.clone(),
1185 source: e,
1186 })?;
1187 let stream_uri: http::Uri =
1188 stream_url
1189 .parse()
1190 .map_err(|e| ClientBuildError::InvalidUrl {
1191 url: stream_url.clone(),
1192 source: e,
1193 })?;
1194 Ok(StoreClient {
1195 health_url,
1196 ingest_uri,
1197 query_uri,
1198 compact_uri,
1199 stream_uri,
1200 http: new_http_client(),
1201 connect_http: ProtoPreferZstdHttpClient::plaintext(),
1202 retry_config: self.retry_config,
1203 connect_request_compression: self.connect_request_compression,
1204 key_prefix: self.key_prefix,
1205 })
1206 }
1207}
1208
1209#[derive(Clone, Debug)]
1211pub struct StoreClient {
1212 pub(crate) health_url: String,
1214 ingest_uri: http::Uri,
1215 query_uri: http::Uri,
1216 compact_uri: http::Uri,
1217 stream_uri: http::Uri,
1218 http: reqwest::Client,
1219 connect_http: ProtoPreferZstdHttpClient,
1220 retry_config: RetryConfig,
1221 connect_request_compression: ConnectRequestCompression,
1222 key_prefix: Option<StoreKeyPrefix>,
1223}
1224
1225#[derive(Clone, Debug)]
1240pub struct SerializableReadSession {
1241 client: StoreClient,
1242 state: Arc<SessionState>,
1243}
1244
1245#[derive(Debug)]
1246struct SessionState {
1247 sequence: Arc<AtomicU64>,
1248 init_gate: tokio::sync::Mutex<()>,
1249}
1250
1251impl SessionState {
1252 fn fixed_sequence(&self) -> Option<u64> {
1253 let sequence = self.sequence.load(Ordering::Acquire);
1254 (sequence > 0).then_some(sequence)
1255 }
1256}
1257
1258#[derive(Default)]
1259struct RangeStreamReadOptions {
1260 min_sequence_number: Option<u64>,
1261 observed_sequence: Option<Arc<AtomicU64>>,
1262}
1263
1264impl StoreClient {
1265 pub fn builder() -> StoreClientBuilder {
1267 StoreClientBuilder::default()
1268 }
1269
1270 pub fn new(url: &str) -> Self {
1271 Self::with_retry_config(url, RetryConfig::standard())
1272 }
1273
1274 pub fn with_retry_config(url: &str, retry_config: RetryConfig) -> Self {
1275 Self::builder()
1276 .url(url)
1277 .retry_config(retry_config)
1278 .build()
1279 .expect("url sets all service URLs")
1280 }
1281
1282 pub fn key_prefix(&self) -> Option<StoreKeyPrefix> {
1284 self.key_prefix
1285 }
1286
1287 pub fn with_key_prefix(&self, prefix: StoreKeyPrefix) -> Self {
1289 let mut out = self.clone();
1290 out.key_prefix = Some(prefix);
1291 out
1292 }
1293
1294 pub fn without_key_prefix(&self) -> Self {
1296 let mut out = self.clone();
1297 out.key_prefix = None;
1298 out
1299 }
1300
1301 pub fn encode_store_key(&self, key: &Key) -> Result<Key, ClientError> {
1303 match self.key_prefix {
1304 Some(prefix) => Ok(prefix.encode_key(key)?),
1305 None => Ok(Bytes::copy_from_slice(key)),
1306 }
1307 }
1308
1309 pub fn decode_store_key(&self, key: &Key) -> Result<Key, ClientError> {
1311 match self.key_prefix {
1312 Some(prefix) => Ok(prefix.decode_key(key)?),
1313 None => Ok(Bytes::copy_from_slice(key)),
1314 }
1315 }
1316
1317 fn encode_store_range(&self, start: &Key, end: &Key) -> Result<(Key, Key), ClientError> {
1318 match self.key_prefix {
1319 Some(prefix) => Ok(prefix.encode_range(start, end)?),
1320 None => Ok((Bytes::copy_from_slice(start), Bytes::copy_from_slice(end))),
1321 }
1322 }
1323
1324 pub fn connect_request_compression(&self) -> ConnectRequestCompression {
1326 self.connect_request_compression
1327 }
1328
1329 pub fn decode_error_details(
1330 err: &ConnectError,
1331 ) -> Result<exoware_proto::DecodedConnectError, buffa::DecodeError> {
1332 proto_decode_connect_error(err)
1333 }
1334
1335 pub fn create_session(&self) -> SerializableReadSession {
1341 self.create_session_with_sequence(0)
1342 }
1343
1344 pub fn create_session_with_sequence(&self, sequence: u64) -> SerializableReadSession {
1350 SerializableReadSession {
1351 client: self.clone(),
1352 state: Arc::new(SessionState {
1353 sequence: Arc::new(AtomicU64::new(sequence)),
1354 init_gate: tokio::sync::Mutex::new(()),
1355 }),
1356 }
1357 }
1358
1359 pub fn ingest(&self) -> Ingest<'_> {
1361 Ingest { c: self }
1362 }
1363
1364 pub fn query(&self) -> Query<'_> {
1366 Query { c: self }
1367 }
1368
1369 pub fn compact(&self) -> Compact<'_> {
1371 Compact { c: self }
1372 }
1373
1374 pub fn stream(&self) -> Stream<'_> {
1376 Stream { c: self }
1377 }
1378
1379 pub(crate) async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
1386 if self.key_prefix.is_none() {
1387 return self.put_physical(kvs).await;
1388 }
1389 let mut keys = Vec::with_capacity(kvs.len());
1390 for (key, _) in kvs {
1391 keys.push(self.encode_store_key(key)?);
1392 }
1393 let prefixed: Vec<(&Key, &[u8])> = keys
1394 .iter()
1395 .zip(kvs.iter())
1396 .map(|(key, (_, value))| (key, *value))
1397 .collect();
1398 self.put_physical(&prefixed).await
1399 }
1400
1401 async fn put_physical(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
1402 let mut proto_kvs = Vec::with_capacity(kvs.len());
1403 for (key, value) in kvs {
1404 if !is_valid_key_size(key.len()) {
1405 return Err(ClientError::WireFormat(format!(
1406 "key length {} is outside valid store key range ({}..={})",
1407 key.len(),
1408 keys::MIN_KEY_LEN,
1409 MAX_KEY_LEN
1410 )));
1411 }
1412 proto_kvs.push(exoware_proto::common::KvEntry {
1413 key: key.to_vec(),
1414 value: Bytes::copy_from_slice(value),
1415 ..Default::default()
1416 });
1417 }
1418
1419 let config =
1420 store_connect_client_config(self.ingest_uri.clone(), self.connect_request_compression);
1421 let client = IngestServiceClient::new(self.connect_http.clone(), config);
1422 let response = client
1423 .put(ProtoPutRequest {
1424 kvs: proto_kvs,
1425 ..Default::default()
1426 })
1427 .await
1428 .map_err(client_error_from_connect)?;
1429 Ok(response.into_owned().sequence_number)
1430 }
1431
1432 pub(crate) async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
1433 self.get_internal(key, None).await
1434 }
1435
1436 pub(crate) async fn get_with_min_sequence_number(
1437 &self,
1438 key: &Key,
1439 min_sequence_number: u64,
1440 ) -> Result<Option<Bytes>, ClientError> {
1441 self.get_internal(key, Some(min_sequence_number)).await
1442 }
1443
1444 async fn get_internal(
1445 &self,
1446 key: &Key,
1447 min_sequence_number: Option<u64>,
1448 ) -> Result<Option<Bytes>, ClientError> {
1449 let (response, _detail) = self
1450 .send_get(key, self.normalize_min_sequence_number(min_sequence_number))
1451 .await?;
1452 Ok(response.value)
1453 }
1454
1455 pub(crate) async fn get_many(
1456 &self,
1457 keys: &[&Key],
1458 batch_size: u32,
1459 ) -> Result<GetManyStream, ClientError> {
1460 self.get_many_internal(keys, batch_size, None, None).await
1461 }
1462
1463 pub(crate) async fn get_many_with_min_sequence_number(
1464 &self,
1465 keys: &[&Key],
1466 batch_size: u32,
1467 min_sequence_number: u64,
1468 ) -> Result<GetManyStream, ClientError> {
1469 self.get_many_internal(keys, batch_size, Some(min_sequence_number), None)
1470 .await
1471 }
1472
1473 async fn get_many_internal(
1474 &self,
1475 keys: &[&Key],
1476 batch_size: u32,
1477 min_sequence_number: Option<u64>,
1478 observed_sequence: Option<Arc<AtomicU64>>,
1479 ) -> Result<GetManyStream, ClientError> {
1480 for key in keys {
1481 if !is_valid_key_size(key.len()) {
1482 return Err(ClientError::WireFormat(format!(
1483 "key length {} is outside valid store key range ({}..={})",
1484 key.len(),
1485 keys::MIN_KEY_LEN,
1486 MAX_KEY_LEN
1487 )));
1488 }
1489 }
1490
1491 let config =
1492 store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1493 let client = QueryServiceClient::new(self.connect_http.clone(), config);
1494 let proto_keys: Vec<Vec<u8>> = keys
1495 .iter()
1496 .map(|k| match self.key_prefix {
1497 Some(_) => self.encode_store_key(k).map(|key| key.to_vec()),
1498 None => Ok(k.to_vec()),
1499 })
1500 .collect::<Result<Vec<_>, _>>()?;
1501 let effective_min = self.normalize_min_sequence_number(min_sequence_number);
1502 let max_attempts = self.retry_config.max_attempts.max(1);
1503 let mut attempt = 1usize;
1504 loop {
1505 match client
1506 .get_many(ProtoGetManyRequest {
1507 keys: proto_keys.clone(),
1508 min_sequence_number: effective_min,
1509 batch_size,
1510 ..Default::default()
1511 })
1512 .await
1513 {
1514 Ok(stream) => {
1515 let mut gms = GetManyStream::from_connect_stream(
1516 stream,
1517 observed_sequence.clone(),
1518 self.key_prefix,
1519 );
1520 if let Err(err) = gms.prefetch_first_frame().await {
1521 if attempt < max_attempts && is_retryable_error(&err) {
1522 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1523 tokio::time::sleep(delay).await;
1524 attempt += 1;
1525 continue;
1526 }
1527 return Err(client_error_from_connect(err));
1528 }
1529 return Ok(gms);
1530 }
1531 Err(err) => {
1532 if attempt < max_attempts && is_retryable_error(&err) {
1533 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1534 tokio::time::sleep(delay).await;
1535 attempt += 1;
1536 continue;
1537 }
1538 return Err(client_error_from_connect(err));
1539 }
1540 }
1541 }
1542 }
1543
1544 pub(crate) async fn range(
1547 &self,
1548 start: &Key,
1549 end: &Key,
1550 limit: usize,
1551 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1552 self.range_internal(start, end, limit, RangeMode::Forward, None)
1553 .await
1554 }
1555
1556 pub(crate) async fn range_with_mode(
1558 &self,
1559 start: &Key,
1560 end: &Key,
1561 limit: usize,
1562 mode: RangeMode,
1563 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1564 self.range_internal(start, end, limit, mode, None).await
1565 }
1566
1567 pub(crate) async fn range_with_min_sequence_number(
1568 &self,
1569 start: &Key,
1570 end: &Key,
1571 limit: usize,
1572 min_sequence_number: u64,
1573 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1574 self.range_internal(
1575 start,
1576 end,
1577 limit,
1578 RangeMode::Forward,
1579 Some(min_sequence_number),
1580 )
1581 .await
1582 }
1583
1584 pub(crate) async fn range_with_mode_and_min_sequence_number(
1585 &self,
1586 start: &Key,
1587 end: &Key,
1588 limit: usize,
1589 mode: RangeMode,
1590 min_sequence_number: u64,
1591 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1592 self.range_internal(start, end, limit, mode, Some(min_sequence_number))
1593 .await
1594 }
1595
1596 pub(crate) async fn range_stream(
1597 &self,
1598 start: &Key,
1599 end: &Key,
1600 limit: usize,
1601 batch_size: usize,
1602 ) -> Result<RangeStream, ClientError> {
1603 self.range_stream_internal(
1604 start,
1605 end,
1606 limit,
1607 batch_size,
1608 RangeMode::Forward,
1609 RangeStreamReadOptions::default(),
1610 )
1611 .await
1612 }
1613
1614 pub(crate) async fn range_stream_with_mode(
1615 &self,
1616 start: &Key,
1617 end: &Key,
1618 limit: usize,
1619 batch_size: usize,
1620 mode: RangeMode,
1621 ) -> Result<RangeStream, ClientError> {
1622 self.range_stream_internal(start, end, limit, batch_size, mode, Default::default())
1623 .await
1624 }
1625
1626 pub(crate) async fn range_stream_with_min_sequence_number(
1627 &self,
1628 start: &Key,
1629 end: &Key,
1630 limit: usize,
1631 batch_size: usize,
1632 min_sequence_number: u64,
1633 ) -> Result<RangeStream, ClientError> {
1634 self.range_stream_internal(
1635 start,
1636 end,
1637 limit,
1638 batch_size,
1639 RangeMode::Forward,
1640 RangeStreamReadOptions {
1641 min_sequence_number: Some(min_sequence_number),
1642 observed_sequence: None,
1643 },
1644 )
1645 .await
1646 }
1647
1648 pub(crate) async fn range_stream_with_mode_and_min_sequence_number(
1649 &self,
1650 start: &Key,
1651 end: &Key,
1652 limit: usize,
1653 batch_size: usize,
1654 mode: RangeMode,
1655 min_sequence_number: u64,
1656 ) -> Result<RangeStream, ClientError> {
1657 self.range_stream_internal(
1658 start,
1659 end,
1660 limit,
1661 batch_size,
1662 mode,
1663 RangeStreamReadOptions {
1664 min_sequence_number: Some(min_sequence_number),
1665 observed_sequence: None,
1666 },
1667 )
1668 .await
1669 }
1670
1671 pub(crate) async fn range_reduce(
1672 &self,
1673 start: &Key,
1674 end: &Key,
1675 request: &DomainRangeReduceRequest,
1676 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
1677 let (response, _) = self
1678 .range_reduce_response_internal(start, end, request, None)
1679 .await?;
1680 let decoded = proto_to_domain_reduce_response(response).map_err(ClientError::WireFormat)?;
1681 if !decoded.groups.is_empty() {
1682 return Err(ClientError::WireFormat(
1683 "grouped range reduction response returned for scalar request".to_string(),
1684 ));
1685 }
1686 Ok(decoded
1687 .results
1688 .iter()
1689 .map(|result| result.value.clone())
1690 .collect())
1691 }
1692
1693 pub(crate) async fn range_reduce_with_min_sequence_number(
1694 &self,
1695 start: &Key,
1696 end: &Key,
1697 request: &DomainRangeReduceRequest,
1698 min_sequence_number: u64,
1699 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
1700 let (response, _) = self
1701 .range_reduce_response_internal(start, end, request, Some(min_sequence_number))
1702 .await?;
1703 let decoded = proto_to_domain_reduce_response(response).map_err(ClientError::WireFormat)?;
1704 if !decoded.groups.is_empty() {
1705 return Err(ClientError::WireFormat(
1706 "grouped range reduction response returned for scalar request".to_string(),
1707 ));
1708 }
1709 Ok(decoded
1710 .results
1711 .iter()
1712 .map(|result| result.value.clone())
1713 .collect())
1714 }
1715
1716 pub(crate) async fn range_reduce_response(
1717 &self,
1718 start: &Key,
1719 end: &Key,
1720 request: &DomainRangeReduceRequest,
1721 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
1722 let (body, _) = self
1723 .range_reduce_response_internal(start, end, request, None)
1724 .await?;
1725 Ok(body)
1726 }
1727
1728 pub(crate) async fn range_reduce_response_with_min_sequence_number(
1729 &self,
1730 start: &Key,
1731 end: &Key,
1732 request: &DomainRangeReduceRequest,
1733 min_sequence_number: u64,
1734 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
1735 let (body, _) = self
1736 .range_reduce_response_internal(start, end, request, Some(min_sequence_number))
1737 .await?;
1738 Ok(body)
1739 }
1740
1741 pub(crate) async fn prune(
1742 &self,
1743 policies: &[crate::prune_policy::PrunePolicy],
1744 ) -> Result<(), ClientError> {
1745 let config =
1746 store_connect_client_config(self.compact_uri.clone(), self.connect_request_compression);
1747 let client = CompactServiceClient::new(self.connect_http.clone(), config);
1748 let policies = self.prefix_prune_policies(policies)?;
1749 client
1750 .prune(ProtoPruneRequest {
1751 policies: exoware_proto::prune_policies_to_proto(&policies),
1752 ..Default::default()
1753 })
1754 .await
1755 .map_err(client_error_from_connect)?;
1756 Ok(())
1757 }
1758
1759 pub async fn health(&self) -> Result<bool, ClientError> {
1760 let resp = self
1761 .http
1762 .get(format!("{}/health", self.health_url))
1763 .send()
1764 .await?;
1765 Ok(resp.status().is_success())
1766 }
1767
1768 pub async fn ready(&self) -> Result<bool, ClientError> {
1769 let resp = self
1770 .http
1771 .get(format!("{}/ready", self.health_url))
1772 .send()
1773 .await?;
1774 Ok(resp.status().is_success())
1775 }
1776
1777 fn normalize_min_sequence_number(&self, requested_sequence: Option<u64>) -> Option<u64> {
1778 requested_sequence.filter(|sequence| *sequence > 0)
1779 }
1780
1781 async fn send_get(
1782 &self,
1783 key: &Key,
1784 min_sequence_number: Option<u64>,
1785 ) -> Result<
1786 (
1787 exoware_proto::query::GetResponse,
1788 Option<proto_query::Detail>,
1789 ),
1790 ClientError,
1791 > {
1792 if !is_valid_key_size(key.len()) {
1793 return Err(ClientError::WireFormat(format!(
1794 "key length {} is outside valid store key range ({}..={})",
1795 key.len(),
1796 keys::MIN_KEY_LEN,
1797 MAX_KEY_LEN
1798 )));
1799 }
1800 let key = self.encode_store_key(key)?;
1801
1802 let config =
1803 store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1804 let client = QueryServiceClient::new(self.connect_http.clone(), config);
1805 let response = self
1806 .send_with_retry(|| async {
1807 client
1808 .get(ProtoGetRequest {
1809 key: key.clone().into(),
1810 min_sequence_number,
1811 ..Default::default()
1812 })
1813 .await
1814 })
1815 .await?;
1816 let owned = response.into_owned();
1817 let detail = owned.detail.as_option().cloned();
1818 Ok((owned, detail))
1819 }
1820
1821 #[cfg(test)]
1822 pub async fn send_get_for_tests(
1823 &self,
1824 key: &Key,
1825 min_sequence_number: Option<u64>,
1826 ) -> Result<
1827 (
1828 exoware_proto::query::GetResponse,
1829 Option<proto_query::Detail>,
1830 ),
1831 ClientError,
1832 > {
1833 self.send_get(key, min_sequence_number).await
1834 }
1835
1836 async fn range_internal(
1837 &self,
1838 start: &Key,
1839 end: &Key,
1840 limit: usize,
1841 mode: RangeMode,
1842 min_sequence_number: Option<u64>,
1843 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1844 let stream = self
1845 .range_stream_internal(
1846 start,
1847 end,
1848 limit,
1849 limit.max(1),
1850 mode,
1851 RangeStreamReadOptions {
1852 min_sequence_number,
1853 observed_sequence: None,
1854 },
1855 )
1856 .await?;
1857 stream.collect().await
1858 }
1859
1860 async fn range_stream_internal(
1861 &self,
1862 start: &Key,
1863 end: &Key,
1864 limit: usize,
1865 batch_size: usize,
1866 mode: RangeMode,
1867 options: RangeStreamReadOptions,
1868 ) -> Result<RangeStream, ClientError> {
1869 if !is_valid_key_size(start.len()) || !is_valid_key_size(end.len()) {
1870 return Err(ClientError::WireFormat(
1871 "range start/end key length is outside valid store key range".to_string(),
1872 ));
1873 }
1874 if batch_size == 0 {
1875 return Err(ClientError::WireFormat(
1876 "batch_size must be positive".to_string(),
1877 ));
1878 }
1879 let (start, end) = self.encode_store_range(start, end)?;
1880
1881 let config =
1882 store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1883 let client = QueryServiceClient::new(self.connect_http.clone(), config);
1884 let min_sequence_number = self.normalize_min_sequence_number(options.min_sequence_number);
1885 let max_attempts = self.retry_config.max_attempts.max(1);
1886 let mut attempt = 1usize;
1887 loop {
1888 let response = match client
1893 .range(ProtoRangeRequest {
1894 start: start.clone().into(),
1895 end: end.clone().into(),
1896 limit: Some(u32::try_from(limit).unwrap_or(u32::MAX)),
1897 batch_size: u32::try_from(batch_size).unwrap_or(u32::MAX),
1898 mode: mode.to_proto().into(),
1899 min_sequence_number,
1900 ..Default::default()
1901 })
1902 .await
1903 {
1904 Ok(response) => response,
1905 Err(err) => {
1906 if attempt < max_attempts && is_retryable_error(&err) {
1907 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1908 tracing::debug!(
1909 attempt,
1910 max_attempts,
1911 code = err.code.as_str(),
1912 delay_ms = delay.as_millis() as u64,
1913 "store client retrying transient range-open error",
1914 );
1915 tokio::time::sleep(delay).await;
1916 attempt += 1;
1917 continue;
1918 }
1919 return Err(client_error_from_connect(err));
1920 }
1921 };
1922
1923 let mut stream = RangeStream::from_connect_stream(
1924 response,
1925 options.observed_sequence.clone(),
1926 self.key_prefix,
1927 );
1928 if let Err(err) = stream.prefetch_first_frame().await {
1929 if attempt < max_attempts && is_retryable_error(&err) {
1930 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1931 tracing::debug!(
1932 attempt,
1933 max_attempts,
1934 code = err.code.as_str(),
1935 delay_ms = delay.as_millis() as u64,
1936 "store client retrying transient stream-open error",
1937 );
1938 tokio::time::sleep(delay).await;
1939 attempt += 1;
1940 continue;
1941 }
1942 return Err(client_error_from_connect(err));
1943 }
1944 return Ok(stream);
1945 }
1946 }
1947
1948 async fn range_reduce_response_internal(
1949 &self,
1950 start: &Key,
1951 end: &Key,
1952 request: &DomainRangeReduceRequest,
1953 min_sequence_number: Option<u64>,
1954 ) -> Result<
1955 (
1956 exoware_proto::query::ReduceResponse,
1957 Option<proto_query::Detail>,
1958 ),
1959 ClientError,
1960 > {
1961 let config =
1962 store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1963 let client = QueryServiceClient::new(self.connect_http.clone(), config);
1964 let (start, end) = self.encode_store_range(start, end)?;
1965 let request = self.prefix_reduce_request(request)?;
1966 let proto_params = proto_to_proto_reduce_params(request);
1967 let min_sequence_number = self.normalize_min_sequence_number(min_sequence_number);
1968 let response = self
1969 .send_with_retry(|| async {
1970 client
1971 .reduce(ProtoWireReduceRequest {
1972 start: start.clone().into(),
1973 end: end.clone().into(),
1974 params: Some(proto_params.clone()).into(),
1975 min_sequence_number,
1976 ..Default::default()
1977 })
1978 .await
1979 })
1980 .await?;
1981 let owned = response.into_owned();
1982 let detail = owned.detail.as_option().cloned();
1983 Ok((owned, detail))
1984 }
1985
1986 fn prefix_prune_policies(
1987 &self,
1988 policies: &[crate::prune_policy::PrunePolicy],
1989 ) -> Result<Vec<crate::prune_policy::PrunePolicy>, ClientError> {
1990 let Some(prefix) = self.key_prefix else {
1991 return Ok(policies.to_vec());
1992 };
1993 policies
1994 .iter()
1995 .map(|policy| {
1996 use crate::prune_policy::{PolicyScope, PrunePolicy};
1997 let scope = match &policy.scope {
1998 PolicyScope::Keys(scope) => {
1999 let mut scope = scope.clone();
2000 scope.match_key = prefix.prefix_match_key(&scope.match_key)?;
2001 PolicyScope::Keys(scope)
2002 }
2003 PolicyScope::Sequence => PolicyScope::Sequence,
2004 };
2005 Ok(PrunePolicy {
2006 scope,
2007 retain: policy.retain.clone(),
2008 })
2009 })
2010 .collect::<Result<Vec<_>, StoreKeyPrefixError>>()
2011 .map_err(ClientError::from)
2012 }
2013
2014 fn prefix_stream_filter(
2015 &self,
2016 filter: crate::stream_filter::StreamFilter,
2017 ) -> Result<crate::stream_filter::StreamFilter, ClientError> {
2018 let Some(prefix) = self.key_prefix else {
2019 return Ok(filter);
2020 };
2021 let match_keys = filter
2022 .match_keys
2023 .iter()
2024 .map(|mk| prefix.prefix_stream_match_key(mk))
2025 .collect::<Result<Vec<_>, _>>()?;
2026 Ok(crate::stream_filter::StreamFilter {
2027 match_keys,
2028 value_filters: filter.value_filters,
2029 })
2030 }
2031
2032 fn prefix_reduce_request(
2033 &self,
2034 request: &DomainRangeReduceRequest,
2035 ) -> Result<DomainRangeReduceRequest, ClientError> {
2036 let Some(prefix) = self.key_prefix else {
2037 return Ok(request.clone());
2038 };
2039 let mut request = request.clone();
2040 shift_reduce_request_key_offsets(prefix.reserved_bits(), &mut request)?;
2041 Ok(request)
2042 }
2043
2044 async fn send_with_retry<F, Fut, T>(&self, mut make_request: F) -> Result<T, ClientError>
2045 where
2046 F: FnMut() -> Fut,
2047 Fut: std::future::Future<Output = Result<T, ConnectError>>,
2048 {
2049 let max_attempts = self.retry_config.max_attempts.max(1);
2050 let mut attempt = 1usize;
2051 loop {
2052 match make_request().await {
2053 Ok(response) => return Ok(response),
2054 Err(err) => {
2055 if attempt < max_attempts && is_retryable_error(&err) {
2056 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
2057 tracing::debug!(
2058 attempt,
2059 max_attempts,
2060 code = err.code.as_str(),
2061 delay_ms = delay.as_millis() as u64,
2062 "store client retrying transient RPC error",
2063 );
2064 tokio::time::sleep(delay).await;
2065 attempt += 1;
2066 continue;
2067 }
2068 return Err(client_error_from_connect(err));
2069 }
2070 }
2071 }
2072 }
2073}
2074
2075fn shift_reduce_request_key_offsets(
2076 prefix_bits: u8,
2077 request: &mut DomainRangeReduceRequest,
2078) -> Result<(), StoreKeyPrefixError> {
2079 for reducer in &mut request.reducers {
2080 if let Some(expr) = &mut reducer.expr {
2081 shift_expr_key_offsets(prefix_bits, expr)?;
2082 }
2083 }
2084 for expr in &mut request.group_by {
2085 shift_expr_key_offsets(prefix_bits, expr)?;
2086 }
2087 if let Some(filter) = &mut request.filter {
2088 for check in &mut filter.checks {
2089 shift_field_ref_key_offset(prefix_bits, &mut check.field)?;
2090 }
2091 }
2092 Ok(())
2093}
2094
2095fn shift_expr_key_offsets(prefix_bits: u8, expr: &mut KvExpr) -> Result<(), StoreKeyPrefixError> {
2096 match expr {
2097 KvExpr::Field(field) => shift_field_ref_key_offset(prefix_bits, field),
2098 KvExpr::Literal(_) => Ok(()),
2099 KvExpr::Add(left, right)
2100 | KvExpr::Sub(left, right)
2101 | KvExpr::Mul(left, right)
2102 | KvExpr::Div(left, right) => {
2103 shift_expr_key_offsets(prefix_bits, left)?;
2104 shift_expr_key_offsets(prefix_bits, right)
2105 }
2106 KvExpr::Lower(inner) | KvExpr::DateTruncDay(inner) => {
2107 shift_expr_key_offsets(prefix_bits, inner)
2108 }
2109 }
2110}
2111
2112fn shift_field_ref_key_offset(
2113 prefix_bits: u8,
2114 field: &mut KvFieldRef,
2115) -> Result<(), StoreKeyPrefixError> {
2116 match field {
2117 KvFieldRef::Key { bit_offset, .. } | KvFieldRef::ZOrderKey { bit_offset, .. } => {
2118 *bit_offset = bit_offset.checked_add(u16::from(prefix_bits)).ok_or(
2119 StoreKeyPrefixError::BitOffsetOverflow {
2120 offset: *bit_offset,
2121 prefix_bits,
2122 },
2123 )?;
2124 Ok(())
2125 }
2126 KvFieldRef::Value { .. } => Ok(()),
2127 }
2128}
2129
2130#[derive(Clone, Copy, Debug)]
2133pub struct Ingest<'a> {
2134 c: &'a StoreClient,
2135}
2136
2137#[derive(Clone, Copy, Debug)]
2138pub struct Query<'a> {
2139 c: &'a StoreClient,
2140}
2141
2142#[derive(Clone, Copy, Debug)]
2143pub struct Compact<'a> {
2144 c: &'a StoreClient,
2145}
2146
2147#[derive(Clone, Copy, Debug)]
2148pub struct Stream<'a> {
2149 c: &'a StoreClient,
2150}
2151
2152impl<'a> Ingest<'a> {
2153 pub async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
2154 self.c.put(kvs).await
2155 }
2156
2157 pub async fn put_prepared(&self, batch: &StoreWriteBatch) -> Result<u64, ClientError> {
2160 batch.commit(self.c).await
2161 }
2162}
2163
2164impl<'a> Query<'a> {
2165 pub async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
2166 self.c.get(key).await
2167 }
2168
2169 pub async fn get_with_min_sequence_number(
2170 &self,
2171 key: &Key,
2172 min_sequence_number: u64,
2173 ) -> Result<Option<Bytes>, ClientError> {
2174 self.c
2175 .get_with_min_sequence_number(key, min_sequence_number)
2176 .await
2177 }
2178
2179 pub async fn get_many(
2180 &self,
2181 keys: &[&Key],
2182 batch_size: u32,
2183 ) -> Result<GetManyStream, ClientError> {
2184 self.c.get_many(keys, batch_size).await
2185 }
2186
2187 pub async fn get_many_with_min_sequence_number(
2188 &self,
2189 keys: &[&Key],
2190 batch_size: u32,
2191 min_sequence_number: u64,
2192 ) -> Result<GetManyStream, ClientError> {
2193 self.c
2194 .get_many_with_min_sequence_number(keys, batch_size, min_sequence_number)
2195 .await
2196 }
2197
2198 pub async fn range(
2200 &self,
2201 start: &Key,
2202 end: &Key,
2203 limit: usize,
2204 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2205 self.c.range(start, end, limit).await
2206 }
2207
2208 pub async fn range_with_mode(
2209 &self,
2210 start: &Key,
2211 end: &Key,
2212 limit: usize,
2213 mode: RangeMode,
2214 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2215 self.c.range_with_mode(start, end, limit, mode).await
2216 }
2217
2218 pub async fn range_with_min_sequence_number(
2219 &self,
2220 start: &Key,
2221 end: &Key,
2222 limit: usize,
2223 min_sequence_number: u64,
2224 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2225 self.c
2226 .range_with_min_sequence_number(start, end, limit, min_sequence_number)
2227 .await
2228 }
2229
2230 pub async fn range_with_mode_and_min_sequence_number(
2231 &self,
2232 start: &Key,
2233 end: &Key,
2234 limit: usize,
2235 mode: RangeMode,
2236 min_sequence_number: u64,
2237 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2238 self.c
2239 .range_with_mode_and_min_sequence_number(start, end, limit, mode, min_sequence_number)
2240 .await
2241 }
2242
2243 pub async fn range_stream(
2244 &self,
2245 start: &Key,
2246 end: &Key,
2247 limit: usize,
2248 batch_size: usize,
2249 ) -> Result<RangeStream, ClientError> {
2250 self.c.range_stream(start, end, limit, batch_size).await
2251 }
2252
2253 pub async fn range_stream_with_mode(
2254 &self,
2255 start: &Key,
2256 end: &Key,
2257 limit: usize,
2258 batch_size: usize,
2259 mode: RangeMode,
2260 ) -> Result<RangeStream, ClientError> {
2261 self.c
2262 .range_stream_with_mode(start, end, limit, batch_size, mode)
2263 .await
2264 }
2265
2266 pub async fn range_stream_with_min_sequence_number(
2267 &self,
2268 start: &Key,
2269 end: &Key,
2270 limit: usize,
2271 batch_size: usize,
2272 min_sequence_number: u64,
2273 ) -> Result<RangeStream, ClientError> {
2274 self.c
2275 .range_stream_with_min_sequence_number(
2276 start,
2277 end,
2278 limit,
2279 batch_size,
2280 min_sequence_number,
2281 )
2282 .await
2283 }
2284
2285 pub async fn range_stream_with_mode_and_min_sequence_number(
2286 &self,
2287 start: &Key,
2288 end: &Key,
2289 limit: usize,
2290 batch_size: usize,
2291 mode: RangeMode,
2292 min_sequence_number: u64,
2293 ) -> Result<RangeStream, ClientError> {
2294 self.c
2295 .range_stream_with_mode_and_min_sequence_number(
2296 start,
2297 end,
2298 limit,
2299 batch_size,
2300 mode,
2301 min_sequence_number,
2302 )
2303 .await
2304 }
2305
2306 pub async fn range_reduce(
2307 &self,
2308 start: &Key,
2309 end: &Key,
2310 request: &DomainRangeReduceRequest,
2311 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2312 self.c.range_reduce(start, end, request).await
2313 }
2314
2315 pub async fn range_reduce_with_min_sequence_number(
2316 &self,
2317 start: &Key,
2318 end: &Key,
2319 request: &DomainRangeReduceRequest,
2320 min_sequence_number: u64,
2321 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2322 self.c
2323 .range_reduce_with_min_sequence_number(start, end, request, min_sequence_number)
2324 .await
2325 }
2326
2327 pub async fn range_reduce_response(
2328 &self,
2329 start: &Key,
2330 end: &Key,
2331 request: &DomainRangeReduceRequest,
2332 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2333 self.c.range_reduce_response(start, end, request).await
2334 }
2335
2336 pub async fn range_reduce_response_with_min_sequence_number(
2337 &self,
2338 start: &Key,
2339 end: &Key,
2340 request: &DomainRangeReduceRequest,
2341 min_sequence_number: u64,
2342 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2343 self.c
2344 .range_reduce_response_with_min_sequence_number(
2345 start,
2346 end,
2347 request,
2348 min_sequence_number,
2349 )
2350 .await
2351 }
2352}
2353
2354impl<'a> Compact<'a> {
2355 pub async fn prune(
2356 &self,
2357 policies: &[crate::prune_policy::PrunePolicy],
2358 ) -> Result<(), ClientError> {
2359 self.c.prune(policies).await
2360 }
2361}
2362
2363impl<'a> Stream<'a> {
2364 pub async fn subscribe(
2370 &self,
2371 filter: crate::stream_filter::StreamFilter,
2372 since_sequence_number: Option<u64>,
2373 ) -> Result<StreamSubscription, ClientError> {
2374 let logical_filter = self
2375 .c
2376 .key_prefix
2377 .is_some()
2378 .then(|| ClientStreamFilter::compile(&filter))
2379 .transpose()?;
2380 let filter = self.c.prefix_stream_filter(filter)?;
2381 crate::stream_filter::validate_filter(&filter)
2382 .map_err(|e| ClientError::WireFormat(e.to_string()))?;
2383 let match_keys = filter
2384 .match_keys
2385 .into_iter()
2386 .map(|mk| exoware_proto::store::common::v1::MatchKey {
2387 reserved_bits: u32::from(mk.reserved_bits),
2388 prefix: u32::from(mk.prefix),
2389 payload_regex: mk.payload_regex.0,
2390 ..Default::default()
2391 })
2392 .collect();
2393 let value_filters = filter
2394 .value_filters
2395 .into_iter()
2396 .map(|vf| {
2397 use crate::stream_filter::BytesFilter;
2398 use exoware_proto::store::common::v1::bytes_filter::Kind as ProtoKind;
2399 let kind = match vf {
2400 BytesFilter::Exact(bytes) => ProtoKind::Exact(bytes),
2401 BytesFilter::Prefix(bytes) => ProtoKind::Prefix(bytes),
2402 BytesFilter::Regex(pattern) => ProtoKind::Regex(pattern),
2403 };
2404 exoware_proto::store::common::v1::BytesFilter {
2405 kind: Some(kind),
2406 ..Default::default()
2407 }
2408 })
2409 .collect();
2410 let request = exoware_proto::store::stream::v1::SubscribeRequest {
2411 match_keys,
2412 value_filters,
2413 since_sequence_number,
2414 ..Default::default()
2415 };
2416 let config = store_connect_client_config(
2417 self.c.stream_uri.clone(),
2418 self.c.connect_request_compression,
2419 );
2420 let client = exoware_proto::store::stream::v1::ServiceClient::new(
2421 self.c.connect_http.clone(),
2422 config,
2423 );
2424 let stream = client
2425 .subscribe(request)
2426 .await
2427 .map_err(client_error_from_connect)?;
2428 Ok(StreamSubscription {
2429 stream,
2430 key_prefix: self.c.key_prefix,
2431 logical_filter,
2432 })
2433 }
2434
2435 pub async fn get(
2438 &self,
2439 sequence_number: u64,
2440 ) -> Result<Option<Vec<(Key, Bytes)>>, ClientError> {
2441 let config = store_connect_client_config(
2442 self.c.stream_uri.clone(),
2443 self.c.connect_request_compression,
2444 );
2445 let client = exoware_proto::store::stream::v1::ServiceClient::new(
2446 self.c.connect_http.clone(),
2447 config,
2448 );
2449 match client
2450 .get(exoware_proto::store::stream::v1::GetRequest {
2451 sequence_number,
2452 ..Default::default()
2453 })
2454 .await
2455 {
2456 Ok(resp) => {
2457 let owned = resp.into_owned();
2458 let mut entries = Vec::with_capacity(owned.entries.len());
2459 for entry in owned.entries {
2460 let key = Bytes::from(entry.key);
2461 match self.c.key_prefix {
2462 Some(prefix) if !prefix.codec.matches(&key) => {}
2463 Some(prefix) => entries.push((prefix.decode_key(&key)?, entry.value)),
2464 None => entries.push((key, entry.value)),
2465 }
2466 }
2467 Ok(Some(entries))
2468 }
2469 Err(err) => {
2470 if is_batch_missing_error(&err) {
2471 Ok(None)
2472 } else {
2473 Err(client_error_from_connect(err))
2474 }
2475 }
2476 }
2477 }
2478}
2479
2480impl SerializableReadSession {
2481 pub fn fixed_sequence(&self) -> Option<u64> {
2488 self.state.fixed_sequence()
2489 }
2490
2491 pub async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
2492 let seeded_client = self.client.clone();
2493 let unseeded_client = self.client.clone();
2494 self.run_read(
2495 move |sequence| {
2496 let client = seeded_client.clone();
2497 async move { client.get_with_min_sequence_number(key, sequence).await }
2498 },
2499 move |observed_sequence| {
2500 let client = unseeded_client.clone();
2501 async move {
2502 let (response, detail) = client.send_get(key, None).await?;
2503 if let Some(detail) = detail {
2504 observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2505 }
2506 Ok(response.value)
2507 }
2508 },
2509 )
2510 .await
2511 }
2512
2513 pub async fn get_many(
2514 &self,
2515 keys: &[&Key],
2516 batch_size: u32,
2517 ) -> Result<GetManyStream, ClientError> {
2518 let keys_owned: Vec<Key> = keys.iter().map(|k| Bytes::copy_from_slice(k)).collect();
2519 let seeded_client = self.client.clone();
2520 let unseeded_client = self.client.clone();
2521 let keys_seeded = keys_owned.clone();
2522 let keys_unseeded = keys_owned;
2523 self.run_read(
2524 move |sequence| {
2525 let client = seeded_client.clone();
2526 let keys = keys_seeded.clone();
2527 async move {
2528 let refs: Vec<&Key> = keys.iter().collect();
2529 client
2530 .get_many_with_min_sequence_number(&refs, batch_size, sequence)
2531 .await
2532 }
2533 },
2534 move |observed_sequence| {
2535 let client = unseeded_client.clone();
2536 let keys = keys_unseeded.clone();
2537 async move {
2538 let refs: Vec<&Key> = keys.iter().collect();
2539 client
2540 .get_many_internal(&refs, batch_size, None, Some(observed_sequence))
2541 .await
2542 }
2543 },
2544 )
2545 .await
2546 }
2547
2548 pub async fn range(
2549 &self,
2550 start: &Key,
2551 end: &Key,
2552 limit: usize,
2553 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2554 self.range_with_mode(start, end, limit, RangeMode::Forward)
2555 .await
2556 }
2557
2558 pub async fn range_with_mode(
2559 &self,
2560 start: &Key,
2561 end: &Key,
2562 limit: usize,
2563 mode: RangeMode,
2564 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2565 let seeded_client = self.client.clone();
2566 let unseeded_client = self.client.clone();
2567 self.run_read(
2568 move |sequence| {
2569 let client = seeded_client.clone();
2570 async move {
2571 client
2572 .range_with_mode_and_min_sequence_number(start, end, limit, mode, sequence)
2573 .await
2574 }
2575 },
2576 move |observed_sequence| {
2577 let client = unseeded_client.clone();
2578 async move {
2579 let stream = client
2580 .range_stream_internal(
2581 start,
2582 end,
2583 limit,
2584 limit.max(1),
2585 mode,
2586 RangeStreamReadOptions {
2587 min_sequence_number: None,
2588 observed_sequence: Some(observed_sequence),
2589 },
2590 )
2591 .await;
2592 stream?.collect().await
2593 }
2594 },
2595 )
2596 .await
2597 }
2598
2599 pub async fn range_stream(
2600 &self,
2601 start: &Key,
2602 end: &Key,
2603 limit: usize,
2604 batch_size: usize,
2605 ) -> Result<RangeStream, ClientError> {
2606 self.range_stream_with_mode(start, end, limit, batch_size, RangeMode::Forward)
2607 .await
2608 }
2609
2610 pub async fn range_stream_with_mode(
2611 &self,
2612 start: &Key,
2613 end: &Key,
2614 limit: usize,
2615 batch_size: usize,
2616 mode: RangeMode,
2617 ) -> Result<RangeStream, ClientError> {
2618 let seeded_client = self.client.clone();
2619 let unseeded_client = self.client.clone();
2620 self.run_read(
2621 move |sequence| {
2622 let client = seeded_client.clone();
2623 async move {
2624 client
2625 .range_stream_with_mode_and_min_sequence_number(
2626 start, end, limit, batch_size, mode, sequence,
2627 )
2628 .await
2629 }
2630 },
2631 move |observed_sequence| {
2632 let client = unseeded_client.clone();
2633 async move {
2634 client
2635 .range_stream_internal(
2636 start,
2637 end,
2638 limit,
2639 batch_size,
2640 mode,
2641 RangeStreamReadOptions {
2642 min_sequence_number: None,
2643 observed_sequence: Some(observed_sequence),
2644 },
2645 )
2646 .await
2647 }
2648 },
2649 )
2650 .await
2651 }
2652
2653 pub async fn range_reduce(
2654 &self,
2655 start: &Key,
2656 end: &Key,
2657 request: &DomainRangeReduceRequest,
2658 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2659 let seeded_client = self.client.clone();
2660 let unseeded_client = self.client.clone();
2661 let request_seeded = request.clone();
2662 let request_unseeded = request.clone();
2663 self.run_read(
2664 move |sequence| {
2665 let client = seeded_client.clone();
2666 let request = request_seeded.clone();
2667 async move {
2668 client
2669 .range_reduce_with_min_sequence_number(start, end, &request, sequence)
2670 .await
2671 }
2672 },
2673 move |observed_sequence| {
2674 let client = unseeded_client.clone();
2675 let request = request_unseeded.clone();
2676 async move {
2677 let (response, detail) = client
2678 .range_reduce_response_internal(start, end, &request, None)
2679 .await?;
2680 if let Some(detail) = detail {
2681 observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2682 }
2683 let decoded = proto_to_domain_reduce_response(response)
2684 .map_err(ClientError::WireFormat)?;
2685 if !decoded.groups.is_empty() {
2686 return Err(ClientError::WireFormat(
2687 "grouped range reduction response returned for scalar request"
2688 .to_string(),
2689 ));
2690 }
2691 Ok(decoded
2692 .results
2693 .iter()
2694 .map(|result| result.value.clone())
2695 .collect())
2696 }
2697 },
2698 )
2699 .await
2700 }
2701
2702 pub async fn range_reduce_response(
2703 &self,
2704 start: &Key,
2705 end: &Key,
2706 request: &DomainRangeReduceRequest,
2707 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2708 let seeded_client = self.client.clone();
2709 let unseeded_client = self.client.clone();
2710 let request_seeded = request.clone();
2711 let request_unseeded = request.clone();
2712 self.run_read(
2713 move |sequence| {
2714 let client = seeded_client.clone();
2715 let request = request_seeded.clone();
2716 async move {
2717 client
2718 .range_reduce_response_with_min_sequence_number(
2719 start, end, &request, sequence,
2720 )
2721 .await
2722 }
2723 },
2724 move |observed_sequence| {
2725 let client = unseeded_client.clone();
2726 let request = request_unseeded.clone();
2727 async move {
2728 let (response, detail) = client
2729 .range_reduce_response_internal(start, end, &request, None)
2730 .await?;
2731 if let Some(detail) = detail {
2732 observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2733 }
2734 Ok(response)
2735 }
2736 },
2737 )
2738 .await
2739 }
2740
2741 async fn run_read<T, SeededCall, SeededFut, UnseededCall, UnseededFut>(
2742 &self,
2743 seeded_call: SeededCall,
2744 unseeded_call: UnseededCall,
2745 ) -> Result<T, ClientError>
2746 where
2747 SeededCall: Fn(u64) -> SeededFut,
2748 SeededFut: std::future::Future<Output = Result<T, ClientError>>,
2749 UnseededCall: Fn(Arc<AtomicU64>) -> UnseededFut,
2750 UnseededFut: std::future::Future<Output = Result<T, ClientError>>,
2751 {
2752 if let Some(sequence) = self.fixed_sequence() {
2753 return seeded_call(sequence).await;
2754 }
2755
2756 let gate = self.state.init_gate.lock().await;
2757
2758 if let Some(sequence) = self.fixed_sequence() {
2759 drop(gate);
2760 return seeded_call(sequence).await;
2761 }
2762
2763 let result = unseeded_call(self.state.sequence.clone()).await;
2764 drop(gate);
2765 result
2766 }
2767}
2768
2769fn client_error_from_connect(err: ConnectError) -> ClientError {
2770 ClientError::Rpc(Box::new(err))
2771}
2772
2773fn is_retryable_error(err: &ConnectError) -> bool {
2774 matches!(
2775 err.code,
2776 ErrorCode::Aborted
2777 | ErrorCode::ResourceExhausted
2778 | ErrorCode::Unavailable
2779 | ErrorCode::Unknown
2780 | ErrorCode::Internal
2785 )
2786}
2787
2788fn retry_delay_for_error(
2789 err: &ConnectError,
2790 attempt: usize,
2791 retry_config: RetryConfig,
2792) -> Duration {
2793 if let Ok(decoded) = proto_decode_connect_error(err) {
2794 if let Some(retry_info) = decoded.retry_info {
2795 if let Some(delay) = retry_info.retry_delay.as_option() {
2796 let secs = u64::try_from(delay.seconds).unwrap_or(0);
2797 let nanos = u32::try_from(delay.nanos.max(0)).unwrap_or(0);
2798 let hinted = Duration::new(secs, nanos);
2799 if !hinted.is_zero() {
2800 return hinted.min(retry_config.max_backoff);
2801 }
2802 }
2803 }
2804 }
2805 retry_backoff_delay(attempt, retry_config)
2806}
2807
2808fn retry_backoff_delay(attempt: usize, retry_config: RetryConfig) -> Duration {
2809 let exponent = (attempt.saturating_sub(1)).min(20) as u32;
2810 let factor = 1u128 << exponent;
2811 let base_ms = retry_config.initial_backoff.as_millis();
2812 let capped_ms = base_ms
2813 .saturating_mul(factor)
2814 .min(retry_config.max_backoff.as_millis());
2815 Duration::from_millis(capped_ms.min(u64::MAX as u128) as u64)
2816}
2817
2818#[cfg(test)]
2819mod tests {
2820 use super::*;
2821 use crate::kv_codec::{KvFieldKind, KvPredicate, KvPredicateCheck, KvPredicateConstraint};
2822 use exoware_proto::query::TraversalMode as ProtoTraversalMode;
2823
2824 #[test]
2825 fn hex_round_trip() {
2826 let data = vec![0x00, 0x42, 0xFF, 0xAB];
2827 let encoded = hex_encode(&data);
2828 assert_eq!(encoded, "0042ffab");
2829 let decoded = hex_decode(&encoded).unwrap();
2830 assert_eq!(decoded, data);
2831 }
2832
2833 #[test]
2834 fn client_creation() {
2835 let client = StoreClient::new("http://localhost:10000");
2836 assert_eq!(client.health_url, "http://localhost:10000");
2837 assert_eq!(client.ingest_uri.to_string(), "http://localhost:10000/");
2838 assert_eq!(client.query_uri.to_string(), "http://localhost:10000/");
2839 assert_eq!(client.stream_uri.to_string(), "http://localhost:10000/");
2840 }
2841
2842 #[test]
2843 fn builder_fails_until_all_urls_set() {
2844 assert!(matches!(
2845 StoreClient::builder().health_url("http://h").build(),
2846 Err(ClientBuildError::MissingIngestUrl)
2847 ));
2848 assert!(matches!(
2849 StoreClient::builder()
2850 .health_url("http://h")
2851 .ingest_url("http://i")
2852 .build(),
2853 Err(ClientBuildError::MissingQueryUrl)
2854 ));
2855 assert!(matches!(
2856 StoreClient::builder()
2857 .health_url("http://h")
2858 .ingest_url("http://i")
2859 .query_url("http://q")
2860 .build(),
2861 Err(ClientBuildError::MissingCompactUrl)
2862 ));
2863 assert!(matches!(
2864 StoreClient::builder()
2865 .health_url("http://h")
2866 .ingest_url("http://i")
2867 .query_url("http://q")
2868 .compact_url("http://c")
2869 .build(),
2870 Err(ClientBuildError::MissingStreamUrl)
2871 ));
2872 }
2873
2874 #[test]
2875 fn client_trims_trailing_slash() {
2876 let client = StoreClient::new("http://localhost:10000/");
2877 assert_eq!(client.health_url, "http://localhost:10000");
2878 }
2879
2880 #[test]
2881 fn create_session_starts_unseeded() {
2882 let client = StoreClient::new("http://localhost:10000/");
2883 let session = client.create_session();
2884 assert_eq!(session.fixed_sequence(), None);
2885 }
2886
2887 #[test]
2888 fn range_mode_maps_to_proto_traversal() {
2889 assert_eq!(
2890 RangeMode::Forward.to_proto(),
2891 ProtoTraversalMode::TRAVERSAL_MODE_FORWARD
2892 );
2893 assert_eq!(
2894 RangeMode::Reverse.to_proto(),
2895 ProtoTraversalMode::TRAVERSAL_MODE_REVERSE
2896 );
2897 }
2898
2899 #[test]
2900 fn retry_config_standard_defaults_match_expected() {
2901 let config = RetryConfig::standard();
2902 assert_eq!(config.max_attempts, 3);
2903 assert_eq!(config.initial_backoff, Duration::from_millis(100));
2904 assert_eq!(config.max_backoff, Duration::from_millis(2_000));
2905 }
2906
2907 #[test]
2908 fn retry_config_clamps_attempts_and_backoff_bounds() {
2909 let config = RetryConfig::standard()
2910 .with_max_attempts(0)
2911 .with_initial_backoff(Duration::from_millis(250))
2912 .with_max_backoff(Duration::from_millis(50))
2913 .sanitized();
2914 assert_eq!(config.max_attempts, 1);
2915 assert_eq!(config.initial_backoff, Duration::from_millis(250));
2916 assert_eq!(config.max_backoff, Duration::from_millis(250));
2917 }
2918
2919 #[test]
2920 fn retryable_codes_include_connect_transients() {
2921 assert!(is_retryable_error(&ConnectError::aborted("retry")));
2922 assert!(is_retryable_error(&ConnectError::resource_exhausted(
2923 "retry"
2924 )));
2925 assert!(is_retryable_error(&ConnectError::unavailable("retry")));
2926 assert!(is_retryable_error(&ConnectError::internal("retry")));
2927 assert!(!is_retryable_error(&ConnectError::invalid_argument(
2928 "no retry"
2929 )));
2930 }
2931
2932 #[test]
2933 fn retry_backoff_delay_is_exponential_and_capped() {
2934 let config = RetryConfig::standard()
2935 .with_initial_backoff(Duration::from_millis(100))
2936 .with_max_backoff(Duration::from_millis(250));
2937 assert_eq!(retry_backoff_delay(1, config), Duration::from_millis(100));
2938 assert_eq!(retry_backoff_delay(2, config), Duration::from_millis(200));
2939 assert_eq!(retry_backoff_delay(3, config), Duration::from_millis(250));
2940 assert_eq!(retry_backoff_delay(4, config), Duration::from_millis(250));
2941 }
2942
2943 #[test]
2944 fn create_session_with_sequence_pins_explicit_floor() {
2945 let client = StoreClient::new("http://localhost:10000/");
2946 let session = client.create_session_with_sequence(27);
2947 assert_eq!(session.fixed_sequence(), Some(27));
2948 }
2949
2950 #[test]
2951 fn store_key_prefix_round_trips_logical_keys() {
2952 let prefix = StoreKeyPrefix::new(4, 0xA).unwrap();
2953 let logical = Bytes::from_static(b"hello");
2954 let physical = prefix.encode_key(&logical).unwrap();
2955 assert!(prefix.codec.matches(&physical));
2956 assert_eq!(prefix.decode_key(&physical).unwrap(), logical);
2957 }
2958
2959 #[test]
2960 fn store_key_prefix_clamps_long_logical_range_upper_bound() {
2961 let prefix = StoreKeyPrefix::new(4, 0x2).unwrap();
2962 let logical_codec = KeyCodec::new(4, 0x7);
2963 let (logical_start, logical_end) = logical_codec.prefix_bounds();
2964 assert_eq!(logical_end.len(), MAX_KEY_LEN);
2965
2966 let (physical_start, physical_end) =
2967 prefix.encode_range(&logical_start, &logical_end).unwrap();
2968 assert!(prefix.codec.matches(&physical_start));
2969 assert!(prefix.codec.matches(&physical_end));
2970 assert_eq!(physical_end.len(), MAX_KEY_LEN);
2971 assert_eq!(prefix.decode_key(&physical_start).unwrap(), logical_start);
2972 }
2973
2974 #[test]
2975 fn store_key_prefix_rewrites_match_key_family() {
2976 let prefix = StoreKeyPrefix::new(3, 0b101).unwrap();
2977 let logical = crate::match_key::MatchKey {
2978 reserved_bits: 4,
2979 prefix: 0b0110,
2980 payload_regex: crate::kv_codec::Utf8::from("(?s).*"),
2981 };
2982 let physical = prefix.prefix_match_key(&logical).unwrap();
2983 assert_eq!(physical.reserved_bits, 7);
2984 assert_eq!(physical.prefix, 0b101_0110);
2985 assert_eq!(physical.payload_regex, logical.payload_regex);
2986 }
2987
2988 #[test]
2989 fn store_key_prefix_broadens_stream_match_key_payload_regex() {
2990 let prefix = StoreKeyPrefix::new(3, 0b101).unwrap();
2991 let logical = crate::match_key::MatchKey {
2992 reserved_bits: 4,
2993 prefix: 0b0110,
2994 payload_regex: crate::kv_codec::Utf8::from("(?s).*"),
2995 };
2996 let physical = prefix.prefix_stream_match_key(&logical).unwrap();
2997 assert_eq!(physical.reserved_bits, 7);
2998 assert_eq!(physical.prefix, 0b101_0110);
2999 assert_eq!(&*physical.payload_regex, "(?s-u).*");
3000 }
3001
3002 #[test]
3003 fn prefixed_reduce_request_shifts_key_field_offsets() {
3004 let client = StoreClient::builder()
3005 .url("http://localhost:10000")
3006 .key_prefix(StoreKeyPrefix::new(5, 0b10101).unwrap())
3007 .build()
3008 .unwrap();
3009 let request = DomainRangeReduceRequest {
3010 reducers: vec![crate::RangeReducerSpec {
3011 op: crate::RangeReduceOp::SumField,
3012 expr: Some(KvExpr::Field(KvFieldRef::Key {
3013 bit_offset: 9,
3014 kind: KvFieldKind::UInt64,
3015 })),
3016 }],
3017 group_by: vec![KvExpr::Field(KvFieldRef::ZOrderKey {
3018 bit_offset: 12,
3019 field_position: 0,
3020 field_widths: vec![8],
3021 kind: KvFieldKind::UInt64,
3022 })],
3023 filter: Some(KvPredicate {
3024 checks: vec![KvPredicateCheck {
3025 field: KvFieldRef::Value {
3026 index: 0,
3027 kind: KvFieldKind::UInt64,
3028 nullable: false,
3029 },
3030 constraint: KvPredicateConstraint::UInt64Range {
3031 min: Some(1),
3032 max: Some(9),
3033 },
3034 }],
3035 contradiction: false,
3036 }),
3037 };
3038
3039 let shifted = client.prefix_reduce_request(&request).unwrap();
3040 let Some(KvExpr::Field(KvFieldRef::Key { bit_offset, .. })) =
3041 shifted.reducers[0].expr.as_ref()
3042 else {
3043 panic!("expected key field reducer");
3044 };
3045 assert_eq!(*bit_offset, 14);
3046 let KvExpr::Field(KvFieldRef::ZOrderKey { bit_offset, .. }) = &shifted.group_by[0] else {
3047 panic!("expected z-order group field");
3048 };
3049 assert_eq!(*bit_offset, 17);
3050 }
3051
3052 #[test]
3053 fn store_write_batch_uses_each_clients_prefix() {
3054 let base = StoreClient::new("http://localhost:10000");
3055 let a = base.with_key_prefix(StoreKeyPrefix::new(4, 1).unwrap());
3056 let b = base.with_key_prefix(StoreKeyPrefix::new(4, 2).unwrap());
3057 let key_a = Bytes::from_static(b"a");
3058 let key_b = Bytes::from_static(b"b");
3059
3060 let mut batch = StoreWriteBatch::new();
3061 batch.push(&a, &key_a, b"va").unwrap();
3062 batch.push(&b, &key_b, b"vb").unwrap();
3063
3064 assert_eq!(
3065 batch.entries[0].0,
3066 a.key_prefix().unwrap().encode_key(&key_a).unwrap()
3067 );
3068 assert_eq!(
3069 batch.entries[1].0,
3070 b.key_prefix().unwrap().encode_key(&key_b).unwrap()
3071 );
3072 }
3073
3074 fn hex_encode(data: &[u8]) -> String {
3075 hex::encode(data)
3076 }
3077
3078 fn hex_decode(s: &str) -> Option<Vec<u8>> {
3079 hex::decode(s).ok()
3080 }
3081}