1pub mod keys;
14pub mod kv_codec;
15pub mod match_key;
16pub mod proto;
17pub mod prune_policy;
18pub mod stream_filter;
19pub use keys::{Key, KeyCodec, KeyCodecError, KeyMut, KeyValidationError, Value, MAX_KEY_LEN};
20pub use proto::*;
21extern crate self as exoware_proto;
22
23use bytes::Bytes;
24use connectrpc::client::{ClientConfig, ServerStream as ConnectServerStream};
25use connectrpc::{ConnectError, ErrorCode};
26use exoware_proto::compact::ServiceClient as CompactServiceClient;
27use exoware_proto::ingest::ServiceClient as IngestServiceClient;
28use exoware_proto::query as proto_query;
29use exoware_proto::query::ServiceClient as QueryServiceClient;
30use exoware_proto::store::compact::v1::PruneRequest as ProtoPruneRequest;
31use exoware_proto::store::ingest::v1::PutRequest as ProtoPutRequest;
32use exoware_proto::store::query::v1::{
33 GetManyRequest as ProtoGetManyRequest, GetRequest as ProtoGetRequest,
34 RangeRequest as ProtoRangeRequest, ReduceRequest as ProtoWireReduceRequest,
35};
36use exoware_proto::RangeReduceRequest as DomainRangeReduceRequest;
37use exoware_proto::{
38 connect_compression_registry as proto_connect_compression_registry,
39 decode_connect_error as proto_decode_connect_error,
40 decode_query_detail_header_value as proto_decode_query_detail_header_value,
41 to_domain_reduce_response as proto_to_domain_reduce_response,
42 to_proto_reduce_params as proto_to_proto_reduce_params,
43 PreferZstdHttpClient as ProtoPreferZstdHttpClient,
44 QUERY_DETAIL_RESPONSE_HEADER as PROTO_QUERY_DETAIL_RESPONSE_HEADER,
45};
46use futures::future::BoxFuture;
47use http::HeaderMap;
48use keys::is_valid_key_size;
49use kv_codec::{KvExpr, KvFieldRef, KvReducedValue};
50use std::collections::HashMap;
51use std::sync::{
52 atomic::{AtomicU64, Ordering},
53 Arc,
54};
55use std::time::Duration;
56
57const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
58const DEFAULT_RETRY_INITIAL_BACKOFF_MS: u64 = 100;
59const DEFAULT_RETRY_MAX_BACKOFF_MS: u64 = 2_000;
60
61#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
71pub enum ConnectRequestCompression {
72 #[default]
74 Zstd,
75 Gzip,
77}
78
79impl ConnectRequestCompression {
80 fn wire_name(self) -> &'static str {
81 match self {
82 Self::Zstd => "zstd",
83 Self::Gzip => "gzip",
84 }
85 }
86}
87
88const STORE_CLIENT_MAX_MESSAGE_BYTES: usize = 256 * 1024 * 1024;
93
94fn store_connect_client_config(
100 base_uri: http::Uri,
101 request_compression: ConnectRequestCompression,
102) -> ClientConfig {
103 ClientConfig::new(base_uri)
104 .compression(proto_connect_compression_registry())
105 .compress_requests(request_compression.wire_name())
106 .default_max_message_size(STORE_CLIENT_MAX_MESSAGE_BYTES)
107}
108
109#[derive(Debug, thiserror::Error)]
111pub enum ClientError {
112 #[error("HTTP error: {0}")]
113 Http(#[from] reqwest::Error),
114 #[error("RPC error ({0})")]
115 Rpc(Box<ConnectError>),
116 #[error("store key prefix error: {0}")]
117 KeyPrefix(#[from] StoreKeyPrefixError),
118 #[error("invalid key length: expected {expected}, got {got}")]
119 InvalidKeyLength { expected: usize, got: usize },
120 #[error("wire format error: {0}")]
121 WireFormat(String),
122}
123
124impl ClientError {
125 pub fn rpc_error(&self) -> Option<&ConnectError> {
126 match self {
127 Self::Rpc(err) => Some(err.as_ref()),
128 _ => None,
129 }
130 }
131
132 pub fn rpc_code(&self) -> Option<ErrorCode> {
133 self.rpc_error().map(|err| err.code)
134 }
135
136 pub fn decoded_rpc_error(
137 &self,
138 ) -> Result<Option<exoware_proto::DecodedConnectError>, buffa::DecodeError> {
139 self.rpc_error().map(proto_decode_connect_error).transpose()
140 }
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
146pub enum StoreKeyPrefixError {
147 #[error("reserved_bits {reserved_bits} exceeds 16")]
148 ReservedBitsTooLarge { reserved_bits: u8 },
149 #[error("prefix {prefix} does not fit in {reserved_bits} reserved bits")]
150 PrefixTooLarge { reserved_bits: u8, prefix: u16 },
151 #[error(
152 "combined reserved bits exceed 16: store prefix bits {prefix_bits} + logical bits {logical_bits}"
153 )]
154 CombinedReservedBitsTooLarge { prefix_bits: u8, logical_bits: u8 },
155 #[error("key does not belong to this store prefix")]
156 PrefixMismatch,
157 #[error("key bit offset {offset} plus store prefix bits {prefix_bits} exceeds u16")]
158 BitOffsetOverflow { offset: u16, prefix_bits: u8 },
159 #[error("key codec error: {0}")]
160 Codec(#[from] KeyCodecError),
161}
162
163#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
171pub struct StoreKeyPrefix {
172 codec: KeyCodec,
173}
174
175impl StoreKeyPrefix {
176 pub fn new(reserved_bits: u8, prefix: u16) -> Result<Self, StoreKeyPrefixError> {
177 validate_prefix_bits(reserved_bits, prefix)?;
178 Ok(Self {
179 codec: KeyCodec::new(reserved_bits, prefix),
180 })
181 }
182
183 #[inline]
184 pub fn reserved_bits(self) -> u8 {
185 self.codec.reserved_bits()
186 }
187
188 #[inline]
189 pub fn prefix(self) -> u16 {
190 self.codec.prefix()
191 }
192
193 #[inline]
195 pub fn max_logical_key_len(self) -> usize {
196 self.codec.max_payload_capacity_bytes()
197 }
198
199 pub fn encode_key(self, key: &Key) -> Result<Key, StoreKeyPrefixError> {
201 Ok(self.codec.encode(key)?)
202 }
203
204 pub fn decode_key(self, key: &Key) -> Result<Key, StoreKeyPrefixError> {
206 if !self.codec.matches(key) {
207 return Err(StoreKeyPrefixError::PrefixMismatch);
208 }
209 let payload_len = self.codec.payload_capacity_bytes_for_key_len(key.len());
210 Ok(Bytes::from(self.codec.read_payload(key, 0, payload_len)?))
211 }
212
213 pub fn encode_range(self, start: &Key, end: &Key) -> Result<(Key, Key), StoreKeyPrefixError> {
221 let start = self.encode_key(start)?;
222 let end = if end.is_empty() {
223 self.codec.prefix_bounds().1
224 } else {
225 let max_len = self.max_logical_key_len();
226 let end = if end.len() > max_len {
227 Bytes::copy_from_slice(&end[..max_len])
228 } else {
229 Bytes::copy_from_slice(end)
230 };
231 self.encode_key(&end)?
232 };
233 Ok((start, end))
234 }
235
236 fn prefix_match_key(
237 self,
238 match_key: &crate::match_key::MatchKey,
239 ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
240 self.prefix_match_key_with_regex(match_key, match_key.payload_regex.clone())
241 }
242
243 fn prefix_stream_match_key(
244 self,
245 match_key: &crate::match_key::MatchKey,
246 ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
247 self.prefix_match_key_with_regex(match_key, crate::kv_codec::Utf8::from("(?s-u).*"))
248 }
249
250 fn prefix_match_key_with_regex(
251 self,
252 match_key: &crate::match_key::MatchKey,
253 payload_regex: crate::kv_codec::Utf8,
254 ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
255 validate_prefix_bits(match_key.reserved_bits, match_key.prefix)?;
256 let reserved_bits = self
257 .reserved_bits()
258 .checked_add(match_key.reserved_bits)
259 .ok_or(StoreKeyPrefixError::CombinedReservedBitsTooLarge {
260 prefix_bits: self.reserved_bits(),
261 logical_bits: match_key.reserved_bits,
262 })?;
263 if reserved_bits > 16 {
264 return Err(StoreKeyPrefixError::CombinedReservedBitsTooLarge {
265 prefix_bits: self.reserved_bits(),
266 logical_bits: match_key.reserved_bits,
267 });
268 }
269
270 let prefix = (u32::from(self.prefix()) << u32::from(match_key.reserved_bits))
271 | u32::from(match_key.prefix);
272 let prefix = u16::try_from(prefix).map_err(|_| StoreKeyPrefixError::PrefixTooLarge {
273 reserved_bits,
274 prefix: u16::MAX,
275 })?;
276 validate_prefix_bits(reserved_bits, prefix)?;
277 Ok(crate::match_key::MatchKey {
278 reserved_bits,
279 prefix,
280 payload_regex,
281 })
282 }
283}
284
285#[derive(Clone, Debug, Default)]
291pub struct StoreWriteBatch {
292 entries: Vec<(Key, Bytes)>,
293}
294
295impl StoreWriteBatch {
296 pub fn new() -> Self {
297 Self::default()
298 }
299
300 pub fn len(&self) -> usize {
301 self.entries.len()
302 }
303
304 pub fn is_empty(&self) -> bool {
305 self.entries.is_empty()
306 }
307
308 pub fn clear(&mut self) {
309 self.entries.clear();
310 }
311
312 pub fn push(
313 &mut self,
314 client: &StoreClient,
315 key: &Key,
316 value: &[u8],
317 ) -> Result<&mut Self, ClientError> {
318 self.entries
319 .push((client.encode_store_key(key)?, Bytes::copy_from_slice(value)));
320 Ok(self)
321 }
322
323 pub async fn commit(&self, client: &StoreClient) -> Result<u64, ClientError> {
324 let refs: Vec<(&Key, &[u8])> = self
325 .entries
326 .iter()
327 .map(|(key, value)| (key, value.as_ref()))
328 .collect();
329 client.put_physical(&refs).await
330 }
331}
332
333pub trait StoreBatchUpload {
345 type Prepared: Send;
346 type Receipt: Send;
347 type Error: std::fmt::Display + Send;
348
349 fn stage_upload(
350 &self,
351 prepared: &Self::Prepared,
352 batch: &mut StoreWriteBatch,
353 ) -> Result<(), Self::Error>;
354
355 fn commit_error(&self, error: ClientError) -> Self::Error;
356
357 fn mark_upload_persisted<'a>(
358 &'a self,
359 prepared: Self::Prepared,
360 sequence_number: u64,
361 ) -> BoxFuture<'a, Self::Receipt>
362 where
363 Self: Sync + 'a,
364 Self::Prepared: 'a;
365
366 fn mark_upload_failed<'a>(
367 &'a self,
368 prepared: Self::Prepared,
369 error: String,
370 ) -> BoxFuture<'a, ()>
371 where
372 Self: Sync + 'a,
373 Self::Prepared: 'a;
374
375 fn commit_upload<'a>(
376 &'a self,
377 client: &'a StoreClient,
378 prepared: Self::Prepared,
379 ) -> BoxFuture<'a, Result<Self::Receipt, Self::Error>>
380 where
381 Self: Sync + Sized + 'a,
382 Self::Prepared: 'a,
383 Self::Receipt: 'a,
384 Self::Error: 'a,
385 {
386 Box::pin(async move {
387 let mut batch = StoreWriteBatch::new();
388 if let Err(err) = self.stage_upload(&prepared, &mut batch) {
389 let message = err.to_string();
390 self.mark_upload_failed(prepared, message).await;
391 return Err(err);
392 }
393 match batch.commit(client).await {
394 Ok(sequence_number) => {
395 Ok(self.mark_upload_persisted(prepared, sequence_number).await)
396 }
397 Err(err) => {
398 let message = err.to_string();
399 self.mark_upload_failed(prepared, message).await;
400 Err(self.commit_error(err))
401 }
402 }
403 })
404 }
405}
406
407pub trait StoreBatchPublication {
415 type PreparedPublication: Send;
416 type PublicationReceipt: Send;
417 type Error: std::fmt::Display + Send;
418
419 fn stage_publication(
420 &self,
421 prepared: &Self::PreparedPublication,
422 batch: &mut StoreWriteBatch,
423 ) -> Result<(), Self::Error>;
424
425 fn publication_commit_error(&self, error: ClientError) -> Self::Error;
426
427 fn mark_publication_persisted<'a>(
428 &'a self,
429 prepared: Self::PreparedPublication,
430 sequence_number: u64,
431 ) -> BoxFuture<'a, Self::PublicationReceipt>
432 where
433 Self: Sync + 'a,
434 Self::PreparedPublication: 'a;
435
436 fn mark_publication_failed<'a>(
437 &'a self,
438 _prepared: Self::PreparedPublication,
439 _error: String,
440 ) -> BoxFuture<'a, ()>
441 where
442 Self: Sync + 'a,
443 Self::PreparedPublication: 'a,
444 {
445 Box::pin(async {})
446 }
447
448 fn commit_publication<'a>(
449 &'a self,
450 client: &'a StoreClient,
451 prepared: Self::PreparedPublication,
452 ) -> BoxFuture<'a, Result<Self::PublicationReceipt, Self::Error>>
453 where
454 Self: Sync + Sized + 'a,
455 Self::PreparedPublication: 'a,
456 Self::PublicationReceipt: 'a,
457 Self::Error: 'a,
458 {
459 Box::pin(async move {
460 let mut batch = StoreWriteBatch::new();
461 if let Err(err) = self.stage_publication(&prepared, &mut batch) {
462 let message = err.to_string();
463 self.mark_publication_failed(prepared, message).await;
464 return Err(err);
465 }
466 match batch.commit(client).await {
467 Ok(sequence_number) => Ok(self
468 .mark_publication_persisted(prepared, sequence_number)
469 .await),
470 Err(err) => {
471 let message = err.to_string();
472 self.mark_publication_failed(prepared, message).await;
473 Err(self.publication_commit_error(err))
474 }
475 }
476 })
477 }
478}
479
480pub trait StorePublicationFrontierWriter: StoreBatchPublication {
489 fn latest_publication_receipt<'a>(&'a self) -> BoxFuture<'a, Option<Self::PublicationReceipt>>
490 where
491 Self: Sync + 'a,
492 Self::PublicationReceipt: 'a;
493
494 fn prepare_publication<'a>(
495 &'a self,
496 ) -> BoxFuture<'a, Result<Option<Self::PreparedPublication>, Self::Error>>
497 where
498 Self: Sync + 'a,
499 Self::PreparedPublication: 'a,
500 Self::Error: 'a;
501
502 fn flush_publication_with_receipt<'a>(
503 &'a self,
504 ) -> BoxFuture<'a, Result<Option<Self::PublicationReceipt>, Self::Error>>
505 where
506 Self: Sync + 'a,
507 Self::PublicationReceipt: 'a,
508 Self::Error: 'a;
509
510 fn flush_publication<'a>(&'a self) -> BoxFuture<'a, Result<(), Self::Error>>
511 where
512 Self: Sync + 'a,
513 Self::PublicationReceipt: 'a,
514 Self::Error: 'a,
515 {
516 Box::pin(async move { self.flush_publication_with_receipt().await.map(|_| ()) })
517 }
518}
519
520#[derive(Clone, Copy, Debug, Eq, PartialEq)]
522pub enum RangeMode {
523 Forward,
524 Reverse,
525}
526
527pub struct RangeStream {
529 stream:
530 ConnectServerStream<hyper::body::Incoming, exoware_proto::query::RangeFrameView<'static>>,
531 pending_frame: Option<exoware_proto::query::RangeFrame>,
532 rows_seen: usize,
533 final_count: Option<usize>,
534 final_detail: Option<proto_query::Detail>,
535 finished: bool,
536 observed_sequence: Option<Arc<AtomicU64>>,
537 key_prefix: Option<StoreKeyPrefix>,
538}
539
540impl RangeStream {
541 fn from_connect_stream(
542 stream: ConnectServerStream<
543 hyper::body::Incoming,
544 exoware_proto::query::RangeFrameView<'static>,
545 >,
546 observed_sequence: Option<Arc<AtomicU64>>,
547 key_prefix: Option<StoreKeyPrefix>,
548 ) -> Self {
549 Self {
550 stream,
551 pending_frame: None,
552 rows_seen: 0,
553 final_count: None,
554 final_detail: None,
555 finished: false,
556 observed_sequence,
557 key_prefix,
558 }
559 }
560
561 pub fn final_count(&self) -> Option<usize> {
562 self.final_count
563 }
564
565 pub fn final_detail(&self) -> Option<proto_query::Detail> {
566 self.final_detail.clone()
567 }
568
569 fn observe_detail_from_stream_trailers(&mut self) {
570 self.final_detail = self
571 .stream
572 .trailers()
573 .and_then(query_detail_from_header_map);
574 if let (Some(sequence_store), Some(d)) =
575 (&self.observed_sequence, self.final_detail.as_ref())
576 {
577 sequence_store.fetch_max(d.sequence_number, Ordering::SeqCst);
578 }
579 }
580
581 async fn prefetch_first_frame(&mut self) -> Result<(), ConnectError> {
582 if self.pending_frame.is_some() || self.finished {
583 return Ok(());
584 }
585 match self.stream.message().await? {
586 Some(frame) => {
587 self.pending_frame = Some(frame.to_owned_message());
588 Ok(())
589 }
590 None => {
591 self.finished = true;
592 if let Some(err) = self.stream.error() {
593 Err(err.clone())
594 } else {
595 self.final_count = Some(self.rows_seen);
596 self.observe_detail_from_stream_trailers();
597 Ok(())
598 }
599 }
600 }
601 }
602
603 pub async fn next_chunk(&mut self) -> Result<Option<Vec<(Key, Bytes)>>, ClientError> {
604 if self.finished {
605 return Ok(None);
606 }
607
608 let frame = if let Some(frame) = self.pending_frame.take() {
609 frame
610 } else {
611 let Some(frame) = self
612 .stream
613 .message()
614 .await
615 .map_err(client_error_from_connect)?
616 else {
617 self.finished = true;
618 if let Some(err) = self.stream.error() {
619 return Err(client_error_from_connect(err.clone()));
620 }
621 self.final_count = Some(self.rows_seen);
622 self.observe_detail_from_stream_trailers();
623 return Ok(None);
624 };
625 frame.to_owned_message()
626 };
627
628 let n = frame.results.len();
629 self.rows_seen += n;
630 let mut out = Vec::with_capacity(n);
631 for entry in &frame.results {
632 let key = Bytes::copy_from_slice(&entry.key);
633 let key = match self.key_prefix {
634 Some(prefix) => prefix.decode_key(&key)?,
635 None => key,
636 };
637 out.push((key, Bytes::copy_from_slice(&entry.value)));
638 }
639 Ok(Some(out))
640 }
641
642 pub async fn collect(mut self) -> Result<Vec<(Key, Bytes)>, ClientError> {
643 let mut entries = Vec::new();
644 while let Some(chunk) = self.next_chunk().await? {
645 entries.extend(chunk);
646 }
647 Ok(entries)
648 }
649}
650
651pub struct GetManyStream {
652 stream:
653 ConnectServerStream<hyper::body::Incoming, exoware_proto::query::GetManyFrameView<'static>>,
654 pending_frame: Option<exoware_proto::query::GetManyFrame>,
655 entries_seen: usize,
656 final_detail: Option<proto_query::Detail>,
657 finished: bool,
658 observed_sequence: Option<Arc<AtomicU64>>,
659 key_prefix: Option<StoreKeyPrefix>,
660}
661
662impl GetManyStream {
663 pub fn final_detail(&self) -> Option<proto_query::Detail> {
664 self.final_detail.clone()
665 }
666
667 fn from_connect_stream(
668 stream: ConnectServerStream<
669 hyper::body::Incoming,
670 exoware_proto::query::GetManyFrameView<'static>,
671 >,
672 observed_sequence: Option<Arc<AtomicU64>>,
673 key_prefix: Option<StoreKeyPrefix>,
674 ) -> Self {
675 Self {
676 stream,
677 pending_frame: None,
678 entries_seen: 0,
679 final_detail: None,
680 finished: false,
681 observed_sequence,
682 key_prefix,
683 }
684 }
685
686 fn observe_detail_from_stream_trailers(&mut self) {
687 self.final_detail = self
688 .stream
689 .trailers()
690 .and_then(query_detail_from_header_map);
691 if let (Some(sequence_store), Some(d)) =
692 (&self.observed_sequence, self.final_detail.as_ref())
693 {
694 sequence_store.fetch_max(d.sequence_number, Ordering::SeqCst);
695 }
696 }
697
698 async fn prefetch_first_frame(&mut self) -> Result<(), ConnectError> {
699 if self.pending_frame.is_some() || self.finished {
700 return Ok(());
701 }
702 match self.stream.message().await? {
703 Some(frame) => {
704 self.pending_frame = Some(frame.to_owned_message());
705 Ok(())
706 }
707 None => {
708 self.finished = true;
709 if let Some(err) = self.stream.error() {
710 Err(err.clone())
711 } else {
712 self.observe_detail_from_stream_trailers();
713 Ok(())
714 }
715 }
716 }
717 }
718
719 pub async fn next_chunk(&mut self) -> Result<Option<Vec<(Key, Option<Bytes>)>>, ClientError> {
720 if self.finished {
721 return Ok(None);
722 }
723 let frame = if let Some(frame) = self.pending_frame.take() {
724 frame
725 } else {
726 let Some(frame) = self
727 .stream
728 .message()
729 .await
730 .map_err(client_error_from_connect)?
731 else {
732 self.finished = true;
733 if let Some(err) = self.stream.error() {
734 return Err(client_error_from_connect(err.clone()));
735 }
736 self.observe_detail_from_stream_trailers();
737 return Ok(None);
738 };
739 frame.to_owned_message()
740 };
741
742 self.entries_seen += frame.results.len();
743 let mut out = Vec::with_capacity(frame.results.len());
744 for entry in &frame.results {
745 let key = Bytes::copy_from_slice(&entry.key);
746 let key = match self.key_prefix {
747 Some(prefix) => prefix.decode_key(&key)?,
748 None => key,
749 };
750 let value = entry.value.as_ref().map(|v| Bytes::copy_from_slice(v));
751 out.push((key, value));
752 }
753 Ok(Some(out))
754 }
755
756 pub async fn collect(mut self) -> Result<HashMap<Key, Bytes>, ClientError> {
757 let mut map = HashMap::new();
758 while let Some(chunk) = self.next_chunk().await? {
759 for (key, value) in chunk {
760 if let Some(v) = value {
761 map.insert(key, v);
762 }
763 }
764 }
765 Ok(map)
766 }
767}
768
769impl RangeMode {
770 fn to_proto(self) -> proto_query::TraversalMode {
771 match self {
772 Self::Forward => proto_query::TraversalMode::TRAVERSAL_MODE_FORWARD,
773 Self::Reverse => proto_query::TraversalMode::TRAVERSAL_MODE_REVERSE,
774 }
775 }
776}
777
778#[inline]
779fn key_prefix_mask(bits: u8) -> Result<u16, StoreKeyPrefixError> {
780 if bits > 16 {
781 return Err(StoreKeyPrefixError::ReservedBitsTooLarge {
782 reserved_bits: bits,
783 });
784 }
785 Ok(if bits == 0 {
786 0
787 } else if bits == 16 {
788 u16::MAX
789 } else {
790 (1u16 << bits) - 1
791 })
792}
793
794fn validate_prefix_bits(reserved_bits: u8, prefix: u16) -> Result<(), StoreKeyPrefixError> {
795 let mask = key_prefix_mask(reserved_bits)?;
796 if prefix > mask {
797 return Err(StoreKeyPrefixError::PrefixTooLarge {
798 reserved_bits,
799 prefix,
800 });
801 }
802 Ok(())
803}
804
805#[derive(Clone, Debug)]
809pub struct StreamSubscriptionEntry {
810 pub key: Key,
811 pub value: Bytes,
812}
813
814#[derive(Clone, Debug)]
816pub struct StreamSubscriptionFrame {
817 pub sequence_number: u64,
818 pub entries: Vec<StreamSubscriptionEntry>,
819}
820
821pub struct StreamSubscription {
824 stream: ConnectServerStream<
825 hyper::body::Incoming,
826 exoware_proto::store::stream::v1::SubscribeResponseView<'static>,
827 >,
828 key_prefix: Option<StoreKeyPrefix>,
829 logical_filter: Option<ClientStreamFilter>,
830}
831
832impl std::fmt::Debug for StreamSubscription {
833 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
834 f.debug_struct("StreamSubscription").finish_non_exhaustive()
835 }
836}
837
838impl StreamSubscription {
839 pub async fn next(&mut self) -> Result<Option<StreamSubscriptionFrame>, ClientError> {
841 loop {
842 match self
843 .stream
844 .message()
845 .await
846 .map_err(client_error_from_connect)?
847 {
848 Some(view) => {
849 let owned = view.to_owned_message();
850 let mut entries = Vec::with_capacity(owned.entries.len());
851 for entry in owned.entries {
852 let key = Bytes::from(entry.key);
853 let key = match self.key_prefix {
854 Some(prefix) => prefix.decode_key(&key)?,
855 None => key,
856 };
857 let value = Bytes::from(entry.value);
858 if self
859 .logical_filter
860 .as_ref()
861 .is_none_or(|filter| filter.matches(&key, value.as_ref()))
862 {
863 entries.push(StreamSubscriptionEntry { key, value });
864 }
865 }
866 if entries.is_empty() {
867 continue;
868 }
869 let frame = StreamSubscriptionFrame {
870 sequence_number: owned.sequence_number,
871 entries,
872 };
873 return Ok(Some(frame));
874 }
875 None => {
876 if let Some(err) = self.stream.error() {
877 return Err(client_error_from_connect(err.clone()));
878 } else {
879 return Ok(None);
880 }
881 }
882 }
883 }
884 }
885}
886
887#[derive(Clone)]
888struct ClientKeyMatcher {
889 codec: KeyCodec,
890 regex: regex::bytes::Regex,
891}
892
893#[derive(Clone)]
894struct ClientStreamFilter {
895 keys: Vec<ClientKeyMatcher>,
896 values: Option<crate::stream_filter::CompiledBytesFilters>,
897}
898
899impl ClientStreamFilter {
900 fn compile(filter: &crate::stream_filter::StreamFilter) -> Result<Self, ClientError> {
901 crate::stream_filter::validate_filter(filter)
902 .map_err(|e| ClientError::WireFormat(e.to_string()))?;
903 let keys = filter
904 .match_keys
905 .iter()
906 .map(|mk| {
907 let regex = crate::match_key::compile_payload_regex(&mk.payload_regex)
908 .map_err(|e| ClientError::WireFormat(e.to_string()))?;
909 Ok(ClientKeyMatcher {
910 codec: KeyCodec::new(mk.reserved_bits, mk.prefix),
911 regex,
912 })
913 })
914 .collect::<Result<Vec<_>, ClientError>>()?;
915 let values = crate::stream_filter::CompiledBytesFilters::compile(&filter.value_filters)
916 .map_err(ClientError::WireFormat)?;
917 Ok(Self { keys, values })
918 }
919
920 fn matches(&self, key: &Key, value: &[u8]) -> bool {
921 if !self
922 .values
923 .as_ref()
924 .is_none_or(|filter| filter.matches(value))
925 {
926 return false;
927 }
928 self.keys.iter().any(|matcher| {
929 if !matcher.codec.matches(key) {
930 return false;
931 }
932 let payload_len = matcher.codec.payload_capacity_bytes_for_key_len(key.len());
933 matcher
934 .codec
935 .read_payload(key, 0, payload_len)
936 .is_ok_and(|payload| matcher.regex.is_match(&payload))
937 })
938 }
939}
940
941fn is_batch_missing_error(err: &ConnectError) -> bool {
944 match proto_decode_connect_error(err) {
945 Ok(decoded) => decoded.error_info.is_some_and(|info| {
946 info.domain == "store.stream"
947 && matches!(info.reason.as_str(), "BATCH_EVICTED" | "BATCH_NOT_FOUND")
948 }),
949 Err(_) => false,
950 }
951}
952
953#[derive(Clone, Copy, Debug)]
955pub struct RetryConfig {
956 max_attempts: usize,
957 initial_backoff: Duration,
958 max_backoff: Duration,
959}
960
961impl RetryConfig {
962 pub fn standard() -> Self {
963 Self {
964 max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
965 initial_backoff: Duration::from_millis(DEFAULT_RETRY_INITIAL_BACKOFF_MS),
966 max_backoff: Duration::from_millis(DEFAULT_RETRY_MAX_BACKOFF_MS),
967 }
968 }
969
970 pub fn disabled() -> Self {
971 Self::standard().with_max_attempts(1)
972 }
973
974 pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
975 self.max_attempts = max_attempts.max(1);
976 self
977 }
978
979 pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self {
980 self.initial_backoff = initial_backoff;
981 self
982 }
983
984 pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self {
985 self.max_backoff = max_backoff;
986 self
987 }
988
989 pub(crate) fn sanitized(self) -> Self {
990 let max_attempts = self.max_attempts.max(1);
991 let max_backoff = self.max_backoff.max(self.initial_backoff);
992 Self {
993 max_attempts,
994 initial_backoff: self.initial_backoff,
995 max_backoff,
996 }
997 }
998}
999
1000impl Default for RetryConfig {
1001 fn default() -> Self {
1002 Self::standard()
1003 }
1004}
1005
1006fn trim_connect_base(url: &str) -> String {
1007 url.trim_end_matches('/').to_string()
1008}
1009
1010fn query_detail_from_header_map(map: &HeaderMap) -> Option<proto_query::Detail> {
1011 let v = map.get(PROTO_QUERY_DETAIL_RESPONSE_HEADER)?;
1012 let s = v.to_str().ok()?;
1013 proto_decode_query_detail_header_value(s).ok()
1014}
1015
1016fn query_detail_from_unary_metadata(
1017 headers: &HeaderMap,
1018 trailers: &HeaderMap,
1019) -> Option<proto_query::Detail> {
1020 query_detail_from_header_map(headers).or_else(|| query_detail_from_header_map(trailers))
1021}
1022
1023fn new_http_client() -> reqwest::Client {
1024 reqwest::Client::builder()
1025 .pool_max_idle_per_host(32)
1026 .timeout(Duration::from_secs(30))
1027 .build()
1028 .expect("failed to build HTTP client")
1029}
1030
1031#[derive(Debug, thiserror::Error)]
1033pub enum ClientBuildError {
1034 #[error("StoreClientBuilder: missing health URL (set health_url or url)")]
1035 MissingHealthUrl,
1036 #[error("StoreClientBuilder: missing ingest URL (set ingest_url or url)")]
1037 MissingIngestUrl,
1038 #[error("StoreClientBuilder: missing query URL (set query_url or url)")]
1039 MissingQueryUrl,
1040 #[error("StoreClientBuilder: missing compact URL (set compact_url or url)")]
1041 MissingCompactUrl,
1042 #[error("StoreClientBuilder: invalid URL \"{url}\": {source}")]
1043 InvalidUrl {
1044 url: String,
1045 source: http::uri::InvalidUri,
1046 },
1047}
1048
1049#[derive(Debug, Default)]
1054pub struct StoreClientBuilder {
1055 health_url: Option<String>,
1056 ingest_url: Option<String>,
1057 query_url: Option<String>,
1058 compact_url: Option<String>,
1059 stream_url: Option<String>,
1060 key_prefix: Option<StoreKeyPrefix>,
1061 retry_config: RetryConfig,
1062 connect_request_compression: ConnectRequestCompression,
1063}
1064
1065impl StoreClientBuilder {
1066 pub fn url(mut self, url: &str) -> Self {
1068 let u = trim_connect_base(url);
1069 self.health_url = Some(u.clone());
1070 self.ingest_url = Some(u.clone());
1071 self.query_url = Some(u.clone());
1072 self.compact_url = Some(u.clone());
1073 self.stream_url = Some(u);
1074 self
1075 }
1076
1077 pub fn health_url(mut self, url: &str) -> Self {
1079 self.health_url = Some(trim_connect_base(url));
1080 self
1081 }
1082
1083 pub fn ingest_url(mut self, url: &str) -> Self {
1085 self.ingest_url = Some(trim_connect_base(url));
1086 self
1087 }
1088
1089 pub fn query_url(mut self, url: &str) -> Self {
1091 self.query_url = Some(trim_connect_base(url));
1092 self
1093 }
1094
1095 pub fn compact_url(mut self, url: &str) -> Self {
1097 self.compact_url = Some(trim_connect_base(url));
1098 self
1099 }
1100
1101 pub fn stream_url(mut self, url: &str) -> Self {
1104 self.stream_url = Some(trim_connect_base(url));
1105 self
1106 }
1107
1108 pub fn key_prefix(mut self, prefix: StoreKeyPrefix) -> Self {
1110 self.key_prefix = Some(prefix);
1111 self
1112 }
1113
1114 pub fn retry_config(mut self, retry: RetryConfig) -> Self {
1116 self.retry_config = retry.sanitized();
1117 self
1118 }
1119
1120 pub fn connect_request_compression(mut self, compression: ConnectRequestCompression) -> Self {
1122 self.connect_request_compression = compression;
1123 self
1124 }
1125
1126 pub fn build(self) -> Result<StoreClient, ClientBuildError> {
1128 let health_url = self.health_url.ok_or(ClientBuildError::MissingHealthUrl)?;
1129 let ingest_url = self.ingest_url.ok_or(ClientBuildError::MissingIngestUrl)?;
1130 let query_url = self.query_url.ok_or(ClientBuildError::MissingQueryUrl)?;
1131 let compact_url = self
1132 .compact_url
1133 .ok_or(ClientBuildError::MissingCompactUrl)?;
1134 let stream_url = self.stream_url.unwrap_or_else(|| ingest_url.clone());
1137 let ingest_uri: http::Uri =
1138 ingest_url
1139 .parse()
1140 .map_err(|e| ClientBuildError::InvalidUrl {
1141 url: ingest_url.clone(),
1142 source: e,
1143 })?;
1144 let query_uri: http::Uri = query_url
1145 .parse()
1146 .map_err(|e| ClientBuildError::InvalidUrl {
1147 url: query_url.clone(),
1148 source: e,
1149 })?;
1150 let compact_uri: http::Uri =
1151 compact_url
1152 .parse()
1153 .map_err(|e| ClientBuildError::InvalidUrl {
1154 url: compact_url.clone(),
1155 source: e,
1156 })?;
1157 let stream_uri: http::Uri =
1158 stream_url
1159 .parse()
1160 .map_err(|e| ClientBuildError::InvalidUrl {
1161 url: stream_url.clone(),
1162 source: e,
1163 })?;
1164 Ok(StoreClient {
1165 health_url,
1166 ingest_uri,
1167 query_uri,
1168 compact_uri,
1169 stream_uri,
1170 http: new_http_client(),
1171 connect_http: ProtoPreferZstdHttpClient::plaintext(),
1172 retry_config: self.retry_config,
1173 connect_request_compression: self.connect_request_compression,
1174 key_prefix: self.key_prefix,
1175 })
1176 }
1177}
1178
1179#[derive(Clone, Debug)]
1181pub struct StoreClient {
1182 pub(crate) health_url: String,
1184 ingest_uri: http::Uri,
1185 query_uri: http::Uri,
1186 compact_uri: http::Uri,
1187 stream_uri: http::Uri,
1188 http: reqwest::Client,
1189 connect_http: ProtoPreferZstdHttpClient,
1190 retry_config: RetryConfig,
1191 connect_request_compression: ConnectRequestCompression,
1192 key_prefix: Option<StoreKeyPrefix>,
1193}
1194
1195#[derive(Clone, Debug)]
1210pub struct SerializableReadSession {
1211 client: StoreClient,
1212 state: Arc<SessionState>,
1213}
1214
1215#[derive(Debug)]
1216struct SessionState {
1217 sequence: Arc<AtomicU64>,
1218 init_gate: tokio::sync::Mutex<()>,
1219}
1220
1221impl SessionState {
1222 fn fixed_sequence(&self) -> Option<u64> {
1223 let sequence = self.sequence.load(Ordering::Acquire);
1224 (sequence > 0).then_some(sequence)
1225 }
1226}
1227
1228#[derive(Default)]
1229struct RangeStreamReadOptions {
1230 min_sequence_number: Option<u64>,
1231 observed_sequence: Option<Arc<AtomicU64>>,
1232}
1233
1234impl StoreClient {
1235 pub fn builder() -> StoreClientBuilder {
1237 StoreClientBuilder::default()
1238 }
1239
1240 pub fn new(url: &str) -> Self {
1241 Self::with_retry_config(url, RetryConfig::standard())
1242 }
1243
1244 pub fn with_retry_config(url: &str, retry_config: RetryConfig) -> Self {
1245 Self::builder()
1246 .url(url)
1247 .retry_config(retry_config)
1248 .build()
1249 .expect("url sets health, ingest, and query URLs")
1250 }
1251
1252 pub fn with_split_urls(
1254 health_base: &str,
1255 ingest_base: &str,
1256 query_base: &str,
1257 compact_base: &str,
1258 ) -> Self {
1259 Self::with_split_urls_and_retry(
1260 health_base,
1261 ingest_base,
1262 query_base,
1263 compact_base,
1264 RetryConfig::standard(),
1265 )
1266 }
1267
1268 pub fn with_split_urls_and_retry(
1269 health_base: &str,
1270 ingest_base: &str,
1271 query_base: &str,
1272 compact_base: &str,
1273 retry_config: RetryConfig,
1274 ) -> Self {
1275 Self::builder()
1276 .health_url(health_base)
1277 .ingest_url(ingest_base)
1278 .query_url(query_base)
1279 .compact_url(compact_base)
1280 .retry_config(retry_config)
1281 .build()
1282 .expect("all service URLs are set")
1283 }
1284
1285 pub fn key_prefix(&self) -> Option<StoreKeyPrefix> {
1287 self.key_prefix
1288 }
1289
1290 pub fn with_key_prefix(&self, prefix: StoreKeyPrefix) -> Self {
1292 let mut out = self.clone();
1293 out.key_prefix = Some(prefix);
1294 out
1295 }
1296
1297 pub fn without_key_prefix(&self) -> Self {
1299 let mut out = self.clone();
1300 out.key_prefix = None;
1301 out
1302 }
1303
1304 pub fn encode_store_key(&self, key: &Key) -> Result<Key, ClientError> {
1306 match self.key_prefix {
1307 Some(prefix) => Ok(prefix.encode_key(key)?),
1308 None => Ok(Bytes::copy_from_slice(key)),
1309 }
1310 }
1311
1312 pub fn decode_store_key(&self, key: &Key) -> Result<Key, ClientError> {
1314 match self.key_prefix {
1315 Some(prefix) => Ok(prefix.decode_key(key)?),
1316 None => Ok(Bytes::copy_from_slice(key)),
1317 }
1318 }
1319
1320 fn encode_store_range(&self, start: &Key, end: &Key) -> Result<(Key, Key), ClientError> {
1321 match self.key_prefix {
1322 Some(prefix) => Ok(prefix.encode_range(start, end)?),
1323 None => Ok((Bytes::copy_from_slice(start), Bytes::copy_from_slice(end))),
1324 }
1325 }
1326
1327 pub fn connect_request_compression(&self) -> ConnectRequestCompression {
1329 self.connect_request_compression
1330 }
1331
1332 pub fn decode_error_details(
1333 err: &ConnectError,
1334 ) -> Result<exoware_proto::DecodedConnectError, buffa::DecodeError> {
1335 proto_decode_connect_error(err)
1336 }
1337
1338 pub fn create_session(&self) -> SerializableReadSession {
1344 self.create_session_with_sequence(0)
1345 }
1346
1347 pub fn create_session_with_sequence(&self, sequence: u64) -> SerializableReadSession {
1353 SerializableReadSession {
1354 client: self.clone(),
1355 state: Arc::new(SessionState {
1356 sequence: Arc::new(AtomicU64::new(sequence)),
1357 init_gate: tokio::sync::Mutex::new(()),
1358 }),
1359 }
1360 }
1361
1362 pub fn ingest(&self) -> Ingest<'_> {
1364 Ingest { c: self }
1365 }
1366
1367 pub fn query(&self) -> Query<'_> {
1369 Query { c: self }
1370 }
1371
1372 pub fn compact(&self) -> Compact<'_> {
1374 Compact { c: self }
1375 }
1376
1377 pub fn stream(&self) -> Stream<'_> {
1379 Stream { c: self }
1380 }
1381
1382 pub(crate) async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
1389 if self.key_prefix.is_none() {
1390 return self.put_physical(kvs).await;
1391 }
1392 let mut keys = Vec::with_capacity(kvs.len());
1393 for (key, _) in kvs {
1394 keys.push(self.encode_store_key(key)?);
1395 }
1396 let prefixed: Vec<(&Key, &[u8])> = keys
1397 .iter()
1398 .zip(kvs.iter())
1399 .map(|(key, (_, value))| (key, *value))
1400 .collect();
1401 self.put_physical(&prefixed).await
1402 }
1403
1404 async fn put_physical(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
1405 let mut proto_kvs = Vec::with_capacity(kvs.len());
1406 for (key, value) in kvs {
1407 if !is_valid_key_size(key.len()) {
1408 return Err(ClientError::WireFormat(format!(
1409 "key length {} is outside valid store key range ({}..={})",
1410 key.len(),
1411 keys::MIN_KEY_LEN,
1412 MAX_KEY_LEN
1413 )));
1414 }
1415 proto_kvs.push(exoware_proto::common::KvEntry {
1416 key: (*key).to_vec(),
1417 value: value.to_vec(),
1418 ..Default::default()
1419 });
1420 }
1421
1422 let config =
1423 store_connect_client_config(self.ingest_uri.clone(), self.connect_request_compression);
1424 let client = IngestServiceClient::new(self.connect_http.clone(), config);
1425 let response = client
1426 .put(ProtoPutRequest {
1427 kvs: proto_kvs,
1428 ..Default::default()
1429 })
1430 .await
1431 .map_err(client_error_from_connect)?;
1432 Ok(response.into_owned().sequence_number)
1433 }
1434
1435 pub(crate) async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
1436 self.get_internal(key, None).await
1437 }
1438
1439 pub(crate) async fn get_with_min_sequence_number(
1440 &self,
1441 key: &Key,
1442 min_sequence_number: u64,
1443 ) -> Result<Option<Bytes>, ClientError> {
1444 self.get_internal(key, Some(min_sequence_number)).await
1445 }
1446
1447 async fn get_internal(
1448 &self,
1449 key: &Key,
1450 min_sequence_number: Option<u64>,
1451 ) -> Result<Option<Bytes>, ClientError> {
1452 let (response, detail) = self
1453 .send_get(key, self.normalize_min_sequence_number(min_sequence_number))
1454 .await?;
1455 let _ = detail;
1456 Ok(response.value.map(Bytes::from))
1457 }
1458
1459 pub(crate) async fn get_many(
1460 &self,
1461 keys: &[&Key],
1462 batch_size: u32,
1463 ) -> Result<GetManyStream, ClientError> {
1464 self.get_many_internal(keys, batch_size, None, None).await
1465 }
1466
1467 pub(crate) async fn get_many_with_min_sequence_number(
1468 &self,
1469 keys: &[&Key],
1470 batch_size: u32,
1471 min_sequence_number: u64,
1472 ) -> Result<GetManyStream, ClientError> {
1473 self.get_many_internal(keys, batch_size, Some(min_sequence_number), None)
1474 .await
1475 }
1476
1477 async fn get_many_internal(
1478 &self,
1479 keys: &[&Key],
1480 batch_size: u32,
1481 min_sequence_number: Option<u64>,
1482 observed_sequence: Option<Arc<AtomicU64>>,
1483 ) -> Result<GetManyStream, ClientError> {
1484 for key in keys {
1485 if !is_valid_key_size(key.len()) {
1486 return Err(ClientError::WireFormat(format!(
1487 "key length {} is outside valid store key range ({}..={})",
1488 key.len(),
1489 keys::MIN_KEY_LEN,
1490 MAX_KEY_LEN
1491 )));
1492 }
1493 }
1494
1495 let config =
1496 store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1497 let client = QueryServiceClient::new(self.connect_http.clone(), config);
1498 let proto_keys: Vec<Vec<u8>> = keys
1499 .iter()
1500 .map(|k| self.encode_store_key(k).map(|key| key.to_vec()))
1501 .collect::<Result<Vec<_>, _>>()?;
1502 let effective_min = self.normalize_min_sequence_number(min_sequence_number);
1503 let max_attempts = self.retry_config.max_attempts.max(1);
1504 let mut attempt = 1usize;
1505 loop {
1506 match client
1507 .get_many(ProtoGetManyRequest {
1508 keys: proto_keys.clone(),
1509 min_sequence_number: effective_min,
1510 batch_size,
1511 ..Default::default()
1512 })
1513 .await
1514 {
1515 Ok(stream) => {
1516 let mut gms = GetManyStream::from_connect_stream(
1517 stream,
1518 observed_sequence.clone(),
1519 self.key_prefix,
1520 );
1521 if let Err(err) = gms.prefetch_first_frame().await {
1522 if attempt < max_attempts && is_retryable_error(&err) {
1523 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1524 tokio::time::sleep(delay).await;
1525 attempt += 1;
1526 continue;
1527 }
1528 return Err(client_error_from_connect(err));
1529 }
1530 return Ok(gms);
1531 }
1532 Err(err) => {
1533 if attempt < max_attempts && is_retryable_error(&err) {
1534 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1535 tokio::time::sleep(delay).await;
1536 attempt += 1;
1537 continue;
1538 }
1539 return Err(client_error_from_connect(err));
1540 }
1541 }
1542 }
1543 }
1544
1545 pub(crate) async fn range(
1548 &self,
1549 start: &Key,
1550 end: &Key,
1551 limit: usize,
1552 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1553 self.range_internal(start, end, limit, RangeMode::Forward, None)
1554 .await
1555 }
1556
1557 pub(crate) async fn range_with_mode(
1559 &self,
1560 start: &Key,
1561 end: &Key,
1562 limit: usize,
1563 mode: RangeMode,
1564 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1565 self.range_internal(start, end, limit, mode, None).await
1566 }
1567
1568 pub(crate) async fn range_with_min_sequence_number(
1569 &self,
1570 start: &Key,
1571 end: &Key,
1572 limit: usize,
1573 min_sequence_number: u64,
1574 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1575 self.range_internal(
1576 start,
1577 end,
1578 limit,
1579 RangeMode::Forward,
1580 Some(min_sequence_number),
1581 )
1582 .await
1583 }
1584
1585 pub(crate) async fn range_with_mode_and_min_sequence_number(
1586 &self,
1587 start: &Key,
1588 end: &Key,
1589 limit: usize,
1590 mode: RangeMode,
1591 min_sequence_number: u64,
1592 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1593 self.range_internal(start, end, limit, mode, Some(min_sequence_number))
1594 .await
1595 }
1596
1597 pub(crate) async fn range_stream(
1598 &self,
1599 start: &Key,
1600 end: &Key,
1601 limit: usize,
1602 batch_size: usize,
1603 ) -> Result<RangeStream, ClientError> {
1604 self.range_stream_internal(
1605 start,
1606 end,
1607 limit,
1608 batch_size,
1609 RangeMode::Forward,
1610 RangeStreamReadOptions::default(),
1611 )
1612 .await
1613 }
1614
1615 pub(crate) async fn range_stream_with_mode(
1616 &self,
1617 start: &Key,
1618 end: &Key,
1619 limit: usize,
1620 batch_size: usize,
1621 mode: RangeMode,
1622 ) -> Result<RangeStream, ClientError> {
1623 self.range_stream_internal(start, end, limit, batch_size, mode, Default::default())
1624 .await
1625 }
1626
1627 pub(crate) async fn range_stream_with_min_sequence_number(
1628 &self,
1629 start: &Key,
1630 end: &Key,
1631 limit: usize,
1632 batch_size: usize,
1633 min_sequence_number: u64,
1634 ) -> Result<RangeStream, ClientError> {
1635 self.range_stream_internal(
1636 start,
1637 end,
1638 limit,
1639 batch_size,
1640 RangeMode::Forward,
1641 RangeStreamReadOptions {
1642 min_sequence_number: Some(min_sequence_number),
1643 observed_sequence: None,
1644 },
1645 )
1646 .await
1647 }
1648
1649 pub(crate) async fn range_stream_with_mode_and_min_sequence_number(
1650 &self,
1651 start: &Key,
1652 end: &Key,
1653 limit: usize,
1654 batch_size: usize,
1655 mode: RangeMode,
1656 min_sequence_number: u64,
1657 ) -> Result<RangeStream, ClientError> {
1658 self.range_stream_internal(
1659 start,
1660 end,
1661 limit,
1662 batch_size,
1663 mode,
1664 RangeStreamReadOptions {
1665 min_sequence_number: Some(min_sequence_number),
1666 observed_sequence: None,
1667 },
1668 )
1669 .await
1670 }
1671
1672 pub(crate) async fn range_reduce(
1673 &self,
1674 start: &Key,
1675 end: &Key,
1676 request: &DomainRangeReduceRequest,
1677 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
1678 let (response, _) = self
1679 .range_reduce_response_internal(start, end, request, None)
1680 .await?;
1681 let decoded = proto_to_domain_reduce_response(response).map_err(ClientError::WireFormat)?;
1682 if !decoded.groups.is_empty() {
1683 return Err(ClientError::WireFormat(
1684 "grouped range reduction response returned for scalar request".to_string(),
1685 ));
1686 }
1687 Ok(decoded
1688 .results
1689 .iter()
1690 .map(|result| result.value.clone())
1691 .collect())
1692 }
1693
1694 pub(crate) async fn range_reduce_with_min_sequence_number(
1695 &self,
1696 start: &Key,
1697 end: &Key,
1698 request: &DomainRangeReduceRequest,
1699 min_sequence_number: u64,
1700 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
1701 let (response, _) = self
1702 .range_reduce_response_internal(start, end, request, Some(min_sequence_number))
1703 .await?;
1704 let decoded = proto_to_domain_reduce_response(response).map_err(ClientError::WireFormat)?;
1705 if !decoded.groups.is_empty() {
1706 return Err(ClientError::WireFormat(
1707 "grouped range reduction response returned for scalar request".to_string(),
1708 ));
1709 }
1710 Ok(decoded
1711 .results
1712 .iter()
1713 .map(|result| result.value.clone())
1714 .collect())
1715 }
1716
1717 pub(crate) async fn range_reduce_response(
1718 &self,
1719 start: &Key,
1720 end: &Key,
1721 request: &DomainRangeReduceRequest,
1722 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
1723 let (body, _) = self
1724 .range_reduce_response_internal(start, end, request, None)
1725 .await?;
1726 Ok(body)
1727 }
1728
1729 pub(crate) async fn range_reduce_response_with_min_sequence_number(
1730 &self,
1731 start: &Key,
1732 end: &Key,
1733 request: &DomainRangeReduceRequest,
1734 min_sequence_number: u64,
1735 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
1736 let (body, _) = self
1737 .range_reduce_response_internal(start, end, request, Some(min_sequence_number))
1738 .await?;
1739 Ok(body)
1740 }
1741
1742 pub(crate) async fn prune(
1743 &self,
1744 policies: &[crate::prune_policy::PrunePolicy],
1745 ) -> Result<(), ClientError> {
1746 let config =
1747 store_connect_client_config(self.compact_uri.clone(), self.connect_request_compression);
1748 let client = CompactServiceClient::new(self.connect_http.clone(), config);
1749 let policies = self.prefix_prune_policies(policies)?;
1750 client
1751 .prune(ProtoPruneRequest {
1752 policies: exoware_proto::prune_policies_to_proto(&policies),
1753 ..Default::default()
1754 })
1755 .await
1756 .map_err(client_error_from_connect)?;
1757 Ok(())
1758 }
1759
1760 pub async fn health(&self) -> Result<bool, ClientError> {
1761 let resp = self
1762 .http
1763 .get(format!("{}/health", self.health_url))
1764 .send()
1765 .await?;
1766 Ok(resp.status().is_success())
1767 }
1768
1769 pub async fn ready(&self) -> Result<bool, ClientError> {
1770 let resp = self
1771 .http
1772 .get(format!("{}/ready", self.health_url))
1773 .send()
1774 .await?;
1775 Ok(resp.status().is_success())
1776 }
1777
1778 fn normalize_min_sequence_number(&self, requested_sequence: Option<u64>) -> Option<u64> {
1779 requested_sequence.filter(|sequence| *sequence > 0)
1780 }
1781
1782 async fn send_get(
1783 &self,
1784 key: &Key,
1785 min_sequence_number: Option<u64>,
1786 ) -> Result<
1787 (
1788 exoware_proto::query::GetResponse,
1789 Option<proto_query::Detail>,
1790 ),
1791 ClientError,
1792 > {
1793 if !is_valid_key_size(key.len()) {
1794 return Err(ClientError::WireFormat(format!(
1795 "key length {} is outside valid store key range ({}..={})",
1796 key.len(),
1797 keys::MIN_KEY_LEN,
1798 MAX_KEY_LEN
1799 )));
1800 }
1801 let key = self.encode_store_key(key)?;
1802
1803 let config =
1804 store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1805 let client = QueryServiceClient::new(self.connect_http.clone(), config);
1806 let response = self
1807 .send_with_retry(|| async {
1808 client
1809 .get(ProtoGetRequest {
1810 key: key.to_vec(),
1811 min_sequence_number,
1812 ..Default::default()
1813 })
1814 .await
1815 })
1816 .await?;
1817 let detail = query_detail_from_unary_metadata(response.headers(), response.trailers());
1818 let owned = response.into_owned();
1819 Ok((owned, detail))
1820 }
1821
1822 #[cfg(test)]
1823 pub async fn send_get_for_tests(
1824 &self,
1825 key: &Key,
1826 min_sequence_number: Option<u64>,
1827 ) -> Result<
1828 (
1829 exoware_proto::query::GetResponse,
1830 Option<proto_query::Detail>,
1831 ),
1832 ClientError,
1833 > {
1834 self.send_get(key, min_sequence_number).await
1835 }
1836
1837 async fn range_internal(
1838 &self,
1839 start: &Key,
1840 end: &Key,
1841 limit: usize,
1842 mode: RangeMode,
1843 min_sequence_number: Option<u64>,
1844 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1845 let stream = self
1846 .range_stream_internal(
1847 start,
1848 end,
1849 limit,
1850 limit.max(1),
1851 mode,
1852 RangeStreamReadOptions {
1853 min_sequence_number,
1854 observed_sequence: None,
1855 },
1856 )
1857 .await?;
1858 stream.collect().await
1859 }
1860
1861 async fn range_stream_internal(
1862 &self,
1863 start: &Key,
1864 end: &Key,
1865 limit: usize,
1866 batch_size: usize,
1867 mode: RangeMode,
1868 options: RangeStreamReadOptions,
1869 ) -> Result<RangeStream, ClientError> {
1870 if !is_valid_key_size(start.len()) || !is_valid_key_size(end.len()) {
1871 return Err(ClientError::WireFormat(
1872 "range start/end key length is outside valid store key range".to_string(),
1873 ));
1874 }
1875 if batch_size == 0 {
1876 return Err(ClientError::WireFormat(
1877 "batch_size must be positive".to_string(),
1878 ));
1879 }
1880 let (start, end) = self.encode_store_range(start, end)?;
1881
1882 let config =
1883 store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1884 let client = QueryServiceClient::new(self.connect_http.clone(), config);
1885 let min_sequence_number = self.normalize_min_sequence_number(options.min_sequence_number);
1886 let max_attempts = self.retry_config.max_attempts.max(1);
1887 let mut attempt = 1usize;
1888 loop {
1889 let response = match client
1894 .range(ProtoRangeRequest {
1895 start: start.to_vec(),
1896 end: end.to_vec(),
1897 limit: Some(u32::try_from(limit).unwrap_or(u32::MAX)),
1898 batch_size: u32::try_from(batch_size).unwrap_or(u32::MAX),
1899 mode: mode.to_proto().into(),
1900 min_sequence_number,
1901 ..Default::default()
1902 })
1903 .await
1904 {
1905 Ok(response) => response,
1906 Err(err) => {
1907 if attempt < max_attempts && is_retryable_error(&err) {
1908 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1909 tracing::debug!(
1910 attempt,
1911 max_attempts,
1912 code = err.code.as_str(),
1913 delay_ms = delay.as_millis() as u64,
1914 "store client retrying transient range-open error",
1915 );
1916 tokio::time::sleep(delay).await;
1917 attempt += 1;
1918 continue;
1919 }
1920 return Err(client_error_from_connect(err));
1921 }
1922 };
1923
1924 let mut stream = RangeStream::from_connect_stream(
1925 response,
1926 options.observed_sequence.clone(),
1927 self.key_prefix,
1928 );
1929 if let Err(err) = stream.prefetch_first_frame().await {
1930 if attempt < max_attempts && is_retryable_error(&err) {
1931 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1932 tracing::debug!(
1933 attempt,
1934 max_attempts,
1935 code = err.code.as_str(),
1936 delay_ms = delay.as_millis() as u64,
1937 "store client retrying transient stream-open error",
1938 );
1939 tokio::time::sleep(delay).await;
1940 attempt += 1;
1941 continue;
1942 }
1943 return Err(client_error_from_connect(err));
1944 }
1945 return Ok(stream);
1946 }
1947 }
1948
1949 async fn range_reduce_response_internal(
1950 &self,
1951 start: &Key,
1952 end: &Key,
1953 request: &DomainRangeReduceRequest,
1954 min_sequence_number: Option<u64>,
1955 ) -> Result<
1956 (
1957 exoware_proto::query::ReduceResponse,
1958 Option<proto_query::Detail>,
1959 ),
1960 ClientError,
1961 > {
1962 let config =
1963 store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1964 let client = QueryServiceClient::new(self.connect_http.clone(), config);
1965 let (start, end) = self.encode_store_range(start, end)?;
1966 let request = self.prefix_reduce_request(request)?;
1967 let proto_params = proto_to_proto_reduce_params(request);
1968 let min_sequence_number = self.normalize_min_sequence_number(min_sequence_number);
1969 let response = self
1970 .send_with_retry(|| async {
1971 client
1972 .reduce(ProtoWireReduceRequest {
1973 start: start.to_vec(),
1974 end: end.to_vec(),
1975 params: Some(proto_params.clone()).into(),
1976 min_sequence_number,
1977 ..Default::default()
1978 })
1979 .await
1980 })
1981 .await?;
1982 let detail = query_detail_from_unary_metadata(response.headers(), response.trailers());
1983 let owned = response.into_owned();
1984 Ok((owned, detail))
1985 }
1986
1987 fn prefix_prune_policies(
1988 &self,
1989 policies: &[crate::prune_policy::PrunePolicy],
1990 ) -> Result<Vec<crate::prune_policy::PrunePolicy>, ClientError> {
1991 let Some(prefix) = self.key_prefix else {
1992 return Ok(policies.to_vec());
1993 };
1994 policies
1995 .iter()
1996 .map(|policy| {
1997 use crate::prune_policy::{PolicyScope, PrunePolicy};
1998 let scope = match &policy.scope {
1999 PolicyScope::Keys(scope) => {
2000 let mut scope = scope.clone();
2001 scope.match_key = prefix.prefix_match_key(&scope.match_key)?;
2002 PolicyScope::Keys(scope)
2003 }
2004 PolicyScope::Sequence => PolicyScope::Sequence,
2005 };
2006 Ok(PrunePolicy {
2007 scope,
2008 retain: policy.retain.clone(),
2009 })
2010 })
2011 .collect::<Result<Vec<_>, StoreKeyPrefixError>>()
2012 .map_err(ClientError::from)
2013 }
2014
2015 fn prefix_stream_filter(
2016 &self,
2017 filter: crate::stream_filter::StreamFilter,
2018 ) -> Result<crate::stream_filter::StreamFilter, ClientError> {
2019 let Some(prefix) = self.key_prefix else {
2020 return Ok(filter);
2021 };
2022 let match_keys = filter
2023 .match_keys
2024 .iter()
2025 .map(|mk| prefix.prefix_stream_match_key(mk))
2026 .collect::<Result<Vec<_>, _>>()?;
2027 Ok(crate::stream_filter::StreamFilter {
2028 match_keys,
2029 value_filters: filter.value_filters,
2030 })
2031 }
2032
2033 fn prefix_reduce_request(
2034 &self,
2035 request: &DomainRangeReduceRequest,
2036 ) -> Result<DomainRangeReduceRequest, ClientError> {
2037 let Some(prefix) = self.key_prefix else {
2038 return Ok(request.clone());
2039 };
2040 let mut request = request.clone();
2041 shift_reduce_request_key_offsets(prefix.reserved_bits(), &mut request)?;
2042 Ok(request)
2043 }
2044
2045 async fn send_with_retry<F, Fut, T>(&self, mut make_request: F) -> Result<T, ClientError>
2046 where
2047 F: FnMut() -> Fut,
2048 Fut: std::future::Future<Output = Result<T, ConnectError>>,
2049 {
2050 let max_attempts = self.retry_config.max_attempts.max(1);
2051 let mut attempt = 1usize;
2052 loop {
2053 match make_request().await {
2054 Ok(response) => return Ok(response),
2055 Err(err) => {
2056 if attempt < max_attempts && is_retryable_error(&err) {
2057 let delay = retry_delay_for_error(&err, attempt, self.retry_config);
2058 tracing::debug!(
2059 attempt,
2060 max_attempts,
2061 code = err.code.as_str(),
2062 delay_ms = delay.as_millis() as u64,
2063 "store client retrying transient RPC error",
2064 );
2065 tokio::time::sleep(delay).await;
2066 attempt += 1;
2067 continue;
2068 }
2069 return Err(client_error_from_connect(err));
2070 }
2071 }
2072 }
2073 }
2074}
2075
2076fn shift_reduce_request_key_offsets(
2077 prefix_bits: u8,
2078 request: &mut DomainRangeReduceRequest,
2079) -> Result<(), StoreKeyPrefixError> {
2080 for reducer in &mut request.reducers {
2081 if let Some(expr) = &mut reducer.expr {
2082 shift_expr_key_offsets(prefix_bits, expr)?;
2083 }
2084 }
2085 for expr in &mut request.group_by {
2086 shift_expr_key_offsets(prefix_bits, expr)?;
2087 }
2088 if let Some(filter) = &mut request.filter {
2089 for check in &mut filter.checks {
2090 shift_field_ref_key_offset(prefix_bits, &mut check.field)?;
2091 }
2092 }
2093 Ok(())
2094}
2095
2096fn shift_expr_key_offsets(prefix_bits: u8, expr: &mut KvExpr) -> Result<(), StoreKeyPrefixError> {
2097 match expr {
2098 KvExpr::Field(field) => shift_field_ref_key_offset(prefix_bits, field),
2099 KvExpr::Literal(_) => Ok(()),
2100 KvExpr::Add(left, right)
2101 | KvExpr::Sub(left, right)
2102 | KvExpr::Mul(left, right)
2103 | KvExpr::Div(left, right) => {
2104 shift_expr_key_offsets(prefix_bits, left)?;
2105 shift_expr_key_offsets(prefix_bits, right)
2106 }
2107 KvExpr::Lower(inner) | KvExpr::DateTruncDay(inner) => {
2108 shift_expr_key_offsets(prefix_bits, inner)
2109 }
2110 }
2111}
2112
2113fn shift_field_ref_key_offset(
2114 prefix_bits: u8,
2115 field: &mut KvFieldRef,
2116) -> Result<(), StoreKeyPrefixError> {
2117 match field {
2118 KvFieldRef::Key { bit_offset, .. } | KvFieldRef::ZOrderKey { bit_offset, .. } => {
2119 *bit_offset = bit_offset.checked_add(u16::from(prefix_bits)).ok_or(
2120 StoreKeyPrefixError::BitOffsetOverflow {
2121 offset: *bit_offset,
2122 prefix_bits,
2123 },
2124 )?;
2125 Ok(())
2126 }
2127 KvFieldRef::Value { .. } => Ok(()),
2128 }
2129}
2130
2131#[derive(Clone, Copy, Debug)]
2134pub struct Ingest<'a> {
2135 c: &'a StoreClient,
2136}
2137
2138#[derive(Clone, Copy, Debug)]
2139pub struct Query<'a> {
2140 c: &'a StoreClient,
2141}
2142
2143#[derive(Clone, Copy, Debug)]
2144pub struct Compact<'a> {
2145 c: &'a StoreClient,
2146}
2147
2148#[derive(Clone, Copy, Debug)]
2149pub struct Stream<'a> {
2150 c: &'a StoreClient,
2151}
2152
2153impl<'a> Ingest<'a> {
2154 pub async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
2155 self.c.put(kvs).await
2156 }
2157
2158 pub async fn put_prepared(&self, batch: &StoreWriteBatch) -> Result<u64, ClientError> {
2161 batch.commit(self.c).await
2162 }
2163}
2164
2165impl<'a> Query<'a> {
2166 pub async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
2167 self.c.get(key).await
2168 }
2169
2170 pub async fn get_with_min_sequence_number(
2171 &self,
2172 key: &Key,
2173 min_sequence_number: u64,
2174 ) -> Result<Option<Bytes>, ClientError> {
2175 self.c
2176 .get_with_min_sequence_number(key, min_sequence_number)
2177 .await
2178 }
2179
2180 pub async fn get_many(
2181 &self,
2182 keys: &[&Key],
2183 batch_size: u32,
2184 ) -> Result<GetManyStream, ClientError> {
2185 self.c.get_many(keys, batch_size).await
2186 }
2187
2188 pub async fn get_many_with_min_sequence_number(
2189 &self,
2190 keys: &[&Key],
2191 batch_size: u32,
2192 min_sequence_number: u64,
2193 ) -> Result<GetManyStream, ClientError> {
2194 self.c
2195 .get_many_with_min_sequence_number(keys, batch_size, min_sequence_number)
2196 .await
2197 }
2198
2199 pub async fn range(
2201 &self,
2202 start: &Key,
2203 end: &Key,
2204 limit: usize,
2205 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2206 self.c.range(start, end, limit).await
2207 }
2208
2209 pub async fn range_with_mode(
2210 &self,
2211 start: &Key,
2212 end: &Key,
2213 limit: usize,
2214 mode: RangeMode,
2215 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2216 self.c.range_with_mode(start, end, limit, mode).await
2217 }
2218
2219 pub async fn range_with_min_sequence_number(
2220 &self,
2221 start: &Key,
2222 end: &Key,
2223 limit: usize,
2224 min_sequence_number: u64,
2225 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2226 self.c
2227 .range_with_min_sequence_number(start, end, limit, min_sequence_number)
2228 .await
2229 }
2230
2231 pub async fn range_with_mode_and_min_sequence_number(
2232 &self,
2233 start: &Key,
2234 end: &Key,
2235 limit: usize,
2236 mode: RangeMode,
2237 min_sequence_number: u64,
2238 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2239 self.c
2240 .range_with_mode_and_min_sequence_number(start, end, limit, mode, min_sequence_number)
2241 .await
2242 }
2243
2244 pub async fn range_stream(
2245 &self,
2246 start: &Key,
2247 end: &Key,
2248 limit: usize,
2249 batch_size: usize,
2250 ) -> Result<RangeStream, ClientError> {
2251 self.c.range_stream(start, end, limit, batch_size).await
2252 }
2253
2254 pub async fn range_stream_with_mode(
2255 &self,
2256 start: &Key,
2257 end: &Key,
2258 limit: usize,
2259 batch_size: usize,
2260 mode: RangeMode,
2261 ) -> Result<RangeStream, ClientError> {
2262 self.c
2263 .range_stream_with_mode(start, end, limit, batch_size, mode)
2264 .await
2265 }
2266
2267 pub async fn range_stream_with_min_sequence_number(
2268 &self,
2269 start: &Key,
2270 end: &Key,
2271 limit: usize,
2272 batch_size: usize,
2273 min_sequence_number: u64,
2274 ) -> Result<RangeStream, ClientError> {
2275 self.c
2276 .range_stream_with_min_sequence_number(
2277 start,
2278 end,
2279 limit,
2280 batch_size,
2281 min_sequence_number,
2282 )
2283 .await
2284 }
2285
2286 pub async fn range_stream_with_mode_and_min_sequence_number(
2287 &self,
2288 start: &Key,
2289 end: &Key,
2290 limit: usize,
2291 batch_size: usize,
2292 mode: RangeMode,
2293 min_sequence_number: u64,
2294 ) -> Result<RangeStream, ClientError> {
2295 self.c
2296 .range_stream_with_mode_and_min_sequence_number(
2297 start,
2298 end,
2299 limit,
2300 batch_size,
2301 mode,
2302 min_sequence_number,
2303 )
2304 .await
2305 }
2306
2307 pub async fn range_reduce(
2308 &self,
2309 start: &Key,
2310 end: &Key,
2311 request: &DomainRangeReduceRequest,
2312 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2313 self.c.range_reduce(start, end, request).await
2314 }
2315
2316 pub async fn range_reduce_with_min_sequence_number(
2317 &self,
2318 start: &Key,
2319 end: &Key,
2320 request: &DomainRangeReduceRequest,
2321 min_sequence_number: u64,
2322 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2323 self.c
2324 .range_reduce_with_min_sequence_number(start, end, request, min_sequence_number)
2325 .await
2326 }
2327
2328 pub async fn range_reduce_response(
2329 &self,
2330 start: &Key,
2331 end: &Key,
2332 request: &DomainRangeReduceRequest,
2333 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2334 self.c.range_reduce_response(start, end, request).await
2335 }
2336
2337 pub async fn range_reduce_response_with_min_sequence_number(
2338 &self,
2339 start: &Key,
2340 end: &Key,
2341 request: &DomainRangeReduceRequest,
2342 min_sequence_number: u64,
2343 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2344 self.c
2345 .range_reduce_response_with_min_sequence_number(
2346 start,
2347 end,
2348 request,
2349 min_sequence_number,
2350 )
2351 .await
2352 }
2353}
2354
2355impl<'a> Compact<'a> {
2356 pub async fn prune(
2357 &self,
2358 policies: &[crate::prune_policy::PrunePolicy],
2359 ) -> Result<(), ClientError> {
2360 self.c.prune(policies).await
2361 }
2362}
2363
2364impl<'a> Stream<'a> {
2365 pub async fn subscribe(
2371 &self,
2372 filter: crate::stream_filter::StreamFilter,
2373 since_sequence_number: Option<u64>,
2374 ) -> Result<StreamSubscription, ClientError> {
2375 let logical_filter = self
2376 .c
2377 .key_prefix
2378 .is_some()
2379 .then(|| ClientStreamFilter::compile(&filter))
2380 .transpose()?;
2381 let filter = self.c.prefix_stream_filter(filter)?;
2382 crate::stream_filter::validate_filter(&filter)
2383 .map_err(|e| ClientError::WireFormat(e.to_string()))?;
2384 let match_keys = filter
2385 .match_keys
2386 .into_iter()
2387 .map(|mk| exoware_proto::store::common::v1::MatchKey {
2388 reserved_bits: u32::from(mk.reserved_bits),
2389 prefix: u32::from(mk.prefix),
2390 payload_regex: mk.payload_regex.0,
2391 ..Default::default()
2392 })
2393 .collect();
2394 let value_filters = filter
2395 .value_filters
2396 .into_iter()
2397 .map(|vf| {
2398 use crate::stream_filter::BytesFilter;
2399 use exoware_proto::store::common::v1::bytes_filter::Kind as ProtoKind;
2400 let kind = match vf {
2401 BytesFilter::Exact(bytes) => ProtoKind::Exact(bytes),
2402 BytesFilter::Prefix(bytes) => ProtoKind::Prefix(bytes),
2403 BytesFilter::Regex(pattern) => ProtoKind::Regex(pattern),
2404 };
2405 exoware_proto::store::common::v1::BytesFilter {
2406 kind: Some(kind),
2407 ..Default::default()
2408 }
2409 })
2410 .collect();
2411 let request = exoware_proto::store::stream::v1::SubscribeRequest {
2412 match_keys,
2413 value_filters,
2414 since_sequence_number,
2415 ..Default::default()
2416 };
2417 let config = store_connect_client_config(
2418 self.c.stream_uri.clone(),
2419 self.c.connect_request_compression,
2420 );
2421 let client = exoware_proto::store::stream::v1::ServiceClient::new(
2422 self.c.connect_http.clone(),
2423 config,
2424 );
2425 let stream = client
2426 .subscribe(request)
2427 .await
2428 .map_err(client_error_from_connect)?;
2429 Ok(StreamSubscription {
2430 stream,
2431 key_prefix: self.c.key_prefix,
2432 logical_filter,
2433 })
2434 }
2435
2436 pub async fn get(
2439 &self,
2440 sequence_number: u64,
2441 ) -> Result<Option<Vec<(Key, Bytes)>>, ClientError> {
2442 let config = store_connect_client_config(
2443 self.c.stream_uri.clone(),
2444 self.c.connect_request_compression,
2445 );
2446 let client = exoware_proto::store::stream::v1::ServiceClient::new(
2447 self.c.connect_http.clone(),
2448 config,
2449 );
2450 match client
2451 .get(exoware_proto::store::stream::v1::GetRequest {
2452 sequence_number,
2453 ..Default::default()
2454 })
2455 .await
2456 {
2457 Ok(resp) => {
2458 let owned = resp.into_owned();
2459 let mut entries = Vec::with_capacity(owned.entries.len());
2460 for entry in owned.entries {
2461 let key = Bytes::from(entry.key);
2462 match self.c.key_prefix {
2463 Some(prefix) if !prefix.codec.matches(&key) => {}
2464 Some(prefix) => {
2465 entries.push((prefix.decode_key(&key)?, Bytes::from(entry.value)))
2466 }
2467 None => entries.push((key, Bytes::from(entry.value))),
2468 }
2469 }
2470 Ok(Some(entries))
2471 }
2472 Err(err) => {
2473 if is_batch_missing_error(&err) {
2474 Ok(None)
2475 } else {
2476 Err(client_error_from_connect(err))
2477 }
2478 }
2479 }
2480 }
2481}
2482
2483impl SerializableReadSession {
2484 pub fn fixed_sequence(&self) -> Option<u64> {
2491 self.state.fixed_sequence()
2492 }
2493
2494 pub async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
2495 let seeded_client = self.client.clone();
2496 let unseeded_client = self.client.clone();
2497 self.run_read(
2498 move |sequence| {
2499 let client = seeded_client.clone();
2500 async move { client.get_with_min_sequence_number(key, sequence).await }
2501 },
2502 move |observed_sequence| {
2503 let client = unseeded_client.clone();
2504 async move {
2505 let (response, detail) = client.send_get(key, None).await?;
2506 if let Some(detail) = detail {
2507 observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2508 }
2509 Ok(response.value.map(Bytes::from))
2510 }
2511 },
2512 )
2513 .await
2514 }
2515
2516 pub async fn get_many(
2517 &self,
2518 keys: &[&Key],
2519 batch_size: u32,
2520 ) -> Result<GetManyStream, ClientError> {
2521 let keys_owned: Vec<Key> = keys.iter().map(|k| Bytes::copy_from_slice(k)).collect();
2522 let seeded_client = self.client.clone();
2523 let unseeded_client = self.client.clone();
2524 let keys_seeded = keys_owned.clone();
2525 let keys_unseeded = keys_owned;
2526 self.run_read(
2527 move |sequence| {
2528 let client = seeded_client.clone();
2529 let keys = keys_seeded.clone();
2530 async move {
2531 let refs: Vec<&Key> = keys.iter().collect();
2532 client
2533 .get_many_with_min_sequence_number(&refs, batch_size, sequence)
2534 .await
2535 }
2536 },
2537 move |observed_sequence| {
2538 let client = unseeded_client.clone();
2539 let keys = keys_unseeded.clone();
2540 async move {
2541 let refs: Vec<&Key> = keys.iter().collect();
2542 client
2543 .get_many_internal(&refs, batch_size, None, Some(observed_sequence))
2544 .await
2545 }
2546 },
2547 )
2548 .await
2549 }
2550
2551 pub async fn range(
2552 &self,
2553 start: &Key,
2554 end: &Key,
2555 limit: usize,
2556 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2557 self.range_with_mode(start, end, limit, RangeMode::Forward)
2558 .await
2559 }
2560
2561 pub async fn range_with_mode(
2562 &self,
2563 start: &Key,
2564 end: &Key,
2565 limit: usize,
2566 mode: RangeMode,
2567 ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2568 let seeded_client = self.client.clone();
2569 let unseeded_client = self.client.clone();
2570 self.run_read(
2571 move |sequence| {
2572 let client = seeded_client.clone();
2573 async move {
2574 client
2575 .range_with_mode_and_min_sequence_number(start, end, limit, mode, sequence)
2576 .await
2577 }
2578 },
2579 move |observed_sequence| {
2580 let client = unseeded_client.clone();
2581 async move {
2582 let stream = client
2583 .range_stream_internal(
2584 start,
2585 end,
2586 limit,
2587 limit.max(1),
2588 mode,
2589 RangeStreamReadOptions {
2590 min_sequence_number: None,
2591 observed_sequence: Some(observed_sequence),
2592 },
2593 )
2594 .await;
2595 stream?.collect().await
2596 }
2597 },
2598 )
2599 .await
2600 }
2601
2602 pub async fn range_stream(
2603 &self,
2604 start: &Key,
2605 end: &Key,
2606 limit: usize,
2607 batch_size: usize,
2608 ) -> Result<RangeStream, ClientError> {
2609 self.range_stream_with_mode(start, end, limit, batch_size, RangeMode::Forward)
2610 .await
2611 }
2612
2613 pub async fn range_stream_with_mode(
2614 &self,
2615 start: &Key,
2616 end: &Key,
2617 limit: usize,
2618 batch_size: usize,
2619 mode: RangeMode,
2620 ) -> Result<RangeStream, ClientError> {
2621 let seeded_client = self.client.clone();
2622 let unseeded_client = self.client.clone();
2623 self.run_read(
2624 move |sequence| {
2625 let client = seeded_client.clone();
2626 async move {
2627 client
2628 .range_stream_with_mode_and_min_sequence_number(
2629 start, end, limit, batch_size, mode, sequence,
2630 )
2631 .await
2632 }
2633 },
2634 move |observed_sequence| {
2635 let client = unseeded_client.clone();
2636 async move {
2637 client
2638 .range_stream_internal(
2639 start,
2640 end,
2641 limit,
2642 batch_size,
2643 mode,
2644 RangeStreamReadOptions {
2645 min_sequence_number: None,
2646 observed_sequence: Some(observed_sequence),
2647 },
2648 )
2649 .await
2650 }
2651 },
2652 )
2653 .await
2654 }
2655
2656 pub async fn range_reduce(
2657 &self,
2658 start: &Key,
2659 end: &Key,
2660 request: &DomainRangeReduceRequest,
2661 ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2662 let seeded_client = self.client.clone();
2663 let unseeded_client = self.client.clone();
2664 let request_seeded = request.clone();
2665 let request_unseeded = request.clone();
2666 self.run_read(
2667 move |sequence| {
2668 let client = seeded_client.clone();
2669 let request = request_seeded.clone();
2670 async move {
2671 client
2672 .range_reduce_with_min_sequence_number(start, end, &request, sequence)
2673 .await
2674 }
2675 },
2676 move |observed_sequence| {
2677 let client = unseeded_client.clone();
2678 let request = request_unseeded.clone();
2679 async move {
2680 let (response, detail) = client
2681 .range_reduce_response_internal(start, end, &request, None)
2682 .await?;
2683 if let Some(detail) = detail {
2684 observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2685 }
2686 let decoded = proto_to_domain_reduce_response(response)
2687 .map_err(ClientError::WireFormat)?;
2688 if !decoded.groups.is_empty() {
2689 return Err(ClientError::WireFormat(
2690 "grouped range reduction response returned for scalar request"
2691 .to_string(),
2692 ));
2693 }
2694 Ok(decoded
2695 .results
2696 .iter()
2697 .map(|result| result.value.clone())
2698 .collect())
2699 }
2700 },
2701 )
2702 .await
2703 }
2704
2705 pub async fn range_reduce_response(
2706 &self,
2707 start: &Key,
2708 end: &Key,
2709 request: &DomainRangeReduceRequest,
2710 ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2711 let seeded_client = self.client.clone();
2712 let unseeded_client = self.client.clone();
2713 let request_seeded = request.clone();
2714 let request_unseeded = request.clone();
2715 self.run_read(
2716 move |sequence| {
2717 let client = seeded_client.clone();
2718 let request = request_seeded.clone();
2719 async move {
2720 client
2721 .range_reduce_response_with_min_sequence_number(
2722 start, end, &request, sequence,
2723 )
2724 .await
2725 }
2726 },
2727 move |observed_sequence| {
2728 let client = unseeded_client.clone();
2729 let request = request_unseeded.clone();
2730 async move {
2731 let (response, detail) = client
2732 .range_reduce_response_internal(start, end, &request, None)
2733 .await?;
2734 if let Some(detail) = detail {
2735 observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2736 }
2737 Ok(response)
2738 }
2739 },
2740 )
2741 .await
2742 }
2743
2744 async fn run_read<T, SeededCall, SeededFut, UnseededCall, UnseededFut>(
2745 &self,
2746 seeded_call: SeededCall,
2747 unseeded_call: UnseededCall,
2748 ) -> Result<T, ClientError>
2749 where
2750 SeededCall: Fn(u64) -> SeededFut,
2751 SeededFut: std::future::Future<Output = Result<T, ClientError>>,
2752 UnseededCall: Fn(Arc<AtomicU64>) -> UnseededFut,
2753 UnseededFut: std::future::Future<Output = Result<T, ClientError>>,
2754 {
2755 if let Some(sequence) = self.fixed_sequence() {
2756 return seeded_call(sequence).await;
2757 }
2758
2759 let gate = self.state.init_gate.lock().await;
2760
2761 if let Some(sequence) = self.fixed_sequence() {
2762 drop(gate);
2763 return seeded_call(sequence).await;
2764 }
2765
2766 let result = unseeded_call(self.state.sequence.clone()).await;
2767 drop(gate);
2768 result
2769 }
2770}
2771
2772fn client_error_from_connect(err: ConnectError) -> ClientError {
2773 ClientError::Rpc(Box::new(err))
2774}
2775
2776fn is_retryable_error(err: &ConnectError) -> bool {
2777 matches!(
2778 err.code,
2779 ErrorCode::Aborted
2780 | ErrorCode::ResourceExhausted
2781 | ErrorCode::Unavailable
2782 | ErrorCode::Unknown
2783 | ErrorCode::Internal
2788 )
2789}
2790
2791fn retry_delay_for_error(
2792 err: &ConnectError,
2793 attempt: usize,
2794 retry_config: RetryConfig,
2795) -> Duration {
2796 if let Ok(decoded) = proto_decode_connect_error(err) {
2797 if let Some(retry_info) = decoded.retry_info {
2798 if let Some(delay) = retry_info.retry_delay.as_option() {
2799 let secs = u64::try_from(delay.seconds).unwrap_or(0);
2800 let nanos = u32::try_from(delay.nanos.max(0)).unwrap_or(0);
2801 let hinted = Duration::new(secs, nanos);
2802 if !hinted.is_zero() {
2803 return hinted.min(retry_config.max_backoff);
2804 }
2805 }
2806 }
2807 }
2808 retry_backoff_delay(attempt, retry_config)
2809}
2810
2811fn retry_backoff_delay(attempt: usize, retry_config: RetryConfig) -> Duration {
2812 let exponent = (attempt.saturating_sub(1)).min(20) as u32;
2813 let factor = 1u128 << exponent;
2814 let base_ms = retry_config.initial_backoff.as_millis();
2815 let capped_ms = base_ms
2816 .saturating_mul(factor)
2817 .min(retry_config.max_backoff.as_millis());
2818 Duration::from_millis(capped_ms.min(u64::MAX as u128) as u64)
2819}
2820
2821#[cfg(test)]
2822mod tests {
2823 use super::*;
2824 use crate::kv_codec::{KvFieldKind, KvPredicate, KvPredicateCheck, KvPredicateConstraint};
2825 use exoware_proto::query::TraversalMode as ProtoTraversalMode;
2826
2827 #[test]
2828 fn hex_round_trip() {
2829 let data = vec![0x00, 0x42, 0xFF, 0xAB];
2830 let encoded = hex_encode(&data);
2831 assert_eq!(encoded, "0042ffab");
2832 let decoded = hex_decode(&encoded).unwrap();
2833 assert_eq!(decoded, data);
2834 }
2835
2836 #[test]
2837 fn client_creation() {
2838 let client = StoreClient::new("http://localhost:10000");
2839 assert_eq!(client.health_url, "http://localhost:10000");
2840 assert_eq!(client.ingest_uri.to_string(), "http://localhost:10000/");
2841 assert_eq!(client.query_uri.to_string(), "http://localhost:10000/");
2842 }
2843
2844 #[test]
2845 fn builder_matches_new_and_split_urls() {
2846 let single = StoreClient::new("http://localhost:9/");
2847 let built = StoreClient::builder()
2848 .url("http://localhost:9/")
2849 .build()
2850 .unwrap();
2851 assert_eq!(single.health_url, built.health_url);
2852 assert_eq!(single.ingest_uri.to_string(), built.ingest_uri.to_string());
2853 assert_eq!(single.query_uri.to_string(), built.query_uri.to_string());
2854
2855 let split = StoreClient::with_split_urls("http://h", "http://i", "http://q", "http://c");
2856 let split_b = StoreClient::builder()
2857 .health_url("http://h")
2858 .ingest_url("http://i")
2859 .query_url("http://q")
2860 .compact_url("http://c")
2861 .build()
2862 .unwrap();
2863 assert_eq!(split.health_url, split_b.health_url);
2864 assert_eq!(split.ingest_uri.to_string(), split_b.ingest_uri.to_string());
2865 assert_eq!(split.query_uri.to_string(), split_b.query_uri.to_string());
2866 assert_eq!(
2867 split.compact_uri.to_string(),
2868 split_b.compact_uri.to_string()
2869 );
2870 }
2871
2872 #[test]
2873 fn builder_fails_until_all_urls_set() {
2874 assert!(matches!(
2875 StoreClient::builder().health_url("http://h").build(),
2876 Err(ClientBuildError::MissingIngestUrl)
2877 ));
2878 assert!(matches!(
2879 StoreClient::builder()
2880 .health_url("http://h")
2881 .ingest_url("http://i")
2882 .build(),
2883 Err(ClientBuildError::MissingQueryUrl)
2884 ));
2885 assert!(matches!(
2886 StoreClient::builder()
2887 .health_url("http://h")
2888 .ingest_url("http://i")
2889 .query_url("http://q")
2890 .build(),
2891 Err(ClientBuildError::MissingCompactUrl)
2892 ));
2893 }
2894
2895 #[test]
2896 fn client_trims_trailing_slash() {
2897 let client = StoreClient::new("http://localhost:10000/");
2898 assert_eq!(client.health_url, "http://localhost:10000");
2899 }
2900
2901 #[test]
2902 fn create_session_starts_unseeded() {
2903 let client = StoreClient::new("http://localhost:10000/");
2904 let session = client.create_session();
2905 assert_eq!(session.fixed_sequence(), None);
2906 }
2907
2908 #[test]
2909 fn range_mode_maps_to_proto_traversal() {
2910 assert_eq!(
2911 RangeMode::Forward.to_proto(),
2912 ProtoTraversalMode::TRAVERSAL_MODE_FORWARD
2913 );
2914 assert_eq!(
2915 RangeMode::Reverse.to_proto(),
2916 ProtoTraversalMode::TRAVERSAL_MODE_REVERSE
2917 );
2918 }
2919
2920 #[test]
2921 fn retry_config_standard_defaults_match_expected() {
2922 let config = RetryConfig::standard();
2923 assert_eq!(config.max_attempts, 3);
2924 assert_eq!(config.initial_backoff, Duration::from_millis(100));
2925 assert_eq!(config.max_backoff, Duration::from_millis(2_000));
2926 }
2927
2928 #[test]
2929 fn retry_config_clamps_attempts_and_backoff_bounds() {
2930 let config = RetryConfig::standard()
2931 .with_max_attempts(0)
2932 .with_initial_backoff(Duration::from_millis(250))
2933 .with_max_backoff(Duration::from_millis(50))
2934 .sanitized();
2935 assert_eq!(config.max_attempts, 1);
2936 assert_eq!(config.initial_backoff, Duration::from_millis(250));
2937 assert_eq!(config.max_backoff, Duration::from_millis(250));
2938 }
2939
2940 #[test]
2941 fn retryable_codes_include_connect_transients() {
2942 assert!(is_retryable_error(&ConnectError::aborted("retry")));
2943 assert!(is_retryable_error(&ConnectError::resource_exhausted(
2944 "retry"
2945 )));
2946 assert!(is_retryable_error(&ConnectError::unavailable("retry")));
2947 assert!(is_retryable_error(&ConnectError::internal("retry")));
2948 assert!(!is_retryable_error(&ConnectError::invalid_argument(
2949 "no retry"
2950 )));
2951 }
2952
2953 #[test]
2954 fn retry_backoff_delay_is_exponential_and_capped() {
2955 let config = RetryConfig::standard()
2956 .with_initial_backoff(Duration::from_millis(100))
2957 .with_max_backoff(Duration::from_millis(250));
2958 assert_eq!(retry_backoff_delay(1, config), Duration::from_millis(100));
2959 assert_eq!(retry_backoff_delay(2, config), Duration::from_millis(200));
2960 assert_eq!(retry_backoff_delay(3, config), Duration::from_millis(250));
2961 assert_eq!(retry_backoff_delay(4, config), Duration::from_millis(250));
2962 }
2963
2964 #[test]
2965 fn create_session_with_sequence_pins_explicit_floor() {
2966 let client = StoreClient::new("http://localhost:10000/");
2967 let session = client.create_session_with_sequence(27);
2968 assert_eq!(session.fixed_sequence(), Some(27));
2969 }
2970
2971 #[test]
2972 fn store_key_prefix_round_trips_logical_keys() {
2973 let prefix = StoreKeyPrefix::new(4, 0xA).unwrap();
2974 let logical = Bytes::from_static(b"hello");
2975 let physical = prefix.encode_key(&logical).unwrap();
2976 assert!(prefix.codec.matches(&physical));
2977 assert_eq!(prefix.decode_key(&physical).unwrap(), logical);
2978 }
2979
2980 #[test]
2981 fn store_key_prefix_clamps_long_logical_range_upper_bound() {
2982 let prefix = StoreKeyPrefix::new(4, 0x2).unwrap();
2983 let logical_codec = KeyCodec::new(4, 0x7);
2984 let (logical_start, logical_end) = logical_codec.prefix_bounds();
2985 assert_eq!(logical_end.len(), MAX_KEY_LEN);
2986
2987 let (physical_start, physical_end) =
2988 prefix.encode_range(&logical_start, &logical_end).unwrap();
2989 assert!(prefix.codec.matches(&physical_start));
2990 assert!(prefix.codec.matches(&physical_end));
2991 assert_eq!(physical_end.len(), MAX_KEY_LEN);
2992 assert_eq!(prefix.decode_key(&physical_start).unwrap(), logical_start);
2993 }
2994
2995 #[test]
2996 fn store_key_prefix_rewrites_match_key_family() {
2997 let prefix = StoreKeyPrefix::new(3, 0b101).unwrap();
2998 let logical = crate::match_key::MatchKey {
2999 reserved_bits: 4,
3000 prefix: 0b0110,
3001 payload_regex: crate::kv_codec::Utf8::from("(?s).*"),
3002 };
3003 let physical = prefix.prefix_match_key(&logical).unwrap();
3004 assert_eq!(physical.reserved_bits, 7);
3005 assert_eq!(physical.prefix, 0b101_0110);
3006 assert_eq!(physical.payload_regex, logical.payload_regex);
3007 }
3008
3009 #[test]
3010 fn store_key_prefix_broadens_stream_match_key_payload_regex() {
3011 let prefix = StoreKeyPrefix::new(3, 0b101).unwrap();
3012 let logical = crate::match_key::MatchKey {
3013 reserved_bits: 4,
3014 prefix: 0b0110,
3015 payload_regex: crate::kv_codec::Utf8::from("(?s).*"),
3016 };
3017 let physical = prefix.prefix_stream_match_key(&logical).unwrap();
3018 assert_eq!(physical.reserved_bits, 7);
3019 assert_eq!(physical.prefix, 0b101_0110);
3020 assert_eq!(&*physical.payload_regex, "(?s-u).*");
3021 }
3022
3023 #[test]
3024 fn prefixed_reduce_request_shifts_key_field_offsets() {
3025 let client = StoreClient::builder()
3026 .url("http://localhost:10000")
3027 .key_prefix(StoreKeyPrefix::new(5, 0b10101).unwrap())
3028 .build()
3029 .unwrap();
3030 let request = DomainRangeReduceRequest {
3031 reducers: vec![crate::RangeReducerSpec {
3032 op: crate::RangeReduceOp::SumField,
3033 expr: Some(KvExpr::Field(KvFieldRef::Key {
3034 bit_offset: 9,
3035 kind: KvFieldKind::UInt64,
3036 })),
3037 }],
3038 group_by: vec![KvExpr::Field(KvFieldRef::ZOrderKey {
3039 bit_offset: 12,
3040 field_position: 0,
3041 field_widths: vec![8],
3042 kind: KvFieldKind::UInt64,
3043 })],
3044 filter: Some(KvPredicate {
3045 checks: vec![KvPredicateCheck {
3046 field: KvFieldRef::Value {
3047 index: 0,
3048 kind: KvFieldKind::UInt64,
3049 nullable: false,
3050 },
3051 constraint: KvPredicateConstraint::UInt64Range {
3052 min: Some(1),
3053 max: Some(9),
3054 },
3055 }],
3056 contradiction: false,
3057 }),
3058 };
3059
3060 let shifted = client.prefix_reduce_request(&request).unwrap();
3061 let Some(KvExpr::Field(KvFieldRef::Key { bit_offset, .. })) =
3062 shifted.reducers[0].expr.as_ref()
3063 else {
3064 panic!("expected key field reducer");
3065 };
3066 assert_eq!(*bit_offset, 14);
3067 let KvExpr::Field(KvFieldRef::ZOrderKey { bit_offset, .. }) = &shifted.group_by[0] else {
3068 panic!("expected z-order group field");
3069 };
3070 assert_eq!(*bit_offset, 17);
3071 }
3072
3073 #[test]
3074 fn store_write_batch_uses_each_clients_prefix() {
3075 let base = StoreClient::new("http://localhost:10000");
3076 let a = base.with_key_prefix(StoreKeyPrefix::new(4, 1).unwrap());
3077 let b = base.with_key_prefix(StoreKeyPrefix::new(4, 2).unwrap());
3078 let key_a = Bytes::from_static(b"a");
3079 let key_b = Bytes::from_static(b"b");
3080
3081 let mut batch = StoreWriteBatch::new();
3082 batch.push(&a, &key_a, b"va").unwrap();
3083 batch.push(&b, &key_b, b"vb").unwrap();
3084
3085 assert_eq!(
3086 batch.entries[0].0,
3087 a.key_prefix().unwrap().encode_key(&key_a).unwrap()
3088 );
3089 assert_eq!(
3090 batch.entries[1].0,
3091 b.key_prefix().unwrap().encode_key(&key_b).unwrap()
3092 );
3093 }
3094
3095 fn hex_encode(data: &[u8]) -> String {
3096 hex::encode(data)
3097 }
3098
3099 fn hex_decode(s: &str) -> Option<Vec<u8>> {
3100 hex::decode(s).ok()
3101 }
3102}