Skip to main content

exoware_sdk/
lib.rs

1//! Store Rust SDK Client.
2//!
3//! Provides typed access to the store put/get/query APIs plus
4//! plain HTTP health/readiness probes.
5//!
6//! ## Errors
7//!
8//! RPC failures surface as [`ClientError::Rpc`] carrying a native [`ConnectError`]. Use
9//! [`ClientError::decoded_rpc_error`] or [`StoreClient::decode_error_details`] to unpack
10//! protobuf `google.rpc` details (and `store.query.v1.Detail` on query RPC errors), not string parsing.
11//! Idempotent reads honor `google.rpc.RetryInfo` when deciding backoff (see `retry_delay_for_error`).
12
13pub mod keys;
14pub mod kv_codec;
15pub mod match_key;
16pub mod proto;
17pub mod prune_policy;
18pub mod stream_filter;
19pub use keys::{Key, KeyCodec, KeyCodecError, KeyMut, KeyValidationError, Value, MAX_KEY_LEN};
20pub use proto::*;
21extern crate self as exoware_proto;
22
23use bytes::Bytes;
24use connectrpc::client::{ClientConfig, ServerStream as ConnectServerStream};
25use connectrpc::{ConnectError, ErrorCode};
26use exoware_proto::compact::ServiceClient as CompactServiceClient;
27use exoware_proto::ingest::ServiceClient as IngestServiceClient;
28use exoware_proto::query as proto_query;
29use exoware_proto::query::ServiceClient as QueryServiceClient;
30use exoware_proto::store::compact::v1::PruneRequest as ProtoPruneRequest;
31use exoware_proto::store::ingest::v1::PutRequest as ProtoPutRequest;
32use exoware_proto::store::query::v1::{
33    GetManyRequest as ProtoGetManyRequest, GetRequest as ProtoGetRequest,
34    RangeRequest as ProtoRangeRequest, ReduceRequest as ProtoWireReduceRequest,
35};
36use exoware_proto::RangeReduceRequest as DomainRangeReduceRequest;
37use exoware_proto::{
38    connect_compression_registry as proto_connect_compression_registry,
39    decode_connect_error as proto_decode_connect_error,
40    decode_query_detail_header_value as proto_decode_query_detail_header_value,
41    to_domain_reduce_response as proto_to_domain_reduce_response,
42    to_proto_reduce_params as proto_to_proto_reduce_params,
43    PreferZstdHttpClient as ProtoPreferZstdHttpClient,
44    QUERY_DETAIL_RESPONSE_HEADER as PROTO_QUERY_DETAIL_RESPONSE_HEADER,
45};
46use futures::future::BoxFuture;
47use http::HeaderMap;
48use keys::is_valid_key_size;
49use kv_codec::{KvExpr, KvFieldRef, KvReducedValue};
50use std::collections::HashMap;
51use std::sync::{
52    atomic::{AtomicU64, Ordering},
53    Arc,
54};
55use std::time::Duration;
56
57const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
58const DEFAULT_RETRY_INITIAL_BACKOFF_MS: u64 = 100;
59const DEFAULT_RETRY_MAX_BACKOFF_MS: u64 = 2_000;
60
61/// Codec used to compress **outgoing** RPC request bodies when compression applies.
62///
63/// connectrpc `ClientConfig::compress_requests` accepts a single encoding name; there is no
64/// HTTP-style negotiation list for requests. Prefer [`Zstd`](Self::Zstd); use [`Gzip`](Self::Gzip)
65/// when talking to a peer that only accepts gzip-compressed requests. Response decompression still
66/// follows [`PreferZstdHttpClient`] and the shared [`connect_compression_registry`].
67///
68/// To drive this from configuration or environment variables, map your setting to this enum and
69/// pass it to [`StoreClientBuilder::connect_request_compression`].
70#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
71pub enum ConnectRequestCompression {
72    /// `compress_requests("zstd")`.
73    #[default]
74    Zstd,
75    /// `compress_requests("gzip")`.
76    Gzip,
77}
78
79impl ConnectRequestCompression {
80    fn wire_name(self) -> &'static str {
81        match self {
82            Self::Zstd => "zstd",
83            Self::Gzip => "gzip",
84        }
85    }
86}
87
88/// Default max decompressed RPC message size for client decode (matches the query worker).
89///
90/// The underlying client uses 4 MiB unless configured; large `Range` frames need headroom.
91/// The store simulator uses the same 256 MiB cap for large `Range` frames.
92const STORE_CLIENT_MAX_MESSAGE_BYTES: usize = 256 * 1024 * 1024;
93
94/// Store client defaults: [`connect_compression_registry`] for codecs;
95/// [`PreferZstdHttpClient`] sets `Accept-Encoding: zstd, gzip` on responses.
96///
97/// Request body compression uses [`ConnectRequestCompression`] (default zstd); connectrpc only
98/// supports one request encoding per config (see [`ConnectRequestCompression`]).
99fn store_connect_client_config(
100    base_uri: http::Uri,
101    request_compression: ConnectRequestCompression,
102) -> ClientConfig {
103    ClientConfig::new(base_uri)
104        .compression(proto_connect_compression_registry())
105        .compress_requests(request_compression.wire_name())
106        .default_max_message_size(STORE_CLIENT_MAX_MESSAGE_BYTES)
107}
108
109/// Store client error.
110#[derive(Debug, thiserror::Error)]
111pub enum ClientError {
112    #[error("HTTP error: {0}")]
113    Http(#[from] reqwest::Error),
114    #[error("RPC error ({0})")]
115    Rpc(Box<ConnectError>),
116    #[error("store key prefix error: {0}")]
117    KeyPrefix(#[from] StoreKeyPrefixError),
118    #[error("invalid key length: expected {expected}, got {got}")]
119    InvalidKeyLength { expected: usize, got: usize },
120    #[error("wire format error: {0}")]
121    WireFormat(String),
122}
123
124impl ClientError {
125    pub fn rpc_error(&self) -> Option<&ConnectError> {
126        match self {
127            Self::Rpc(err) => Some(err.as_ref()),
128            _ => None,
129        }
130    }
131
132    pub fn rpc_code(&self) -> Option<ErrorCode> {
133        self.rpc_error().map(|err| err.code)
134    }
135
136    pub fn decoded_rpc_error(
137        &self,
138    ) -> Result<Option<exoware_proto::DecodedConnectError>, buffa::DecodeError> {
139        self.rpc_error().map(proto_decode_connect_error).transpose()
140    }
141}
142
143/// Errors returned by [`StoreKeyPrefix`] when a logical key cannot be mapped
144/// into the prefixed physical keyspace.
145#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
146pub enum StoreKeyPrefixError {
147    #[error("reserved_bits {reserved_bits} exceeds 16")]
148    ReservedBitsTooLarge { reserved_bits: u8 },
149    #[error("prefix {prefix} does not fit in {reserved_bits} reserved bits")]
150    PrefixTooLarge { reserved_bits: u8, prefix: u16 },
151    #[error(
152        "combined reserved bits exceed 16: store prefix bits {prefix_bits} + logical bits {logical_bits}"
153    )]
154    CombinedReservedBitsTooLarge { prefix_bits: u8, logical_bits: u8 },
155    #[error("key does not belong to this store prefix")]
156    PrefixMismatch,
157    #[error("key bit offset {offset} plus store prefix bits {prefix_bits} exceeds u16")]
158    BitOffsetOverflow { offset: u16, prefix_bits: u8 },
159    #[error("key codec error: {0}")]
160    Codec(#[from] KeyCodecError),
161}
162
163/// A client-side namespace layered over raw Store keys.
164///
165/// The prefix consumes a small number of high bits in the physical Store key
166/// and stores the caller's logical key in the remaining payload bits. QMDB,
167/// SQL, and other higher-level instances continue to build their own logical
168/// keys as before; a prefixed [`StoreClient`] maps those keys on the wire and
169/// maps returned keys back before callers see them.
170#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
171pub struct StoreKeyPrefix {
172    codec: KeyCodec,
173}
174
175impl StoreKeyPrefix {
176    pub fn new(reserved_bits: u8, prefix: u16) -> Result<Self, StoreKeyPrefixError> {
177        validate_prefix_bits(reserved_bits, prefix)?;
178        Ok(Self {
179            codec: KeyCodec::new(reserved_bits, prefix),
180        })
181    }
182
183    #[inline]
184    pub fn reserved_bits(self) -> u8 {
185        self.codec.reserved_bits()
186    }
187
188    #[inline]
189    pub fn prefix(self) -> u16 {
190        self.codec.prefix()
191    }
192
193    /// Maximum logical key bytes available under this prefix.
194    #[inline]
195    pub fn max_logical_key_len(self) -> usize {
196        self.codec.max_payload_capacity_bytes()
197    }
198
199    /// Encode a logical key into the physical Store keyspace.
200    pub fn encode_key(self, key: &Key) -> Result<Key, StoreKeyPrefixError> {
201        Ok(self.codec.encode(key)?)
202    }
203
204    /// Decode a physical Store key back into the logical keyspace.
205    pub fn decode_key(self, key: &Key) -> Result<Key, StoreKeyPrefixError> {
206        if !self.codec.matches(key) {
207            return Err(StoreKeyPrefixError::PrefixMismatch);
208        }
209        let payload_len = self.codec.payload_capacity_bytes_for_key_len(key.len());
210        Ok(Bytes::from(self.codec.read_payload(key, 0, payload_len)?))
211    }
212
213    /// Encode an inclusive logical range into the physical Store keyspace.
214    ///
215    /// Empty `end` means unbounded in the logical keyspace and is narrowed to
216    /// this prefix's physical upper bound. Long logical upper bounds are
217    /// clamped to the maximum logical key length representable under this
218    /// prefix; this preserves scans over existing `KeyCodec::prefix_bounds`
219    /// ranges, whose upper bound is intentionally `MAX_KEY_LEN` bytes.
220    pub fn encode_range(self, start: &Key, end: &Key) -> Result<(Key, Key), StoreKeyPrefixError> {
221        let start = self.encode_key(start)?;
222        let end = if end.is_empty() {
223            self.codec.prefix_bounds().1
224        } else {
225            let max_len = self.max_logical_key_len();
226            let end = if end.len() > max_len {
227                Bytes::copy_from_slice(&end[..max_len])
228            } else {
229                Bytes::copy_from_slice(end)
230            };
231            self.encode_key(&end)?
232        };
233        Ok((start, end))
234    }
235
236    fn prefix_match_key(
237        self,
238        match_key: &crate::match_key::MatchKey,
239    ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
240        self.prefix_match_key_with_regex(match_key, match_key.payload_regex.clone())
241    }
242
243    fn prefix_stream_match_key(
244        self,
245        match_key: &crate::match_key::MatchKey,
246    ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
247        self.prefix_match_key_with_regex(match_key, crate::kv_codec::Utf8::from("(?s-u).*"))
248    }
249
250    fn prefix_match_key_with_regex(
251        self,
252        match_key: &crate::match_key::MatchKey,
253        payload_regex: crate::kv_codec::Utf8,
254    ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
255        validate_prefix_bits(match_key.reserved_bits, match_key.prefix)?;
256        let reserved_bits = self
257            .reserved_bits()
258            .checked_add(match_key.reserved_bits)
259            .ok_or(StoreKeyPrefixError::CombinedReservedBitsTooLarge {
260                prefix_bits: self.reserved_bits(),
261                logical_bits: match_key.reserved_bits,
262            })?;
263        if reserved_bits > 16 {
264            return Err(StoreKeyPrefixError::CombinedReservedBitsTooLarge {
265                prefix_bits: self.reserved_bits(),
266                logical_bits: match_key.reserved_bits,
267            });
268        }
269
270        let prefix = (u32::from(self.prefix()) << u32::from(match_key.reserved_bits))
271            | u32::from(match_key.prefix);
272        let prefix = u16::try_from(prefix).map_err(|_| StoreKeyPrefixError::PrefixTooLarge {
273            reserved_bits,
274            prefix: u16::MAX,
275        })?;
276        validate_prefix_bits(reserved_bits, prefix)?;
277        Ok(crate::match_key::MatchKey {
278            reserved_bits,
279            prefix,
280            payload_regex,
281        })
282    }
283}
284
285/// A physical Store write batch assembled from one or more logical clients.
286///
287/// Use [`Self::push`] with the specific prefixed client that produced each
288/// logical key, then [`Self::commit`] once to submit all rows in one atomic
289/// Store `Put`.
290#[derive(Clone, Debug, Default)]
291pub struct StoreWriteBatch {
292    entries: Vec<(Key, Bytes)>,
293}
294
295impl StoreWriteBatch {
296    pub fn new() -> Self {
297        Self::default()
298    }
299
300    pub fn len(&self) -> usize {
301        self.entries.len()
302    }
303
304    pub fn is_empty(&self) -> bool {
305        self.entries.is_empty()
306    }
307
308    pub fn clear(&mut self) {
309        self.entries.clear();
310    }
311
312    pub fn push(
313        &mut self,
314        client: &StoreClient,
315        key: &Key,
316        value: &[u8],
317    ) -> Result<&mut Self, ClientError> {
318        self.entries
319            .push((client.encode_store_key(key)?, Bytes::copy_from_slice(value)));
320        Ok(self)
321    }
322
323    pub async fn commit(&self, client: &StoreClient) -> Result<u64, ClientError> {
324        let refs: Vec<(&Key, &[u8])> = self
325            .entries
326            .iter()
327            .map(|(key, value)| (key, value.as_ref()))
328            .collect();
329        client.put_physical(&refs).await
330    }
331}
332
333/// A writer that can stage an already-prepared upload into a shared Store
334/// write batch and then be notified of the batch outcome.
335///
336/// Implementations should keep `prepare_*` methods as inherent APIs because
337/// each writer's input shape differs. Once a caller has a prepared handle,
338/// this trait provides the common lifecycle:
339///
340/// 1. stage rows into a [`StoreWriteBatch`]
341/// 2. commit that batch
342/// 3. mark the prepared handle persisted with the returned Store sequence
343///    number, or failed if staging/commit does not complete
344pub trait StoreBatchUpload {
345    type Prepared: Send;
346    type Receipt: Send;
347    type Error: std::fmt::Display + Send;
348
349    fn stage_upload(
350        &self,
351        prepared: &Self::Prepared,
352        batch: &mut StoreWriteBatch,
353    ) -> Result<(), Self::Error>;
354
355    fn commit_error(&self, error: ClientError) -> Self::Error;
356
357    fn mark_upload_persisted<'a>(
358        &'a self,
359        prepared: Self::Prepared,
360        sequence_number: u64,
361    ) -> BoxFuture<'a, Self::Receipt>
362    where
363        Self: Sync + 'a,
364        Self::Prepared: 'a;
365
366    fn mark_upload_failed<'a>(
367        &'a self,
368        prepared: Self::Prepared,
369        error: String,
370    ) -> BoxFuture<'a, ()>
371    where
372        Self: Sync + 'a,
373        Self::Prepared: 'a;
374
375    fn commit_upload<'a>(
376        &'a self,
377        client: &'a StoreClient,
378        prepared: Self::Prepared,
379    ) -> BoxFuture<'a, Result<Self::Receipt, Self::Error>>
380    where
381        Self: Sync + Sized + 'a,
382        Self::Prepared: 'a,
383        Self::Receipt: 'a,
384        Self::Error: 'a,
385    {
386        Box::pin(async move {
387            let mut batch = StoreWriteBatch::new();
388            if let Err(err) = self.stage_upload(&prepared, &mut batch) {
389                let message = err.to_string();
390                self.mark_upload_failed(prepared, message).await;
391                return Err(err);
392            }
393            match batch.commit(client).await {
394                Ok(sequence_number) => {
395                    Ok(self.mark_upload_persisted(prepared, sequence_number).await)
396                }
397                Err(err) => {
398                    let message = err.to_string();
399                    self.mark_upload_failed(prepared, message).await;
400                    Err(self.commit_error(err))
401                }
402            }
403        })
404    }
405}
406
407/// A writer that can stage an already-prepared publication record into a
408/// shared Store write batch and then be notified of the batch outcome.
409///
410/// This is the companion to [`StoreBatchUpload`] for metadata that publishes
411/// already-staged data, such as QMDB watermarks. Implementations should keep
412/// `prepare_*` methods as inherent APIs because each publisher decides when a
413/// publication is needed.
414pub trait StoreBatchPublication {
415    type PreparedPublication: Send;
416    type PublicationReceipt: Send;
417    type Error: std::fmt::Display + Send;
418
419    fn stage_publication(
420        &self,
421        prepared: &Self::PreparedPublication,
422        batch: &mut StoreWriteBatch,
423    ) -> Result<(), Self::Error>;
424
425    fn publication_commit_error(&self, error: ClientError) -> Self::Error;
426
427    fn mark_publication_persisted<'a>(
428        &'a self,
429        prepared: Self::PreparedPublication,
430        sequence_number: u64,
431    ) -> BoxFuture<'a, Self::PublicationReceipt>
432    where
433        Self: Sync + 'a,
434        Self::PreparedPublication: 'a;
435
436    fn mark_publication_failed<'a>(
437        &'a self,
438        _prepared: Self::PreparedPublication,
439        _error: String,
440    ) -> BoxFuture<'a, ()>
441    where
442        Self: Sync + 'a,
443        Self::PreparedPublication: 'a,
444    {
445        Box::pin(async {})
446    }
447
448    fn commit_publication<'a>(
449        &'a self,
450        client: &'a StoreClient,
451        prepared: Self::PreparedPublication,
452    ) -> BoxFuture<'a, Result<Self::PublicationReceipt, Self::Error>>
453    where
454        Self: Sync + Sized + 'a,
455        Self::PreparedPublication: 'a,
456        Self::PublicationReceipt: 'a,
457        Self::Error: 'a,
458    {
459        Box::pin(async move {
460            let mut batch = StoreWriteBatch::new();
461            if let Err(err) = self.stage_publication(&prepared, &mut batch) {
462                let message = err.to_string();
463                self.mark_publication_failed(prepared, message).await;
464                return Err(err);
465            }
466            match batch.commit(client).await {
467                Ok(sequence_number) => Ok(self
468                    .mark_publication_persisted(prepared, sequence_number)
469                    .await),
470                Err(err) => {
471                    let message = err.to_string();
472                    self.mark_publication_failed(prepared, message).await;
473                    Err(self.publication_commit_error(err))
474                }
475            }
476        })
477    }
478}
479
480/// A stateful writer that owns a durable publication frontier.
481///
482/// This extends [`StoreBatchPublication`] with the pieces needed by writers
483/// that can prepare a catch-up publication after pending uploads drain. The
484/// associated prepared publication, receipt, and error types come from
485/// [`StoreBatchPublication`], so databases can use this for watermarks,
486/// checkpoints, catalog versions, or any similar publication record without
487/// sharing QMDB-specific concepts.
488pub trait StorePublicationFrontierWriter: StoreBatchPublication {
489    fn latest_publication_receipt<'a>(&'a self) -> BoxFuture<'a, Option<Self::PublicationReceipt>>
490    where
491        Self: Sync + 'a,
492        Self::PublicationReceipt: 'a;
493
494    fn prepare_publication<'a>(
495        &'a self,
496    ) -> BoxFuture<'a, Result<Option<Self::PreparedPublication>, Self::Error>>
497    where
498        Self: Sync + 'a,
499        Self::PreparedPublication: 'a,
500        Self::Error: 'a;
501
502    fn flush_publication_with_receipt<'a>(
503        &'a self,
504    ) -> BoxFuture<'a, Result<Option<Self::PublicationReceipt>, Self::Error>>
505    where
506        Self: Sync + 'a,
507        Self::PublicationReceipt: 'a,
508        Self::Error: 'a;
509
510    fn flush_publication<'a>(&'a self) -> BoxFuture<'a, Result<(), Self::Error>>
511    where
512        Self: Sync + 'a,
513        Self::PublicationReceipt: 'a,
514        Self::Error: 'a,
515    {
516        Box::pin(async move { self.flush_publication_with_receipt().await.map(|_| ()) })
517    }
518}
519
520/// Traversal mode for range queries.
521#[derive(Clone, Copy, Debug, Eq, PartialEq)]
522pub enum RangeMode {
523    Forward,
524    Reverse,
525}
526
527/// Iterator-like async range stream.
528pub struct RangeStream {
529    stream:
530        ConnectServerStream<hyper::body::Incoming, exoware_proto::query::RangeFrameView<'static>>,
531    pending_frame: Option<exoware_proto::query::RangeFrame>,
532    rows_seen: usize,
533    final_count: Option<usize>,
534    final_detail: Option<proto_query::Detail>,
535    finished: bool,
536    observed_sequence: Option<Arc<AtomicU64>>,
537    key_prefix: Option<StoreKeyPrefix>,
538}
539
540impl RangeStream {
541    fn from_connect_stream(
542        stream: ConnectServerStream<
543            hyper::body::Incoming,
544            exoware_proto::query::RangeFrameView<'static>,
545        >,
546        observed_sequence: Option<Arc<AtomicU64>>,
547        key_prefix: Option<StoreKeyPrefix>,
548    ) -> Self {
549        Self {
550            stream,
551            pending_frame: None,
552            rows_seen: 0,
553            final_count: None,
554            final_detail: None,
555            finished: false,
556            observed_sequence,
557            key_prefix,
558        }
559    }
560
561    pub fn final_count(&self) -> Option<usize> {
562        self.final_count
563    }
564
565    pub fn final_detail(&self) -> Option<proto_query::Detail> {
566        self.final_detail.clone()
567    }
568
569    fn observe_detail_from_stream_trailers(&mut self) {
570        self.final_detail = self
571            .stream
572            .trailers()
573            .and_then(query_detail_from_header_map);
574        if let (Some(sequence_store), Some(d)) =
575            (&self.observed_sequence, self.final_detail.as_ref())
576        {
577            sequence_store.fetch_max(d.sequence_number, Ordering::SeqCst);
578        }
579    }
580
581    async fn prefetch_first_frame(&mut self) -> Result<(), ConnectError> {
582        if self.pending_frame.is_some() || self.finished {
583            return Ok(());
584        }
585        match self.stream.message().await? {
586            Some(frame) => {
587                self.pending_frame = Some(frame.to_owned_message());
588                Ok(())
589            }
590            None => {
591                self.finished = true;
592                if let Some(err) = self.stream.error() {
593                    Err(err.clone())
594                } else {
595                    self.final_count = Some(self.rows_seen);
596                    self.observe_detail_from_stream_trailers();
597                    Ok(())
598                }
599            }
600        }
601    }
602
603    pub async fn next_chunk(&mut self) -> Result<Option<Vec<(Key, Bytes)>>, ClientError> {
604        if self.finished {
605            return Ok(None);
606        }
607
608        let frame = if let Some(frame) = self.pending_frame.take() {
609            frame
610        } else {
611            let Some(frame) = self
612                .stream
613                .message()
614                .await
615                .map_err(client_error_from_connect)?
616            else {
617                self.finished = true;
618                if let Some(err) = self.stream.error() {
619                    return Err(client_error_from_connect(err.clone()));
620                }
621                self.final_count = Some(self.rows_seen);
622                self.observe_detail_from_stream_trailers();
623                return Ok(None);
624            };
625            frame.to_owned_message()
626        };
627
628        let n = frame.results.len();
629        self.rows_seen += n;
630        let mut out = Vec::with_capacity(n);
631        for entry in &frame.results {
632            let key = Bytes::copy_from_slice(&entry.key);
633            let key = match self.key_prefix {
634                Some(prefix) => prefix.decode_key(&key)?,
635                None => key,
636            };
637            out.push((key, Bytes::copy_from_slice(&entry.value)));
638        }
639        Ok(Some(out))
640    }
641
642    pub async fn collect(mut self) -> Result<Vec<(Key, Bytes)>, ClientError> {
643        let mut entries = Vec::new();
644        while let Some(chunk) = self.next_chunk().await? {
645            entries.extend(chunk);
646        }
647        Ok(entries)
648    }
649}
650
651pub struct GetManyStream {
652    stream:
653        ConnectServerStream<hyper::body::Incoming, exoware_proto::query::GetManyFrameView<'static>>,
654    pending_frame: Option<exoware_proto::query::GetManyFrame>,
655    entries_seen: usize,
656    final_detail: Option<proto_query::Detail>,
657    finished: bool,
658    observed_sequence: Option<Arc<AtomicU64>>,
659    key_prefix: Option<StoreKeyPrefix>,
660}
661
662impl GetManyStream {
663    pub fn final_detail(&self) -> Option<proto_query::Detail> {
664        self.final_detail.clone()
665    }
666
667    fn from_connect_stream(
668        stream: ConnectServerStream<
669            hyper::body::Incoming,
670            exoware_proto::query::GetManyFrameView<'static>,
671        >,
672        observed_sequence: Option<Arc<AtomicU64>>,
673        key_prefix: Option<StoreKeyPrefix>,
674    ) -> Self {
675        Self {
676            stream,
677            pending_frame: None,
678            entries_seen: 0,
679            final_detail: None,
680            finished: false,
681            observed_sequence,
682            key_prefix,
683        }
684    }
685
686    fn observe_detail_from_stream_trailers(&mut self) {
687        self.final_detail = self
688            .stream
689            .trailers()
690            .and_then(query_detail_from_header_map);
691        if let (Some(sequence_store), Some(d)) =
692            (&self.observed_sequence, self.final_detail.as_ref())
693        {
694            sequence_store.fetch_max(d.sequence_number, Ordering::SeqCst);
695        }
696    }
697
698    async fn prefetch_first_frame(&mut self) -> Result<(), ConnectError> {
699        if self.pending_frame.is_some() || self.finished {
700            return Ok(());
701        }
702        match self.stream.message().await? {
703            Some(frame) => {
704                self.pending_frame = Some(frame.to_owned_message());
705                Ok(())
706            }
707            None => {
708                self.finished = true;
709                if let Some(err) = self.stream.error() {
710                    Err(err.clone())
711                } else {
712                    self.observe_detail_from_stream_trailers();
713                    Ok(())
714                }
715            }
716        }
717    }
718
719    pub async fn next_chunk(&mut self) -> Result<Option<Vec<(Key, Option<Bytes>)>>, ClientError> {
720        if self.finished {
721            return Ok(None);
722        }
723        let frame = if let Some(frame) = self.pending_frame.take() {
724            frame
725        } else {
726            let Some(frame) = self
727                .stream
728                .message()
729                .await
730                .map_err(client_error_from_connect)?
731            else {
732                self.finished = true;
733                if let Some(err) = self.stream.error() {
734                    return Err(client_error_from_connect(err.clone()));
735                }
736                self.observe_detail_from_stream_trailers();
737                return Ok(None);
738            };
739            frame.to_owned_message()
740        };
741
742        self.entries_seen += frame.results.len();
743        let mut out = Vec::with_capacity(frame.results.len());
744        for entry in &frame.results {
745            let key = Bytes::copy_from_slice(&entry.key);
746            let key = match self.key_prefix {
747                Some(prefix) => prefix.decode_key(&key)?,
748                None => key,
749            };
750            let value = entry.value.as_ref().map(|v| Bytes::copy_from_slice(v));
751            out.push((key, value));
752        }
753        Ok(Some(out))
754    }
755
756    pub async fn collect(mut self) -> Result<HashMap<Key, Bytes>, ClientError> {
757        let mut map = HashMap::new();
758        while let Some(chunk) = self.next_chunk().await? {
759            for (key, value) in chunk {
760                if let Some(v) = value {
761                    map.insert(key, v);
762                }
763            }
764        }
765        Ok(map)
766    }
767}
768
769impl RangeMode {
770    fn to_proto(self) -> proto_query::TraversalMode {
771        match self {
772            Self::Forward => proto_query::TraversalMode::TRAVERSAL_MODE_FORWARD,
773            Self::Reverse => proto_query::TraversalMode::TRAVERSAL_MODE_REVERSE,
774        }
775    }
776}
777
778#[inline]
779fn key_prefix_mask(bits: u8) -> Result<u16, StoreKeyPrefixError> {
780    if bits > 16 {
781        return Err(StoreKeyPrefixError::ReservedBitsTooLarge {
782            reserved_bits: bits,
783        });
784    }
785    Ok(if bits == 0 {
786        0
787    } else if bits == 16 {
788        u16::MAX
789    } else {
790        (1u16 << bits) - 1
791    })
792}
793
794fn validate_prefix_bits(reserved_bits: u8, prefix: u16) -> Result<(), StoreKeyPrefixError> {
795    let mask = key_prefix_mask(reserved_bits)?;
796    if prefix > mask {
797        return Err(StoreKeyPrefixError::PrefixTooLarge {
798            reserved_bits,
799            prefix,
800        });
801    }
802    Ok(())
803}
804
805/// One delivered (key, value) row from a stream subscription. The client
806/// reapplies its own filter if it needs to know which match_key matched —
807/// the wire frame doesn't carry the index.
808#[derive(Clone, Debug)]
809pub struct StreamSubscriptionEntry {
810    pub key: Key,
811    pub value: Bytes,
812}
813
814/// One atomic Put batch delivered to a subscriber.
815#[derive(Clone, Debug)]
816pub struct StreamSubscriptionFrame {
817    pub sequence_number: u64,
818    pub entries: Vec<StreamSubscriptionEntry>,
819}
820
821/// Async stream of `StreamSubscriptionFrame`. Backed by the generated
822/// connectrpc server stream.
823pub struct StreamSubscription {
824    stream: ConnectServerStream<
825        hyper::body::Incoming,
826        exoware_proto::store::stream::v1::SubscribeResponseView<'static>,
827    >,
828    key_prefix: Option<StoreKeyPrefix>,
829    logical_filter: Option<ClientStreamFilter>,
830}
831
832impl std::fmt::Debug for StreamSubscription {
833    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
834        f.debug_struct("StreamSubscription").finish_non_exhaustive()
835    }
836}
837
838impl StreamSubscription {
839    /// Pull the next frame. `Ok(None)` = server closed the stream cleanly.
840    pub async fn next(&mut self) -> Result<Option<StreamSubscriptionFrame>, ClientError> {
841        loop {
842            match self
843                .stream
844                .message()
845                .await
846                .map_err(client_error_from_connect)?
847            {
848                Some(view) => {
849                    let owned = view.to_owned_message();
850                    let mut entries = Vec::with_capacity(owned.entries.len());
851                    for entry in owned.entries {
852                        let key = Bytes::from(entry.key);
853                        let key = match self.key_prefix {
854                            Some(prefix) => prefix.decode_key(&key)?,
855                            None => key,
856                        };
857                        let value = Bytes::from(entry.value);
858                        if self
859                            .logical_filter
860                            .as_ref()
861                            .is_none_or(|filter| filter.matches(&key, value.as_ref()))
862                        {
863                            entries.push(StreamSubscriptionEntry { key, value });
864                        }
865                    }
866                    if entries.is_empty() {
867                        continue;
868                    }
869                    let frame = StreamSubscriptionFrame {
870                        sequence_number: owned.sequence_number,
871                        entries,
872                    };
873                    return Ok(Some(frame));
874                }
875                None => {
876                    if let Some(err) = self.stream.error() {
877                        return Err(client_error_from_connect(err.clone()));
878                    } else {
879                        return Ok(None);
880                    }
881                }
882            }
883        }
884    }
885}
886
887#[derive(Clone)]
888struct ClientKeyMatcher {
889    codec: KeyCodec,
890    regex: regex::bytes::Regex,
891}
892
893#[derive(Clone)]
894struct ClientStreamFilter {
895    keys: Vec<ClientKeyMatcher>,
896    values: Option<crate::stream_filter::CompiledBytesFilters>,
897}
898
899impl ClientStreamFilter {
900    fn compile(filter: &crate::stream_filter::StreamFilter) -> Result<Self, ClientError> {
901        crate::stream_filter::validate_filter(filter)
902            .map_err(|e| ClientError::WireFormat(e.to_string()))?;
903        let keys = filter
904            .match_keys
905            .iter()
906            .map(|mk| {
907                let regex = crate::match_key::compile_payload_regex(&mk.payload_regex)
908                    .map_err(|e| ClientError::WireFormat(e.to_string()))?;
909                Ok(ClientKeyMatcher {
910                    codec: KeyCodec::new(mk.reserved_bits, mk.prefix),
911                    regex,
912                })
913            })
914            .collect::<Result<Vec<_>, ClientError>>()?;
915        let values = crate::stream_filter::CompiledBytesFilters::compile(&filter.value_filters)
916            .map_err(ClientError::WireFormat)?;
917        Ok(Self { keys, values })
918    }
919
920    fn matches(&self, key: &Key, value: &[u8]) -> bool {
921        if !self
922            .values
923            .as_ref()
924            .is_none_or(|filter| filter.matches(value))
925        {
926            return false;
927        }
928        self.keys.iter().any(|matcher| {
929            if !matcher.codec.matches(key) {
930                return false;
931            }
932            let payload_len = matcher.codec.payload_capacity_bytes_for_key_len(key.len());
933            matcher
934                .codec
935                .read_payload(key, 0, payload_len)
936                .is_ok_and(|payload| matcher.regex.is_match(&payload))
937        })
938    }
939}
940
941/// Inspect a Connect error for `store.stream.BATCH_EVICTED` / `BATCH_NOT_FOUND`
942/// `ErrorInfo` details. Used by `get_batch` to collapse both into `Ok(None)`.
943fn is_batch_missing_error(err: &ConnectError) -> bool {
944    match proto_decode_connect_error(err) {
945        Ok(decoded) => decoded.error_info.is_some_and(|info| {
946            info.domain == "store.stream"
947                && matches!(info.reason.as_str(), "BATCH_EVICTED" | "BATCH_NOT_FOUND")
948        }),
949        Err(_) => false,
950    }
951}
952
953/// Retry policy for idempotent read operations.
954#[derive(Clone, Copy, Debug)]
955pub struct RetryConfig {
956    max_attempts: usize,
957    initial_backoff: Duration,
958    max_backoff: Duration,
959}
960
961impl RetryConfig {
962    pub fn standard() -> Self {
963        Self {
964            max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
965            initial_backoff: Duration::from_millis(DEFAULT_RETRY_INITIAL_BACKOFF_MS),
966            max_backoff: Duration::from_millis(DEFAULT_RETRY_MAX_BACKOFF_MS),
967        }
968    }
969
970    pub fn disabled() -> Self {
971        Self::standard().with_max_attempts(1)
972    }
973
974    pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
975        self.max_attempts = max_attempts.max(1);
976        self
977    }
978
979    pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self {
980        self.initial_backoff = initial_backoff;
981        self
982    }
983
984    pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self {
985        self.max_backoff = max_backoff;
986        self
987    }
988
989    pub(crate) fn sanitized(self) -> Self {
990        let max_attempts = self.max_attempts.max(1);
991        let max_backoff = self.max_backoff.max(self.initial_backoff);
992        Self {
993            max_attempts,
994            initial_backoff: self.initial_backoff,
995            max_backoff,
996        }
997    }
998}
999
1000impl Default for RetryConfig {
1001    fn default() -> Self {
1002        Self::standard()
1003    }
1004}
1005
1006fn trim_connect_base(url: &str) -> String {
1007    url.trim_end_matches('/').to_string()
1008}
1009
1010fn query_detail_from_header_map(map: &HeaderMap) -> Option<proto_query::Detail> {
1011    let v = map.get(PROTO_QUERY_DETAIL_RESPONSE_HEADER)?;
1012    let s = v.to_str().ok()?;
1013    proto_decode_query_detail_header_value(s).ok()
1014}
1015
1016fn query_detail_from_unary_metadata(
1017    headers: &HeaderMap,
1018    trailers: &HeaderMap,
1019) -> Option<proto_query::Detail> {
1020    query_detail_from_header_map(headers).or_else(|| query_detail_from_header_map(trailers))
1021}
1022
1023fn new_http_client() -> reqwest::Client {
1024    reqwest::Client::builder()
1025        .pool_max_idle_per_host(32)
1026        .timeout(Duration::from_secs(30))
1027        .build()
1028        .expect("failed to build HTTP client")
1029}
1030
1031/// Error returned when [`StoreClientBuilder`] is missing a required endpoint URL.
1032#[derive(Debug, thiserror::Error)]
1033pub enum ClientBuildError {
1034    #[error("StoreClientBuilder: missing health URL (set health_url or url)")]
1035    MissingHealthUrl,
1036    #[error("StoreClientBuilder: missing ingest URL (set ingest_url or url)")]
1037    MissingIngestUrl,
1038    #[error("StoreClientBuilder: missing query URL (set query_url or url)")]
1039    MissingQueryUrl,
1040    #[error("StoreClientBuilder: missing compact URL (set compact_url or url)")]
1041    MissingCompactUrl,
1042    #[error("StoreClientBuilder: invalid URL \"{url}\": {source}")]
1043    InvalidUrl {
1044        url: String,
1045        source: http::uri::InvalidUri,
1046    },
1047}
1048
1049/// Configures a [`StoreClient`] with explicit bases for health probes and store services.
1050///
1051/// Use [`StoreClient::builder()`] to construct. Call [`Self::url`] to point health, ingest, and
1052/// query at the same origin, or set each base separately. Finish with [`Self::build`].
1053#[derive(Debug, Default)]
1054pub struct StoreClientBuilder {
1055    health_url: Option<String>,
1056    ingest_url: Option<String>,
1057    query_url: Option<String>,
1058    compact_url: Option<String>,
1059    stream_url: Option<String>,
1060    key_prefix: Option<StoreKeyPrefix>,
1061    retry_config: RetryConfig,
1062    connect_request_compression: ConnectRequestCompression,
1063}
1064
1065impl StoreClientBuilder {
1066    /// Sets the same base URL for all services (health, ingest, query, compact, stream).
1067    pub fn url(mut self, url: &str) -> Self {
1068        let u = trim_connect_base(url);
1069        self.health_url = Some(u.clone());
1070        self.ingest_url = Some(u.clone());
1071        self.query_url = Some(u.clone());
1072        self.compact_url = Some(u.clone());
1073        self.stream_url = Some(u);
1074        self
1075    }
1076
1077    /// Base URL for plain HTTP `GET /health` and `GET /ready` (often the query worker).
1078    pub fn health_url(mut self, url: &str) -> Self {
1079        self.health_url = Some(trim_connect_base(url));
1080        self
1081    }
1082
1083    /// Base URL for the ingest service (`store.ingest.v1.Service`).
1084    pub fn ingest_url(mut self, url: &str) -> Self {
1085        self.ingest_url = Some(trim_connect_base(url));
1086        self
1087    }
1088
1089    /// Base URL for the query service (`store.query.v1.Service`).
1090    pub fn query_url(mut self, url: &str) -> Self {
1091        self.query_url = Some(trim_connect_base(url));
1092        self
1093    }
1094
1095    /// Base URL for the compact service (`store.compact.v1.Service`).
1096    pub fn compact_url(mut self, url: &str) -> Self {
1097        self.compact_url = Some(trim_connect_base(url));
1098        self
1099    }
1100
1101    /// Base URL for the stream service (`store.stream.v1.Service`). Defaults
1102    /// to the ingest base when not set explicitly.
1103    pub fn stream_url(mut self, url: &str) -> Self {
1104        self.stream_url = Some(trim_connect_base(url));
1105        self
1106    }
1107
1108    /// Client-side key namespace applied to all user-key operations.
1109    pub fn key_prefix(mut self, prefix: StoreKeyPrefix) -> Self {
1110        self.key_prefix = Some(prefix);
1111        self
1112    }
1113
1114    /// Retry policy for idempotent read operations (get / range / reduce).
1115    pub fn retry_config(mut self, retry: RetryConfig) -> Self {
1116        self.retry_config = retry.sanitized();
1117        self
1118    }
1119
1120    /// Codec for compressing **outgoing** RPC request bodies (default [`ConnectRequestCompression::Zstd`]).
1121    pub fn connect_request_compression(mut self, compression: ConnectRequestCompression) -> Self {
1122        self.connect_request_compression = compression;
1123        self
1124    }
1125
1126    /// Build the client, or return an error if any required URL was not set.
1127    pub fn build(self) -> Result<StoreClient, ClientBuildError> {
1128        let health_url = self.health_url.ok_or(ClientBuildError::MissingHealthUrl)?;
1129        let ingest_url = self.ingest_url.ok_or(ClientBuildError::MissingIngestUrl)?;
1130        let query_url = self.query_url.ok_or(ClientBuildError::MissingQueryUrl)?;
1131        let compact_url = self
1132            .compact_url
1133            .ok_or(ClientBuildError::MissingCompactUrl)?;
1134        // Stream defaults to ingest when not explicitly configured; they
1135        // share the same origin in the reference simulator.
1136        let stream_url = self.stream_url.unwrap_or_else(|| ingest_url.clone());
1137        let ingest_uri: http::Uri =
1138            ingest_url
1139                .parse()
1140                .map_err(|e| ClientBuildError::InvalidUrl {
1141                    url: ingest_url.clone(),
1142                    source: e,
1143                })?;
1144        let query_uri: http::Uri = query_url
1145            .parse()
1146            .map_err(|e| ClientBuildError::InvalidUrl {
1147                url: query_url.clone(),
1148                source: e,
1149            })?;
1150        let compact_uri: http::Uri =
1151            compact_url
1152                .parse()
1153                .map_err(|e| ClientBuildError::InvalidUrl {
1154                    url: compact_url.clone(),
1155                    source: e,
1156                })?;
1157        let stream_uri: http::Uri =
1158            stream_url
1159                .parse()
1160                .map_err(|e| ClientBuildError::InvalidUrl {
1161                    url: stream_url.clone(),
1162                    source: e,
1163                })?;
1164        Ok(StoreClient {
1165            health_url,
1166            ingest_uri,
1167            query_uri,
1168            compact_uri,
1169            stream_uri,
1170            http: new_http_client(),
1171            connect_http: ProtoPreferZstdHttpClient::plaintext(),
1172            retry_config: self.retry_config,
1173            connect_request_compression: self.connect_request_compression,
1174            key_prefix: self.key_prefix,
1175        })
1176    }
1177}
1178
1179/// Typed Rust client for Store.
1180#[derive(Clone, Debug)]
1181pub struct StoreClient {
1182    /// Base URL for `health()` / `ready()` (typically the query worker).
1183    pub(crate) health_url: String,
1184    ingest_uri: http::Uri,
1185    query_uri: http::Uri,
1186    compact_uri: http::Uri,
1187    stream_uri: http::Uri,
1188    http: reqwest::Client,
1189    connect_http: ProtoPreferZstdHttpClient,
1190    retry_config: RetryConfig,
1191    connect_request_compression: ConnectRequestCompression,
1192    key_prefix: Option<StoreKeyPrefix>,
1193}
1194
1195/// A session that enforces monotonic read consistency via a fixed `min_sequence_number` floor.
1196///
1197/// `StoreClient` itself does not retain any client-global observed sequence.
1198/// Plain query reads are stateless unless the caller passes an explicit
1199/// `min_sequence_number`.
1200///
1201/// A `SerializableReadSession` is the explicit consistency mechanism. The first
1202/// successful unary read seeds the session from the server-reported sequence,
1203/// and every later read passes that fixed floor so the server guarantees all
1204/// responses reflect at least that point in the write log.
1205///
1206/// Streamed query reads (`get_many`, `range_stream`) only expose their final
1207/// sequence in stream trailers, so if one of those is the first successful
1208/// read, the session is not pinned until that stream is fully drained.
1209#[derive(Clone, Debug)]
1210pub struct SerializableReadSession {
1211    client: StoreClient,
1212    state: Arc<SessionState>,
1213}
1214
1215#[derive(Debug)]
1216struct SessionState {
1217    sequence: Arc<AtomicU64>,
1218    init_gate: tokio::sync::Mutex<()>,
1219}
1220
1221impl SessionState {
1222    fn fixed_sequence(&self) -> Option<u64> {
1223        let sequence = self.sequence.load(Ordering::Acquire);
1224        (sequence > 0).then_some(sequence)
1225    }
1226}
1227
1228#[derive(Default)]
1229struct RangeStreamReadOptions {
1230    min_sequence_number: Option<u64>,
1231    observed_sequence: Option<Arc<AtomicU64>>,
1232}
1233
1234impl StoreClient {
1235    /// Start building a client with per-service base URLs.
1236    pub fn builder() -> StoreClientBuilder {
1237        StoreClientBuilder::default()
1238    }
1239
1240    pub fn new(url: &str) -> Self {
1241        Self::with_retry_config(url, RetryConfig::standard())
1242    }
1243
1244    pub fn with_retry_config(url: &str, retry_config: RetryConfig) -> Self {
1245        Self::builder()
1246            .url(url)
1247            .retry_config(retry_config)
1248            .build()
1249            .expect("url sets health, ingest, and query URLs")
1250    }
1251
1252    /// Split endpoints for deployments where services run on different ports or hosts.
1253    pub fn with_split_urls(
1254        health_base: &str,
1255        ingest_base: &str,
1256        query_base: &str,
1257        compact_base: &str,
1258    ) -> Self {
1259        Self::with_split_urls_and_retry(
1260            health_base,
1261            ingest_base,
1262            query_base,
1263            compact_base,
1264            RetryConfig::standard(),
1265        )
1266    }
1267
1268    pub fn with_split_urls_and_retry(
1269        health_base: &str,
1270        ingest_base: &str,
1271        query_base: &str,
1272        compact_base: &str,
1273        retry_config: RetryConfig,
1274    ) -> Self {
1275        Self::builder()
1276            .health_url(health_base)
1277            .ingest_url(ingest_base)
1278            .query_url(query_base)
1279            .compact_url(compact_base)
1280            .retry_config(retry_config)
1281            .build()
1282            .expect("all service URLs are set")
1283    }
1284
1285    /// Return this client's configured Store key prefix, if any.
1286    pub fn key_prefix(&self) -> Option<StoreKeyPrefix> {
1287        self.key_prefix
1288    }
1289
1290    /// Clone this client with a client-side Store key prefix.
1291    pub fn with_key_prefix(&self, prefix: StoreKeyPrefix) -> Self {
1292        let mut out = self.clone();
1293        out.key_prefix = Some(prefix);
1294        out
1295    }
1296
1297    /// Clone this client without client-side key prefixing.
1298    pub fn without_key_prefix(&self) -> Self {
1299        let mut out = self.clone();
1300        out.key_prefix = None;
1301        out
1302    }
1303
1304    /// Encode a logical key as it will appear in the physical Store.
1305    pub fn encode_store_key(&self, key: &Key) -> Result<Key, ClientError> {
1306        match self.key_prefix {
1307            Some(prefix) => Ok(prefix.encode_key(key)?),
1308            None => Ok(Bytes::copy_from_slice(key)),
1309        }
1310    }
1311
1312    /// Decode a physical Store key into this client's logical keyspace.
1313    pub fn decode_store_key(&self, key: &Key) -> Result<Key, ClientError> {
1314        match self.key_prefix {
1315            Some(prefix) => Ok(prefix.decode_key(key)?),
1316            None => Ok(Bytes::copy_from_slice(key)),
1317        }
1318    }
1319
1320    fn encode_store_range(&self, start: &Key, end: &Key) -> Result<(Key, Key), ClientError> {
1321        match self.key_prefix {
1322            Some(prefix) => Ok(prefix.encode_range(start, end)?),
1323            None => Ok((Bytes::copy_from_slice(start), Bytes::copy_from_slice(end))),
1324        }
1325    }
1326
1327    /// Outgoing Connect request body compression (see [`ConnectRequestCompression`]).
1328    pub fn connect_request_compression(&self) -> ConnectRequestCompression {
1329        self.connect_request_compression
1330    }
1331
1332    pub fn decode_error_details(
1333        err: &ConnectError,
1334    ) -> Result<exoware_proto::DecodedConnectError, buffa::DecodeError> {
1335        proto_decode_connect_error(err)
1336    }
1337
1338    /// Create an unseeded serializable read session.
1339    ///
1340    /// The first successful unary read fixes the session's sequence floor from
1341    /// the server response. Streamed query reads fix that floor only once their
1342    /// trailers arrive.
1343    pub fn create_session(&self) -> SerializableReadSession {
1344        self.create_session_with_sequence(0)
1345    }
1346
1347    /// Create a serializable read session pinned to at least `sequence`.
1348    ///
1349    /// This is intended for stream-driven read paths that already observed a
1350    /// concrete store batch sequence and need follow-up query/proof reads to
1351    /// see at least that sequence.
1352    pub fn create_session_with_sequence(&self, sequence: u64) -> SerializableReadSession {
1353        SerializableReadSession {
1354            client: self.clone(),
1355            state: Arc::new(SessionState {
1356                sequence: Arc::new(AtomicU64::new(sequence)),
1357                init_gate: tokio::sync::Mutex::new(()),
1358            }),
1359        }
1360    }
1361
1362    /// Typed access to the `store.ingest.v1` service.
1363    pub fn ingest(&self) -> Ingest<'_> {
1364        Ingest { c: self }
1365    }
1366
1367    /// Typed access to the `store.query.v1` service.
1368    pub fn query(&self) -> Query<'_> {
1369        Query { c: self }
1370    }
1371
1372    /// Typed access to the `store.compact.v1` service.
1373    pub fn compact(&self) -> Compact<'_> {
1374        Compact { c: self }
1375    }
1376
1377    /// Typed access to the `store.stream.v1` service.
1378    pub fn stream(&self) -> Stream<'_> {
1379        Stream { c: self }
1380    }
1381
1382    /// Submit a KV batch via Connect `Put`.
1383    ///
1384    /// On success returns the **store sequence number** from the response. Use it for immediate
1385    /// `get_with_min_sequence_number` / range calls or to seed
1386    /// [`Self::create_session_with_sequence`].
1387    /// If the request succeeds, the server accepts the full batch (count is `kvs.len()`).
1388    pub(crate) async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
1389        if self.key_prefix.is_none() {
1390            return self.put_physical(kvs).await;
1391        }
1392        let mut keys = Vec::with_capacity(kvs.len());
1393        for (key, _) in kvs {
1394            keys.push(self.encode_store_key(key)?);
1395        }
1396        let prefixed: Vec<(&Key, &[u8])> = keys
1397            .iter()
1398            .zip(kvs.iter())
1399            .map(|(key, (_, value))| (key, *value))
1400            .collect();
1401        self.put_physical(&prefixed).await
1402    }
1403
1404    async fn put_physical(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
1405        let mut proto_kvs = Vec::with_capacity(kvs.len());
1406        for (key, value) in kvs {
1407            if !is_valid_key_size(key.len()) {
1408                return Err(ClientError::WireFormat(format!(
1409                    "key length {} is outside valid store key range ({}..={})",
1410                    key.len(),
1411                    keys::MIN_KEY_LEN,
1412                    MAX_KEY_LEN
1413                )));
1414            }
1415            proto_kvs.push(exoware_proto::common::KvEntry {
1416                key: (*key).to_vec(),
1417                value: value.to_vec(),
1418                ..Default::default()
1419            });
1420        }
1421
1422        let config =
1423            store_connect_client_config(self.ingest_uri.clone(), self.connect_request_compression);
1424        let client = IngestServiceClient::new(self.connect_http.clone(), config);
1425        let response = client
1426            .put(ProtoPutRequest {
1427                kvs: proto_kvs,
1428                ..Default::default()
1429            })
1430            .await
1431            .map_err(client_error_from_connect)?;
1432        Ok(response.into_owned().sequence_number)
1433    }
1434
1435    pub(crate) async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
1436        self.get_internal(key, None).await
1437    }
1438
1439    pub(crate) async fn get_with_min_sequence_number(
1440        &self,
1441        key: &Key,
1442        min_sequence_number: u64,
1443    ) -> Result<Option<Bytes>, ClientError> {
1444        self.get_internal(key, Some(min_sequence_number)).await
1445    }
1446
1447    async fn get_internal(
1448        &self,
1449        key: &Key,
1450        min_sequence_number: Option<u64>,
1451    ) -> Result<Option<Bytes>, ClientError> {
1452        let (response, detail) = self
1453            .send_get(key, self.normalize_min_sequence_number(min_sequence_number))
1454            .await?;
1455        let _ = detail;
1456        Ok(response.value.map(Bytes::from))
1457    }
1458
1459    pub(crate) async fn get_many(
1460        &self,
1461        keys: &[&Key],
1462        batch_size: u32,
1463    ) -> Result<GetManyStream, ClientError> {
1464        self.get_many_internal(keys, batch_size, None, None).await
1465    }
1466
1467    pub(crate) async fn get_many_with_min_sequence_number(
1468        &self,
1469        keys: &[&Key],
1470        batch_size: u32,
1471        min_sequence_number: u64,
1472    ) -> Result<GetManyStream, ClientError> {
1473        self.get_many_internal(keys, batch_size, Some(min_sequence_number), None)
1474            .await
1475    }
1476
1477    async fn get_many_internal(
1478        &self,
1479        keys: &[&Key],
1480        batch_size: u32,
1481        min_sequence_number: Option<u64>,
1482        observed_sequence: Option<Arc<AtomicU64>>,
1483    ) -> Result<GetManyStream, ClientError> {
1484        for key in keys {
1485            if !is_valid_key_size(key.len()) {
1486                return Err(ClientError::WireFormat(format!(
1487                    "key length {} is outside valid store key range ({}..={})",
1488                    key.len(),
1489                    keys::MIN_KEY_LEN,
1490                    MAX_KEY_LEN
1491                )));
1492            }
1493        }
1494
1495        let config =
1496            store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1497        let client = QueryServiceClient::new(self.connect_http.clone(), config);
1498        let proto_keys: Vec<Vec<u8>> = keys
1499            .iter()
1500            .map(|k| self.encode_store_key(k).map(|key| key.to_vec()))
1501            .collect::<Result<Vec<_>, _>>()?;
1502        let effective_min = self.normalize_min_sequence_number(min_sequence_number);
1503        let max_attempts = self.retry_config.max_attempts.max(1);
1504        let mut attempt = 1usize;
1505        loop {
1506            match client
1507                .get_many(ProtoGetManyRequest {
1508                    keys: proto_keys.clone(),
1509                    min_sequence_number: effective_min,
1510                    batch_size,
1511                    ..Default::default()
1512                })
1513                .await
1514            {
1515                Ok(stream) => {
1516                    let mut gms = GetManyStream::from_connect_stream(
1517                        stream,
1518                        observed_sequence.clone(),
1519                        self.key_prefix,
1520                    );
1521                    if let Err(err) = gms.prefetch_first_frame().await {
1522                        if attempt < max_attempts && is_retryable_error(&err) {
1523                            let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1524                            tokio::time::sleep(delay).await;
1525                            attempt += 1;
1526                            continue;
1527                        }
1528                        return Err(client_error_from_connect(err));
1529                    }
1530                    return Ok(gms);
1531                }
1532                Err(err) => {
1533                    if attempt < max_attempts && is_retryable_error(&err) {
1534                        let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1535                        tokio::time::sleep(delay).await;
1536                        attempt += 1;
1537                        continue;
1538                    }
1539                    return Err(client_error_from_connect(err));
1540                }
1541            }
1542        }
1543    }
1544
1545    /// Key range is inclusive: `start <= key <= end` when `end` is non-empty; empty `end` is
1546    /// unbounded above (matches `store.query.v1.RangeRequest`).
1547    pub(crate) async fn range(
1548        &self,
1549        start: &Key,
1550        end: &Key,
1551        limit: usize,
1552    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1553        self.range_internal(start, end, limit, RangeMode::Forward, None)
1554            .await
1555    }
1556
1557    /// See [`StoreClient::range`] for `end` semantics.
1558    pub(crate) async fn range_with_mode(
1559        &self,
1560        start: &Key,
1561        end: &Key,
1562        limit: usize,
1563        mode: RangeMode,
1564    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1565        self.range_internal(start, end, limit, mode, None).await
1566    }
1567
1568    pub(crate) async fn range_with_min_sequence_number(
1569        &self,
1570        start: &Key,
1571        end: &Key,
1572        limit: usize,
1573        min_sequence_number: u64,
1574    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1575        self.range_internal(
1576            start,
1577            end,
1578            limit,
1579            RangeMode::Forward,
1580            Some(min_sequence_number),
1581        )
1582        .await
1583    }
1584
1585    pub(crate) async fn range_with_mode_and_min_sequence_number(
1586        &self,
1587        start: &Key,
1588        end: &Key,
1589        limit: usize,
1590        mode: RangeMode,
1591        min_sequence_number: u64,
1592    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1593        self.range_internal(start, end, limit, mode, Some(min_sequence_number))
1594            .await
1595    }
1596
1597    pub(crate) async fn range_stream(
1598        &self,
1599        start: &Key,
1600        end: &Key,
1601        limit: usize,
1602        batch_size: usize,
1603    ) -> Result<RangeStream, ClientError> {
1604        self.range_stream_internal(
1605            start,
1606            end,
1607            limit,
1608            batch_size,
1609            RangeMode::Forward,
1610            RangeStreamReadOptions::default(),
1611        )
1612        .await
1613    }
1614
1615    pub(crate) async fn range_stream_with_mode(
1616        &self,
1617        start: &Key,
1618        end: &Key,
1619        limit: usize,
1620        batch_size: usize,
1621        mode: RangeMode,
1622    ) -> Result<RangeStream, ClientError> {
1623        self.range_stream_internal(start, end, limit, batch_size, mode, Default::default())
1624            .await
1625    }
1626
1627    pub(crate) async fn range_stream_with_min_sequence_number(
1628        &self,
1629        start: &Key,
1630        end: &Key,
1631        limit: usize,
1632        batch_size: usize,
1633        min_sequence_number: u64,
1634    ) -> Result<RangeStream, ClientError> {
1635        self.range_stream_internal(
1636            start,
1637            end,
1638            limit,
1639            batch_size,
1640            RangeMode::Forward,
1641            RangeStreamReadOptions {
1642                min_sequence_number: Some(min_sequence_number),
1643                observed_sequence: None,
1644            },
1645        )
1646        .await
1647    }
1648
1649    pub(crate) async fn range_stream_with_mode_and_min_sequence_number(
1650        &self,
1651        start: &Key,
1652        end: &Key,
1653        limit: usize,
1654        batch_size: usize,
1655        mode: RangeMode,
1656        min_sequence_number: u64,
1657    ) -> Result<RangeStream, ClientError> {
1658        self.range_stream_internal(
1659            start,
1660            end,
1661            limit,
1662            batch_size,
1663            mode,
1664            RangeStreamReadOptions {
1665                min_sequence_number: Some(min_sequence_number),
1666                observed_sequence: None,
1667            },
1668        )
1669        .await
1670    }
1671
1672    pub(crate) async fn range_reduce(
1673        &self,
1674        start: &Key,
1675        end: &Key,
1676        request: &DomainRangeReduceRequest,
1677    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
1678        let (response, _) = self
1679            .range_reduce_response_internal(start, end, request, None)
1680            .await?;
1681        let decoded = proto_to_domain_reduce_response(response).map_err(ClientError::WireFormat)?;
1682        if !decoded.groups.is_empty() {
1683            return Err(ClientError::WireFormat(
1684                "grouped range reduction response returned for scalar request".to_string(),
1685            ));
1686        }
1687        Ok(decoded
1688            .results
1689            .iter()
1690            .map(|result| result.value.clone())
1691            .collect())
1692    }
1693
1694    pub(crate) async fn range_reduce_with_min_sequence_number(
1695        &self,
1696        start: &Key,
1697        end: &Key,
1698        request: &DomainRangeReduceRequest,
1699        min_sequence_number: u64,
1700    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
1701        let (response, _) = self
1702            .range_reduce_response_internal(start, end, request, Some(min_sequence_number))
1703            .await?;
1704        let decoded = proto_to_domain_reduce_response(response).map_err(ClientError::WireFormat)?;
1705        if !decoded.groups.is_empty() {
1706            return Err(ClientError::WireFormat(
1707                "grouped range reduction response returned for scalar request".to_string(),
1708            ));
1709        }
1710        Ok(decoded
1711            .results
1712            .iter()
1713            .map(|result| result.value.clone())
1714            .collect())
1715    }
1716
1717    pub(crate) async fn range_reduce_response(
1718        &self,
1719        start: &Key,
1720        end: &Key,
1721        request: &DomainRangeReduceRequest,
1722    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
1723        let (body, _) = self
1724            .range_reduce_response_internal(start, end, request, None)
1725            .await?;
1726        Ok(body)
1727    }
1728
1729    pub(crate) async fn range_reduce_response_with_min_sequence_number(
1730        &self,
1731        start: &Key,
1732        end: &Key,
1733        request: &DomainRangeReduceRequest,
1734        min_sequence_number: u64,
1735    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
1736        let (body, _) = self
1737            .range_reduce_response_internal(start, end, request, Some(min_sequence_number))
1738            .await?;
1739        Ok(body)
1740    }
1741
1742    pub(crate) async fn prune(
1743        &self,
1744        policies: &[crate::prune_policy::PrunePolicy],
1745    ) -> Result<(), ClientError> {
1746        let config =
1747            store_connect_client_config(self.compact_uri.clone(), self.connect_request_compression);
1748        let client = CompactServiceClient::new(self.connect_http.clone(), config);
1749        let policies = self.prefix_prune_policies(policies)?;
1750        client
1751            .prune(ProtoPruneRequest {
1752                policies: exoware_proto::prune_policies_to_proto(&policies),
1753                ..Default::default()
1754            })
1755            .await
1756            .map_err(client_error_from_connect)?;
1757        Ok(())
1758    }
1759
1760    pub async fn health(&self) -> Result<bool, ClientError> {
1761        let resp = self
1762            .http
1763            .get(format!("{}/health", self.health_url))
1764            .send()
1765            .await?;
1766        Ok(resp.status().is_success())
1767    }
1768
1769    pub async fn ready(&self) -> Result<bool, ClientError> {
1770        let resp = self
1771            .http
1772            .get(format!("{}/ready", self.health_url))
1773            .send()
1774            .await?;
1775        Ok(resp.status().is_success())
1776    }
1777
1778    fn normalize_min_sequence_number(&self, requested_sequence: Option<u64>) -> Option<u64> {
1779        requested_sequence.filter(|sequence| *sequence > 0)
1780    }
1781
1782    async fn send_get(
1783        &self,
1784        key: &Key,
1785        min_sequence_number: Option<u64>,
1786    ) -> Result<
1787        (
1788            exoware_proto::query::GetResponse,
1789            Option<proto_query::Detail>,
1790        ),
1791        ClientError,
1792    > {
1793        if !is_valid_key_size(key.len()) {
1794            return Err(ClientError::WireFormat(format!(
1795                "key length {} is outside valid store key range ({}..={})",
1796                key.len(),
1797                keys::MIN_KEY_LEN,
1798                MAX_KEY_LEN
1799            )));
1800        }
1801        let key = self.encode_store_key(key)?;
1802
1803        let config =
1804            store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1805        let client = QueryServiceClient::new(self.connect_http.clone(), config);
1806        let response = self
1807            .send_with_retry(|| async {
1808                client
1809                    .get(ProtoGetRequest {
1810                        key: key.to_vec(),
1811                        min_sequence_number,
1812                        ..Default::default()
1813                    })
1814                    .await
1815            })
1816            .await?;
1817        let detail = query_detail_from_unary_metadata(response.headers(), response.trailers());
1818        let owned = response.into_owned();
1819        Ok((owned, detail))
1820    }
1821
1822    #[cfg(test)]
1823    pub async fn send_get_for_tests(
1824        &self,
1825        key: &Key,
1826        min_sequence_number: Option<u64>,
1827    ) -> Result<
1828        (
1829            exoware_proto::query::GetResponse,
1830            Option<proto_query::Detail>,
1831        ),
1832        ClientError,
1833    > {
1834        self.send_get(key, min_sequence_number).await
1835    }
1836
1837    async fn range_internal(
1838        &self,
1839        start: &Key,
1840        end: &Key,
1841        limit: usize,
1842        mode: RangeMode,
1843        min_sequence_number: Option<u64>,
1844    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1845        let stream = self
1846            .range_stream_internal(
1847                start,
1848                end,
1849                limit,
1850                limit.max(1),
1851                mode,
1852                RangeStreamReadOptions {
1853                    min_sequence_number,
1854                    observed_sequence: None,
1855                },
1856            )
1857            .await?;
1858        stream.collect().await
1859    }
1860
1861    async fn range_stream_internal(
1862        &self,
1863        start: &Key,
1864        end: &Key,
1865        limit: usize,
1866        batch_size: usize,
1867        mode: RangeMode,
1868        options: RangeStreamReadOptions,
1869    ) -> Result<RangeStream, ClientError> {
1870        if !is_valid_key_size(start.len()) || !is_valid_key_size(end.len()) {
1871            return Err(ClientError::WireFormat(
1872                "range start/end key length is outside valid store key range".to_string(),
1873            ));
1874        }
1875        if batch_size == 0 {
1876            return Err(ClientError::WireFormat(
1877                "batch_size must be positive".to_string(),
1878            ));
1879        }
1880        let (start, end) = self.encode_store_range(start, end)?;
1881
1882        let config =
1883            store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1884        let client = QueryServiceClient::new(self.connect_http.clone(), config);
1885        let min_sequence_number = self.normalize_min_sequence_number(options.min_sequence_number);
1886        let max_attempts = self.retry_config.max_attempts.max(1);
1887        let mut attempt = 1usize;
1888        loop {
1889            // Server-streaming RPCs cannot transparently recover mid-stream failures,
1890            // but retrying a transient error while opening the stream or before the
1891            // first frame arrives is still safe. Treat both phases as a single
1892            // attempt budget so range opens do not multiply retries quadratically.
1893            let response = match client
1894                .range(ProtoRangeRequest {
1895                    start: start.to_vec(),
1896                    end: end.to_vec(),
1897                    limit: Some(u32::try_from(limit).unwrap_or(u32::MAX)),
1898                    batch_size: u32::try_from(batch_size).unwrap_or(u32::MAX),
1899                    mode: mode.to_proto().into(),
1900                    min_sequence_number,
1901                    ..Default::default()
1902                })
1903                .await
1904            {
1905                Ok(response) => response,
1906                Err(err) => {
1907                    if attempt < max_attempts && is_retryable_error(&err) {
1908                        let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1909                        tracing::debug!(
1910                            attempt,
1911                            max_attempts,
1912                            code = err.code.as_str(),
1913                            delay_ms = delay.as_millis() as u64,
1914                            "store client retrying transient range-open error",
1915                        );
1916                        tokio::time::sleep(delay).await;
1917                        attempt += 1;
1918                        continue;
1919                    }
1920                    return Err(client_error_from_connect(err));
1921                }
1922            };
1923
1924            let mut stream = RangeStream::from_connect_stream(
1925                response,
1926                options.observed_sequence.clone(),
1927                self.key_prefix,
1928            );
1929            if let Err(err) = stream.prefetch_first_frame().await {
1930                if attempt < max_attempts && is_retryable_error(&err) {
1931                    let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1932                    tracing::debug!(
1933                        attempt,
1934                        max_attempts,
1935                        code = err.code.as_str(),
1936                        delay_ms = delay.as_millis() as u64,
1937                        "store client retrying transient stream-open error",
1938                    );
1939                    tokio::time::sleep(delay).await;
1940                    attempt += 1;
1941                    continue;
1942                }
1943                return Err(client_error_from_connect(err));
1944            }
1945            return Ok(stream);
1946        }
1947    }
1948
1949    async fn range_reduce_response_internal(
1950        &self,
1951        start: &Key,
1952        end: &Key,
1953        request: &DomainRangeReduceRequest,
1954        min_sequence_number: Option<u64>,
1955    ) -> Result<
1956        (
1957            exoware_proto::query::ReduceResponse,
1958            Option<proto_query::Detail>,
1959        ),
1960        ClientError,
1961    > {
1962        let config =
1963            store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1964        let client = QueryServiceClient::new(self.connect_http.clone(), config);
1965        let (start, end) = self.encode_store_range(start, end)?;
1966        let request = self.prefix_reduce_request(request)?;
1967        let proto_params = proto_to_proto_reduce_params(request);
1968        let min_sequence_number = self.normalize_min_sequence_number(min_sequence_number);
1969        let response = self
1970            .send_with_retry(|| async {
1971                client
1972                    .reduce(ProtoWireReduceRequest {
1973                        start: start.to_vec(),
1974                        end: end.to_vec(),
1975                        params: Some(proto_params.clone()).into(),
1976                        min_sequence_number,
1977                        ..Default::default()
1978                    })
1979                    .await
1980            })
1981            .await?;
1982        let detail = query_detail_from_unary_metadata(response.headers(), response.trailers());
1983        let owned = response.into_owned();
1984        Ok((owned, detail))
1985    }
1986
1987    fn prefix_prune_policies(
1988        &self,
1989        policies: &[crate::prune_policy::PrunePolicy],
1990    ) -> Result<Vec<crate::prune_policy::PrunePolicy>, ClientError> {
1991        let Some(prefix) = self.key_prefix else {
1992            return Ok(policies.to_vec());
1993        };
1994        policies
1995            .iter()
1996            .map(|policy| {
1997                use crate::prune_policy::{PolicyScope, PrunePolicy};
1998                let scope = match &policy.scope {
1999                    PolicyScope::Keys(scope) => {
2000                        let mut scope = scope.clone();
2001                        scope.match_key = prefix.prefix_match_key(&scope.match_key)?;
2002                        PolicyScope::Keys(scope)
2003                    }
2004                    PolicyScope::Sequence => PolicyScope::Sequence,
2005                };
2006                Ok(PrunePolicy {
2007                    scope,
2008                    retain: policy.retain.clone(),
2009                })
2010            })
2011            .collect::<Result<Vec<_>, StoreKeyPrefixError>>()
2012            .map_err(ClientError::from)
2013    }
2014
2015    fn prefix_stream_filter(
2016        &self,
2017        filter: crate::stream_filter::StreamFilter,
2018    ) -> Result<crate::stream_filter::StreamFilter, ClientError> {
2019        let Some(prefix) = self.key_prefix else {
2020            return Ok(filter);
2021        };
2022        let match_keys = filter
2023            .match_keys
2024            .iter()
2025            .map(|mk| prefix.prefix_stream_match_key(mk))
2026            .collect::<Result<Vec<_>, _>>()?;
2027        Ok(crate::stream_filter::StreamFilter {
2028            match_keys,
2029            value_filters: filter.value_filters,
2030        })
2031    }
2032
2033    fn prefix_reduce_request(
2034        &self,
2035        request: &DomainRangeReduceRequest,
2036    ) -> Result<DomainRangeReduceRequest, ClientError> {
2037        let Some(prefix) = self.key_prefix else {
2038            return Ok(request.clone());
2039        };
2040        let mut request = request.clone();
2041        shift_reduce_request_key_offsets(prefix.reserved_bits(), &mut request)?;
2042        Ok(request)
2043    }
2044
2045    async fn send_with_retry<F, Fut, T>(&self, mut make_request: F) -> Result<T, ClientError>
2046    where
2047        F: FnMut() -> Fut,
2048        Fut: std::future::Future<Output = Result<T, ConnectError>>,
2049    {
2050        let max_attempts = self.retry_config.max_attempts.max(1);
2051        let mut attempt = 1usize;
2052        loop {
2053            match make_request().await {
2054                Ok(response) => return Ok(response),
2055                Err(err) => {
2056                    if attempt < max_attempts && is_retryable_error(&err) {
2057                        let delay = retry_delay_for_error(&err, attempt, self.retry_config);
2058                        tracing::debug!(
2059                            attempt,
2060                            max_attempts,
2061                            code = err.code.as_str(),
2062                            delay_ms = delay.as_millis() as u64,
2063                            "store client retrying transient RPC error",
2064                        );
2065                        tokio::time::sleep(delay).await;
2066                        attempt += 1;
2067                        continue;
2068                    }
2069                    return Err(client_error_from_connect(err));
2070                }
2071            }
2072        }
2073    }
2074}
2075
2076fn shift_reduce_request_key_offsets(
2077    prefix_bits: u8,
2078    request: &mut DomainRangeReduceRequest,
2079) -> Result<(), StoreKeyPrefixError> {
2080    for reducer in &mut request.reducers {
2081        if let Some(expr) = &mut reducer.expr {
2082            shift_expr_key_offsets(prefix_bits, expr)?;
2083        }
2084    }
2085    for expr in &mut request.group_by {
2086        shift_expr_key_offsets(prefix_bits, expr)?;
2087    }
2088    if let Some(filter) = &mut request.filter {
2089        for check in &mut filter.checks {
2090            shift_field_ref_key_offset(prefix_bits, &mut check.field)?;
2091        }
2092    }
2093    Ok(())
2094}
2095
2096fn shift_expr_key_offsets(prefix_bits: u8, expr: &mut KvExpr) -> Result<(), StoreKeyPrefixError> {
2097    match expr {
2098        KvExpr::Field(field) => shift_field_ref_key_offset(prefix_bits, field),
2099        KvExpr::Literal(_) => Ok(()),
2100        KvExpr::Add(left, right)
2101        | KvExpr::Sub(left, right)
2102        | KvExpr::Mul(left, right)
2103        | KvExpr::Div(left, right) => {
2104            shift_expr_key_offsets(prefix_bits, left)?;
2105            shift_expr_key_offsets(prefix_bits, right)
2106        }
2107        KvExpr::Lower(inner) | KvExpr::DateTruncDay(inner) => {
2108            shift_expr_key_offsets(prefix_bits, inner)
2109        }
2110    }
2111}
2112
2113fn shift_field_ref_key_offset(
2114    prefix_bits: u8,
2115    field: &mut KvFieldRef,
2116) -> Result<(), StoreKeyPrefixError> {
2117    match field {
2118        KvFieldRef::Key { bit_offset, .. } | KvFieldRef::ZOrderKey { bit_offset, .. } => {
2119            *bit_offset = bit_offset.checked_add(u16::from(prefix_bits)).ok_or(
2120                StoreKeyPrefixError::BitOffsetOverflow {
2121                    offset: *bit_offset,
2122                    prefix_bits,
2123                },
2124            )?;
2125            Ok(())
2126        }
2127        KvFieldRef::Value { .. } => Ok(()),
2128    }
2129}
2130
2131// --- Service-grouped accessors ---------------------------------------------
2132
2133#[derive(Clone, Copy, Debug)]
2134pub struct Ingest<'a> {
2135    c: &'a StoreClient,
2136}
2137
2138#[derive(Clone, Copy, Debug)]
2139pub struct Query<'a> {
2140    c: &'a StoreClient,
2141}
2142
2143#[derive(Clone, Copy, Debug)]
2144pub struct Compact<'a> {
2145    c: &'a StoreClient,
2146}
2147
2148#[derive(Clone, Copy, Debug)]
2149pub struct Stream<'a> {
2150    c: &'a StoreClient,
2151}
2152
2153impl<'a> Ingest<'a> {
2154    pub async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
2155        self.c.put(kvs).await
2156    }
2157
2158    /// Submit a [`StoreWriteBatch`] that has already been encoded into the
2159    /// physical Store keyspace.
2160    pub async fn put_prepared(&self, batch: &StoreWriteBatch) -> Result<u64, ClientError> {
2161        batch.commit(self.c).await
2162    }
2163}
2164
2165impl<'a> Query<'a> {
2166    pub async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
2167        self.c.get(key).await
2168    }
2169
2170    pub async fn get_with_min_sequence_number(
2171        &self,
2172        key: &Key,
2173        min_sequence_number: u64,
2174    ) -> Result<Option<Bytes>, ClientError> {
2175        self.c
2176            .get_with_min_sequence_number(key, min_sequence_number)
2177            .await
2178    }
2179
2180    pub async fn get_many(
2181        &self,
2182        keys: &[&Key],
2183        batch_size: u32,
2184    ) -> Result<GetManyStream, ClientError> {
2185        self.c.get_many(keys, batch_size).await
2186    }
2187
2188    pub async fn get_many_with_min_sequence_number(
2189        &self,
2190        keys: &[&Key],
2191        batch_size: u32,
2192        min_sequence_number: u64,
2193    ) -> Result<GetManyStream, ClientError> {
2194        self.c
2195            .get_many_with_min_sequence_number(keys, batch_size, min_sequence_number)
2196            .await
2197    }
2198
2199    /// Collect a `Range` into a `Vec`. Use `range_stream` for large scans.
2200    pub async fn range(
2201        &self,
2202        start: &Key,
2203        end: &Key,
2204        limit: usize,
2205    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2206        self.c.range(start, end, limit).await
2207    }
2208
2209    pub async fn range_with_mode(
2210        &self,
2211        start: &Key,
2212        end: &Key,
2213        limit: usize,
2214        mode: RangeMode,
2215    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2216        self.c.range_with_mode(start, end, limit, mode).await
2217    }
2218
2219    pub async fn range_with_min_sequence_number(
2220        &self,
2221        start: &Key,
2222        end: &Key,
2223        limit: usize,
2224        min_sequence_number: u64,
2225    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2226        self.c
2227            .range_with_min_sequence_number(start, end, limit, min_sequence_number)
2228            .await
2229    }
2230
2231    pub async fn range_with_mode_and_min_sequence_number(
2232        &self,
2233        start: &Key,
2234        end: &Key,
2235        limit: usize,
2236        mode: RangeMode,
2237        min_sequence_number: u64,
2238    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2239        self.c
2240            .range_with_mode_and_min_sequence_number(start, end, limit, mode, min_sequence_number)
2241            .await
2242    }
2243
2244    pub async fn range_stream(
2245        &self,
2246        start: &Key,
2247        end: &Key,
2248        limit: usize,
2249        batch_size: usize,
2250    ) -> Result<RangeStream, ClientError> {
2251        self.c.range_stream(start, end, limit, batch_size).await
2252    }
2253
2254    pub async fn range_stream_with_mode(
2255        &self,
2256        start: &Key,
2257        end: &Key,
2258        limit: usize,
2259        batch_size: usize,
2260        mode: RangeMode,
2261    ) -> Result<RangeStream, ClientError> {
2262        self.c
2263            .range_stream_with_mode(start, end, limit, batch_size, mode)
2264            .await
2265    }
2266
2267    pub async fn range_stream_with_min_sequence_number(
2268        &self,
2269        start: &Key,
2270        end: &Key,
2271        limit: usize,
2272        batch_size: usize,
2273        min_sequence_number: u64,
2274    ) -> Result<RangeStream, ClientError> {
2275        self.c
2276            .range_stream_with_min_sequence_number(
2277                start,
2278                end,
2279                limit,
2280                batch_size,
2281                min_sequence_number,
2282            )
2283            .await
2284    }
2285
2286    pub async fn range_stream_with_mode_and_min_sequence_number(
2287        &self,
2288        start: &Key,
2289        end: &Key,
2290        limit: usize,
2291        batch_size: usize,
2292        mode: RangeMode,
2293        min_sequence_number: u64,
2294    ) -> Result<RangeStream, ClientError> {
2295        self.c
2296            .range_stream_with_mode_and_min_sequence_number(
2297                start,
2298                end,
2299                limit,
2300                batch_size,
2301                mode,
2302                min_sequence_number,
2303            )
2304            .await
2305    }
2306
2307    pub async fn range_reduce(
2308        &self,
2309        start: &Key,
2310        end: &Key,
2311        request: &DomainRangeReduceRequest,
2312    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2313        self.c.range_reduce(start, end, request).await
2314    }
2315
2316    pub async fn range_reduce_with_min_sequence_number(
2317        &self,
2318        start: &Key,
2319        end: &Key,
2320        request: &DomainRangeReduceRequest,
2321        min_sequence_number: u64,
2322    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2323        self.c
2324            .range_reduce_with_min_sequence_number(start, end, request, min_sequence_number)
2325            .await
2326    }
2327
2328    pub async fn range_reduce_response(
2329        &self,
2330        start: &Key,
2331        end: &Key,
2332        request: &DomainRangeReduceRequest,
2333    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2334        self.c.range_reduce_response(start, end, request).await
2335    }
2336
2337    pub async fn range_reduce_response_with_min_sequence_number(
2338        &self,
2339        start: &Key,
2340        end: &Key,
2341        request: &DomainRangeReduceRequest,
2342        min_sequence_number: u64,
2343    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2344        self.c
2345            .range_reduce_response_with_min_sequence_number(
2346                start,
2347                end,
2348                request,
2349                min_sequence_number,
2350            )
2351            .await
2352    }
2353}
2354
2355impl<'a> Compact<'a> {
2356    pub async fn prune(
2357        &self,
2358        policies: &[crate::prune_policy::PrunePolicy],
2359    ) -> Result<(), ClientError> {
2360        self.c.prune(policies).await
2361    }
2362}
2363
2364impl<'a> Stream<'a> {
2365    /// `store.stream.v1.Service.Subscribe` — see `StreamSubscription::next`
2366    /// for consuming delivered frames. `since_sequence_number = None` starts
2367    /// live from the next Put; `Some(N)` replays retained batches before
2368    /// transitioning to live. An evicted `since` returns a
2369    /// `ConnectError::out_of_range` carrying `ErrorInfo.reason = "BATCH_EVICTED"`.
2370    pub async fn subscribe(
2371        &self,
2372        filter: crate::stream_filter::StreamFilter,
2373        since_sequence_number: Option<u64>,
2374    ) -> Result<StreamSubscription, ClientError> {
2375        let logical_filter = self
2376            .c
2377            .key_prefix
2378            .is_some()
2379            .then(|| ClientStreamFilter::compile(&filter))
2380            .transpose()?;
2381        let filter = self.c.prefix_stream_filter(filter)?;
2382        crate::stream_filter::validate_filter(&filter)
2383            .map_err(|e| ClientError::WireFormat(e.to_string()))?;
2384        let match_keys = filter
2385            .match_keys
2386            .into_iter()
2387            .map(|mk| exoware_proto::store::common::v1::MatchKey {
2388                reserved_bits: u32::from(mk.reserved_bits),
2389                prefix: u32::from(mk.prefix),
2390                payload_regex: mk.payload_regex.0,
2391                ..Default::default()
2392            })
2393            .collect();
2394        let value_filters = filter
2395            .value_filters
2396            .into_iter()
2397            .map(|vf| {
2398                use crate::stream_filter::BytesFilter;
2399                use exoware_proto::store::common::v1::bytes_filter::Kind as ProtoKind;
2400                let kind = match vf {
2401                    BytesFilter::Exact(bytes) => ProtoKind::Exact(bytes),
2402                    BytesFilter::Prefix(bytes) => ProtoKind::Prefix(bytes),
2403                    BytesFilter::Regex(pattern) => ProtoKind::Regex(pattern),
2404                };
2405                exoware_proto::store::common::v1::BytesFilter {
2406                    kind: Some(kind),
2407                    ..Default::default()
2408                }
2409            })
2410            .collect();
2411        let request = exoware_proto::store::stream::v1::SubscribeRequest {
2412            match_keys,
2413            value_filters,
2414            since_sequence_number,
2415            ..Default::default()
2416        };
2417        let config = store_connect_client_config(
2418            self.c.stream_uri.clone(),
2419            self.c.connect_request_compression,
2420        );
2421        let client = exoware_proto::store::stream::v1::ServiceClient::new(
2422            self.c.connect_http.clone(),
2423            config,
2424        );
2425        let stream = client
2426            .subscribe(request)
2427            .await
2428            .map_err(client_error_from_connect)?;
2429        Ok(StreamSubscription {
2430            stream,
2431            key_prefix: self.c.key_prefix,
2432            logical_filter,
2433        })
2434    }
2435
2436    /// `store.stream.v1.Service.Get` — `Ok(None)` collapses the server's
2437    /// `BATCH_EVICTED` / `BATCH_NOT_FOUND` error details.
2438    pub async fn get(
2439        &self,
2440        sequence_number: u64,
2441    ) -> Result<Option<Vec<(Key, Bytes)>>, ClientError> {
2442        let config = store_connect_client_config(
2443            self.c.stream_uri.clone(),
2444            self.c.connect_request_compression,
2445        );
2446        let client = exoware_proto::store::stream::v1::ServiceClient::new(
2447            self.c.connect_http.clone(),
2448            config,
2449        );
2450        match client
2451            .get(exoware_proto::store::stream::v1::GetRequest {
2452                sequence_number,
2453                ..Default::default()
2454            })
2455            .await
2456        {
2457            Ok(resp) => {
2458                let owned = resp.into_owned();
2459                let mut entries = Vec::with_capacity(owned.entries.len());
2460                for entry in owned.entries {
2461                    let key = Bytes::from(entry.key);
2462                    match self.c.key_prefix {
2463                        Some(prefix) if !prefix.codec.matches(&key) => {}
2464                        Some(prefix) => {
2465                            entries.push((prefix.decode_key(&key)?, Bytes::from(entry.value)))
2466                        }
2467                        None => entries.push((key, Bytes::from(entry.value))),
2468                    }
2469                }
2470                Ok(Some(entries))
2471            }
2472            Err(err) => {
2473                if is_batch_missing_error(&err) {
2474                    Ok(None)
2475                } else {
2476                    Err(client_error_from_connect(err))
2477                }
2478            }
2479        }
2480    }
2481}
2482
2483impl SerializableReadSession {
2484    /// Fixed sequence floor for this session, if one has been established yet.
2485    ///
2486    /// Fresh sessions start with `None` unless created via
2487    /// [`StoreClient::create_session_with_sequence`]. A first streamed query
2488    /// read (`get_many`, `range_stream`) only sets this once the stream is
2489    /// fully drained and trailers are available.
2490    pub fn fixed_sequence(&self) -> Option<u64> {
2491        self.state.fixed_sequence()
2492    }
2493
2494    pub async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
2495        let seeded_client = self.client.clone();
2496        let unseeded_client = self.client.clone();
2497        self.run_read(
2498            move |sequence| {
2499                let client = seeded_client.clone();
2500                async move { client.get_with_min_sequence_number(key, sequence).await }
2501            },
2502            move |observed_sequence| {
2503                let client = unseeded_client.clone();
2504                async move {
2505                    let (response, detail) = client.send_get(key, None).await?;
2506                    if let Some(detail) = detail {
2507                        observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2508                    }
2509                    Ok(response.value.map(Bytes::from))
2510                }
2511            },
2512        )
2513        .await
2514    }
2515
2516    pub async fn get_many(
2517        &self,
2518        keys: &[&Key],
2519        batch_size: u32,
2520    ) -> Result<GetManyStream, ClientError> {
2521        let keys_owned: Vec<Key> = keys.iter().map(|k| Bytes::copy_from_slice(k)).collect();
2522        let seeded_client = self.client.clone();
2523        let unseeded_client = self.client.clone();
2524        let keys_seeded = keys_owned.clone();
2525        let keys_unseeded = keys_owned;
2526        self.run_read(
2527            move |sequence| {
2528                let client = seeded_client.clone();
2529                let keys = keys_seeded.clone();
2530                async move {
2531                    let refs: Vec<&Key> = keys.iter().collect();
2532                    client
2533                        .get_many_with_min_sequence_number(&refs, batch_size, sequence)
2534                        .await
2535                }
2536            },
2537            move |observed_sequence| {
2538                let client = unseeded_client.clone();
2539                let keys = keys_unseeded.clone();
2540                async move {
2541                    let refs: Vec<&Key> = keys.iter().collect();
2542                    client
2543                        .get_many_internal(&refs, batch_size, None, Some(observed_sequence))
2544                        .await
2545                }
2546            },
2547        )
2548        .await
2549    }
2550
2551    pub async fn range(
2552        &self,
2553        start: &Key,
2554        end: &Key,
2555        limit: usize,
2556    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2557        self.range_with_mode(start, end, limit, RangeMode::Forward)
2558            .await
2559    }
2560
2561    pub async fn range_with_mode(
2562        &self,
2563        start: &Key,
2564        end: &Key,
2565        limit: usize,
2566        mode: RangeMode,
2567    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2568        let seeded_client = self.client.clone();
2569        let unseeded_client = self.client.clone();
2570        self.run_read(
2571            move |sequence| {
2572                let client = seeded_client.clone();
2573                async move {
2574                    client
2575                        .range_with_mode_and_min_sequence_number(start, end, limit, mode, sequence)
2576                        .await
2577                }
2578            },
2579            move |observed_sequence| {
2580                let client = unseeded_client.clone();
2581                async move {
2582                    let stream = client
2583                        .range_stream_internal(
2584                            start,
2585                            end,
2586                            limit,
2587                            limit.max(1),
2588                            mode,
2589                            RangeStreamReadOptions {
2590                                min_sequence_number: None,
2591                                observed_sequence: Some(observed_sequence),
2592                            },
2593                        )
2594                        .await;
2595                    stream?.collect().await
2596                }
2597            },
2598        )
2599        .await
2600    }
2601
2602    pub async fn range_stream(
2603        &self,
2604        start: &Key,
2605        end: &Key,
2606        limit: usize,
2607        batch_size: usize,
2608    ) -> Result<RangeStream, ClientError> {
2609        self.range_stream_with_mode(start, end, limit, batch_size, RangeMode::Forward)
2610            .await
2611    }
2612
2613    pub async fn range_stream_with_mode(
2614        &self,
2615        start: &Key,
2616        end: &Key,
2617        limit: usize,
2618        batch_size: usize,
2619        mode: RangeMode,
2620    ) -> Result<RangeStream, ClientError> {
2621        let seeded_client = self.client.clone();
2622        let unseeded_client = self.client.clone();
2623        self.run_read(
2624            move |sequence| {
2625                let client = seeded_client.clone();
2626                async move {
2627                    client
2628                        .range_stream_with_mode_and_min_sequence_number(
2629                            start, end, limit, batch_size, mode, sequence,
2630                        )
2631                        .await
2632                }
2633            },
2634            move |observed_sequence| {
2635                let client = unseeded_client.clone();
2636                async move {
2637                    client
2638                        .range_stream_internal(
2639                            start,
2640                            end,
2641                            limit,
2642                            batch_size,
2643                            mode,
2644                            RangeStreamReadOptions {
2645                                min_sequence_number: None,
2646                                observed_sequence: Some(observed_sequence),
2647                            },
2648                        )
2649                        .await
2650                }
2651            },
2652        )
2653        .await
2654    }
2655
2656    pub async fn range_reduce(
2657        &self,
2658        start: &Key,
2659        end: &Key,
2660        request: &DomainRangeReduceRequest,
2661    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2662        let seeded_client = self.client.clone();
2663        let unseeded_client = self.client.clone();
2664        let request_seeded = request.clone();
2665        let request_unseeded = request.clone();
2666        self.run_read(
2667            move |sequence| {
2668                let client = seeded_client.clone();
2669                let request = request_seeded.clone();
2670                async move {
2671                    client
2672                        .range_reduce_with_min_sequence_number(start, end, &request, sequence)
2673                        .await
2674                }
2675            },
2676            move |observed_sequence| {
2677                let client = unseeded_client.clone();
2678                let request = request_unseeded.clone();
2679                async move {
2680                    let (response, detail) = client
2681                        .range_reduce_response_internal(start, end, &request, None)
2682                        .await?;
2683                    if let Some(detail) = detail {
2684                        observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2685                    }
2686                    let decoded = proto_to_domain_reduce_response(response)
2687                        .map_err(ClientError::WireFormat)?;
2688                    if !decoded.groups.is_empty() {
2689                        return Err(ClientError::WireFormat(
2690                            "grouped range reduction response returned for scalar request"
2691                                .to_string(),
2692                        ));
2693                    }
2694                    Ok(decoded
2695                        .results
2696                        .iter()
2697                        .map(|result| result.value.clone())
2698                        .collect())
2699                }
2700            },
2701        )
2702        .await
2703    }
2704
2705    pub async fn range_reduce_response(
2706        &self,
2707        start: &Key,
2708        end: &Key,
2709        request: &DomainRangeReduceRequest,
2710    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2711        let seeded_client = self.client.clone();
2712        let unseeded_client = self.client.clone();
2713        let request_seeded = request.clone();
2714        let request_unseeded = request.clone();
2715        self.run_read(
2716            move |sequence| {
2717                let client = seeded_client.clone();
2718                let request = request_seeded.clone();
2719                async move {
2720                    client
2721                        .range_reduce_response_with_min_sequence_number(
2722                            start, end, &request, sequence,
2723                        )
2724                        .await
2725                }
2726            },
2727            move |observed_sequence| {
2728                let client = unseeded_client.clone();
2729                let request = request_unseeded.clone();
2730                async move {
2731                    let (response, detail) = client
2732                        .range_reduce_response_internal(start, end, &request, None)
2733                        .await?;
2734                    if let Some(detail) = detail {
2735                        observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2736                    }
2737                    Ok(response)
2738                }
2739            },
2740        )
2741        .await
2742    }
2743
2744    async fn run_read<T, SeededCall, SeededFut, UnseededCall, UnseededFut>(
2745        &self,
2746        seeded_call: SeededCall,
2747        unseeded_call: UnseededCall,
2748    ) -> Result<T, ClientError>
2749    where
2750        SeededCall: Fn(u64) -> SeededFut,
2751        SeededFut: std::future::Future<Output = Result<T, ClientError>>,
2752        UnseededCall: Fn(Arc<AtomicU64>) -> UnseededFut,
2753        UnseededFut: std::future::Future<Output = Result<T, ClientError>>,
2754    {
2755        if let Some(sequence) = self.fixed_sequence() {
2756            return seeded_call(sequence).await;
2757        }
2758
2759        let gate = self.state.init_gate.lock().await;
2760
2761        if let Some(sequence) = self.fixed_sequence() {
2762            drop(gate);
2763            return seeded_call(sequence).await;
2764        }
2765
2766        let result = unseeded_call(self.state.sequence.clone()).await;
2767        drop(gate);
2768        result
2769    }
2770}
2771
2772fn client_error_from_connect(err: ConnectError) -> ClientError {
2773    ClientError::Rpc(Box::new(err))
2774}
2775
2776fn is_retryable_error(err: &ConnectError) -> bool {
2777    matches!(
2778        err.code,
2779        ErrorCode::Aborted
2780            | ErrorCode::ResourceExhausted
2781            | ErrorCode::Unavailable
2782            | ErrorCode::Unknown
2783            // Retrying `internal` is a trade-off: proxies and load balancers sometimes surface
2784            // transient faults this way; idempotent reads use a small attempt budget so we do not
2785            // spin forever. Prefer interpreting `google.rpc.RetryInfo` when present (see
2786            // `retry_delay_for_error`).
2787            | ErrorCode::Internal
2788    )
2789}
2790
2791fn retry_delay_for_error(
2792    err: &ConnectError,
2793    attempt: usize,
2794    retry_config: RetryConfig,
2795) -> Duration {
2796    if let Ok(decoded) = proto_decode_connect_error(err) {
2797        if let Some(retry_info) = decoded.retry_info {
2798            if let Some(delay) = retry_info.retry_delay.as_option() {
2799                let secs = u64::try_from(delay.seconds).unwrap_or(0);
2800                let nanos = u32::try_from(delay.nanos.max(0)).unwrap_or(0);
2801                let hinted = Duration::new(secs, nanos);
2802                if !hinted.is_zero() {
2803                    return hinted.min(retry_config.max_backoff);
2804                }
2805            }
2806        }
2807    }
2808    retry_backoff_delay(attempt, retry_config)
2809}
2810
2811fn retry_backoff_delay(attempt: usize, retry_config: RetryConfig) -> Duration {
2812    let exponent = (attempt.saturating_sub(1)).min(20) as u32;
2813    let factor = 1u128 << exponent;
2814    let base_ms = retry_config.initial_backoff.as_millis();
2815    let capped_ms = base_ms
2816        .saturating_mul(factor)
2817        .min(retry_config.max_backoff.as_millis());
2818    Duration::from_millis(capped_ms.min(u64::MAX as u128) as u64)
2819}
2820
2821#[cfg(test)]
2822mod tests {
2823    use super::*;
2824    use crate::kv_codec::{KvFieldKind, KvPredicate, KvPredicateCheck, KvPredicateConstraint};
2825    use exoware_proto::query::TraversalMode as ProtoTraversalMode;
2826
2827    #[test]
2828    fn hex_round_trip() {
2829        let data = vec![0x00, 0x42, 0xFF, 0xAB];
2830        let encoded = hex_encode(&data);
2831        assert_eq!(encoded, "0042ffab");
2832        let decoded = hex_decode(&encoded).unwrap();
2833        assert_eq!(decoded, data);
2834    }
2835
2836    #[test]
2837    fn client_creation() {
2838        let client = StoreClient::new("http://localhost:10000");
2839        assert_eq!(client.health_url, "http://localhost:10000");
2840        assert_eq!(client.ingest_uri.to_string(), "http://localhost:10000/");
2841        assert_eq!(client.query_uri.to_string(), "http://localhost:10000/");
2842    }
2843
2844    #[test]
2845    fn builder_matches_new_and_split_urls() {
2846        let single = StoreClient::new("http://localhost:9/");
2847        let built = StoreClient::builder()
2848            .url("http://localhost:9/")
2849            .build()
2850            .unwrap();
2851        assert_eq!(single.health_url, built.health_url);
2852        assert_eq!(single.ingest_uri.to_string(), built.ingest_uri.to_string());
2853        assert_eq!(single.query_uri.to_string(), built.query_uri.to_string());
2854
2855        let split = StoreClient::with_split_urls("http://h", "http://i", "http://q", "http://c");
2856        let split_b = StoreClient::builder()
2857            .health_url("http://h")
2858            .ingest_url("http://i")
2859            .query_url("http://q")
2860            .compact_url("http://c")
2861            .build()
2862            .unwrap();
2863        assert_eq!(split.health_url, split_b.health_url);
2864        assert_eq!(split.ingest_uri.to_string(), split_b.ingest_uri.to_string());
2865        assert_eq!(split.query_uri.to_string(), split_b.query_uri.to_string());
2866        assert_eq!(
2867            split.compact_uri.to_string(),
2868            split_b.compact_uri.to_string()
2869        );
2870    }
2871
2872    #[test]
2873    fn builder_fails_until_all_urls_set() {
2874        assert!(matches!(
2875            StoreClient::builder().health_url("http://h").build(),
2876            Err(ClientBuildError::MissingIngestUrl)
2877        ));
2878        assert!(matches!(
2879            StoreClient::builder()
2880                .health_url("http://h")
2881                .ingest_url("http://i")
2882                .build(),
2883            Err(ClientBuildError::MissingQueryUrl)
2884        ));
2885        assert!(matches!(
2886            StoreClient::builder()
2887                .health_url("http://h")
2888                .ingest_url("http://i")
2889                .query_url("http://q")
2890                .build(),
2891            Err(ClientBuildError::MissingCompactUrl)
2892        ));
2893    }
2894
2895    #[test]
2896    fn client_trims_trailing_slash() {
2897        let client = StoreClient::new("http://localhost:10000/");
2898        assert_eq!(client.health_url, "http://localhost:10000");
2899    }
2900
2901    #[test]
2902    fn create_session_starts_unseeded() {
2903        let client = StoreClient::new("http://localhost:10000/");
2904        let session = client.create_session();
2905        assert_eq!(session.fixed_sequence(), None);
2906    }
2907
2908    #[test]
2909    fn range_mode_maps_to_proto_traversal() {
2910        assert_eq!(
2911            RangeMode::Forward.to_proto(),
2912            ProtoTraversalMode::TRAVERSAL_MODE_FORWARD
2913        );
2914        assert_eq!(
2915            RangeMode::Reverse.to_proto(),
2916            ProtoTraversalMode::TRAVERSAL_MODE_REVERSE
2917        );
2918    }
2919
2920    #[test]
2921    fn retry_config_standard_defaults_match_expected() {
2922        let config = RetryConfig::standard();
2923        assert_eq!(config.max_attempts, 3);
2924        assert_eq!(config.initial_backoff, Duration::from_millis(100));
2925        assert_eq!(config.max_backoff, Duration::from_millis(2_000));
2926    }
2927
2928    #[test]
2929    fn retry_config_clamps_attempts_and_backoff_bounds() {
2930        let config = RetryConfig::standard()
2931            .with_max_attempts(0)
2932            .with_initial_backoff(Duration::from_millis(250))
2933            .with_max_backoff(Duration::from_millis(50))
2934            .sanitized();
2935        assert_eq!(config.max_attempts, 1);
2936        assert_eq!(config.initial_backoff, Duration::from_millis(250));
2937        assert_eq!(config.max_backoff, Duration::from_millis(250));
2938    }
2939
2940    #[test]
2941    fn retryable_codes_include_connect_transients() {
2942        assert!(is_retryable_error(&ConnectError::aborted("retry")));
2943        assert!(is_retryable_error(&ConnectError::resource_exhausted(
2944            "retry"
2945        )));
2946        assert!(is_retryable_error(&ConnectError::unavailable("retry")));
2947        assert!(is_retryable_error(&ConnectError::internal("retry")));
2948        assert!(!is_retryable_error(&ConnectError::invalid_argument(
2949            "no retry"
2950        )));
2951    }
2952
2953    #[test]
2954    fn retry_backoff_delay_is_exponential_and_capped() {
2955        let config = RetryConfig::standard()
2956            .with_initial_backoff(Duration::from_millis(100))
2957            .with_max_backoff(Duration::from_millis(250));
2958        assert_eq!(retry_backoff_delay(1, config), Duration::from_millis(100));
2959        assert_eq!(retry_backoff_delay(2, config), Duration::from_millis(200));
2960        assert_eq!(retry_backoff_delay(3, config), Duration::from_millis(250));
2961        assert_eq!(retry_backoff_delay(4, config), Duration::from_millis(250));
2962    }
2963
2964    #[test]
2965    fn create_session_with_sequence_pins_explicit_floor() {
2966        let client = StoreClient::new("http://localhost:10000/");
2967        let session = client.create_session_with_sequence(27);
2968        assert_eq!(session.fixed_sequence(), Some(27));
2969    }
2970
2971    #[test]
2972    fn store_key_prefix_round_trips_logical_keys() {
2973        let prefix = StoreKeyPrefix::new(4, 0xA).unwrap();
2974        let logical = Bytes::from_static(b"hello");
2975        let physical = prefix.encode_key(&logical).unwrap();
2976        assert!(prefix.codec.matches(&physical));
2977        assert_eq!(prefix.decode_key(&physical).unwrap(), logical);
2978    }
2979
2980    #[test]
2981    fn store_key_prefix_clamps_long_logical_range_upper_bound() {
2982        let prefix = StoreKeyPrefix::new(4, 0x2).unwrap();
2983        let logical_codec = KeyCodec::new(4, 0x7);
2984        let (logical_start, logical_end) = logical_codec.prefix_bounds();
2985        assert_eq!(logical_end.len(), MAX_KEY_LEN);
2986
2987        let (physical_start, physical_end) =
2988            prefix.encode_range(&logical_start, &logical_end).unwrap();
2989        assert!(prefix.codec.matches(&physical_start));
2990        assert!(prefix.codec.matches(&physical_end));
2991        assert_eq!(physical_end.len(), MAX_KEY_LEN);
2992        assert_eq!(prefix.decode_key(&physical_start).unwrap(), logical_start);
2993    }
2994
2995    #[test]
2996    fn store_key_prefix_rewrites_match_key_family() {
2997        let prefix = StoreKeyPrefix::new(3, 0b101).unwrap();
2998        let logical = crate::match_key::MatchKey {
2999            reserved_bits: 4,
3000            prefix: 0b0110,
3001            payload_regex: crate::kv_codec::Utf8::from("(?s).*"),
3002        };
3003        let physical = prefix.prefix_match_key(&logical).unwrap();
3004        assert_eq!(physical.reserved_bits, 7);
3005        assert_eq!(physical.prefix, 0b101_0110);
3006        assert_eq!(physical.payload_regex, logical.payload_regex);
3007    }
3008
3009    #[test]
3010    fn store_key_prefix_broadens_stream_match_key_payload_regex() {
3011        let prefix = StoreKeyPrefix::new(3, 0b101).unwrap();
3012        let logical = crate::match_key::MatchKey {
3013            reserved_bits: 4,
3014            prefix: 0b0110,
3015            payload_regex: crate::kv_codec::Utf8::from("(?s).*"),
3016        };
3017        let physical = prefix.prefix_stream_match_key(&logical).unwrap();
3018        assert_eq!(physical.reserved_bits, 7);
3019        assert_eq!(physical.prefix, 0b101_0110);
3020        assert_eq!(&*physical.payload_regex, "(?s-u).*");
3021    }
3022
3023    #[test]
3024    fn prefixed_reduce_request_shifts_key_field_offsets() {
3025        let client = StoreClient::builder()
3026            .url("http://localhost:10000")
3027            .key_prefix(StoreKeyPrefix::new(5, 0b10101).unwrap())
3028            .build()
3029            .unwrap();
3030        let request = DomainRangeReduceRequest {
3031            reducers: vec![crate::RangeReducerSpec {
3032                op: crate::RangeReduceOp::SumField,
3033                expr: Some(KvExpr::Field(KvFieldRef::Key {
3034                    bit_offset: 9,
3035                    kind: KvFieldKind::UInt64,
3036                })),
3037            }],
3038            group_by: vec![KvExpr::Field(KvFieldRef::ZOrderKey {
3039                bit_offset: 12,
3040                field_position: 0,
3041                field_widths: vec![8],
3042                kind: KvFieldKind::UInt64,
3043            })],
3044            filter: Some(KvPredicate {
3045                checks: vec![KvPredicateCheck {
3046                    field: KvFieldRef::Value {
3047                        index: 0,
3048                        kind: KvFieldKind::UInt64,
3049                        nullable: false,
3050                    },
3051                    constraint: KvPredicateConstraint::UInt64Range {
3052                        min: Some(1),
3053                        max: Some(9),
3054                    },
3055                }],
3056                contradiction: false,
3057            }),
3058        };
3059
3060        let shifted = client.prefix_reduce_request(&request).unwrap();
3061        let Some(KvExpr::Field(KvFieldRef::Key { bit_offset, .. })) =
3062            shifted.reducers[0].expr.as_ref()
3063        else {
3064            panic!("expected key field reducer");
3065        };
3066        assert_eq!(*bit_offset, 14);
3067        let KvExpr::Field(KvFieldRef::ZOrderKey { bit_offset, .. }) = &shifted.group_by[0] else {
3068            panic!("expected z-order group field");
3069        };
3070        assert_eq!(*bit_offset, 17);
3071    }
3072
3073    #[test]
3074    fn store_write_batch_uses_each_clients_prefix() {
3075        let base = StoreClient::new("http://localhost:10000");
3076        let a = base.with_key_prefix(StoreKeyPrefix::new(4, 1).unwrap());
3077        let b = base.with_key_prefix(StoreKeyPrefix::new(4, 2).unwrap());
3078        let key_a = Bytes::from_static(b"a");
3079        let key_b = Bytes::from_static(b"b");
3080
3081        let mut batch = StoreWriteBatch::new();
3082        batch.push(&a, &key_a, b"va").unwrap();
3083        batch.push(&b, &key_b, b"vb").unwrap();
3084
3085        assert_eq!(
3086            batch.entries[0].0,
3087            a.key_prefix().unwrap().encode_key(&key_a).unwrap()
3088        );
3089        assert_eq!(
3090            batch.entries[1].0,
3091            b.key_prefix().unwrap().encode_key(&key_b).unwrap()
3092        );
3093    }
3094
3095    fn hex_encode(data: &[u8]) -> String {
3096        hex::encode(data)
3097    }
3098
3099    fn hex_decode(s: &str) -> Option<Vec<u8>> {
3100        hex::decode(s).ok()
3101    }
3102}