Skip to main content

exoware_sdk/
lib.rs

1//! Store Rust SDK Client.
2//!
3//! Provides typed access to the store put/get/query APIs plus
4//! plain HTTP health/readiness probes.
5//!
6//! ## Errors
7//!
8//! RPC failures surface as [`ClientError::Rpc`] carrying a native [`ConnectError`]. Use
9//! [`ClientError::decoded_rpc_error`] or [`StoreClient::decode_error_details`] to unpack
10//! protobuf `google.rpc` details (and `store.query.v1.Detail` on query RPC errors), not string parsing.
11//! Idempotent reads honor `google.rpc.RetryInfo` when deciding backoff (see `retry_delay_for_error`).
12
13pub mod keys;
14pub mod kv_codec;
15pub mod match_key;
16pub mod proto;
17pub mod prune_policy;
18pub mod stream_filter;
19pub use keys::{Key, KeyCodec, KeyCodecError, KeyMut, KeyValidationError, Value, MAX_KEY_LEN};
20pub use proto::*;
21extern crate self as exoware_proto;
22
23use bytes::Bytes;
24use connectrpc::client::{ClientConfig, ServerStream as ConnectServerStream};
25use connectrpc::{ConnectError, ErrorCode};
26use exoware_proto::compact::ServiceClient as CompactServiceClient;
27use exoware_proto::ingest::ServiceClient as IngestServiceClient;
28use exoware_proto::query as proto_query;
29use exoware_proto::query::ServiceClient as QueryServiceClient;
30use exoware_proto::store::compact::v1::PruneRequest as ProtoPruneRequest;
31use exoware_proto::store::ingest::v1::PutRequest as ProtoPutRequest;
32use exoware_proto::store::query::v1::{
33    GetManyRequest as ProtoGetManyRequest, GetRequest as ProtoGetRequest,
34    RangeRequest as ProtoRangeRequest, ReduceRequest as ProtoWireReduceRequest,
35};
36use exoware_proto::RangeReduceRequest as DomainRangeReduceRequest;
37use exoware_proto::{
38    connect_compression_registry as proto_connect_compression_registry,
39    decode_connect_error as proto_decode_connect_error,
40    to_domain_reduce_response as proto_to_domain_reduce_response,
41    to_proto_reduce_params as proto_to_proto_reduce_params,
42    PreferZstdHttpClient as ProtoPreferZstdHttpClient,
43};
44use futures::future::BoxFuture;
45use keys::is_valid_key_size;
46use kv_codec::{KvExpr, KvFieldRef, KvReducedValue};
47use std::collections::HashMap;
48use std::sync::{
49    atomic::{AtomicU64, Ordering},
50    Arc,
51};
52use std::time::Duration;
53
54const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
55const DEFAULT_RETRY_INITIAL_BACKOFF_MS: u64 = 100;
56const DEFAULT_RETRY_MAX_BACKOFF_MS: u64 = 2_000;
57
58/// Converts caller-provided write values into the byte owner stored by
59/// [`StoreWriteBatch`].
60pub trait IntoStoreWriteValue {
61    fn into_store_write_value(self) -> Bytes;
62}
63
64impl IntoStoreWriteValue for Bytes {
65    fn into_store_write_value(self) -> Bytes {
66        self
67    }
68}
69
70impl IntoStoreWriteValue for &Bytes {
71    fn into_store_write_value(self) -> Bytes {
72        self.clone()
73    }
74}
75
76impl IntoStoreWriteValue for Vec<u8> {
77    fn into_store_write_value(self) -> Bytes {
78        self.into()
79    }
80}
81
82impl IntoStoreWriteValue for &Vec<u8> {
83    fn into_store_write_value(self) -> Bytes {
84        Bytes::copy_from_slice(self)
85    }
86}
87
88impl IntoStoreWriteValue for &[u8] {
89    fn into_store_write_value(self) -> Bytes {
90        Bytes::copy_from_slice(self)
91    }
92}
93
94impl<const N: usize> IntoStoreWriteValue for &[u8; N] {
95    fn into_store_write_value(self) -> Bytes {
96        Bytes::copy_from_slice(self)
97    }
98}
99
100/// Codec used to compress **outgoing** RPC request bodies when compression applies.
101///
102/// connectrpc `ClientConfig::compress_requests` accepts a single encoding name; there is no
103/// HTTP-style negotiation list for requests. Prefer [`Zstd`](Self::Zstd); use [`Gzip`](Self::Gzip)
104/// when talking to a peer that only accepts gzip-compressed requests. Response decompression still
105/// follows [`PreferZstdHttpClient`] and the shared [`connect_compression_registry`].
106///
107/// To drive this from configuration or environment variables, map your setting to this enum and
108/// pass it to [`StoreClientBuilder::connect_request_compression`].
109#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
110pub enum ConnectRequestCompression {
111    /// `compress_requests("zstd")`.
112    #[default]
113    Zstd,
114    /// `compress_requests("gzip")`.
115    Gzip,
116}
117
118impl ConnectRequestCompression {
119    fn wire_name(self) -> &'static str {
120        match self {
121            Self::Zstd => "zstd",
122            Self::Gzip => "gzip",
123        }
124    }
125}
126
127/// Default max decompressed RPC message size for client decode (matches the query worker).
128///
129/// The underlying client uses 4 MiB unless configured; large `Range` frames need headroom.
130/// The store simulator uses the same 256 MiB cap for large `Range` frames.
131const STORE_CLIENT_MAX_MESSAGE_BYTES: usize = 256 * 1024 * 1024;
132
133/// Store client defaults: [`connect_compression_registry`] for codecs;
134/// [`PreferZstdHttpClient`] sets `Accept-Encoding: zstd, gzip` on responses.
135///
136/// Request body compression uses [`ConnectRequestCompression`] (default zstd); connectrpc only
137/// supports one request encoding per config (see [`ConnectRequestCompression`]).
138fn store_connect_client_config(
139    base_uri: http::Uri,
140    request_compression: ConnectRequestCompression,
141) -> ClientConfig {
142    ClientConfig::new(base_uri)
143        .with_compression(proto_connect_compression_registry())
144        .compress_requests(request_compression.wire_name())
145        .with_default_max_message_size(STORE_CLIENT_MAX_MESSAGE_BYTES)
146}
147
148/// Store client error.
149#[derive(Debug, thiserror::Error)]
150pub enum ClientError {
151    #[error("HTTP error: {0}")]
152    Http(#[from] reqwest::Error),
153    #[error("RPC error ({0})")]
154    Rpc(Box<ConnectError>),
155    #[error("store key prefix error: {0}")]
156    KeyPrefix(#[from] StoreKeyPrefixError),
157    #[error("invalid key length: expected {expected}, got {got}")]
158    InvalidKeyLength { expected: usize, got: usize },
159    #[error("wire format error: {0}")]
160    WireFormat(String),
161}
162
163impl ClientError {
164    pub fn rpc_error(&self) -> Option<&ConnectError> {
165        match self {
166            Self::Rpc(err) => Some(err.as_ref()),
167            _ => None,
168        }
169    }
170
171    pub fn rpc_code(&self) -> Option<ErrorCode> {
172        self.rpc_error().map(|err| err.code)
173    }
174
175    pub fn decoded_rpc_error(
176        &self,
177    ) -> Result<Option<exoware_proto::DecodedConnectError>, buffa::DecodeError> {
178        self.rpc_error().map(proto_decode_connect_error).transpose()
179    }
180}
181
182/// Errors returned by [`StoreKeyPrefix`] when a logical key cannot be mapped
183/// into the prefixed physical keyspace.
184#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
185pub enum StoreKeyPrefixError {
186    #[error("reserved_bits {reserved_bits} exceeds 16")]
187    ReservedBitsTooLarge { reserved_bits: u8 },
188    #[error("prefix {prefix} does not fit in {reserved_bits} reserved bits")]
189    PrefixTooLarge { reserved_bits: u8, prefix: u16 },
190    #[error(
191        "combined reserved bits exceed 16: store prefix bits {prefix_bits} + logical bits {logical_bits}"
192    )]
193    CombinedReservedBitsTooLarge { prefix_bits: u8, logical_bits: u8 },
194    #[error("key does not belong to this store prefix")]
195    PrefixMismatch,
196    #[error("key bit offset {offset} plus store prefix bits {prefix_bits} exceeds u16")]
197    BitOffsetOverflow { offset: u16, prefix_bits: u8 },
198    #[error("key codec error: {0}")]
199    Codec(#[from] KeyCodecError),
200}
201
202/// A client-side namespace layered over raw Store keys.
203///
204/// The prefix consumes a small number of high bits in the physical Store key
205/// and stores the caller's logical key in the remaining payload bits. QMDB,
206/// SQL, and other higher-level instances continue to build their own logical
207/// keys as before; a prefixed [`StoreClient`] maps those keys on the wire and
208/// maps returned keys back before callers see them.
209#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
210pub struct StoreKeyPrefix {
211    codec: KeyCodec,
212}
213
214impl StoreKeyPrefix {
215    pub fn new(reserved_bits: u8, prefix: u16) -> Result<Self, StoreKeyPrefixError> {
216        validate_prefix_bits(reserved_bits, prefix)?;
217        Ok(Self {
218            codec: KeyCodec::new(reserved_bits, prefix),
219        })
220    }
221
222    #[inline]
223    pub fn reserved_bits(self) -> u8 {
224        self.codec.reserved_bits()
225    }
226
227    #[inline]
228    pub fn prefix(self) -> u16 {
229        self.codec.prefix()
230    }
231
232    /// Maximum logical key bytes available under this prefix.
233    #[inline]
234    pub fn max_logical_key_len(self) -> usize {
235        self.codec.max_payload_capacity_bytes()
236    }
237
238    /// Encode a logical key into the physical Store keyspace.
239    pub fn encode_key(self, key: &Key) -> Result<Key, StoreKeyPrefixError> {
240        Ok(self.codec.encode(key)?)
241    }
242
243    /// Decode a physical Store key back into the logical keyspace.
244    pub fn decode_key(self, key: &Key) -> Result<Key, StoreKeyPrefixError> {
245        if !self.codec.matches(key) {
246            return Err(StoreKeyPrefixError::PrefixMismatch);
247        }
248        let payload_len = self.codec.payload_capacity_bytes_for_key_len(key.len());
249        Ok(Bytes::from(self.codec.read_payload(key, 0, payload_len)?))
250    }
251
252    /// Encode an inclusive logical range into the physical Store keyspace.
253    ///
254    /// Empty `end` means unbounded in the logical keyspace and is narrowed to
255    /// this prefix's physical upper bound. Long logical upper bounds are
256    /// clamped to the maximum logical key length representable under this
257    /// prefix; this preserves scans over existing `KeyCodec::prefix_bounds`
258    /// ranges, whose upper bound is intentionally `MAX_KEY_LEN` bytes.
259    pub fn encode_range(self, start: &Key, end: &Key) -> Result<(Key, Key), StoreKeyPrefixError> {
260        let start = self.encode_key(start)?;
261        let end = if end.is_empty() {
262            self.codec.prefix_bounds().1
263        } else {
264            let max_len = self.max_logical_key_len();
265            let end = if end.len() > max_len {
266                Bytes::copy_from_slice(&end[..max_len])
267            } else {
268                Bytes::copy_from_slice(end)
269            };
270            self.encode_key(&end)?
271        };
272        Ok((start, end))
273    }
274
275    fn prefix_match_key(
276        self,
277        match_key: &crate::match_key::MatchKey,
278    ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
279        self.prefix_match_key_with_regex(match_key, match_key.payload_regex.clone())
280    }
281
282    fn prefix_stream_match_key(
283        self,
284        match_key: &crate::match_key::MatchKey,
285    ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
286        self.prefix_match_key_with_regex(match_key, crate::kv_codec::Utf8::from("(?s-u).*"))
287    }
288
289    fn prefix_match_key_with_regex(
290        self,
291        match_key: &crate::match_key::MatchKey,
292        payload_regex: crate::kv_codec::Utf8,
293    ) -> Result<crate::match_key::MatchKey, StoreKeyPrefixError> {
294        validate_prefix_bits(match_key.reserved_bits, match_key.prefix)?;
295        let reserved_bits = self
296            .reserved_bits()
297            .checked_add(match_key.reserved_bits)
298            .ok_or(StoreKeyPrefixError::CombinedReservedBitsTooLarge {
299                prefix_bits: self.reserved_bits(),
300                logical_bits: match_key.reserved_bits,
301            })?;
302        if reserved_bits > 16 {
303            return Err(StoreKeyPrefixError::CombinedReservedBitsTooLarge {
304                prefix_bits: self.reserved_bits(),
305                logical_bits: match_key.reserved_bits,
306            });
307        }
308
309        let prefix = (u32::from(self.prefix()) << u32::from(match_key.reserved_bits))
310            | u32::from(match_key.prefix);
311        let prefix = u16::try_from(prefix).map_err(|_| StoreKeyPrefixError::PrefixTooLarge {
312            reserved_bits,
313            prefix: u16::MAX,
314        })?;
315        validate_prefix_bits(reserved_bits, prefix)?;
316        Ok(crate::match_key::MatchKey {
317            reserved_bits,
318            prefix,
319            payload_regex,
320        })
321    }
322}
323
324/// A physical Store write batch assembled from one or more logical clients.
325///
326/// Use [`Self::push`] with the specific prefixed client that produced each
327/// logical key, then [`Self::commit`] once to submit all rows in one atomic
328/// Store `Put`.
329#[derive(Clone, Debug, Default)]
330pub struct StoreWriteBatch {
331    entries: Vec<(Key, Bytes)>,
332}
333
334impl StoreWriteBatch {
335    pub fn new() -> Self {
336        Self::default()
337    }
338
339    pub fn len(&self) -> usize {
340        self.entries.len()
341    }
342
343    pub fn is_empty(&self) -> bool {
344        self.entries.is_empty()
345    }
346
347    pub fn clear(&mut self) {
348        self.entries.clear();
349    }
350
351    pub fn push(
352        &mut self,
353        client: &StoreClient,
354        key: &Key,
355        value: impl IntoStoreWriteValue,
356    ) -> Result<&mut Self, ClientError> {
357        self.entries.push((
358            client.encode_store_key(key)?,
359            value.into_store_write_value(),
360        ));
361        Ok(self)
362    }
363
364    pub async fn commit(&self, client: &StoreClient) -> Result<u64, ClientError> {
365        let refs: Vec<(&Key, &[u8])> = self
366            .entries
367            .iter()
368            .map(|(key, value)| (key, value.as_ref()))
369            .collect();
370        client.put_physical(&refs).await
371    }
372}
373
374/// A writer that can stage an already-prepared upload into a shared Store
375/// write batch and then be notified of the batch outcome.
376///
377/// Implementations should keep `prepare_*` methods as inherent APIs because
378/// each writer's input shape differs. Once a caller has a prepared handle,
379/// this trait provides the common lifecycle:
380///
381/// 1. stage rows into a [`StoreWriteBatch`]
382/// 2. commit that batch
383/// 3. mark the prepared handle persisted with the returned Store sequence
384///    number, or failed if staging/commit does not complete
385pub trait StoreBatchUpload {
386    type Prepared: Send;
387    type Receipt: Send;
388    type Error: std::fmt::Display + Send;
389
390    fn stage_upload(
391        &self,
392        prepared: &Self::Prepared,
393        batch: &mut StoreWriteBatch,
394    ) -> Result<(), Self::Error>;
395
396    fn commit_error(&self, error: ClientError) -> Self::Error;
397
398    fn mark_upload_persisted<'a>(
399        &'a self,
400        prepared: Self::Prepared,
401        sequence_number: u64,
402    ) -> BoxFuture<'a, Self::Receipt>
403    where
404        Self: Sync + 'a,
405        Self::Prepared: 'a;
406
407    fn mark_upload_failed<'a>(
408        &'a self,
409        prepared: Self::Prepared,
410        error: String,
411    ) -> BoxFuture<'a, ()>
412    where
413        Self: Sync + 'a,
414        Self::Prepared: 'a;
415
416    fn commit_upload<'a>(
417        &'a self,
418        client: &'a StoreClient,
419        prepared: Self::Prepared,
420    ) -> BoxFuture<'a, Result<Self::Receipt, Self::Error>>
421    where
422        Self: Sync + Sized + 'a,
423        Self::Prepared: 'a,
424        Self::Receipt: 'a,
425        Self::Error: 'a,
426    {
427        Box::pin(async move {
428            let mut batch = StoreWriteBatch::new();
429            if let Err(err) = self.stage_upload(&prepared, &mut batch) {
430                let message = err.to_string();
431                self.mark_upload_failed(prepared, message).await;
432                return Err(err);
433            }
434            match batch.commit(client).await {
435                Ok(sequence_number) => {
436                    Ok(self.mark_upload_persisted(prepared, sequence_number).await)
437                }
438                Err(err) => {
439                    let message = err.to_string();
440                    self.mark_upload_failed(prepared, message).await;
441                    Err(self.commit_error(err))
442                }
443            }
444        })
445    }
446}
447
448/// A writer that can stage an already-prepared publication record into a
449/// shared Store write batch and then be notified of the batch outcome.
450///
451/// This is the companion to [`StoreBatchUpload`] for metadata that publishes
452/// already-staged data, such as QMDB watermarks. Implementations should keep
453/// `prepare_*` methods as inherent APIs because each publisher decides when a
454/// publication is needed.
455pub trait StoreBatchPublication {
456    type PreparedPublication: Send;
457    type PublicationReceipt: Send;
458    type Error: std::fmt::Display + Send;
459
460    fn stage_publication(
461        &self,
462        prepared: &Self::PreparedPublication,
463        batch: &mut StoreWriteBatch,
464    ) -> Result<(), Self::Error>;
465
466    fn publication_commit_error(&self, error: ClientError) -> Self::Error;
467
468    fn mark_publication_persisted<'a>(
469        &'a self,
470        prepared: Self::PreparedPublication,
471        sequence_number: u64,
472    ) -> BoxFuture<'a, Self::PublicationReceipt>
473    where
474        Self: Sync + 'a,
475        Self::PreparedPublication: 'a;
476
477    fn mark_publication_failed<'a>(
478        &'a self,
479        _prepared: Self::PreparedPublication,
480        _error: String,
481    ) -> BoxFuture<'a, ()>
482    where
483        Self: Sync + 'a,
484        Self::PreparedPublication: 'a,
485    {
486        Box::pin(async {})
487    }
488
489    fn commit_publication<'a>(
490        &'a self,
491        client: &'a StoreClient,
492        prepared: Self::PreparedPublication,
493    ) -> BoxFuture<'a, Result<Self::PublicationReceipt, Self::Error>>
494    where
495        Self: Sync + Sized + 'a,
496        Self::PreparedPublication: 'a,
497        Self::PublicationReceipt: 'a,
498        Self::Error: 'a,
499    {
500        Box::pin(async move {
501            let mut batch = StoreWriteBatch::new();
502            if let Err(err) = self.stage_publication(&prepared, &mut batch) {
503                let message = err.to_string();
504                self.mark_publication_failed(prepared, message).await;
505                return Err(err);
506            }
507            match batch.commit(client).await {
508                Ok(sequence_number) => Ok(self
509                    .mark_publication_persisted(prepared, sequence_number)
510                    .await),
511                Err(err) => {
512                    let message = err.to_string();
513                    self.mark_publication_failed(prepared, message).await;
514                    Err(self.publication_commit_error(err))
515                }
516            }
517        })
518    }
519}
520
521/// A stateful writer that owns a durable publication frontier.
522///
523/// This extends [`StoreBatchPublication`] with the pieces needed by writers
524/// that can prepare a catch-up publication after pending uploads drain. The
525/// associated prepared publication, receipt, and error types come from
526/// [`StoreBatchPublication`], so databases can use this for watermarks,
527/// checkpoints, catalog versions, or any similar publication record without
528/// sharing QMDB-specific concepts.
529pub trait StorePublicationFrontierWriter: StoreBatchPublication {
530    fn latest_publication_receipt<'a>(&'a self) -> BoxFuture<'a, Option<Self::PublicationReceipt>>
531    where
532        Self: Sync + 'a,
533        Self::PublicationReceipt: 'a;
534
535    fn prepare_publication<'a>(
536        &'a self,
537    ) -> BoxFuture<'a, Result<Option<Self::PreparedPublication>, Self::Error>>
538    where
539        Self: Sync + 'a,
540        Self::PreparedPublication: 'a,
541        Self::Error: 'a;
542
543    fn flush_publication_with_receipt<'a>(
544        &'a self,
545    ) -> BoxFuture<'a, Result<Option<Self::PublicationReceipt>, Self::Error>>
546    where
547        Self: Sync + 'a,
548        Self::PublicationReceipt: 'a,
549        Self::Error: 'a;
550
551    fn flush_publication<'a>(&'a self) -> BoxFuture<'a, Result<(), Self::Error>>
552    where
553        Self: Sync + 'a,
554        Self::PublicationReceipt: 'a,
555        Self::Error: 'a,
556    {
557        Box::pin(async move { self.flush_publication_with_receipt().await.map(|_| ()) })
558    }
559}
560
561/// Traversal mode for range queries.
562#[derive(Clone, Copy, Debug, Eq, PartialEq)]
563pub enum RangeMode {
564    Forward,
565    Reverse,
566}
567
568#[derive(Clone, Debug)]
569pub struct RangeChunk {
570    /// Rows returned in this stream frame.
571    pub rows: Vec<(Key, Bytes)>,
572    /// Query detail reported after reading this chunk.
573    pub detail: Option<proto_query::Detail>,
574}
575
576#[derive(Clone, Debug)]
577pub struct GetManyChunk {
578    /// Lookup entries returned in this stream frame.
579    pub entries: Vec<(Key, Option<Bytes>)>,
580    /// Query detail reported after reading this chunk.
581    pub detail: Option<proto_query::Detail>,
582}
583
584/// Iterator-like async range stream.
585pub struct RangeStream {
586    stream:
587        ConnectServerStream<hyper::body::Incoming, exoware_proto::query::RangeFrameView<'static>>,
588    pending_frame: Option<exoware_proto::query::RangeFrame>,
589    rows_seen: usize,
590    final_count: Option<usize>,
591    finished: bool,
592    observed_sequence: Option<Arc<AtomicU64>>,
593    key_prefix: Option<StoreKeyPrefix>,
594}
595
596impl RangeStream {
597    fn from_connect_stream(
598        stream: ConnectServerStream<
599            hyper::body::Incoming,
600            exoware_proto::query::RangeFrameView<'static>,
601        >,
602        observed_sequence: Option<Arc<AtomicU64>>,
603        key_prefix: Option<StoreKeyPrefix>,
604    ) -> Self {
605        Self {
606            stream,
607            pending_frame: None,
608            rows_seen: 0,
609            final_count: None,
610            finished: false,
611            observed_sequence,
612            key_prefix,
613        }
614    }
615
616    pub fn final_count(&self) -> Option<usize> {
617        self.final_count
618    }
619
620    async fn prefetch_first_frame(&mut self) -> Result<(), ConnectError> {
621        if self.pending_frame.is_some() || self.finished {
622            return Ok(());
623        }
624        match self.stream.message().await? {
625            Some(frame) => {
626                self.pending_frame = Some(frame.to_owned_message());
627                Ok(())
628            }
629            None => {
630                self.finished = true;
631                if let Some(err) = self.stream.error() {
632                    Err(err.clone())
633                } else {
634                    self.final_count = Some(self.rows_seen);
635                    Ok(())
636                }
637            }
638        }
639    }
640
641    pub async fn next_chunk(&mut self) -> Result<Option<RangeChunk>, ClientError> {
642        loop {
643            if self.finished {
644                return Ok(None);
645            }
646
647            let frame = if let Some(frame) = self.pending_frame.take() {
648                frame
649            } else {
650                let Some(frame) = self
651                    .stream
652                    .message()
653                    .await
654                    .map_err(client_error_from_connect)?
655                else {
656                    self.finished = true;
657                    if let Some(err) = self.stream.error() {
658                        return Err(client_error_from_connect(err.clone()));
659                    }
660                    self.final_count = Some(self.rows_seen);
661                    return Ok(None);
662                };
663                frame.to_owned_message()
664            };
665
666            let detail = frame.detail.as_option().cloned();
667            if let (Some(sequence_store), Some(detail)) = (&self.observed_sequence, detail.as_ref())
668            {
669                sequence_store.fetch_max(detail.sequence_number, Ordering::SeqCst);
670            }
671            let n = frame.results.len();
672
673            // Hide default/empty wire frames from the SDK's semantic chunk stream.
674            if n == 0 && detail.is_none() {
675                continue;
676            }
677
678            let mut out = Vec::with_capacity(n);
679            for entry in &frame.results {
680                let key = Bytes::copy_from_slice(&entry.key);
681                let key = match self.key_prefix {
682                    Some(prefix) => prefix.decode_key(&key)?,
683                    None => key,
684                };
685                out.push((key, Bytes::copy_from_slice(&entry.value)));
686            }
687            self.rows_seen += n;
688            return Ok(Some(RangeChunk { rows: out, detail }));
689        }
690    }
691
692    pub async fn collect(mut self) -> Result<Vec<(Key, Bytes)>, ClientError> {
693        let mut entries = Vec::new();
694        while let Some(chunk) = self.next_chunk().await? {
695            entries.extend(chunk.rows);
696        }
697        Ok(entries)
698    }
699}
700
701pub struct GetManyStream {
702    stream:
703        ConnectServerStream<hyper::body::Incoming, exoware_proto::query::GetManyFrameView<'static>>,
704    pending_frame: Option<exoware_proto::query::GetManyFrame>,
705    finished: bool,
706    observed_sequence: Option<Arc<AtomicU64>>,
707    key_prefix: Option<StoreKeyPrefix>,
708}
709
710impl GetManyStream {
711    fn from_connect_stream(
712        stream: ConnectServerStream<
713            hyper::body::Incoming,
714            exoware_proto::query::GetManyFrameView<'static>,
715        >,
716        observed_sequence: Option<Arc<AtomicU64>>,
717        key_prefix: Option<StoreKeyPrefix>,
718    ) -> Self {
719        Self {
720            stream,
721            pending_frame: None,
722            finished: false,
723            observed_sequence,
724            key_prefix,
725        }
726    }
727
728    async fn prefetch_first_frame(&mut self) -> Result<(), ConnectError> {
729        if self.pending_frame.is_some() || self.finished {
730            return Ok(());
731        }
732        match self.stream.message().await? {
733            Some(frame) => {
734                self.pending_frame = Some(frame.to_owned_message());
735                Ok(())
736            }
737            None => {
738                self.finished = true;
739                if let Some(err) = self.stream.error() {
740                    Err(err.clone())
741                } else {
742                    Ok(())
743                }
744            }
745        }
746    }
747
748    pub async fn next_chunk(&mut self) -> Result<Option<GetManyChunk>, ClientError> {
749        loop {
750            if self.finished {
751                return Ok(None);
752            }
753            let frame = if let Some(frame) = self.pending_frame.take() {
754                frame
755            } else {
756                let Some(frame) = self
757                    .stream
758                    .message()
759                    .await
760                    .map_err(client_error_from_connect)?
761                else {
762                    self.finished = true;
763                    if let Some(err) = self.stream.error() {
764                        return Err(client_error_from_connect(err.clone()));
765                    }
766                    return Ok(None);
767                };
768                frame.to_owned_message()
769            };
770
771            let detail = frame.detail.as_option().cloned();
772            if let (Some(sequence_store), Some(detail)) = (&self.observed_sequence, detail.as_ref())
773            {
774                sequence_store.fetch_max(detail.sequence_number, Ordering::SeqCst);
775            }
776            let n = frame.results.len();
777
778            // Hide default/empty wire frames from the SDK's semantic chunk stream.
779            if n == 0 && detail.is_none() {
780                continue;
781            }
782
783            let mut out = Vec::with_capacity(n);
784            for entry in &frame.results {
785                let key = Bytes::copy_from_slice(&entry.key);
786                let key = match self.key_prefix {
787                    Some(prefix) => prefix.decode_key(&key)?,
788                    None => key,
789                };
790                let value = entry.value.as_ref().map(|v| Bytes::copy_from_slice(v));
791                out.push((key, value));
792            }
793            return Ok(Some(GetManyChunk {
794                entries: out,
795                detail,
796            }));
797        }
798    }
799
800    pub async fn collect(mut self) -> Result<HashMap<Key, Bytes>, ClientError> {
801        let mut map = HashMap::new();
802        while let Some(chunk) = self.next_chunk().await? {
803            for (key, value) in chunk.entries {
804                if let Some(v) = value {
805                    map.insert(key, v);
806                }
807            }
808        }
809        Ok(map)
810    }
811}
812
813impl RangeMode {
814    fn to_proto(self) -> proto_query::TraversalMode {
815        match self {
816            Self::Forward => proto_query::TraversalMode::TRAVERSAL_MODE_FORWARD,
817            Self::Reverse => proto_query::TraversalMode::TRAVERSAL_MODE_REVERSE,
818        }
819    }
820}
821
822#[inline]
823fn key_prefix_mask(bits: u8) -> Result<u16, StoreKeyPrefixError> {
824    if bits > 16 {
825        return Err(StoreKeyPrefixError::ReservedBitsTooLarge {
826            reserved_bits: bits,
827        });
828    }
829    Ok(if bits == 0 {
830        0
831    } else if bits == 16 {
832        u16::MAX
833    } else {
834        (1u16 << bits) - 1
835    })
836}
837
838fn validate_prefix_bits(reserved_bits: u8, prefix: u16) -> Result<(), StoreKeyPrefixError> {
839    let mask = key_prefix_mask(reserved_bits)?;
840    if prefix > mask {
841        return Err(StoreKeyPrefixError::PrefixTooLarge {
842            reserved_bits,
843            prefix,
844        });
845    }
846    Ok(())
847}
848
849/// One delivered (key, value) row from a stream subscription. The client
850/// reapplies its own filter if it needs to know which match_key matched —
851/// the wire frame doesn't carry the index.
852#[derive(Clone, Debug)]
853pub struct StreamSubscriptionEntry {
854    pub key: Key,
855    pub value: Bytes,
856}
857
858/// One atomic Put batch delivered to a subscriber.
859#[derive(Clone, Debug)]
860pub struct StreamSubscriptionFrame {
861    pub sequence_number: u64,
862    pub entries: Vec<StreamSubscriptionEntry>,
863}
864
865/// Async stream of `StreamSubscriptionFrame`. Backed by the generated
866/// connectrpc server stream.
867pub struct StreamSubscription {
868    stream: ConnectServerStream<
869        hyper::body::Incoming,
870        exoware_proto::store::stream::v1::SubscribeResponseView<'static>,
871    >,
872    key_prefix: Option<StoreKeyPrefix>,
873    logical_filter: Option<ClientStreamFilter>,
874}
875
876impl std::fmt::Debug for StreamSubscription {
877    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
878        f.debug_struct("StreamSubscription").finish_non_exhaustive()
879    }
880}
881
882impl StreamSubscription {
883    /// Pull the next frame. `Ok(None)` = server closed the stream cleanly.
884    pub async fn next(&mut self) -> Result<Option<StreamSubscriptionFrame>, ClientError> {
885        loop {
886            match self
887                .stream
888                .message()
889                .await
890                .map_err(client_error_from_connect)?
891            {
892                Some(view) => {
893                    let owned = view.to_owned_message();
894                    let mut entries = Vec::with_capacity(owned.entries.len());
895                    for entry in owned.entries {
896                        let key = Bytes::from(entry.key);
897                        let key = match self.key_prefix {
898                            Some(prefix) => prefix.decode_key(&key)?,
899                            None => key,
900                        };
901                        let value = entry.value;
902                        if self
903                            .logical_filter
904                            .as_ref()
905                            .is_none_or(|filter| filter.matches(&key, value.as_ref()))
906                        {
907                            entries.push(StreamSubscriptionEntry { key, value });
908                        }
909                    }
910                    if entries.is_empty() {
911                        continue;
912                    }
913                    let frame = StreamSubscriptionFrame {
914                        sequence_number: owned.sequence_number,
915                        entries,
916                    };
917                    return Ok(Some(frame));
918                }
919                None => {
920                    if let Some(err) = self.stream.error() {
921                        return Err(client_error_from_connect(err.clone()));
922                    } else {
923                        return Ok(None);
924                    }
925                }
926            }
927        }
928    }
929}
930
931#[derive(Clone)]
932struct ClientKeyMatcher {
933    codec: KeyCodec,
934    regex: regex::bytes::Regex,
935}
936
937#[derive(Clone)]
938struct ClientStreamFilter {
939    keys: Vec<ClientKeyMatcher>,
940    values: Option<crate::stream_filter::CompiledBytesFilters>,
941}
942
943impl ClientStreamFilter {
944    fn compile(filter: &crate::stream_filter::StreamFilter) -> Result<Self, ClientError> {
945        crate::stream_filter::validate_filter(filter)
946            .map_err(|e| ClientError::WireFormat(e.to_string()))?;
947        let keys = filter
948            .match_keys
949            .iter()
950            .map(|mk| {
951                let regex = crate::match_key::compile_payload_regex(&mk.payload_regex)
952                    .map_err(|e| ClientError::WireFormat(e.to_string()))?;
953                Ok(ClientKeyMatcher {
954                    codec: KeyCodec::new(mk.reserved_bits, mk.prefix),
955                    regex,
956                })
957            })
958            .collect::<Result<Vec<_>, ClientError>>()?;
959        let values = crate::stream_filter::CompiledBytesFilters::compile(&filter.value_filters)
960            .map_err(ClientError::WireFormat)?;
961        Ok(Self { keys, values })
962    }
963
964    fn matches(&self, key: &Key, value: &[u8]) -> bool {
965        if !self
966            .values
967            .as_ref()
968            .is_none_or(|filter| filter.matches(value))
969        {
970            return false;
971        }
972        self.keys.iter().any(|matcher| {
973            if !matcher.codec.matches(key) {
974                return false;
975            }
976            let payload_len = matcher.codec.payload_capacity_bytes_for_key_len(key.len());
977            matcher
978                .codec
979                .read_payload(key, 0, payload_len)
980                .is_ok_and(|payload| matcher.regex.is_match(&payload))
981        })
982    }
983}
984
985/// Inspect a Connect error for `store.stream.BATCH_EVICTED` / `BATCH_NOT_FOUND`
986/// `ErrorInfo` details. Used by `get_batch` to collapse both into `Ok(None)`.
987fn is_batch_missing_error(err: &ConnectError) -> bool {
988    match proto_decode_connect_error(err) {
989        Ok(decoded) => decoded.error_info.is_some_and(|info| {
990            info.domain == "store.stream"
991                && matches!(info.reason.as_str(), "BATCH_EVICTED" | "BATCH_NOT_FOUND")
992        }),
993        Err(_) => false,
994    }
995}
996
997/// Retry policy for idempotent read operations.
998#[derive(Clone, Copy, Debug)]
999pub struct RetryConfig {
1000    max_attempts: usize,
1001    initial_backoff: Duration,
1002    max_backoff: Duration,
1003}
1004
1005impl RetryConfig {
1006    pub fn standard() -> Self {
1007        Self {
1008            max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
1009            initial_backoff: Duration::from_millis(DEFAULT_RETRY_INITIAL_BACKOFF_MS),
1010            max_backoff: Duration::from_millis(DEFAULT_RETRY_MAX_BACKOFF_MS),
1011        }
1012    }
1013
1014    pub fn disabled() -> Self {
1015        Self::standard().with_max_attempts(1)
1016    }
1017
1018    pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
1019        self.max_attempts = max_attempts.max(1);
1020        self
1021    }
1022
1023    pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self {
1024        self.initial_backoff = initial_backoff;
1025        self
1026    }
1027
1028    pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self {
1029        self.max_backoff = max_backoff;
1030        self
1031    }
1032
1033    pub(crate) fn sanitized(self) -> Self {
1034        let max_attempts = self.max_attempts.max(1);
1035        let max_backoff = self.max_backoff.max(self.initial_backoff);
1036        Self {
1037            max_attempts,
1038            initial_backoff: self.initial_backoff,
1039            max_backoff,
1040        }
1041    }
1042}
1043
1044impl Default for RetryConfig {
1045    fn default() -> Self {
1046        Self::standard()
1047    }
1048}
1049
1050fn trim_connect_base(url: &str) -> String {
1051    url.trim_end_matches('/').to_string()
1052}
1053
1054fn new_http_client() -> reqwest::Client {
1055    reqwest::Client::builder()
1056        .pool_max_idle_per_host(32)
1057        .timeout(Duration::from_secs(30))
1058        .build()
1059        .expect("failed to build HTTP client")
1060}
1061
1062/// Error returned when [`StoreClientBuilder`] is missing a required endpoint URL.
1063#[derive(Debug, thiserror::Error)]
1064pub enum ClientBuildError {
1065    #[error("StoreClientBuilder: missing health URL (set health_url or url)")]
1066    MissingHealthUrl,
1067    #[error("StoreClientBuilder: missing ingest URL (set ingest_url or url)")]
1068    MissingIngestUrl,
1069    #[error("StoreClientBuilder: missing query URL (set query_url or url)")]
1070    MissingQueryUrl,
1071    #[error("StoreClientBuilder: missing compact URL (set compact_url or url)")]
1072    MissingCompactUrl,
1073    #[error("StoreClientBuilder: missing stream URL (set stream_url or url)")]
1074    MissingStreamUrl,
1075    #[error("StoreClientBuilder: invalid URL \"{url}\": {source}")]
1076    InvalidUrl {
1077        url: String,
1078        source: http::uri::InvalidUri,
1079    },
1080}
1081
1082/// Configures a [`StoreClient`] with explicit bases for health probes and store services.
1083///
1084/// Use [`StoreClient::builder()`] to construct. Call [`Self::url`] to point every
1085/// service at the same origin, or set each base separately. Finish with [`Self::build`].
1086#[derive(Debug, Default)]
1087pub struct StoreClientBuilder {
1088    health_url: Option<String>,
1089    ingest_url: Option<String>,
1090    query_url: Option<String>,
1091    compact_url: Option<String>,
1092    stream_url: Option<String>,
1093    key_prefix: Option<StoreKeyPrefix>,
1094    retry_config: RetryConfig,
1095    connect_request_compression: ConnectRequestCompression,
1096}
1097
1098impl StoreClientBuilder {
1099    /// Sets the same base URL for all services (health, ingest, query, compact, stream).
1100    pub fn url(mut self, url: &str) -> Self {
1101        let u = trim_connect_base(url);
1102        self.health_url = Some(u.clone());
1103        self.ingest_url = Some(u.clone());
1104        self.query_url = Some(u.clone());
1105        self.compact_url = Some(u.clone());
1106        self.stream_url = Some(u);
1107        self
1108    }
1109
1110    /// Base URL for plain HTTP `GET /health` and `GET /ready` (often the query worker).
1111    pub fn health_url(mut self, url: &str) -> Self {
1112        self.health_url = Some(trim_connect_base(url));
1113        self
1114    }
1115
1116    /// Base URL for the ingest service (`store.ingest.v1.Service`).
1117    pub fn ingest_url(mut self, url: &str) -> Self {
1118        self.ingest_url = Some(trim_connect_base(url));
1119        self
1120    }
1121
1122    /// Base URL for the query service (`store.query.v1.Service`).
1123    pub fn query_url(mut self, url: &str) -> Self {
1124        self.query_url = Some(trim_connect_base(url));
1125        self
1126    }
1127
1128    /// Base URL for the compact service (`store.compact.v1.Service`).
1129    pub fn compact_url(mut self, url: &str) -> Self {
1130        self.compact_url = Some(trim_connect_base(url));
1131        self
1132    }
1133
1134    /// Base URL for the stream service (`store.stream.v1.Service`).
1135    pub fn stream_url(mut self, url: &str) -> Self {
1136        self.stream_url = Some(trim_connect_base(url));
1137        self
1138    }
1139
1140    /// Client-side key namespace applied to all user-key operations.
1141    pub fn key_prefix(mut self, prefix: StoreKeyPrefix) -> Self {
1142        self.key_prefix = Some(prefix);
1143        self
1144    }
1145
1146    /// Retry policy for idempotent read operations (get / range / reduce).
1147    pub fn retry_config(mut self, retry: RetryConfig) -> Self {
1148        self.retry_config = retry.sanitized();
1149        self
1150    }
1151
1152    /// Codec for compressing **outgoing** RPC request bodies (default [`ConnectRequestCompression::Zstd`]).
1153    pub fn connect_request_compression(mut self, compression: ConnectRequestCompression) -> Self {
1154        self.connect_request_compression = compression;
1155        self
1156    }
1157
1158    /// Build the client, or return an error if any required URL was not set.
1159    pub fn build(self) -> Result<StoreClient, ClientBuildError> {
1160        let health_url = self.health_url.ok_or(ClientBuildError::MissingHealthUrl)?;
1161        let ingest_url = self.ingest_url.ok_or(ClientBuildError::MissingIngestUrl)?;
1162        let query_url = self.query_url.ok_or(ClientBuildError::MissingQueryUrl)?;
1163        let compact_url = self
1164            .compact_url
1165            .ok_or(ClientBuildError::MissingCompactUrl)?;
1166        let stream_url = self.stream_url.ok_or(ClientBuildError::MissingStreamUrl)?;
1167        let ingest_uri: http::Uri =
1168            ingest_url
1169                .parse()
1170                .map_err(|e| ClientBuildError::InvalidUrl {
1171                    url: ingest_url.clone(),
1172                    source: e,
1173                })?;
1174        let query_uri: http::Uri = query_url
1175            .parse()
1176            .map_err(|e| ClientBuildError::InvalidUrl {
1177                url: query_url.clone(),
1178                source: e,
1179            })?;
1180        let compact_uri: http::Uri =
1181            compact_url
1182                .parse()
1183                .map_err(|e| ClientBuildError::InvalidUrl {
1184                    url: compact_url.clone(),
1185                    source: e,
1186                })?;
1187        let stream_uri: http::Uri =
1188            stream_url
1189                .parse()
1190                .map_err(|e| ClientBuildError::InvalidUrl {
1191                    url: stream_url.clone(),
1192                    source: e,
1193                })?;
1194        Ok(StoreClient {
1195            health_url,
1196            ingest_uri,
1197            query_uri,
1198            compact_uri,
1199            stream_uri,
1200            http: new_http_client(),
1201            connect_http: ProtoPreferZstdHttpClient::plaintext(),
1202            retry_config: self.retry_config,
1203            connect_request_compression: self.connect_request_compression,
1204            key_prefix: self.key_prefix,
1205        })
1206    }
1207}
1208
1209/// Typed Rust client for Store.
1210#[derive(Clone, Debug)]
1211pub struct StoreClient {
1212    /// Base URL for `health()` / `ready()` (typically the query worker).
1213    pub(crate) health_url: String,
1214    ingest_uri: http::Uri,
1215    query_uri: http::Uri,
1216    compact_uri: http::Uri,
1217    stream_uri: http::Uri,
1218    http: reqwest::Client,
1219    connect_http: ProtoPreferZstdHttpClient,
1220    retry_config: RetryConfig,
1221    connect_request_compression: ConnectRequestCompression,
1222    key_prefix: Option<StoreKeyPrefix>,
1223}
1224
1225/// A session that enforces monotonic read consistency via a fixed `min_sequence_number` floor.
1226///
1227/// `StoreClient` itself does not retain any client-global observed sequence.
1228/// Plain query reads are stateless unless the caller passes an explicit
1229/// `min_sequence_number`.
1230///
1231/// A `SerializableReadSession` is the explicit consistency mechanism. The first
1232/// successful unary read seeds the session from the server-reported sequence,
1233/// and every later read passes that fixed floor so the server guarantees all
1234/// responses reflect at least that point in the write log.
1235///
1236/// Streamed query reads (`get_many`, `range_stream`) expose their sequence in
1237/// each returned chunk's detail, so if one of those is the first successful
1238/// read, the session is pinned once the first detail-bearing chunk is consumed.
1239#[derive(Clone, Debug)]
1240pub struct SerializableReadSession {
1241    client: StoreClient,
1242    state: Arc<SessionState>,
1243}
1244
1245#[derive(Debug)]
1246struct SessionState {
1247    sequence: Arc<AtomicU64>,
1248    init_gate: tokio::sync::Mutex<()>,
1249}
1250
1251impl SessionState {
1252    fn fixed_sequence(&self) -> Option<u64> {
1253        let sequence = self.sequence.load(Ordering::Acquire);
1254        (sequence > 0).then_some(sequence)
1255    }
1256}
1257
1258#[derive(Default)]
1259struct RangeStreamReadOptions {
1260    min_sequence_number: Option<u64>,
1261    observed_sequence: Option<Arc<AtomicU64>>,
1262}
1263
1264impl StoreClient {
1265    /// Start building a client with per-service base URLs.
1266    pub fn builder() -> StoreClientBuilder {
1267        StoreClientBuilder::default()
1268    }
1269
1270    pub fn new(url: &str) -> Self {
1271        Self::with_retry_config(url, RetryConfig::standard())
1272    }
1273
1274    pub fn with_retry_config(url: &str, retry_config: RetryConfig) -> Self {
1275        Self::builder()
1276            .url(url)
1277            .retry_config(retry_config)
1278            .build()
1279            .expect("url sets all service URLs")
1280    }
1281
1282    /// Return this client's configured Store key prefix, if any.
1283    pub fn key_prefix(&self) -> Option<StoreKeyPrefix> {
1284        self.key_prefix
1285    }
1286
1287    /// Clone this client with a client-side Store key prefix.
1288    pub fn with_key_prefix(&self, prefix: StoreKeyPrefix) -> Self {
1289        let mut out = self.clone();
1290        out.key_prefix = Some(prefix);
1291        out
1292    }
1293
1294    /// Clone this client without client-side key prefixing.
1295    pub fn without_key_prefix(&self) -> Self {
1296        let mut out = self.clone();
1297        out.key_prefix = None;
1298        out
1299    }
1300
1301    /// Encode a logical key as it will appear in the physical Store.
1302    pub fn encode_store_key(&self, key: &Key) -> Result<Key, ClientError> {
1303        match self.key_prefix {
1304            Some(prefix) => Ok(prefix.encode_key(key)?),
1305            None => Ok(Bytes::copy_from_slice(key)),
1306        }
1307    }
1308
1309    /// Decode a physical Store key into this client's logical keyspace.
1310    pub fn decode_store_key(&self, key: &Key) -> Result<Key, ClientError> {
1311        match self.key_prefix {
1312            Some(prefix) => Ok(prefix.decode_key(key)?),
1313            None => Ok(Bytes::copy_from_slice(key)),
1314        }
1315    }
1316
1317    fn encode_store_range(&self, start: &Key, end: &Key) -> Result<(Key, Key), ClientError> {
1318        match self.key_prefix {
1319            Some(prefix) => Ok(prefix.encode_range(start, end)?),
1320            None => Ok((Bytes::copy_from_slice(start), Bytes::copy_from_slice(end))),
1321        }
1322    }
1323
1324    /// Outgoing Connect request body compression (see [`ConnectRequestCompression`]).
1325    pub fn connect_request_compression(&self) -> ConnectRequestCompression {
1326        self.connect_request_compression
1327    }
1328
1329    pub fn decode_error_details(
1330        err: &ConnectError,
1331    ) -> Result<exoware_proto::DecodedConnectError, buffa::DecodeError> {
1332        proto_decode_connect_error(err)
1333    }
1334
1335    /// Create an unseeded serializable read session.
1336    ///
1337    /// The first successful unary read fixes the session's sequence floor from
1338    /// the server response. Streamed query reads fix that floor once a
1339    /// detail-bearing chunk is consumed.
1340    pub fn create_session(&self) -> SerializableReadSession {
1341        self.create_session_with_sequence(0)
1342    }
1343
1344    /// Create a serializable read session pinned to at least `sequence`.
1345    ///
1346    /// This is intended for stream-driven read paths that already observed a
1347    /// concrete store batch sequence and need follow-up query/proof reads to
1348    /// see at least that sequence.
1349    pub fn create_session_with_sequence(&self, sequence: u64) -> SerializableReadSession {
1350        SerializableReadSession {
1351            client: self.clone(),
1352            state: Arc::new(SessionState {
1353                sequence: Arc::new(AtomicU64::new(sequence)),
1354                init_gate: tokio::sync::Mutex::new(()),
1355            }),
1356        }
1357    }
1358
1359    /// Typed access to the `store.ingest.v1` service.
1360    pub fn ingest(&self) -> Ingest<'_> {
1361        Ingest { c: self }
1362    }
1363
1364    /// Typed access to the `store.query.v1` service.
1365    pub fn query(&self) -> Query<'_> {
1366        Query { c: self }
1367    }
1368
1369    /// Typed access to the `store.compact.v1` service.
1370    pub fn compact(&self) -> Compact<'_> {
1371        Compact { c: self }
1372    }
1373
1374    /// Typed access to the `store.stream.v1` service.
1375    pub fn stream(&self) -> Stream<'_> {
1376        Stream { c: self }
1377    }
1378
1379    /// Submit a KV batch via Connect `Put`.
1380    ///
1381    /// On success returns the **store sequence number** from the response. Use it for immediate
1382    /// `get_with_min_sequence_number` / range calls or to seed
1383    /// [`Self::create_session_with_sequence`].
1384    /// If the request succeeds, the server accepts the full batch (count is `kvs.len()`).
1385    pub(crate) async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
1386        if self.key_prefix.is_none() {
1387            return self.put_physical(kvs).await;
1388        }
1389        let mut keys = Vec::with_capacity(kvs.len());
1390        for (key, _) in kvs {
1391            keys.push(self.encode_store_key(key)?);
1392        }
1393        let prefixed: Vec<(&Key, &[u8])> = keys
1394            .iter()
1395            .zip(kvs.iter())
1396            .map(|(key, (_, value))| (key, *value))
1397            .collect();
1398        self.put_physical(&prefixed).await
1399    }
1400
1401    async fn put_physical(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
1402        let mut proto_kvs = Vec::with_capacity(kvs.len());
1403        for (key, value) in kvs {
1404            if !is_valid_key_size(key.len()) {
1405                return Err(ClientError::WireFormat(format!(
1406                    "key length {} is outside valid store key range ({}..={})",
1407                    key.len(),
1408                    keys::MIN_KEY_LEN,
1409                    MAX_KEY_LEN
1410                )));
1411            }
1412            proto_kvs.push(exoware_proto::common::KvEntry {
1413                key: key.to_vec(),
1414                value: Bytes::copy_from_slice(value),
1415                ..Default::default()
1416            });
1417        }
1418
1419        let config =
1420            store_connect_client_config(self.ingest_uri.clone(), self.connect_request_compression);
1421        let client = IngestServiceClient::new(self.connect_http.clone(), config);
1422        let response = client
1423            .put(ProtoPutRequest {
1424                kvs: proto_kvs,
1425                ..Default::default()
1426            })
1427            .await
1428            .map_err(client_error_from_connect)?;
1429        Ok(response.into_owned().sequence_number)
1430    }
1431
1432    pub(crate) async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
1433        self.get_internal(key, None).await
1434    }
1435
1436    pub(crate) async fn get_with_min_sequence_number(
1437        &self,
1438        key: &Key,
1439        min_sequence_number: u64,
1440    ) -> Result<Option<Bytes>, ClientError> {
1441        self.get_internal(key, Some(min_sequence_number)).await
1442    }
1443
1444    async fn get_internal(
1445        &self,
1446        key: &Key,
1447        min_sequence_number: Option<u64>,
1448    ) -> Result<Option<Bytes>, ClientError> {
1449        let (response, _detail) = self
1450            .send_get(key, self.normalize_min_sequence_number(min_sequence_number))
1451            .await?;
1452        Ok(response.value)
1453    }
1454
1455    pub(crate) async fn get_many(
1456        &self,
1457        keys: &[&Key],
1458        batch_size: u32,
1459    ) -> Result<GetManyStream, ClientError> {
1460        self.get_many_internal(keys, batch_size, None, None).await
1461    }
1462
1463    pub(crate) async fn get_many_with_min_sequence_number(
1464        &self,
1465        keys: &[&Key],
1466        batch_size: u32,
1467        min_sequence_number: u64,
1468    ) -> Result<GetManyStream, ClientError> {
1469        self.get_many_internal(keys, batch_size, Some(min_sequence_number), None)
1470            .await
1471    }
1472
1473    async fn get_many_internal(
1474        &self,
1475        keys: &[&Key],
1476        batch_size: u32,
1477        min_sequence_number: Option<u64>,
1478        observed_sequence: Option<Arc<AtomicU64>>,
1479    ) -> Result<GetManyStream, ClientError> {
1480        for key in keys {
1481            if !is_valid_key_size(key.len()) {
1482                return Err(ClientError::WireFormat(format!(
1483                    "key length {} is outside valid store key range ({}..={})",
1484                    key.len(),
1485                    keys::MIN_KEY_LEN,
1486                    MAX_KEY_LEN
1487                )));
1488            }
1489        }
1490
1491        let config =
1492            store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1493        let client = QueryServiceClient::new(self.connect_http.clone(), config);
1494        let proto_keys: Vec<Vec<u8>> = keys
1495            .iter()
1496            .map(|k| match self.key_prefix {
1497                Some(_) => self.encode_store_key(k).map(|key| key.to_vec()),
1498                None => Ok(k.to_vec()),
1499            })
1500            .collect::<Result<Vec<_>, _>>()?;
1501        let effective_min = self.normalize_min_sequence_number(min_sequence_number);
1502        let max_attempts = self.retry_config.max_attempts.max(1);
1503        let mut attempt = 1usize;
1504        loop {
1505            match client
1506                .get_many(ProtoGetManyRequest {
1507                    keys: proto_keys.clone(),
1508                    min_sequence_number: effective_min,
1509                    batch_size,
1510                    ..Default::default()
1511                })
1512                .await
1513            {
1514                Ok(stream) => {
1515                    let mut gms = GetManyStream::from_connect_stream(
1516                        stream,
1517                        observed_sequence.clone(),
1518                        self.key_prefix,
1519                    );
1520                    if let Err(err) = gms.prefetch_first_frame().await {
1521                        if attempt < max_attempts && is_retryable_error(&err) {
1522                            let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1523                            tokio::time::sleep(delay).await;
1524                            attempt += 1;
1525                            continue;
1526                        }
1527                        return Err(client_error_from_connect(err));
1528                    }
1529                    return Ok(gms);
1530                }
1531                Err(err) => {
1532                    if attempt < max_attempts && is_retryable_error(&err) {
1533                        let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1534                        tokio::time::sleep(delay).await;
1535                        attempt += 1;
1536                        continue;
1537                    }
1538                    return Err(client_error_from_connect(err));
1539                }
1540            }
1541        }
1542    }
1543
1544    /// Key range is inclusive: `start <= key <= end` when `end` is non-empty; empty `end` is
1545    /// unbounded above (matches `store.query.v1.RangeRequest`).
1546    pub(crate) async fn range(
1547        &self,
1548        start: &Key,
1549        end: &Key,
1550        limit: usize,
1551    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1552        self.range_internal(start, end, limit, RangeMode::Forward, None)
1553            .await
1554    }
1555
1556    /// See [`StoreClient::range`] for `end` semantics.
1557    pub(crate) async fn range_with_mode(
1558        &self,
1559        start: &Key,
1560        end: &Key,
1561        limit: usize,
1562        mode: RangeMode,
1563    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1564        self.range_internal(start, end, limit, mode, None).await
1565    }
1566
1567    pub(crate) async fn range_with_min_sequence_number(
1568        &self,
1569        start: &Key,
1570        end: &Key,
1571        limit: usize,
1572        min_sequence_number: u64,
1573    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1574        self.range_internal(
1575            start,
1576            end,
1577            limit,
1578            RangeMode::Forward,
1579            Some(min_sequence_number),
1580        )
1581        .await
1582    }
1583
1584    pub(crate) async fn range_with_mode_and_min_sequence_number(
1585        &self,
1586        start: &Key,
1587        end: &Key,
1588        limit: usize,
1589        mode: RangeMode,
1590        min_sequence_number: u64,
1591    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1592        self.range_internal(start, end, limit, mode, Some(min_sequence_number))
1593            .await
1594    }
1595
1596    pub(crate) async fn range_stream(
1597        &self,
1598        start: &Key,
1599        end: &Key,
1600        limit: usize,
1601        batch_size: usize,
1602    ) -> Result<RangeStream, ClientError> {
1603        self.range_stream_internal(
1604            start,
1605            end,
1606            limit,
1607            batch_size,
1608            RangeMode::Forward,
1609            RangeStreamReadOptions::default(),
1610        )
1611        .await
1612    }
1613
1614    pub(crate) async fn range_stream_with_mode(
1615        &self,
1616        start: &Key,
1617        end: &Key,
1618        limit: usize,
1619        batch_size: usize,
1620        mode: RangeMode,
1621    ) -> Result<RangeStream, ClientError> {
1622        self.range_stream_internal(start, end, limit, batch_size, mode, Default::default())
1623            .await
1624    }
1625
1626    pub(crate) async fn range_stream_with_min_sequence_number(
1627        &self,
1628        start: &Key,
1629        end: &Key,
1630        limit: usize,
1631        batch_size: usize,
1632        min_sequence_number: u64,
1633    ) -> Result<RangeStream, ClientError> {
1634        self.range_stream_internal(
1635            start,
1636            end,
1637            limit,
1638            batch_size,
1639            RangeMode::Forward,
1640            RangeStreamReadOptions {
1641                min_sequence_number: Some(min_sequence_number),
1642                observed_sequence: None,
1643            },
1644        )
1645        .await
1646    }
1647
1648    pub(crate) async fn range_stream_with_mode_and_min_sequence_number(
1649        &self,
1650        start: &Key,
1651        end: &Key,
1652        limit: usize,
1653        batch_size: usize,
1654        mode: RangeMode,
1655        min_sequence_number: u64,
1656    ) -> Result<RangeStream, ClientError> {
1657        self.range_stream_internal(
1658            start,
1659            end,
1660            limit,
1661            batch_size,
1662            mode,
1663            RangeStreamReadOptions {
1664                min_sequence_number: Some(min_sequence_number),
1665                observed_sequence: None,
1666            },
1667        )
1668        .await
1669    }
1670
1671    pub(crate) async fn range_reduce(
1672        &self,
1673        start: &Key,
1674        end: &Key,
1675        request: &DomainRangeReduceRequest,
1676    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
1677        let (response, _) = self
1678            .range_reduce_response_internal(start, end, request, None)
1679            .await?;
1680        let decoded = proto_to_domain_reduce_response(response).map_err(ClientError::WireFormat)?;
1681        if !decoded.groups.is_empty() {
1682            return Err(ClientError::WireFormat(
1683                "grouped range reduction response returned for scalar request".to_string(),
1684            ));
1685        }
1686        Ok(decoded
1687            .results
1688            .iter()
1689            .map(|result| result.value.clone())
1690            .collect())
1691    }
1692
1693    pub(crate) async fn range_reduce_with_min_sequence_number(
1694        &self,
1695        start: &Key,
1696        end: &Key,
1697        request: &DomainRangeReduceRequest,
1698        min_sequence_number: u64,
1699    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
1700        let (response, _) = self
1701            .range_reduce_response_internal(start, end, request, Some(min_sequence_number))
1702            .await?;
1703        let decoded = proto_to_domain_reduce_response(response).map_err(ClientError::WireFormat)?;
1704        if !decoded.groups.is_empty() {
1705            return Err(ClientError::WireFormat(
1706                "grouped range reduction response returned for scalar request".to_string(),
1707            ));
1708        }
1709        Ok(decoded
1710            .results
1711            .iter()
1712            .map(|result| result.value.clone())
1713            .collect())
1714    }
1715
1716    pub(crate) async fn range_reduce_response(
1717        &self,
1718        start: &Key,
1719        end: &Key,
1720        request: &DomainRangeReduceRequest,
1721    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
1722        let (body, _) = self
1723            .range_reduce_response_internal(start, end, request, None)
1724            .await?;
1725        Ok(body)
1726    }
1727
1728    pub(crate) async fn range_reduce_response_with_min_sequence_number(
1729        &self,
1730        start: &Key,
1731        end: &Key,
1732        request: &DomainRangeReduceRequest,
1733        min_sequence_number: u64,
1734    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
1735        let (body, _) = self
1736            .range_reduce_response_internal(start, end, request, Some(min_sequence_number))
1737            .await?;
1738        Ok(body)
1739    }
1740
1741    pub(crate) async fn prune(
1742        &self,
1743        policies: &[crate::prune_policy::PrunePolicy],
1744    ) -> Result<(), ClientError> {
1745        let config =
1746            store_connect_client_config(self.compact_uri.clone(), self.connect_request_compression);
1747        let client = CompactServiceClient::new(self.connect_http.clone(), config);
1748        let policies = self.prefix_prune_policies(policies)?;
1749        client
1750            .prune(ProtoPruneRequest {
1751                policies: exoware_proto::prune_policies_to_proto(&policies),
1752                ..Default::default()
1753            })
1754            .await
1755            .map_err(client_error_from_connect)?;
1756        Ok(())
1757    }
1758
1759    pub async fn health(&self) -> Result<bool, ClientError> {
1760        let resp = self
1761            .http
1762            .get(format!("{}/health", self.health_url))
1763            .send()
1764            .await?;
1765        Ok(resp.status().is_success())
1766    }
1767
1768    pub async fn ready(&self) -> Result<bool, ClientError> {
1769        let resp = self
1770            .http
1771            .get(format!("{}/ready", self.health_url))
1772            .send()
1773            .await?;
1774        Ok(resp.status().is_success())
1775    }
1776
1777    fn normalize_min_sequence_number(&self, requested_sequence: Option<u64>) -> Option<u64> {
1778        requested_sequence.filter(|sequence| *sequence > 0)
1779    }
1780
1781    async fn send_get(
1782        &self,
1783        key: &Key,
1784        min_sequence_number: Option<u64>,
1785    ) -> Result<
1786        (
1787            exoware_proto::query::GetResponse,
1788            Option<proto_query::Detail>,
1789        ),
1790        ClientError,
1791    > {
1792        if !is_valid_key_size(key.len()) {
1793            return Err(ClientError::WireFormat(format!(
1794                "key length {} is outside valid store key range ({}..={})",
1795                key.len(),
1796                keys::MIN_KEY_LEN,
1797                MAX_KEY_LEN
1798            )));
1799        }
1800        let key = self.encode_store_key(key)?;
1801
1802        let config =
1803            store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1804        let client = QueryServiceClient::new(self.connect_http.clone(), config);
1805        let response = self
1806            .send_with_retry(|| async {
1807                client
1808                    .get(ProtoGetRequest {
1809                        key: key.clone().into(),
1810                        min_sequence_number,
1811                        ..Default::default()
1812                    })
1813                    .await
1814            })
1815            .await?;
1816        let owned = response.into_owned();
1817        let detail = owned.detail.as_option().cloned();
1818        Ok((owned, detail))
1819    }
1820
1821    #[cfg(test)]
1822    pub async fn send_get_for_tests(
1823        &self,
1824        key: &Key,
1825        min_sequence_number: Option<u64>,
1826    ) -> Result<
1827        (
1828            exoware_proto::query::GetResponse,
1829            Option<proto_query::Detail>,
1830        ),
1831        ClientError,
1832    > {
1833        self.send_get(key, min_sequence_number).await
1834    }
1835
1836    async fn range_internal(
1837        &self,
1838        start: &Key,
1839        end: &Key,
1840        limit: usize,
1841        mode: RangeMode,
1842        min_sequence_number: Option<u64>,
1843    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
1844        let stream = self
1845            .range_stream_internal(
1846                start,
1847                end,
1848                limit,
1849                limit.max(1),
1850                mode,
1851                RangeStreamReadOptions {
1852                    min_sequence_number,
1853                    observed_sequence: None,
1854                },
1855            )
1856            .await?;
1857        stream.collect().await
1858    }
1859
1860    async fn range_stream_internal(
1861        &self,
1862        start: &Key,
1863        end: &Key,
1864        limit: usize,
1865        batch_size: usize,
1866        mode: RangeMode,
1867        options: RangeStreamReadOptions,
1868    ) -> Result<RangeStream, ClientError> {
1869        if !is_valid_key_size(start.len()) || !is_valid_key_size(end.len()) {
1870            return Err(ClientError::WireFormat(
1871                "range start/end key length is outside valid store key range".to_string(),
1872            ));
1873        }
1874        if batch_size == 0 {
1875            return Err(ClientError::WireFormat(
1876                "batch_size must be positive".to_string(),
1877            ));
1878        }
1879        let (start, end) = self.encode_store_range(start, end)?;
1880
1881        let config =
1882            store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1883        let client = QueryServiceClient::new(self.connect_http.clone(), config);
1884        let min_sequence_number = self.normalize_min_sequence_number(options.min_sequence_number);
1885        let max_attempts = self.retry_config.max_attempts.max(1);
1886        let mut attempt = 1usize;
1887        loop {
1888            // Server-streaming RPCs cannot transparently recover mid-stream failures,
1889            // but retrying a transient error while opening the stream or before the
1890            // first frame arrives is still safe. Treat both phases as a single
1891            // attempt budget so range opens do not multiply retries quadratically.
1892            let response = match client
1893                .range(ProtoRangeRequest {
1894                    start: start.clone().into(),
1895                    end: end.clone().into(),
1896                    limit: Some(u32::try_from(limit).unwrap_or(u32::MAX)),
1897                    batch_size: u32::try_from(batch_size).unwrap_or(u32::MAX),
1898                    mode: mode.to_proto().into(),
1899                    min_sequence_number,
1900                    ..Default::default()
1901                })
1902                .await
1903            {
1904                Ok(response) => response,
1905                Err(err) => {
1906                    if attempt < max_attempts && is_retryable_error(&err) {
1907                        let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1908                        tracing::debug!(
1909                            attempt,
1910                            max_attempts,
1911                            code = err.code.as_str(),
1912                            delay_ms = delay.as_millis() as u64,
1913                            "store client retrying transient range-open error",
1914                        );
1915                        tokio::time::sleep(delay).await;
1916                        attempt += 1;
1917                        continue;
1918                    }
1919                    return Err(client_error_from_connect(err));
1920                }
1921            };
1922
1923            let mut stream = RangeStream::from_connect_stream(
1924                response,
1925                options.observed_sequence.clone(),
1926                self.key_prefix,
1927            );
1928            if let Err(err) = stream.prefetch_first_frame().await {
1929                if attempt < max_attempts && is_retryable_error(&err) {
1930                    let delay = retry_delay_for_error(&err, attempt, self.retry_config);
1931                    tracing::debug!(
1932                        attempt,
1933                        max_attempts,
1934                        code = err.code.as_str(),
1935                        delay_ms = delay.as_millis() as u64,
1936                        "store client retrying transient stream-open error",
1937                    );
1938                    tokio::time::sleep(delay).await;
1939                    attempt += 1;
1940                    continue;
1941                }
1942                return Err(client_error_from_connect(err));
1943            }
1944            return Ok(stream);
1945        }
1946    }
1947
1948    async fn range_reduce_response_internal(
1949        &self,
1950        start: &Key,
1951        end: &Key,
1952        request: &DomainRangeReduceRequest,
1953        min_sequence_number: Option<u64>,
1954    ) -> Result<
1955        (
1956            exoware_proto::query::ReduceResponse,
1957            Option<proto_query::Detail>,
1958        ),
1959        ClientError,
1960    > {
1961        let config =
1962            store_connect_client_config(self.query_uri.clone(), self.connect_request_compression);
1963        let client = QueryServiceClient::new(self.connect_http.clone(), config);
1964        let (start, end) = self.encode_store_range(start, end)?;
1965        let request = self.prefix_reduce_request(request)?;
1966        let proto_params = proto_to_proto_reduce_params(request);
1967        let min_sequence_number = self.normalize_min_sequence_number(min_sequence_number);
1968        let response = self
1969            .send_with_retry(|| async {
1970                client
1971                    .reduce(ProtoWireReduceRequest {
1972                        start: start.clone().into(),
1973                        end: end.clone().into(),
1974                        params: Some(proto_params.clone()).into(),
1975                        min_sequence_number,
1976                        ..Default::default()
1977                    })
1978                    .await
1979            })
1980            .await?;
1981        let owned = response.into_owned();
1982        let detail = owned.detail.as_option().cloned();
1983        Ok((owned, detail))
1984    }
1985
1986    fn prefix_prune_policies(
1987        &self,
1988        policies: &[crate::prune_policy::PrunePolicy],
1989    ) -> Result<Vec<crate::prune_policy::PrunePolicy>, ClientError> {
1990        let Some(prefix) = self.key_prefix else {
1991            return Ok(policies.to_vec());
1992        };
1993        policies
1994            .iter()
1995            .map(|policy| {
1996                use crate::prune_policy::{PolicyScope, PrunePolicy};
1997                let scope = match &policy.scope {
1998                    PolicyScope::Keys(scope) => {
1999                        let mut scope = scope.clone();
2000                        scope.match_key = prefix.prefix_match_key(&scope.match_key)?;
2001                        PolicyScope::Keys(scope)
2002                    }
2003                    PolicyScope::Sequence => PolicyScope::Sequence,
2004                };
2005                Ok(PrunePolicy {
2006                    scope,
2007                    retain: policy.retain.clone(),
2008                })
2009            })
2010            .collect::<Result<Vec<_>, StoreKeyPrefixError>>()
2011            .map_err(ClientError::from)
2012    }
2013
2014    fn prefix_stream_filter(
2015        &self,
2016        filter: crate::stream_filter::StreamFilter,
2017    ) -> Result<crate::stream_filter::StreamFilter, ClientError> {
2018        let Some(prefix) = self.key_prefix else {
2019            return Ok(filter);
2020        };
2021        let match_keys = filter
2022            .match_keys
2023            .iter()
2024            .map(|mk| prefix.prefix_stream_match_key(mk))
2025            .collect::<Result<Vec<_>, _>>()?;
2026        Ok(crate::stream_filter::StreamFilter {
2027            match_keys,
2028            value_filters: filter.value_filters,
2029        })
2030    }
2031
2032    fn prefix_reduce_request(
2033        &self,
2034        request: &DomainRangeReduceRequest,
2035    ) -> Result<DomainRangeReduceRequest, ClientError> {
2036        let Some(prefix) = self.key_prefix else {
2037            return Ok(request.clone());
2038        };
2039        let mut request = request.clone();
2040        shift_reduce_request_key_offsets(prefix.reserved_bits(), &mut request)?;
2041        Ok(request)
2042    }
2043
2044    async fn send_with_retry<F, Fut, T>(&self, mut make_request: F) -> Result<T, ClientError>
2045    where
2046        F: FnMut() -> Fut,
2047        Fut: std::future::Future<Output = Result<T, ConnectError>>,
2048    {
2049        let max_attempts = self.retry_config.max_attempts.max(1);
2050        let mut attempt = 1usize;
2051        loop {
2052            match make_request().await {
2053                Ok(response) => return Ok(response),
2054                Err(err) => {
2055                    if attempt < max_attempts && is_retryable_error(&err) {
2056                        let delay = retry_delay_for_error(&err, attempt, self.retry_config);
2057                        tracing::debug!(
2058                            attempt,
2059                            max_attempts,
2060                            code = err.code.as_str(),
2061                            delay_ms = delay.as_millis() as u64,
2062                            "store client retrying transient RPC error",
2063                        );
2064                        tokio::time::sleep(delay).await;
2065                        attempt += 1;
2066                        continue;
2067                    }
2068                    return Err(client_error_from_connect(err));
2069                }
2070            }
2071        }
2072    }
2073}
2074
2075fn shift_reduce_request_key_offsets(
2076    prefix_bits: u8,
2077    request: &mut DomainRangeReduceRequest,
2078) -> Result<(), StoreKeyPrefixError> {
2079    for reducer in &mut request.reducers {
2080        if let Some(expr) = &mut reducer.expr {
2081            shift_expr_key_offsets(prefix_bits, expr)?;
2082        }
2083    }
2084    for expr in &mut request.group_by {
2085        shift_expr_key_offsets(prefix_bits, expr)?;
2086    }
2087    if let Some(filter) = &mut request.filter {
2088        for check in &mut filter.checks {
2089            shift_field_ref_key_offset(prefix_bits, &mut check.field)?;
2090        }
2091    }
2092    Ok(())
2093}
2094
2095fn shift_expr_key_offsets(prefix_bits: u8, expr: &mut KvExpr) -> Result<(), StoreKeyPrefixError> {
2096    match expr {
2097        KvExpr::Field(field) => shift_field_ref_key_offset(prefix_bits, field),
2098        KvExpr::Literal(_) => Ok(()),
2099        KvExpr::Add(left, right)
2100        | KvExpr::Sub(left, right)
2101        | KvExpr::Mul(left, right)
2102        | KvExpr::Div(left, right) => {
2103            shift_expr_key_offsets(prefix_bits, left)?;
2104            shift_expr_key_offsets(prefix_bits, right)
2105        }
2106        KvExpr::Lower(inner) | KvExpr::DateTruncDay(inner) => {
2107            shift_expr_key_offsets(prefix_bits, inner)
2108        }
2109    }
2110}
2111
2112fn shift_field_ref_key_offset(
2113    prefix_bits: u8,
2114    field: &mut KvFieldRef,
2115) -> Result<(), StoreKeyPrefixError> {
2116    match field {
2117        KvFieldRef::Key { bit_offset, .. } | KvFieldRef::ZOrderKey { bit_offset, .. } => {
2118            *bit_offset = bit_offset.checked_add(u16::from(prefix_bits)).ok_or(
2119                StoreKeyPrefixError::BitOffsetOverflow {
2120                    offset: *bit_offset,
2121                    prefix_bits,
2122                },
2123            )?;
2124            Ok(())
2125        }
2126        KvFieldRef::Value { .. } => Ok(()),
2127    }
2128}
2129
2130// --- Service-grouped accessors ---------------------------------------------
2131
2132#[derive(Clone, Copy, Debug)]
2133pub struct Ingest<'a> {
2134    c: &'a StoreClient,
2135}
2136
2137#[derive(Clone, Copy, Debug)]
2138pub struct Query<'a> {
2139    c: &'a StoreClient,
2140}
2141
2142#[derive(Clone, Copy, Debug)]
2143pub struct Compact<'a> {
2144    c: &'a StoreClient,
2145}
2146
2147#[derive(Clone, Copy, Debug)]
2148pub struct Stream<'a> {
2149    c: &'a StoreClient,
2150}
2151
2152impl<'a> Ingest<'a> {
2153    pub async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
2154        self.c.put(kvs).await
2155    }
2156
2157    /// Submit a [`StoreWriteBatch`] that has already been encoded into the
2158    /// physical Store keyspace.
2159    pub async fn put_prepared(&self, batch: &StoreWriteBatch) -> Result<u64, ClientError> {
2160        batch.commit(self.c).await
2161    }
2162}
2163
2164impl<'a> Query<'a> {
2165    pub async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
2166        self.c.get(key).await
2167    }
2168
2169    pub async fn get_with_min_sequence_number(
2170        &self,
2171        key: &Key,
2172        min_sequence_number: u64,
2173    ) -> Result<Option<Bytes>, ClientError> {
2174        self.c
2175            .get_with_min_sequence_number(key, min_sequence_number)
2176            .await
2177    }
2178
2179    pub async fn get_many(
2180        &self,
2181        keys: &[&Key],
2182        batch_size: u32,
2183    ) -> Result<GetManyStream, ClientError> {
2184        self.c.get_many(keys, batch_size).await
2185    }
2186
2187    pub async fn get_many_with_min_sequence_number(
2188        &self,
2189        keys: &[&Key],
2190        batch_size: u32,
2191        min_sequence_number: u64,
2192    ) -> Result<GetManyStream, ClientError> {
2193        self.c
2194            .get_many_with_min_sequence_number(keys, batch_size, min_sequence_number)
2195            .await
2196    }
2197
2198    /// Collect a `Range` into a `Vec`. Use `range_stream` for large scans.
2199    pub async fn range(
2200        &self,
2201        start: &Key,
2202        end: &Key,
2203        limit: usize,
2204    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2205        self.c.range(start, end, limit).await
2206    }
2207
2208    pub async fn range_with_mode(
2209        &self,
2210        start: &Key,
2211        end: &Key,
2212        limit: usize,
2213        mode: RangeMode,
2214    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2215        self.c.range_with_mode(start, end, limit, mode).await
2216    }
2217
2218    pub async fn range_with_min_sequence_number(
2219        &self,
2220        start: &Key,
2221        end: &Key,
2222        limit: usize,
2223        min_sequence_number: u64,
2224    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2225        self.c
2226            .range_with_min_sequence_number(start, end, limit, min_sequence_number)
2227            .await
2228    }
2229
2230    pub async fn range_with_mode_and_min_sequence_number(
2231        &self,
2232        start: &Key,
2233        end: &Key,
2234        limit: usize,
2235        mode: RangeMode,
2236        min_sequence_number: u64,
2237    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2238        self.c
2239            .range_with_mode_and_min_sequence_number(start, end, limit, mode, min_sequence_number)
2240            .await
2241    }
2242
2243    pub async fn range_stream(
2244        &self,
2245        start: &Key,
2246        end: &Key,
2247        limit: usize,
2248        batch_size: usize,
2249    ) -> Result<RangeStream, ClientError> {
2250        self.c.range_stream(start, end, limit, batch_size).await
2251    }
2252
2253    pub async fn range_stream_with_mode(
2254        &self,
2255        start: &Key,
2256        end: &Key,
2257        limit: usize,
2258        batch_size: usize,
2259        mode: RangeMode,
2260    ) -> Result<RangeStream, ClientError> {
2261        self.c
2262            .range_stream_with_mode(start, end, limit, batch_size, mode)
2263            .await
2264    }
2265
2266    pub async fn range_stream_with_min_sequence_number(
2267        &self,
2268        start: &Key,
2269        end: &Key,
2270        limit: usize,
2271        batch_size: usize,
2272        min_sequence_number: u64,
2273    ) -> Result<RangeStream, ClientError> {
2274        self.c
2275            .range_stream_with_min_sequence_number(
2276                start,
2277                end,
2278                limit,
2279                batch_size,
2280                min_sequence_number,
2281            )
2282            .await
2283    }
2284
2285    pub async fn range_stream_with_mode_and_min_sequence_number(
2286        &self,
2287        start: &Key,
2288        end: &Key,
2289        limit: usize,
2290        batch_size: usize,
2291        mode: RangeMode,
2292        min_sequence_number: u64,
2293    ) -> Result<RangeStream, ClientError> {
2294        self.c
2295            .range_stream_with_mode_and_min_sequence_number(
2296                start,
2297                end,
2298                limit,
2299                batch_size,
2300                mode,
2301                min_sequence_number,
2302            )
2303            .await
2304    }
2305
2306    pub async fn range_reduce(
2307        &self,
2308        start: &Key,
2309        end: &Key,
2310        request: &DomainRangeReduceRequest,
2311    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2312        self.c.range_reduce(start, end, request).await
2313    }
2314
2315    pub async fn range_reduce_with_min_sequence_number(
2316        &self,
2317        start: &Key,
2318        end: &Key,
2319        request: &DomainRangeReduceRequest,
2320        min_sequence_number: u64,
2321    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2322        self.c
2323            .range_reduce_with_min_sequence_number(start, end, request, min_sequence_number)
2324            .await
2325    }
2326
2327    pub async fn range_reduce_response(
2328        &self,
2329        start: &Key,
2330        end: &Key,
2331        request: &DomainRangeReduceRequest,
2332    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2333        self.c.range_reduce_response(start, end, request).await
2334    }
2335
2336    pub async fn range_reduce_response_with_min_sequence_number(
2337        &self,
2338        start: &Key,
2339        end: &Key,
2340        request: &DomainRangeReduceRequest,
2341        min_sequence_number: u64,
2342    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2343        self.c
2344            .range_reduce_response_with_min_sequence_number(
2345                start,
2346                end,
2347                request,
2348                min_sequence_number,
2349            )
2350            .await
2351    }
2352}
2353
2354impl<'a> Compact<'a> {
2355    pub async fn prune(
2356        &self,
2357        policies: &[crate::prune_policy::PrunePolicy],
2358    ) -> Result<(), ClientError> {
2359        self.c.prune(policies).await
2360    }
2361}
2362
2363impl<'a> Stream<'a> {
2364    /// `store.stream.v1.Service.Subscribe` — see `StreamSubscription::next`
2365    /// for consuming delivered frames. `since_sequence_number = None` starts
2366    /// live from the next Put; `Some(N)` replays retained batches before
2367    /// transitioning to live. An evicted `since` returns a
2368    /// `ConnectError::out_of_range` carrying `ErrorInfo.reason = "BATCH_EVICTED"`.
2369    pub async fn subscribe(
2370        &self,
2371        filter: crate::stream_filter::StreamFilter,
2372        since_sequence_number: Option<u64>,
2373    ) -> Result<StreamSubscription, ClientError> {
2374        let logical_filter = self
2375            .c
2376            .key_prefix
2377            .is_some()
2378            .then(|| ClientStreamFilter::compile(&filter))
2379            .transpose()?;
2380        let filter = self.c.prefix_stream_filter(filter)?;
2381        crate::stream_filter::validate_filter(&filter)
2382            .map_err(|e| ClientError::WireFormat(e.to_string()))?;
2383        let match_keys = filter
2384            .match_keys
2385            .into_iter()
2386            .map(|mk| exoware_proto::store::common::v1::MatchKey {
2387                reserved_bits: u32::from(mk.reserved_bits),
2388                prefix: u32::from(mk.prefix),
2389                payload_regex: mk.payload_regex.0,
2390                ..Default::default()
2391            })
2392            .collect();
2393        let value_filters = filter
2394            .value_filters
2395            .into_iter()
2396            .map(|vf| {
2397                use crate::stream_filter::BytesFilter;
2398                use exoware_proto::store::common::v1::bytes_filter::Kind as ProtoKind;
2399                let kind = match vf {
2400                    BytesFilter::Exact(bytes) => ProtoKind::Exact(bytes),
2401                    BytesFilter::Prefix(bytes) => ProtoKind::Prefix(bytes),
2402                    BytesFilter::Regex(pattern) => ProtoKind::Regex(pattern),
2403                };
2404                exoware_proto::store::common::v1::BytesFilter {
2405                    kind: Some(kind),
2406                    ..Default::default()
2407                }
2408            })
2409            .collect();
2410        let request = exoware_proto::store::stream::v1::SubscribeRequest {
2411            match_keys,
2412            value_filters,
2413            since_sequence_number,
2414            ..Default::default()
2415        };
2416        let config = store_connect_client_config(
2417            self.c.stream_uri.clone(),
2418            self.c.connect_request_compression,
2419        );
2420        let client = exoware_proto::store::stream::v1::ServiceClient::new(
2421            self.c.connect_http.clone(),
2422            config,
2423        );
2424        let stream = client
2425            .subscribe(request)
2426            .await
2427            .map_err(client_error_from_connect)?;
2428        Ok(StreamSubscription {
2429            stream,
2430            key_prefix: self.c.key_prefix,
2431            logical_filter,
2432        })
2433    }
2434
2435    /// `store.stream.v1.Service.Get` — `Ok(None)` collapses the server's
2436    /// `BATCH_EVICTED` / `BATCH_NOT_FOUND` error details.
2437    pub async fn get(
2438        &self,
2439        sequence_number: u64,
2440    ) -> Result<Option<Vec<(Key, Bytes)>>, ClientError> {
2441        let config = store_connect_client_config(
2442            self.c.stream_uri.clone(),
2443            self.c.connect_request_compression,
2444        );
2445        let client = exoware_proto::store::stream::v1::ServiceClient::new(
2446            self.c.connect_http.clone(),
2447            config,
2448        );
2449        match client
2450            .get(exoware_proto::store::stream::v1::GetRequest {
2451                sequence_number,
2452                ..Default::default()
2453            })
2454            .await
2455        {
2456            Ok(resp) => {
2457                let owned = resp.into_owned();
2458                let mut entries = Vec::with_capacity(owned.entries.len());
2459                for entry in owned.entries {
2460                    let key = Bytes::from(entry.key);
2461                    match self.c.key_prefix {
2462                        Some(prefix) if !prefix.codec.matches(&key) => {}
2463                        Some(prefix) => entries.push((prefix.decode_key(&key)?, entry.value)),
2464                        None => entries.push((key, entry.value)),
2465                    }
2466                }
2467                Ok(Some(entries))
2468            }
2469            Err(err) => {
2470                if is_batch_missing_error(&err) {
2471                    Ok(None)
2472                } else {
2473                    Err(client_error_from_connect(err))
2474                }
2475            }
2476        }
2477    }
2478}
2479
2480impl SerializableReadSession {
2481    /// Fixed sequence floor for this session, if one has been established yet.
2482    ///
2483    /// Fresh sessions start with `None` unless created via
2484    /// [`StoreClient::create_session_with_sequence`]. A first streamed query
2485    /// read (`get_many`, `range_stream`) sets this once a detail-bearing chunk
2486    /// is consumed.
2487    pub fn fixed_sequence(&self) -> Option<u64> {
2488        self.state.fixed_sequence()
2489    }
2490
2491    pub async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
2492        let seeded_client = self.client.clone();
2493        let unseeded_client = self.client.clone();
2494        self.run_read(
2495            move |sequence| {
2496                let client = seeded_client.clone();
2497                async move { client.get_with_min_sequence_number(key, sequence).await }
2498            },
2499            move |observed_sequence| {
2500                let client = unseeded_client.clone();
2501                async move {
2502                    let (response, detail) = client.send_get(key, None).await?;
2503                    if let Some(detail) = detail {
2504                        observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2505                    }
2506                    Ok(response.value)
2507                }
2508            },
2509        )
2510        .await
2511    }
2512
2513    pub async fn get_many(
2514        &self,
2515        keys: &[&Key],
2516        batch_size: u32,
2517    ) -> Result<GetManyStream, ClientError> {
2518        let keys_owned: Vec<Key> = keys.iter().map(|k| Bytes::copy_from_slice(k)).collect();
2519        let seeded_client = self.client.clone();
2520        let unseeded_client = self.client.clone();
2521        let keys_seeded = keys_owned.clone();
2522        let keys_unseeded = keys_owned;
2523        self.run_read(
2524            move |sequence| {
2525                let client = seeded_client.clone();
2526                let keys = keys_seeded.clone();
2527                async move {
2528                    let refs: Vec<&Key> = keys.iter().collect();
2529                    client
2530                        .get_many_with_min_sequence_number(&refs, batch_size, sequence)
2531                        .await
2532                }
2533            },
2534            move |observed_sequence| {
2535                let client = unseeded_client.clone();
2536                let keys = keys_unseeded.clone();
2537                async move {
2538                    let refs: Vec<&Key> = keys.iter().collect();
2539                    client
2540                        .get_many_internal(&refs, batch_size, None, Some(observed_sequence))
2541                        .await
2542                }
2543            },
2544        )
2545        .await
2546    }
2547
2548    pub async fn range(
2549        &self,
2550        start: &Key,
2551        end: &Key,
2552        limit: usize,
2553    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2554        self.range_with_mode(start, end, limit, RangeMode::Forward)
2555            .await
2556    }
2557
2558    pub async fn range_with_mode(
2559        &self,
2560        start: &Key,
2561        end: &Key,
2562        limit: usize,
2563        mode: RangeMode,
2564    ) -> Result<Vec<(Key, Bytes)>, ClientError> {
2565        let seeded_client = self.client.clone();
2566        let unseeded_client = self.client.clone();
2567        self.run_read(
2568            move |sequence| {
2569                let client = seeded_client.clone();
2570                async move {
2571                    client
2572                        .range_with_mode_and_min_sequence_number(start, end, limit, mode, sequence)
2573                        .await
2574                }
2575            },
2576            move |observed_sequence| {
2577                let client = unseeded_client.clone();
2578                async move {
2579                    let stream = client
2580                        .range_stream_internal(
2581                            start,
2582                            end,
2583                            limit,
2584                            limit.max(1),
2585                            mode,
2586                            RangeStreamReadOptions {
2587                                min_sequence_number: None,
2588                                observed_sequence: Some(observed_sequence),
2589                            },
2590                        )
2591                        .await;
2592                    stream?.collect().await
2593                }
2594            },
2595        )
2596        .await
2597    }
2598
2599    pub async fn range_stream(
2600        &self,
2601        start: &Key,
2602        end: &Key,
2603        limit: usize,
2604        batch_size: usize,
2605    ) -> Result<RangeStream, ClientError> {
2606        self.range_stream_with_mode(start, end, limit, batch_size, RangeMode::Forward)
2607            .await
2608    }
2609
2610    pub async fn range_stream_with_mode(
2611        &self,
2612        start: &Key,
2613        end: &Key,
2614        limit: usize,
2615        batch_size: usize,
2616        mode: RangeMode,
2617    ) -> Result<RangeStream, ClientError> {
2618        let seeded_client = self.client.clone();
2619        let unseeded_client = self.client.clone();
2620        self.run_read(
2621            move |sequence| {
2622                let client = seeded_client.clone();
2623                async move {
2624                    client
2625                        .range_stream_with_mode_and_min_sequence_number(
2626                            start, end, limit, batch_size, mode, sequence,
2627                        )
2628                        .await
2629                }
2630            },
2631            move |observed_sequence| {
2632                let client = unseeded_client.clone();
2633                async move {
2634                    client
2635                        .range_stream_internal(
2636                            start,
2637                            end,
2638                            limit,
2639                            batch_size,
2640                            mode,
2641                            RangeStreamReadOptions {
2642                                min_sequence_number: None,
2643                                observed_sequence: Some(observed_sequence),
2644                            },
2645                        )
2646                        .await
2647                }
2648            },
2649        )
2650        .await
2651    }
2652
2653    pub async fn range_reduce(
2654        &self,
2655        start: &Key,
2656        end: &Key,
2657        request: &DomainRangeReduceRequest,
2658    ) -> Result<Vec<Option<KvReducedValue>>, ClientError> {
2659        let seeded_client = self.client.clone();
2660        let unseeded_client = self.client.clone();
2661        let request_seeded = request.clone();
2662        let request_unseeded = request.clone();
2663        self.run_read(
2664            move |sequence| {
2665                let client = seeded_client.clone();
2666                let request = request_seeded.clone();
2667                async move {
2668                    client
2669                        .range_reduce_with_min_sequence_number(start, end, &request, sequence)
2670                        .await
2671                }
2672            },
2673            move |observed_sequence| {
2674                let client = unseeded_client.clone();
2675                let request = request_unseeded.clone();
2676                async move {
2677                    let (response, detail) = client
2678                        .range_reduce_response_internal(start, end, &request, None)
2679                        .await?;
2680                    if let Some(detail) = detail {
2681                        observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2682                    }
2683                    let decoded = proto_to_domain_reduce_response(response)
2684                        .map_err(ClientError::WireFormat)?;
2685                    if !decoded.groups.is_empty() {
2686                        return Err(ClientError::WireFormat(
2687                            "grouped range reduction response returned for scalar request"
2688                                .to_string(),
2689                        ));
2690                    }
2691                    Ok(decoded
2692                        .results
2693                        .iter()
2694                        .map(|result| result.value.clone())
2695                        .collect())
2696                }
2697            },
2698        )
2699        .await
2700    }
2701
2702    pub async fn range_reduce_response(
2703        &self,
2704        start: &Key,
2705        end: &Key,
2706        request: &DomainRangeReduceRequest,
2707    ) -> Result<exoware_proto::query::ReduceResponse, ClientError> {
2708        let seeded_client = self.client.clone();
2709        let unseeded_client = self.client.clone();
2710        let request_seeded = request.clone();
2711        let request_unseeded = request.clone();
2712        self.run_read(
2713            move |sequence| {
2714                let client = seeded_client.clone();
2715                let request = request_seeded.clone();
2716                async move {
2717                    client
2718                        .range_reduce_response_with_min_sequence_number(
2719                            start, end, &request, sequence,
2720                        )
2721                        .await
2722                }
2723            },
2724            move |observed_sequence| {
2725                let client = unseeded_client.clone();
2726                let request = request_unseeded.clone();
2727                async move {
2728                    let (response, detail) = client
2729                        .range_reduce_response_internal(start, end, &request, None)
2730                        .await?;
2731                    if let Some(detail) = detail {
2732                        observed_sequence.fetch_max(detail.sequence_number, Ordering::SeqCst);
2733                    }
2734                    Ok(response)
2735                }
2736            },
2737        )
2738        .await
2739    }
2740
2741    async fn run_read<T, SeededCall, SeededFut, UnseededCall, UnseededFut>(
2742        &self,
2743        seeded_call: SeededCall,
2744        unseeded_call: UnseededCall,
2745    ) -> Result<T, ClientError>
2746    where
2747        SeededCall: Fn(u64) -> SeededFut,
2748        SeededFut: std::future::Future<Output = Result<T, ClientError>>,
2749        UnseededCall: Fn(Arc<AtomicU64>) -> UnseededFut,
2750        UnseededFut: std::future::Future<Output = Result<T, ClientError>>,
2751    {
2752        if let Some(sequence) = self.fixed_sequence() {
2753            return seeded_call(sequence).await;
2754        }
2755
2756        let gate = self.state.init_gate.lock().await;
2757
2758        if let Some(sequence) = self.fixed_sequence() {
2759            drop(gate);
2760            return seeded_call(sequence).await;
2761        }
2762
2763        let result = unseeded_call(self.state.sequence.clone()).await;
2764        drop(gate);
2765        result
2766    }
2767}
2768
2769fn client_error_from_connect(err: ConnectError) -> ClientError {
2770    ClientError::Rpc(Box::new(err))
2771}
2772
2773fn is_retryable_error(err: &ConnectError) -> bool {
2774    matches!(
2775        err.code,
2776        ErrorCode::Aborted
2777            | ErrorCode::ResourceExhausted
2778            | ErrorCode::Unavailable
2779            | ErrorCode::Unknown
2780            // Retrying `internal` is a trade-off: proxies and load balancers sometimes surface
2781            // transient faults this way; idempotent reads use a small attempt budget so we do not
2782            // spin forever. Prefer interpreting `google.rpc.RetryInfo` when present (see
2783            // `retry_delay_for_error`).
2784            | ErrorCode::Internal
2785    )
2786}
2787
2788fn retry_delay_for_error(
2789    err: &ConnectError,
2790    attempt: usize,
2791    retry_config: RetryConfig,
2792) -> Duration {
2793    if let Ok(decoded) = proto_decode_connect_error(err) {
2794        if let Some(retry_info) = decoded.retry_info {
2795            if let Some(delay) = retry_info.retry_delay.as_option() {
2796                let secs = u64::try_from(delay.seconds).unwrap_or(0);
2797                let nanos = u32::try_from(delay.nanos.max(0)).unwrap_or(0);
2798                let hinted = Duration::new(secs, nanos);
2799                if !hinted.is_zero() {
2800                    return hinted.min(retry_config.max_backoff);
2801                }
2802            }
2803        }
2804    }
2805    retry_backoff_delay(attempt, retry_config)
2806}
2807
2808fn retry_backoff_delay(attempt: usize, retry_config: RetryConfig) -> Duration {
2809    let exponent = (attempt.saturating_sub(1)).min(20) as u32;
2810    let factor = 1u128 << exponent;
2811    let base_ms = retry_config.initial_backoff.as_millis();
2812    let capped_ms = base_ms
2813        .saturating_mul(factor)
2814        .min(retry_config.max_backoff.as_millis());
2815    Duration::from_millis(capped_ms.min(u64::MAX as u128) as u64)
2816}
2817
2818#[cfg(test)]
2819mod tests {
2820    use super::*;
2821    use crate::kv_codec::{KvFieldKind, KvPredicate, KvPredicateCheck, KvPredicateConstraint};
2822    use exoware_proto::query::TraversalMode as ProtoTraversalMode;
2823
2824    #[test]
2825    fn hex_round_trip() {
2826        let data = vec![0x00, 0x42, 0xFF, 0xAB];
2827        let encoded = hex_encode(&data);
2828        assert_eq!(encoded, "0042ffab");
2829        let decoded = hex_decode(&encoded).unwrap();
2830        assert_eq!(decoded, data);
2831    }
2832
2833    #[test]
2834    fn client_creation() {
2835        let client = StoreClient::new("http://localhost:10000");
2836        assert_eq!(client.health_url, "http://localhost:10000");
2837        assert_eq!(client.ingest_uri.to_string(), "http://localhost:10000/");
2838        assert_eq!(client.query_uri.to_string(), "http://localhost:10000/");
2839        assert_eq!(client.stream_uri.to_string(), "http://localhost:10000/");
2840    }
2841
2842    #[test]
2843    fn builder_fails_until_all_urls_set() {
2844        assert!(matches!(
2845            StoreClient::builder().health_url("http://h").build(),
2846            Err(ClientBuildError::MissingIngestUrl)
2847        ));
2848        assert!(matches!(
2849            StoreClient::builder()
2850                .health_url("http://h")
2851                .ingest_url("http://i")
2852                .build(),
2853            Err(ClientBuildError::MissingQueryUrl)
2854        ));
2855        assert!(matches!(
2856            StoreClient::builder()
2857                .health_url("http://h")
2858                .ingest_url("http://i")
2859                .query_url("http://q")
2860                .build(),
2861            Err(ClientBuildError::MissingCompactUrl)
2862        ));
2863        assert!(matches!(
2864            StoreClient::builder()
2865                .health_url("http://h")
2866                .ingest_url("http://i")
2867                .query_url("http://q")
2868                .compact_url("http://c")
2869                .build(),
2870            Err(ClientBuildError::MissingStreamUrl)
2871        ));
2872    }
2873
2874    #[test]
2875    fn client_trims_trailing_slash() {
2876        let client = StoreClient::new("http://localhost:10000/");
2877        assert_eq!(client.health_url, "http://localhost:10000");
2878    }
2879
2880    #[test]
2881    fn create_session_starts_unseeded() {
2882        let client = StoreClient::new("http://localhost:10000/");
2883        let session = client.create_session();
2884        assert_eq!(session.fixed_sequence(), None);
2885    }
2886
2887    #[test]
2888    fn range_mode_maps_to_proto_traversal() {
2889        assert_eq!(
2890            RangeMode::Forward.to_proto(),
2891            ProtoTraversalMode::TRAVERSAL_MODE_FORWARD
2892        );
2893        assert_eq!(
2894            RangeMode::Reverse.to_proto(),
2895            ProtoTraversalMode::TRAVERSAL_MODE_REVERSE
2896        );
2897    }
2898
2899    #[test]
2900    fn retry_config_standard_defaults_match_expected() {
2901        let config = RetryConfig::standard();
2902        assert_eq!(config.max_attempts, 3);
2903        assert_eq!(config.initial_backoff, Duration::from_millis(100));
2904        assert_eq!(config.max_backoff, Duration::from_millis(2_000));
2905    }
2906
2907    #[test]
2908    fn retry_config_clamps_attempts_and_backoff_bounds() {
2909        let config = RetryConfig::standard()
2910            .with_max_attempts(0)
2911            .with_initial_backoff(Duration::from_millis(250))
2912            .with_max_backoff(Duration::from_millis(50))
2913            .sanitized();
2914        assert_eq!(config.max_attempts, 1);
2915        assert_eq!(config.initial_backoff, Duration::from_millis(250));
2916        assert_eq!(config.max_backoff, Duration::from_millis(250));
2917    }
2918
2919    #[test]
2920    fn retryable_codes_include_connect_transients() {
2921        assert!(is_retryable_error(&ConnectError::aborted("retry")));
2922        assert!(is_retryable_error(&ConnectError::resource_exhausted(
2923            "retry"
2924        )));
2925        assert!(is_retryable_error(&ConnectError::unavailable("retry")));
2926        assert!(is_retryable_error(&ConnectError::internal("retry")));
2927        assert!(!is_retryable_error(&ConnectError::invalid_argument(
2928            "no retry"
2929        )));
2930    }
2931
2932    #[test]
2933    fn retry_backoff_delay_is_exponential_and_capped() {
2934        let config = RetryConfig::standard()
2935            .with_initial_backoff(Duration::from_millis(100))
2936            .with_max_backoff(Duration::from_millis(250));
2937        assert_eq!(retry_backoff_delay(1, config), Duration::from_millis(100));
2938        assert_eq!(retry_backoff_delay(2, config), Duration::from_millis(200));
2939        assert_eq!(retry_backoff_delay(3, config), Duration::from_millis(250));
2940        assert_eq!(retry_backoff_delay(4, config), Duration::from_millis(250));
2941    }
2942
2943    #[test]
2944    fn create_session_with_sequence_pins_explicit_floor() {
2945        let client = StoreClient::new("http://localhost:10000/");
2946        let session = client.create_session_with_sequence(27);
2947        assert_eq!(session.fixed_sequence(), Some(27));
2948    }
2949
2950    #[test]
2951    fn store_key_prefix_round_trips_logical_keys() {
2952        let prefix = StoreKeyPrefix::new(4, 0xA).unwrap();
2953        let logical = Bytes::from_static(b"hello");
2954        let physical = prefix.encode_key(&logical).unwrap();
2955        assert!(prefix.codec.matches(&physical));
2956        assert_eq!(prefix.decode_key(&physical).unwrap(), logical);
2957    }
2958
2959    #[test]
2960    fn store_key_prefix_clamps_long_logical_range_upper_bound() {
2961        let prefix = StoreKeyPrefix::new(4, 0x2).unwrap();
2962        let logical_codec = KeyCodec::new(4, 0x7);
2963        let (logical_start, logical_end) = logical_codec.prefix_bounds();
2964        assert_eq!(logical_end.len(), MAX_KEY_LEN);
2965
2966        let (physical_start, physical_end) =
2967            prefix.encode_range(&logical_start, &logical_end).unwrap();
2968        assert!(prefix.codec.matches(&physical_start));
2969        assert!(prefix.codec.matches(&physical_end));
2970        assert_eq!(physical_end.len(), MAX_KEY_LEN);
2971        assert_eq!(prefix.decode_key(&physical_start).unwrap(), logical_start);
2972    }
2973
2974    #[test]
2975    fn store_key_prefix_rewrites_match_key_family() {
2976        let prefix = StoreKeyPrefix::new(3, 0b101).unwrap();
2977        let logical = crate::match_key::MatchKey {
2978            reserved_bits: 4,
2979            prefix: 0b0110,
2980            payload_regex: crate::kv_codec::Utf8::from("(?s).*"),
2981        };
2982        let physical = prefix.prefix_match_key(&logical).unwrap();
2983        assert_eq!(physical.reserved_bits, 7);
2984        assert_eq!(physical.prefix, 0b101_0110);
2985        assert_eq!(physical.payload_regex, logical.payload_regex);
2986    }
2987
2988    #[test]
2989    fn store_key_prefix_broadens_stream_match_key_payload_regex() {
2990        let prefix = StoreKeyPrefix::new(3, 0b101).unwrap();
2991        let logical = crate::match_key::MatchKey {
2992            reserved_bits: 4,
2993            prefix: 0b0110,
2994            payload_regex: crate::kv_codec::Utf8::from("(?s).*"),
2995        };
2996        let physical = prefix.prefix_stream_match_key(&logical).unwrap();
2997        assert_eq!(physical.reserved_bits, 7);
2998        assert_eq!(physical.prefix, 0b101_0110);
2999        assert_eq!(&*physical.payload_regex, "(?s-u).*");
3000    }
3001
3002    #[test]
3003    fn prefixed_reduce_request_shifts_key_field_offsets() {
3004        let client = StoreClient::builder()
3005            .url("http://localhost:10000")
3006            .key_prefix(StoreKeyPrefix::new(5, 0b10101).unwrap())
3007            .build()
3008            .unwrap();
3009        let request = DomainRangeReduceRequest {
3010            reducers: vec![crate::RangeReducerSpec {
3011                op: crate::RangeReduceOp::SumField,
3012                expr: Some(KvExpr::Field(KvFieldRef::Key {
3013                    bit_offset: 9,
3014                    kind: KvFieldKind::UInt64,
3015                })),
3016            }],
3017            group_by: vec![KvExpr::Field(KvFieldRef::ZOrderKey {
3018                bit_offset: 12,
3019                field_position: 0,
3020                field_widths: vec![8],
3021                kind: KvFieldKind::UInt64,
3022            })],
3023            filter: Some(KvPredicate {
3024                checks: vec![KvPredicateCheck {
3025                    field: KvFieldRef::Value {
3026                        index: 0,
3027                        kind: KvFieldKind::UInt64,
3028                        nullable: false,
3029                    },
3030                    constraint: KvPredicateConstraint::UInt64Range {
3031                        min: Some(1),
3032                        max: Some(9),
3033                    },
3034                }],
3035                contradiction: false,
3036            }),
3037        };
3038
3039        let shifted = client.prefix_reduce_request(&request).unwrap();
3040        let Some(KvExpr::Field(KvFieldRef::Key { bit_offset, .. })) =
3041            shifted.reducers[0].expr.as_ref()
3042        else {
3043            panic!("expected key field reducer");
3044        };
3045        assert_eq!(*bit_offset, 14);
3046        let KvExpr::Field(KvFieldRef::ZOrderKey { bit_offset, .. }) = &shifted.group_by[0] else {
3047            panic!("expected z-order group field");
3048        };
3049        assert_eq!(*bit_offset, 17);
3050    }
3051
3052    #[test]
3053    fn store_write_batch_uses_each_clients_prefix() {
3054        let base = StoreClient::new("http://localhost:10000");
3055        let a = base.with_key_prefix(StoreKeyPrefix::new(4, 1).unwrap());
3056        let b = base.with_key_prefix(StoreKeyPrefix::new(4, 2).unwrap());
3057        let key_a = Bytes::from_static(b"a");
3058        let key_b = Bytes::from_static(b"b");
3059
3060        let mut batch = StoreWriteBatch::new();
3061        batch.push(&a, &key_a, b"va").unwrap();
3062        batch.push(&b, &key_b, b"vb").unwrap();
3063
3064        assert_eq!(
3065            batch.entries[0].0,
3066            a.key_prefix().unwrap().encode_key(&key_a).unwrap()
3067        );
3068        assert_eq!(
3069            batch.entries[1].0,
3070            b.key_prefix().unwrap().encode_key(&key_b).unwrap()
3071        );
3072    }
3073
3074    fn hex_encode(data: &[u8]) -> String {
3075        hex::encode(data)
3076    }
3077
3078    fn hex_decode(s: &str) -> Option<Vec<u8>> {
3079        hex::decode(s).ok()
3080    }
3081}