Skip to main content

ringline_momento/
lib.rs

1//! ringline-native Momento cache client for use inside the ringline async runtime.
2//!
3//! This client uses the Momento protosocket protocol (length-delimited protobuf
4//! over TLS) for high-performance cache operations. It is **fully multiplexed**:
5//! multiple requests can be in-flight on a single connection, correlated by
6//! message ID.
7//!
8//! # Example
9//!
10//! ```no_run
11//! use ringline::ConnCtx;
12//! use ringline_momento::{Client, Credential};
13//!
14//! async fn example() -> Result<(), ringline_momento::Error> {
15//!     let credential = Credential::from_env()?;
16//!     let mut client = Client::connect(&credential).await?;
17//!
18//!     // Sequential convenience API
19//!     client.set("my-cache", b"key", b"value", 60_000).await?;
20//!     let value = client.get("my-cache", b"key").await?;
21//!
22//!     // Multiplexed fire/recv API
23//!     let id1 = client.fire_get("my-cache", b"key1", 0)?;
24//!     let id2 = client.fire_get("my-cache", b"key2", 0)?;
25//!     let op1 = client.recv().await?;
26//!     let op2 = client.recv().await?;
27//!
28//!     Ok(())
29//! }
30//! ```
31//!
32//! # Copy Semantics
33//!
34//! | Path | Copies | Mechanism |
35//! |------|--------|-----------|
36//! | **Recv (values)** | **0** | `with_bytes()` + `CacheResponse::decode_bytes()`. Values are `Bytes::slice()` references into the accumulator — zero allocation, O(1) refcount. |
37//! | **Send (requests)** | **1** | Single-pass `encode_into()` writes all protobuf nesting levels directly into one buffer. Then `send_nowait()` copies into the send pool. Namespace is O(1) clone when pre-set via `set_namespace()` / `ClientBuilder::namespace()`. |
38//!
39//! All Momento connections use TLS, which adds encryption copies on the send
40//! path regardless of the encoding strategy. `SendGuard` (zero-copy send)
41//! cannot help here since TLS must read plaintext and write ciphertext.
42
43pub mod credential;
44pub mod error;
45pub mod pool;
46pub mod proto;
47
48pub use credential::Credential;
49pub use error::Error;
50pub use pool::{Pool, PoolConfig};
51
52use std::collections::HashMap;
53use std::net::SocketAddr;
54use std::time::Instant;
55
56use bytes::Bytes;
57use ringline::{ConnCtx, ParseResult};
58
59use crate::proto::{
60    CacheCommand, CacheResponse, CacheResponseResult, DecodedMessage, StatusCode, UnaryCommand,
61    decode_length_delimited_message_bytes,
62};
63
64/// Maximum number of consecutive responses with unmatched `message_id`s
65/// the recv / authenticate loops will skip before poisoning the
66/// connection. The skip behavior exists so a single stale or duplicate
67/// response doesn't drop every in-flight op, but unbounded skipping lets
68/// a misbehaving server starve our in-flight ops indefinitely by
69/// streaming junk. 256 unmatched messages in a row is far past any
70/// reasonable retransmit / race window.
71pub const MAX_RECV_SKIPS: usize = 256;
72
73// ── Request tracking ────────────────────────────────────────────────────
74
75/// Identifies an in-flight request.
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
77pub struct RequestId(u64);
78
79impl RequestId {
80    /// Get the raw ID value.
81    pub fn value(&self) -> u64 {
82        self.0
83    }
84}
85
86/// The type of cache command that completed.
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88#[non_exhaustive]
89pub enum CommandType {
90    Get,
91    Set,
92    Delete,
93}
94
95/// A completed cache operation.
96#[derive(Debug)]
97#[non_exhaustive]
98pub enum CompletedOp {
99    /// Get operation completed.
100    Get {
101        id: RequestId,
102        key: Bytes,
103        result: Result<Option<Bytes>, Error>,
104        user_data: u64,
105        latency_ns: u64,
106    },
107    /// Set operation completed.
108    Set {
109        id: RequestId,
110        key: Bytes,
111        result: Result<(), Error>,
112        user_data: u64,
113        latency_ns: u64,
114    },
115    /// Delete operation completed.
116    Delete {
117        id: RequestId,
118        key: Bytes,
119        result: Result<(), Error>,
120        user_data: u64,
121        latency_ns: u64,
122    },
123}
124
125impl CompletedOp {
126    fn set_latency(self, latency_ns: u64) -> Self {
127        match self {
128            Self::Get {
129                id,
130                key,
131                result,
132                user_data,
133                ..
134            } => Self::Get {
135                id,
136                key,
137                result,
138                user_data,
139                latency_ns,
140            },
141            Self::Set {
142                id,
143                key,
144                result,
145                user_data,
146                ..
147            } => Self::Set {
148                id,
149                key,
150                result,
151                user_data,
152                latency_ns,
153            },
154            Self::Delete {
155                id,
156                key,
157                result,
158                user_data,
159                ..
160            } => Self::Delete {
161                id,
162                key,
163                result,
164                user_data,
165                latency_ns,
166            },
167        }
168    }
169}
170
171/// Result metadata for a completed command, passed to the `on_result` callback.
172#[derive(Debug, Clone)]
173pub struct CommandResult {
174    /// The command type.
175    pub command: CommandType,
176    /// Latency in nanoseconds (send → response parsed).
177    pub latency_ns: u64,
178    /// Whether the command succeeded.
179    pub success: bool,
180    /// Time-to-first-byte in nanoseconds (not available in sequential mode).
181    pub ttfb_ns: Option<u64>,
182    /// Bytes transmitted for this command (protobuf-encoded request size).
183    pub tx_bytes: u32,
184    /// Bytes received for this command (protobuf-encoded response size).
185    pub rx_bytes: u32,
186}
187
188/// Callback type for per-request result notifications.
189type ResultCallback = Box<dyn Fn(&CommandResult)>;
190
191// ── Pending operation state ─────────────────────────────────────────────
192
193/// The type of a pending operation.
194enum PendingOpKind {
195    Get,
196    Set,
197    Delete,
198}
199
200/// State of a pending operation.
201struct PendingOp {
202    kind: PendingOpKind,
203    key: Bytes,
204    send_ts: u64,
205    start: Option<Instant>,
206    user_data: u64,
207    tx_bytes: u32,
208}
209
210// ── ClientMetrics ───────────────────────────────────────────────────────
211
212/// Built-in histogram-based metrics, available when the `metrics` feature is
213/// enabled.
214#[cfg(feature = "metrics")]
215pub struct ClientMetrics {
216    /// Latency histogram.
217    pub latency: histogram::Histogram,
218    /// Total requests completed.
219    pub requests: u64,
220    /// Total errors.
221    pub errors: u64,
222}
223
224#[cfg(feature = "metrics")]
225impl ClientMetrics {
226    fn new() -> Self {
227        Self {
228            latency: histogram::Histogram::new(7, 64).unwrap(),
229            requests: 0,
230            errors: 0,
231        }
232    }
233
234    fn record(&mut self, result: &CommandResult) {
235        self.requests += 1;
236        let _ = self.latency.increment(result.latency_ns);
237
238        if !result.success {
239            self.errors += 1;
240        }
241    }
242}
243
244// ── ClientBuilder ───────────────────────────────────────────────────────
245
246/// Builder for creating a [`Client`] with per-request callbacks and metrics.
247pub struct ClientBuilder {
248    conn: ConnCtx,
249    on_result: Option<ResultCallback>,
250    namespace: Bytes,
251    max_in_flight: usize,
252    #[cfg(feature = "timestamps")]
253    use_kernel_ts: bool,
254    #[cfg(feature = "metrics")]
255    with_metrics: bool,
256}
257
258impl ClientBuilder {
259    pub(crate) fn new(conn: ConnCtx) -> Self {
260        Self {
261            conn,
262            on_result: None,
263            namespace: Bytes::new(),
264            max_in_flight: usize::MAX,
265            #[cfg(feature = "timestamps")]
266            use_kernel_ts: false,
267            #[cfg(feature = "metrics")]
268            with_metrics: false,
269        }
270    }
271
272    /// Maximum number of in-flight `fire_*` requests (depth of the
273    /// pending map). `fire_*` returns [`Error::TooManyInFlight`] past it.
274    /// Defaults to `usize::MAX` (unbounded).
275    pub fn max_in_flight(mut self, n: usize) -> Self {
276        self.max_in_flight = n;
277        self
278    }
279
280    /// Pre-set the cache namespace. When set, `fire_*` methods use an O(1)
281    /// refcount clone instead of allocating a new `Bytes` per request.
282    pub fn namespace(mut self, ns: impl AsRef<[u8]>) -> Self {
283        self.namespace = Bytes::copy_from_slice(ns.as_ref());
284        self
285    }
286
287    /// Register a callback invoked after each command completes.
288    pub fn on_result<F: Fn(&CommandResult) + 'static>(mut self, f: F) -> Self {
289        self.on_result = Some(Box::new(f));
290        self
291    }
292
293    /// Enable kernel SO_TIMESTAMPING for latency measurement (requires `timestamps` feature).
294    #[cfg(feature = "timestamps")]
295    pub fn kernel_timestamps(mut self, enabled: bool) -> Self {
296        self.use_kernel_ts = enabled;
297        self
298    }
299
300    /// Enable built-in histogram tracking (requires `metrics` feature).
301    #[cfg(feature = "metrics")]
302    pub fn with_metrics(mut self) -> Self {
303        self.with_metrics = true;
304        self
305    }
306
307    /// Build the client (already authenticated).
308    pub fn build(self) -> Client {
309        Client {
310            conn: self.conn,
311            next_message_id: 1,
312            pending: HashMap::new(),
313            send_buf: Vec::with_capacity(4096),
314            on_result: self.on_result,
315            namespace: self.namespace,
316            max_in_flight: self.max_in_flight,
317            #[cfg(feature = "timestamps")]
318            use_kernel_ts: self.use_kernel_ts,
319            #[cfg(feature = "metrics")]
320            metrics: if self.with_metrics {
321                Some(ClientMetrics::new())
322            } else {
323                None
324            },
325        }
326    }
327}
328
329// ── Client ──────────────────────────────────────────────────────────────
330
331/// A ringline-native Momento cache client wrapping a single connection.
332///
333/// Supports both multiplexed (fire/recv) and sequential (get/set/delete)
334/// operation modes. Use [`Client::connect`] to establish an authenticated
335/// connection, or [`Client::builder`] after manual authentication.
336pub struct Client {
337    conn: ConnCtx,
338    next_message_id: u64,
339    pending: HashMap<u64, PendingOp>,
340    send_buf: Vec<u8>,
341    on_result: Option<ResultCallback>,
342    namespace: Bytes,
343    /// Cap on `pending.len()`; `fire_*` returns `Error::TooManyInFlight`
344    /// past it. `usize::MAX` (default) disables.
345    max_in_flight: usize,
346    #[cfg(feature = "timestamps")]
347    use_kernel_ts: bool,
348    #[cfg(feature = "metrics")]
349    metrics: Option<ClientMetrics>,
350}
351
352impl Client {
353    /// Connect to Momento, authenticate, and return a ready client.
354    pub async fn connect(credential: &Credential) -> Result<Self, Error> {
355        let host = credential.host();
356        let port = credential.port();
357        let addr: SocketAddr = Self::resolve_addr(host, port)?;
358        let tls_host = credential.tls_host();
359
360        let conn = ringline::connect_tls(addr, tls_host)?.await?;
361
362        let mut client = Self {
363            conn,
364            next_message_id: 1,
365            pending: HashMap::new(),
366            send_buf: Vec::with_capacity(4096),
367            on_result: None,
368            namespace: Bytes::new(),
369            max_in_flight: usize::MAX,
370            #[cfg(feature = "timestamps")]
371            use_kernel_ts: false,
372            #[cfg(feature = "metrics")]
373            metrics: None,
374        };
375
376        client.authenticate(credential.token()).await?;
377        Ok(client)
378    }
379
380    /// Connect with a timeout.
381    pub async fn connect_with_timeout(
382        credential: &Credential,
383        timeout_ms: u64,
384    ) -> Result<Self, Error> {
385        let host = credential.host();
386        let port = credential.port();
387        let addr: SocketAddr = Self::resolve_addr(host, port)?;
388        let tls_host = credential.tls_host();
389
390        let conn = ringline::connect_tls_with_timeout(addr, tls_host, timeout_ms)?.await?;
391
392        let mut client = Self {
393            conn,
394            next_message_id: 1,
395            pending: HashMap::new(),
396            send_buf: Vec::with_capacity(4096),
397            on_result: None,
398            namespace: Bytes::new(),
399            max_in_flight: usize::MAX,
400            #[cfg(feature = "timestamps")]
401            use_kernel_ts: false,
402            #[cfg(feature = "metrics")]
403            metrics: None,
404        };
405
406        client.authenticate(credential.token()).await?;
407        Ok(client)
408    }
409
410    /// Create a builder for a client with per-request callbacks.
411    ///
412    /// The connection must already be authenticated.
413    pub fn builder(conn: ConnCtx) -> ClientBuilder {
414        ClientBuilder::new(conn)
415    }
416
417    /// Returns the underlying connection context.
418    pub fn conn(&self) -> ConnCtx {
419        self.conn
420    }
421
422    /// Returns a reference to the built-in metrics, if enabled.
423    #[cfg(feature = "metrics")]
424    pub fn metrics(&self) -> Option<&ClientMetrics> {
425        self.metrics.as_ref()
426    }
427
428    /// Returns a mutable reference to the built-in metrics, if enabled.
429    #[cfg(feature = "metrics")]
430    pub fn metrics_mut(&mut self) -> Option<&mut ClientMetrics> {
431        self.metrics.as_mut()
432    }
433
434    /// Pre-set the cache namespace. When set, `fire_*` methods use an O(1)
435    /// refcount clone instead of allocating a new `Bytes` per request.
436    pub fn set_namespace(&mut self, ns: impl AsRef<[u8]>) {
437        self.namespace = Bytes::copy_from_slice(ns.as_ref());
438    }
439
440    /// Number of in-flight requests.
441    pub fn pending_count(&self) -> usize {
442        self.pending.len()
443    }
444
445    /// Mark the client as terminal: clear the pending map and close the
446    /// underlying connection. Called from every irrecoverable error path
447    /// in `recv()` and `authenticate()` so a stream-misaligned
448    /// accumulator can't leak through to a subsequent `fire_*`. Idempotent
449    /// — `ConnCtx::close` is a no-op for an already-closed conn.
450    fn poison(&mut self) {
451        self.pending.clear();
452        self.conn.close();
453    }
454
455    // ── Multiplexed fire API ────────────────────────────────────────────
456
457    /// Returns `Err(TooManyInFlight)` if the pending map has hit
458    /// `max_in_flight`. Called at the start of every `fire_*` to bail
459    /// before doing any encode / send work.
460    #[inline]
461    fn check_in_flight(&self) -> Result<(), Error> {
462        if self.pending.len() >= self.max_in_flight {
463            Err(Error::TooManyInFlight)
464        } else {
465            Ok(())
466        }
467    }
468
469    /// Fire a GET request. Returns immediately with a RequestId.
470    pub fn fire_get(
471        &mut self,
472        cache: &str,
473        key: &[u8],
474        user_data: u64,
475    ) -> Result<RequestId, Error> {
476        self.check_in_flight()?;
477        let message_id = self.next_id();
478        let ns = self.namespace_for(cache);
479        let key = Bytes::copy_from_slice(key);
480        let cmd = CacheCommand::new(
481            message_id,
482            UnaryCommand::Get {
483                namespace: ns,
484                key: key.clone(),
485            },
486        );
487
488        self.send_command(&cmd)?;
489        let tx_bytes = self.send_buf.len() as u32;
490
491        let (send_ts, start) = self.timing_start();
492        self.pending.insert(
493            message_id,
494            PendingOp {
495                kind: PendingOpKind::Get,
496                key,
497                send_ts,
498                start,
499                user_data,
500                tx_bytes,
501            },
502        );
503
504        Ok(RequestId(message_id))
505    }
506
507    /// Fire a SET request. Returns immediately with a RequestId.
508    pub fn fire_set(
509        &mut self,
510        cache: &str,
511        key: &[u8],
512        value: &[u8],
513        ttl_ms: u64,
514        user_data: u64,
515    ) -> Result<RequestId, Error> {
516        self.check_in_flight()?;
517        let message_id = self.next_id();
518        let ns = self.namespace_for(cache);
519        let key = Bytes::copy_from_slice(key);
520        let cmd = CacheCommand::new(
521            message_id,
522            UnaryCommand::Set {
523                namespace: ns,
524                key: key.clone(),
525                value: Bytes::copy_from_slice(value),
526                ttl_millis: ttl_ms,
527            },
528        );
529
530        self.send_command(&cmd)?;
531        let tx_bytes = self.send_buf.len() as u32;
532
533        let (send_ts, start) = self.timing_start();
534        self.pending.insert(
535            message_id,
536            PendingOp {
537                kind: PendingOpKind::Set,
538                key,
539                send_ts,
540                start,
541                user_data,
542                tx_bytes,
543            },
544        );
545
546        Ok(RequestId(message_id))
547    }
548
549    /// Fire a DELETE request. Returns immediately with a RequestId.
550    pub fn fire_delete(
551        &mut self,
552        cache: &str,
553        key: &[u8],
554        user_data: u64,
555    ) -> Result<RequestId, Error> {
556        self.check_in_flight()?;
557        let message_id = self.next_id();
558        let ns = self.namespace_for(cache);
559        let key = Bytes::copy_from_slice(key);
560        let cmd = CacheCommand::new(
561            message_id,
562            UnaryCommand::Delete {
563                namespace: ns,
564                key: key.clone(),
565            },
566        );
567
568        self.send_command(&cmd)?;
569        let tx_bytes = self.send_buf.len() as u32;
570
571        let (send_ts, start) = self.timing_start();
572        self.pending.insert(
573            message_id,
574            PendingOp {
575                kind: PendingOpKind::Delete,
576                key,
577                send_ts,
578                start,
579                user_data,
580                tx_bytes,
581            },
582        );
583
584        Ok(RequestId(message_id))
585    }
586
587    // ── Multiplexed recv API ────────────────────────────────────────────
588
589    /// Await the next completed operation. Reads from the connection,
590    /// decodes the response, and correlates by message_id.
591    ///
592    /// Uses zero-copy parsing via `with_bytes()`: response values are
593    /// `Bytes::slice()` references into the accumulator buffer.
594    ///
595    /// Responses whose `message_id` doesn't match any pending op are
596    /// skipped (a stale or duplicate frame doesn't drop every other
597    /// in-flight op). Skipping is capped at [`MAX_RECV_SKIPS`]
598    /// consecutive misses: past that, the connection is poisoned (the
599    /// pending map cleared and the conn closed) and [`Error::Protocol`]
600    /// is returned. Without the cap a misbehaving server could starve
601    /// in-flight ops by streaming unmatched frames forever.
602    ///
603    /// On any poison path (`Oversize`, malformed protobuf, `n == 0`, or
604    /// skip-cap exceeded) the underlying connection is closed so a
605    /// stream-misaligned accumulator can't be reused for fresh
606    /// `fire_*` calls.
607    pub async fn recv(&mut self) -> Result<CompletedOp, Error> {
608        if self.pending.is_empty() {
609            return Err(Error::NoPending);
610        }
611
612        let mut skips = 0usize;
613        let (dr, total_bytes) = loop {
614            let pending = &mut self.pending;
615            let mut dispatch_result: Option<DispatchResult> = None;
616            let mut malformed = false;
617            let mut oversize = false;
618
619            let n = self
620                .conn
621                .with_bytes(
622                    |bytes| match decode_length_delimited_message_bytes(&bytes) {
623                        DecodedMessage::Message(consumed, msg_bytes) => {
624                            if let Some(response) = CacheResponse::decode_bytes(msg_bytes) {
625                                dispatch_result = dispatch_response(response, pending);
626                            } else {
627                                malformed = true;
628                            }
629                            ParseResult::Consumed(consumed)
630                        }
631                        DecodedMessage::Incomplete => ParseResult::Consumed(0),
632                        DecodedMessage::Oversize => {
633                            oversize = true;
634                            ParseResult::Consumed(0)
635                        }
636                    },
637                )
638                .await;
639
640            if oversize {
641                // Adversarial / broken peer — declared length exceeds the
642                // per-message cap. The accumulator can't progress past this
643                // message, so the connection is unusable from here on.
644                self.poison();
645                return Err(Error::Protocol(
646                    "inbound message exceeded MAX_MESSAGE_SIZE".into(),
647                ));
648            }
649            if n == 0 {
650                // Connection broken — clear pending so subsequent recv()
651                // returns NoPending instead of reading stale data. The
652                // conn close is harmless here (already dead) but keeps
653                // the poison path uniform.
654                self.poison();
655                return Err(Error::ConnectionClosed);
656            }
657            if malformed {
658                // Decoded a length-prefix but the inner protobuf was
659                // gibberish. Can't re-sync the stream — poison.
660                self.poison();
661                return Err(Error::Protocol("failed to decode response".into()));
662            }
663            if let Some(dr) = dispatch_result {
664                break (dr, n);
665            }
666            // Unknown / unmatched message_id: response was decoded fine,
667            // but no pending op claimed it. Skip and wait for the next.
668            // If the pending queue is now empty (e.g. caller raced),
669            // bail out distinctly.
670            if self.pending.is_empty() {
671                return Err(Error::NoPending);
672            }
673            skips += 1;
674            if skips > MAX_RECV_SKIPS {
675                // Server is streaming responses we can't match — either a
676                // protocol drift or hostile traffic. Poison so the next
677                // recv() / fire_* sees a clean error.
678                self.poison();
679                return Err(Error::Protocol(
680                    "too many consecutive unmatched responses".into(),
681                ));
682            }
683        };
684        let n = total_bytes;
685
686        // Read recv_timestamp once, after with_bytes completes, so the
687        // kernel timestamp reflects the CQE that delivered this response.
688        // Used for both TTFB and latency to avoid redundant reads and
689        // stale-timestamp risk when with_bytes had to yield.
690        let recv_ts = self.capture_recv_ts();
691        let rx_bytes = n as u32;
692        let ttfb_ns = Self::ttfb_from_timestamps(recv_ts, dr.send_ts);
693        let latency_ns = if self.is_instrumented() {
694            let latency_ns = self.finish_timing(recv_ts, dr.send_ts, dr.start);
695            self.record(&CommandResult {
696                command: dr.cmd_type,
697                latency_ns,
698                success: dr.success,
699                ttfb_ns,
700                tx_bytes: dr.tx_bytes,
701                rx_bytes,
702            });
703            latency_ns
704        } else {
705            0
706        };
707
708        Ok(dr.op.set_latency(latency_ns))
709    }
710
711    // ── Sequential convenience API ──────────────────────────────────────
712
713    /// Reject sequential API calls while `fire_*` ops are still in
714    /// flight. Momento is multiplexed and `recv()` returns whatever
715    /// `message_id` arrives next; the convenience APIs discard the
716    /// id/key and would otherwise silently complete with data for the
717    /// wrong request.
718    #[inline]
719    fn check_no_pending(&self) -> Result<(), Error> {
720        if self.pending.is_empty() {
721            Ok(())
722        } else {
723            Err(Error::PendingOpsInFlight)
724        }
725    }
726
727    /// Sequential get: fire + recv.
728    ///
729    /// Returns [`Error::PendingOpsInFlight`] if any `fire_*` ops are
730    /// still in flight — without that guard, `recv()` could resolve the
731    /// other request's response and we'd return its value as if it
732    /// belonged to `key`.
733    pub async fn get(&mut self, cache: &str, key: &[u8]) -> Result<Option<Bytes>, Error> {
734        self.check_no_pending()?;
735        let _id = self.fire_get(cache, key, 0)?;
736        match self.recv().await? {
737            CompletedOp::Get { result, .. } => result,
738            _ => Err(Error::Protocol("unexpected response type".into())),
739        }
740    }
741
742    /// Sequential set. See [`Self::get`] for the in-flight-ops guard.
743    pub async fn set(
744        &mut self,
745        cache: &str,
746        key: &[u8],
747        value: &[u8],
748        ttl_ms: u64,
749    ) -> Result<(), Error> {
750        self.check_no_pending()?;
751        let _id = self.fire_set(cache, key, value, ttl_ms, 0)?;
752        match self.recv().await? {
753            CompletedOp::Set { result, .. } => result,
754            _ => Err(Error::Protocol("unexpected response type".into())),
755        }
756    }
757
758    /// Sequential delete. See [`Self::get`] for the in-flight-ops guard.
759    pub async fn delete(&mut self, cache: &str, key: &[u8]) -> Result<(), Error> {
760        self.check_no_pending()?;
761        let _id = self.fire_delete(cache, key, 0)?;
762        match self.recv().await? {
763            CompletedOp::Delete { result, .. } => result,
764            _ => Err(Error::Protocol("unexpected response type".into())),
765        }
766    }
767
768    // ── Internal helpers ────────────────────────────────────────────────
769
770    /// Return the namespace `Bytes` — O(1) clone if pre-set, otherwise allocates.
771    #[inline]
772    fn namespace_for(&self, cache: &str) -> Bytes {
773        if self.namespace.is_empty() {
774            Bytes::copy_from_slice(cache.as_bytes())
775        } else {
776            self.namespace.clone()
777        }
778    }
779
780    fn next_id(&mut self) -> u64 {
781        let id = self.next_message_id;
782        self.next_message_id += 1;
783        id
784    }
785
786    fn send_command(&mut self, cmd: &CacheCommand) -> Result<(), Error> {
787        self.send_buf.clear();
788        cmd.encode_length_delimited_into(&mut self.send_buf);
789        self.conn.send_nowait(&self.send_buf)?;
790        Ok(())
791    }
792
793    /// Send an `Authenticate` command and wait for the matching response.
794    ///
795    /// Loops over inbound messages — like [`Self::recv`], a message
796    /// whose `message_id` doesn't match the one we issued is skipped
797    /// rather than treated as a hard failure (previously a single
798    /// non-matching frame ate the auth response and produced a generic
799    /// "failed to decode" error). Skipping is bounded by
800    /// [`MAX_RECV_SKIPS`]; past the cap the connection is poisoned and
801    /// [`Error::Protocol`] is returned. On `Oversize` / malformed /
802    /// `n == 0` the conn is closed before returning so callers can't
803    /// reuse a desynced accumulator.
804    async fn authenticate(&mut self, token: &str) -> Result<(), Error> {
805        let message_id = self.next_id();
806        let cmd = CacheCommand::new(
807            message_id,
808            UnaryCommand::Authenticate {
809                auth_token: token.to_string(),
810            },
811        );
812
813        self.send_buf.clear();
814        cmd.encode_length_delimited_into(&mut self.send_buf);
815        if let Err(e) = self.conn.send_nowait(&self.send_buf) {
816            self.conn.close();
817            return Err(Error::Io(e));
818        }
819
820        let mut skips = 0usize;
821        loop {
822            let mut auth_result: Option<Result<(), Error>> = None;
823            let mut malformed = false;
824            let mut oversize = false;
825
826            let n = self
827                .conn
828                .with_bytes(
829                    |bytes| match decode_length_delimited_message_bytes(&bytes) {
830                        DecodedMessage::Message(consumed, msg_bytes) => {
831                            if let Some(response) = CacheResponse::decode_bytes(msg_bytes) {
832                                if response.message_id == message_id {
833                                    match response.result {
834                                        CacheResponseResult::Authenticate => {
835                                            auth_result = Some(Ok(()));
836                                        }
837                                        CacheResponseResult::Error(err) => {
838                                            auth_result = Some(Err(Error::AuthFailed(err.message)));
839                                        }
840                                        _ => {
841                                            auth_result = Some(Err(Error::Protocol(
842                                                "unexpected auth response type".into(),
843                                            )));
844                                        }
845                                    }
846                                }
847                            } else {
848                                malformed = true;
849                            }
850                            ParseResult::Consumed(consumed)
851                        }
852                        DecodedMessage::Incomplete => ParseResult::Consumed(0),
853                        DecodedMessage::Oversize => {
854                            oversize = true;
855                            ParseResult::Consumed(0)
856                        }
857                    },
858                )
859                .await;
860
861            if oversize {
862                self.conn.close();
863                return Err(Error::Protocol(
864                    "auth response exceeded MAX_MESSAGE_SIZE".into(),
865                ));
866            }
867            if n == 0 {
868                self.conn.close();
869                return Err(Error::ConnectionClosed);
870            }
871            if malformed {
872                self.conn.close();
873                return Err(Error::Protocol("failed to decode auth response".into()));
874            }
875            if let Some(r) = auth_result {
876                return r;
877            }
878            // Decoded a well-formed message but it's not the auth response
879            // (mismatched message_id). Skip and wait for the next, with a
880            // hard cap mirroring recv() — otherwise a chatty / hostile
881            // server could deny us the auth response indefinitely.
882            skips += 1;
883            if skips > MAX_RECV_SKIPS {
884                self.conn.close();
885                return Err(Error::Protocol(
886                    "too many unmatched messages before auth response".into(),
887                ));
888            }
889        }
890    }
891
892    fn resolve_addr(host: &str, port: u16) -> Result<SocketAddr, Error> {
893        use std::net::ToSocketAddrs;
894        let addr_str = format!("{}:{}", host, port);
895        addr_str
896            .to_socket_addrs()
897            .map_err(|e| Error::Config(format!("failed to resolve {}: {}", addr_str, e)))?
898            .next()
899            .ok_or_else(|| Error::Config(format!("no addresses found for {}", addr_str)))
900    }
901
902    // ── Timing helpers ──────────────────────────────────────────────────
903
904    #[inline]
905    fn is_instrumented(&self) -> bool {
906        if self.on_result.is_some() {
907            return true;
908        }
909        #[cfg(feature = "metrics")]
910        if self.metrics.is_some() {
911            return true;
912        }
913        false
914    }
915
916    #[inline]
917    fn timing_start(&self) -> (u64, Option<Instant>) {
918        if self.is_instrumented() {
919            (self.send_timestamp(), Some(Instant::now()))
920        } else {
921            (0, None)
922        }
923    }
924
925    #[cfg(feature = "timestamps")]
926    #[inline]
927    fn capture_recv_ts(&self) -> u64 {
928        if self.use_kernel_ts {
929            self.conn.recv_timestamp()
930        } else {
931            0
932        }
933    }
934
935    #[cfg(not(feature = "timestamps"))]
936    #[inline]
937    fn capture_recv_ts(&self) -> u64 {
938        0
939    }
940
941    #[inline]
942    fn ttfb_from_timestamps(recv_ts: u64, send_ts: u64) -> Option<u64> {
943        if recv_ts > 0 && recv_ts > send_ts {
944            Some(recv_ts - send_ts)
945        } else {
946            None
947        }
948    }
949
950    #[cfg(feature = "timestamps")]
951    #[inline]
952    fn send_timestamp(&self) -> u64 {
953        if self.use_kernel_ts {
954            now_realtime_ns()
955        } else {
956            0
957        }
958    }
959
960    #[cfg(not(feature = "timestamps"))]
961    #[inline]
962    fn send_timestamp(&self) -> u64 {
963        0
964    }
965
966    #[inline]
967    fn finish_timing(&self, recv_ts: u64, send_ts: u64, start: Option<Instant>) -> u64 {
968        if recv_ts > 0 && recv_ts > send_ts {
969            return recv_ts - send_ts;
970        }
971        start.map_or(0, |s| s.elapsed().as_nanos() as u64)
972    }
973
974    fn record(&mut self, result: &CommandResult) {
975        if let Some(ref cb) = self.on_result {
976            cb(result);
977        }
978        #[cfg(feature = "metrics")]
979        if let Some(ref mut m) = self.metrics {
980            m.record(result);
981        }
982    }
983}
984
985// ── Response dispatch ───────────────────────────────────────────────────
986
987/// Result of dispatching a response to a pending operation.
988struct DispatchResult {
989    op: CompletedOp,
990    cmd_type: CommandType,
991    success: bool,
992    send_ts: u64,
993    start: Option<Instant>,
994    tx_bytes: u32,
995}
996
997/// Dispatch a decoded CacheResponse to the appropriate pending operation.
998fn dispatch_response(
999    response: CacheResponse,
1000    pending: &mut HashMap<u64, PendingOp>,
1001) -> Option<DispatchResult> {
1002    let message_id = response.message_id;
1003    let id = RequestId(message_id);
1004
1005    let op = pending.remove(&message_id)?;
1006    let send_ts = op.send_ts;
1007    let start = op.start;
1008    let user_data = op.user_data;
1009    let tx_bytes = op.tx_bytes;
1010
1011    match op.kind {
1012        PendingOpKind::Get => {
1013            let result = match response.result {
1014                CacheResponseResult::Get { value } => Ok(value),
1015                CacheResponseResult::Error(ref err) if err.code == StatusCode::NotFound => Ok(None),
1016                CacheResponseResult::Error(err) => Err(Error::Protocol(format!(
1017                    "{}: {}",
1018                    err.code as u32, err.message
1019                ))),
1020                _ => Err(Error::Protocol("unexpected response type for get".into())),
1021            };
1022            let success = result.is_ok();
1023            Some(DispatchResult {
1024                op: CompletedOp::Get {
1025                    id,
1026                    key: op.key,
1027                    result,
1028                    user_data,
1029                    latency_ns: 0,
1030                },
1031                cmd_type: CommandType::Get,
1032                success,
1033                send_ts,
1034                start,
1035                tx_bytes,
1036            })
1037        }
1038        PendingOpKind::Set => {
1039            let result = match response.result {
1040                CacheResponseResult::Set => Ok(()),
1041                CacheResponseResult::Error(err) => Err(Error::Protocol(format!(
1042                    "{}: {}",
1043                    err.code as u32, err.message
1044                ))),
1045                _ => Err(Error::Protocol("unexpected response type for set".into())),
1046            };
1047            let success = result.is_ok();
1048            Some(DispatchResult {
1049                op: CompletedOp::Set {
1050                    id,
1051                    key: op.key,
1052                    result,
1053                    user_data,
1054                    latency_ns: 0,
1055                },
1056                cmd_type: CommandType::Set,
1057                success,
1058                send_ts,
1059                start,
1060                tx_bytes,
1061            })
1062        }
1063        PendingOpKind::Delete => {
1064            let result = match response.result {
1065                CacheResponseResult::Delete => Ok(()),
1066                CacheResponseResult::Error(err) => Err(Error::Protocol(format!(
1067                    "{}: {}",
1068                    err.code as u32, err.message
1069                ))),
1070                _ => Err(Error::Protocol(
1071                    "unexpected response type for delete".into(),
1072                )),
1073            };
1074            let success = result.is_ok();
1075            Some(DispatchResult {
1076                op: CompletedOp::Delete {
1077                    id,
1078                    key: op.key,
1079                    result,
1080                    user_data,
1081                    latency_ns: 0,
1082                },
1083                cmd_type: CommandType::Delete,
1084                success,
1085                send_ts,
1086                start,
1087                tx_bytes,
1088            })
1089        }
1090    }
1091}
1092
1093// ── Kernel timestamp helper ─────────────────────────────────────────────
1094
1095#[cfg(feature = "timestamps")]
1096fn now_realtime_ns() -> u64 {
1097    let mut ts = libc::timespec {
1098        tv_sec: 0,
1099        tv_nsec: 0,
1100    };
1101    unsafe {
1102        libc::clock_gettime(libc::CLOCK_REALTIME, &mut ts);
1103    }
1104    ts.tv_sec as u64 * 1_000_000_000 + ts.tv_nsec as u64
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109    use super::*;
1110    use crate::proto::CacheResponse;
1111
1112    fn make_pending_get(key: &[u8]) -> PendingOp {
1113        PendingOp {
1114            kind: PendingOpKind::Get,
1115            key: Bytes::copy_from_slice(key),
1116            send_ts: 0,
1117            start: None,
1118            user_data: 42,
1119            tx_bytes: 0,
1120        }
1121    }
1122
1123    #[test]
1124    fn max_recv_skips_is_positive() {
1125        // Cap must be > 0; otherwise the very first unmatched message
1126        // would poison and `recv()` would never recover from a single
1127        // duplicate frame.
1128        const { assert!(MAX_RECV_SKIPS > 0) };
1129    }
1130
1131    #[test]
1132    fn pending_ops_in_flight_display() {
1133        let msg = format!("{}", Error::PendingOpsInFlight);
1134        assert!(
1135            msg.contains("fire_*"),
1136            "PendingOpsInFlight display should mention fire_*, got: {msg}"
1137        );
1138    }
1139
1140    #[test]
1141    fn dispatch_response_returns_some_on_match() {
1142        let mut pending: HashMap<u64, PendingOp> = HashMap::new();
1143        pending.insert(7, make_pending_get(b"k"));
1144        let response = CacheResponse::get_hit(7, Bytes::from_static(b"v"));
1145        let dr = dispatch_response(response, &mut pending);
1146        assert!(dr.is_some(), "expected matched dispatch");
1147        assert!(pending.is_empty(), "matched op should be drained");
1148    }
1149
1150    #[test]
1151    fn dispatch_response_returns_none_on_unmatched_id() {
1152        let mut pending: HashMap<u64, PendingOp> = HashMap::new();
1153        pending.insert(1, make_pending_get(b"k"));
1154        // Response carries a message_id that isn't in pending.
1155        let response = CacheResponse::get_hit(99, Bytes::from_static(b"v"));
1156        let dr = dispatch_response(response, &mut pending);
1157        assert!(dr.is_none(), "unmatched id must return None");
1158        assert_eq!(pending.len(), 1, "pending must be unchanged on miss");
1159    }
1160
1161    #[test]
1162    fn dispatch_response_get_returns_protocol_error_on_wrong_kind() {
1163        let mut pending: HashMap<u64, PendingOp> = HashMap::new();
1164        pending.insert(3, make_pending_get(b"k"));
1165        // Pending op is Get but server sent a Set response — kind mismatch.
1166        let response = CacheResponse::set_ok(3);
1167        let dr = dispatch_response(response, &mut pending).expect("matched id");
1168        assert!(pending.is_empty());
1169        assert!(!dr.success, "kind mismatch should mark op as failed");
1170        match dr.op {
1171            CompletedOp::Get { result, .. } => assert!(result.is_err()),
1172            _ => panic!("expected Get op kind preserved"),
1173        }
1174    }
1175}