Skip to main content

wombatkv_daemon/
lib.rs

1//! Shared-memory ping-pong transport for the wombatkv puffer daemon.
2//!
3//! Two POSIX SHM disruptor rings (req + resp) carry typed rkyv-encoded
4//! messages between an inference engine (client) and a long-running
5//! daemon process holding a [`wombatkv_node::embed::WombatKVKvStore`].
6//!
7//! # Design choices (v1)
8//!
9//! - **Right-sized frames plus client chunking.** `AlignedFixedFrame<DATA_BYTES>`
10//!   is sized for low-millisecond KV chunks, while `RemoteKvStoreClient`
11//!   chunks larger engine payloads behind its public `put_kv/get_kv` API.
12//! - **Tiered ops.** This crate handles control + small-payload ops
13//!   (PING, `PUT_SMALL`, `GET_SMALL`, EXISTS, STATS, RESTORE, CLEAR). For
14//!   payloads larger than the per-frame budget are stored as chunk objects
15//!   plus a tiny manifest. The daemon remains a simple key/value server.
16//! - **rkyv codec.** Zero-alloc encode (`AlignedVec`) on the producer
17//!   side; on the consumer side messages are decoded into owned types
18//!   for ergonomics. Switching to `recv_leased` for archived-view
19//!   access is a one-line change once the larger-payload path lands.
20//!
21//! # Frame budget
22//!
23//! `DATA_BYTES = 4 MiB - 16` so each ring slot fits a multi-MiB
24//! rkyv-encoded payload. With ring depth 16 that's about 64 MiB per
25//! ring, 128 MiB for the full request+response pair. Acceptable on an
26//! RTX dev host and large enough to avoid tens of thousands of tiny
27//! chunks for real KV blobs.
28
29// Crate-level lint: this crate `deny`s unsafe (overriding the workspace
30// `forbid`) so the arena module can wrap memmap2's `unsafe fn map_mut`
31// in a single audited site (see `arena.rs`). All other modules remain
32// safe-only.
33#![deny(unsafe_code)]
34
35pub mod arena;
36pub mod client;
37pub mod config;
38pub mod constants;
39pub mod envelope;
40pub mod http_transport;
41pub mod lifecycle;
42pub mod runtime_tpc;
43pub mod tcp_transport;
44pub use arena::{
45    arena_path, ArenaError, ArenaReader, ArenaWriter, ARENA_HEADER_BYTES, DEFAULT_ARENA_BYTES,
46};
47pub use client::{
48    ClientOptions, RemoteError, RemoteGetOutcome, RemoteHitTier, RemoteKvStoreClient,
49    DEFAULT_CALL_TIMEOUT,
50};
51pub use config::DaemonConfig;
52pub use lifecycle::{cleanup_prefix_segments, ClientHeartbeat, HeartbeatMonitor, ReopenReason};
53
54use std::time::Duration;
55
56use bytes::Bytes;
57use myelon::codec::{Codec, CodecError};
58use myelon::transport::AlignedFixedFrame;
59use myelon::typed_transport::{TypedConsumer, TypedProducer};
60use myelon::MyelonWaitStrategy;
61
62use rkyv::rancor::Error as RkyvError;
63use rkyv::util::AlignedVec;
64use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
65
66/// Per-frame data budget. 4 MiB minus 16-byte header.
67///
68/// Sizing rationale: the ring buffer itself is statically allocated at
69/// `create_with_consumers` time, but the owned (copying) APIs we use -
70/// `TypedProducer::publish` + `TypedConsumer::recv_owned`, pay 4×
71/// `DATA_BYTES` of memcpy per round-trip:
72///   - producer `T::default()`: zeroes a stack frame of `DATA_BYTES`
73///   - producer `*slot = frame`: copies the full frame into the ring slot
74///   - consumer `*event_ptr`: copies the full slot out to a stack frame
75///   - consumer's payload-bounded copy out of the slot (small)
76///
77/// So PING RTT is dominated by raw memory bandwidth on full-frame
78/// memcpys, NOT by any per-publish init cost, the ring is allocated
79/// once at create-time. (M3 Max, ~16 GB/s sustained on 4× per-RTT
80/// copies puts the floor at roughly 4 × `DATA_BYTES` / 16 GB/s, matching
81/// the empirical sweep below.)
82///
83/// Frame-size sweep, M3 Max, `MinIO` local, p50 µs:
84///   frame  | PING | EXISTS | 4K GET | 64K GET | 256K GET | 1M GET | 1.5M GET
85///   2 KiB  | 0.71 | 1.08   | 2.79   | 19.0    | 47.7     | 225    | 283
86///   4 KiB  | 2.00 | 1.54   | 2.71   | 16.7    | 50.6     | 184    | 244  ← chosen
87///   8 KiB  | 3.00 | 3.38   | 3.21   | 17.3    | 49.2     | 179    | 248
88///   16 KiB | 4.25 | 4.75   | 5.54   | 20.3    | 44.3     | 165    | 320
89///   64 KiB | 18.4 | 20.8   | 21.0   | 34.0    | 61.5     | 228    | 258
90///
91/// 4 KiB is the sweet spot: only 1.3 µs slower than 2 KiB on PING, but
92/// avoids the large-payload regression you see at 2 KiB (1.5 MiB GET
93/// goes 244 → 283 µs). Within ~10 % of the best on 256 KiB / 1 MiB
94/// payloads (16 KiB wins those by a hair, but loses badly on PING and
95/// on 1.5 MiB). Matches the macOS page size for clean TLB behavior.
96///
97/// Future work: switching to `recv_leased` would eliminate the consumer
98/// copy (it borrows directly from the ring slot), and a `publish_with`-
99/// style API that writes directly into the ring slot would eliminate
100/// the producer's stack frame too. That moves the bottleneck off
101/// memcpy entirely, at which point larger frames stop hurting.
102///
103/// Reference per-block KV sizes (Qwen3-0.6B Metal):
104///   per layer per block (32 tok × 8 `kv_h` × 128 `head_dim` × 2 bytes) = 16 KiB
105///   per block all 28 layers (K+V) ≈ 458 KiB → fragments to ~115 frames
106pub const FRAME_DATA_BYTES: usize = 4 * 1024 * 1024 - 16;
107/// Disruptor ring depth: N in-flight slots.
108/// Total ring footprint per direction = depth × `FRAME_DATA_BYTES` = 1 MiB.
109pub const DEFAULT_RING_DEPTH: usize = 16;
110
111/// The frame type backing both rings.
112pub type ShmFrame = AlignedFixedFrame<FRAME_DATA_BYTES>;
113
114/// Consumer id on the request ring (daemon side). Kept tiny because
115/// macOS POSIX SHM caps segment names at 31 chars; the consumer cursor
116/// segment appends this id to the ring name.
117pub const REQ_CONSUMER_ID: &str = "dn";
118/// Consumer id on the response ring (client side).
119pub const RESP_CONSUMER_ID: &str = "cn";
120
121/// Default attach timeout for SHM segments (matches perf-bench).
122///
123/// Overridable via `WMBT_KV_DAEMON_SHM_ATTACH_TIMEOUT_SECS` env
124/// for slow-startup scenarios (Docker cold start, busy CI runners). Values
125/// <1 are ignored. See [`effective_attach_timeout`] for the resolver.
126pub const ATTACH_TIMEOUT: Duration = Duration::from_secs(30);
127
128/// Resolves the effective SHM attach timeout from
129/// `WMBT_KV_DAEMON_SHM_ATTACH_TIMEOUT_SECS` env, falling back to
130/// [`ATTACH_TIMEOUT`]. Useful for slow Docker startup ops or CI
131/// runners where the daemon takes longer to bring up SHM segments.
132#[must_use]
133pub fn effective_attach_timeout() -> Duration {
134    std::env::var("WMBT_KV_DAEMON_SHM_ATTACH_TIMEOUT_SECS")
135        .ok()
136        .and_then(|s| s.parse::<u64>().ok())
137        .filter(|n| *n >= 1)
138        .map_or(ATTACH_TIMEOUT, Duration::from_secs)
139}
140
141/// Wire op codes. Encoded as `u8` (struct fields) to side-step rkyv's
142/// enum derive constraints, we want plain field traversal on the hot
143/// path, not match dispatch.
144///
145/// # Block-shaped ops (ABI 1.5+, opcodes 11..=13)
146///
147/// `LOOKUP_BLOCK_PREFIX`, `GET_KV_BLOCKS_BATCH`, and `PUT_KV_BLOCKS_BATCH`
148/// mirror the C ABI `wmbt_kv_lookup_block_prefix` / `_get_kv_blocks_borrowed`
149/// / `_put_kv_blocks` surfaces over the daemon transport. The op-specific
150/// request/response shapes are carried as **rkyv-archived** bytes inside
151/// the existing `WireRequest.payload` / `WireResponse.payload` `Vec<u8>`, no envelope-level change. RFC 0008 §4e completed the migration from
152/// the hand-rolled length-prefix codec (V1 magics) to rkyv (V2 magics).
153/// See `LookupBlockPrefixReq` and friends.
154pub mod op {
155    pub const PING: u8 = 1;
156    pub const PUT: u8 = 2;
157    pub const GET: u8 = 3;
158    pub const EXISTS: u8 = 4;
159    pub const STATS: u8 = 5;
160    pub const CLEAR: u8 = 6;
161    pub const RESTORE: u8 = 7;
162    pub const CLOSE: u8 = 8;
163    pub const GET_MANY: u8 = 9;
164    pub const LIST: u8 = 10;
165    /// Block-shaped: count leading hashes present in the daemon's
166    /// metadata index. See `LookupBlockPrefixReq`/`LookupBlockPrefixResp`.
167    pub const LOOKUP_BLOCK_PREFIX: u8 = 11;
168    /// Block-shaped: parallel batched GET for N content-addressed blocks.
169    /// See `GetKvBlocksBatchReq`/`GetKvBlocksBatchResp`.
170    pub const GET_KV_BLOCKS_BATCH: u8 = 12;
171    /// Block-shaped: parallel batched PUT for N content-addressed blocks
172    /// plus a server-side metadata-index update so subsequent
173    /// `LOOKUP_BLOCK_PREFIX` reflects the new presence.
174    /// See `PutKvBlocksBatchReq`/`PutKvBlocksBatchResp`.
175    pub const PUT_KV_BLOCKS_BATCH: u8 = 13;
176
177    #[must_use]
178    pub fn name(code: u8) -> &'static str {
179        match code {
180            PING => "ping",
181            PUT => "put",
182            GET => "get",
183            EXISTS => "exists",
184            STATS => "stats",
185            CLEAR => "clear",
186            RESTORE => "restore",
187            CLOSE => "close",
188            GET_MANY => "get_many",
189            LIST => "list",
190            LOOKUP_BLOCK_PREFIX => "lookup_block_prefix",
191            GET_KV_BLOCKS_BATCH => "get_kv_blocks_batch",
192            PUT_KV_BLOCKS_BATCH => "put_kv_blocks_batch",
193            _ => "unknown",
194        }
195    }
196}
197
198/// Wire status codes (see `op` for design rationale).
199pub mod status {
200    pub const OK: u8 = 0;
201    pub const MISS: u8 = 1;
202    pub const TOO_LARGE: u8 = 2;
203    pub const ERROR: u8 = 3;
204}
205
206const KEY_BATCH_MAGIC: &[u8] = b"WMBT_KV_KEYS_V1\0";
207const BYTES_BATCH_MAGIC: &[u8] = b"WMBT_KV_BYTES_V1\0";
208
209// Per-op magic constants for the block-shaped payloads. 16 bytes each
210// (matching the legacy batch magics) so decode failures point at the
211// exact op that produced the malformed frame.
212//
213// Block-opcode magic strings: 16 bytes each, NUL-padded. Alpha
214// breaking-window means there's no legacy V1 envelope to coexist with -
215// any client/daemon mismatch surfaces as an explicit decode error
216// rather than a silent torn read.
217const LOOKUP_BLOCK_PREFIX_REQ_MAGIC: &[u8] = b"WMBT_LBP_REQ\0\0\0\0";
218const LOOKUP_BLOCK_PREFIX_RESP_MAGIC: &[u8] = b"WMBT_LBP_RES\0\0\0\0";
219const GET_KV_BLOCKS_BATCH_REQ_MAGIC: &[u8] = b"WMBT_GBB_REQ\0\0\0\0";
220const GET_KV_BLOCKS_BATCH_RESP_MAGIC: &[u8] = b"WMBT_GBB_RES\0\0\0\0";
221const PUT_KV_BLOCKS_BATCH_REQ_MAGIC: &[u8] = b"WMBT_PBB_REQ\0\0\0\0";
222const PUT_KV_BLOCKS_BATCH_RESP_MAGIC: &[u8] = b"WMBT_PBB_RES\0\0\0\0";
223
224/// Encode an ordered key list for the daemon batched GET control path.
225#[must_use]
226pub fn encode_key_batch(keys: &[String]) -> Vec<u8> {
227    let total = KEY_BATCH_MAGIC.len() + 4 + keys.iter().map(|key| 4 + key.len()).sum::<usize>();
228    let mut out = Vec::with_capacity(total);
229    out.extend_from_slice(KEY_BATCH_MAGIC);
230    out.extend_from_slice(&(keys.len() as u32).to_le_bytes());
231    for key in keys {
232        let bytes = key.as_bytes();
233        out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
234        out.extend_from_slice(bytes);
235    }
236    out
237}
238
239/// Typed error for the daemon's wire-codec helpers (encode/decode/
240/// validate). Replaces ad-hoc `Result<T, String>` in the daemon's
241/// codec surface. Named `WireCodecError` to avoid
242/// shadowing `myelon::codec::CodecError` (which we import at the
243/// top of this file for the typed-transport surface).
244///
245/// Each variant carries enough context to be operator-actionable
246/// without grepping the source.
247#[derive(Debug, thiserror::Error)]
248pub enum WireCodecError {
249    /// A length prefix or sentinel claimed more bytes than the
250    /// payload contains.
251    #[error("{what} truncated: need {needed} bytes at offset {at}, payload is {got}")]
252    Truncated { what: &'static str, needed: usize, at: usize, got: usize },
253
254    /// First N bytes don't match the expected magic for this op.
255    #[error("{op}: bad magic; expected {expected:?} got first {got_len} bytes {got:?}")]
256    BadMagic { op: &'static str, expected: &'static [u8], got_len: usize, got: Vec<u8> },
257
258    /// Length arithmetic would overflow `usize`.
259    #[error("{what}: length overflow")]
260    LengthOverflow { what: &'static str },
261
262    /// Trailing bytes past the parsed end of the payload.
263    #[error("{what}: trailing bytes ({extra} unread)")]
264    TrailingBytes { what: &'static str, extra: usize },
265
266    /// Body length stated in the prefix doesn't match the actual
267    /// payload bytes available.
268    #[error("{what}: body length mismatch (claimed={claimed}, actual={actual})")]
269    BodyLengthMismatch { what: &'static str, claimed: usize, actual: usize },
270
271    /// UTF-8 decode failure for a key/string field.
272    #[error("{what}: utf8: {source}")]
273    Utf8 {
274        what: &'static str,
275        #[source]
276        source: std::str::Utf8Error,
277    },
278
279    /// rkyv encode/decode failure, scoped by op name.
280    #[error("{op}: rkyv: {source}")]
281    Rkyv {
282        op: &'static str,
283        #[source]
284        source: rkyv::rancor::Error,
285    },
286
287    /// SHM segment-name budget validation failure (see
288    /// `validate_segment_name_budget`).
289    #[error("{0}")]
290    SegmentNameBudget(String),
291}
292
293/// Decode an ordered key list encoded by [`encode_key_batch`].
294pub fn decode_key_batch(payload: &[u8]) -> Result<Vec<String>, WireCodecError> {
295    if !payload.starts_with(KEY_BATCH_MAGIC) {
296        let got_len = KEY_BATCH_MAGIC.len().min(payload.len());
297        return Err(WireCodecError::BadMagic {
298            op: "key_batch",
299            expected: KEY_BATCH_MAGIC,
300            got_len,
301            got: payload[..got_len].to_vec(),
302        });
303    }
304    let mut cursor = KEY_BATCH_MAGIC.len();
305    let count = read_u32(payload, &mut cursor)? as usize;
306    let mut keys = Vec::with_capacity(count);
307    for _ in 0..count {
308        let len = read_u32(payload, &mut cursor)? as usize;
309        let end =
310            cursor.checked_add(len).ok_or(WireCodecError::LengthOverflow { what: "key_batch" })?;
311        if end > payload.len() {
312            return Err(WireCodecError::Truncated {
313                what: "key_batch",
314                needed: len,
315                at: cursor,
316                got: payload.len(),
317            });
318        }
319        let key = std::str::from_utf8(&payload[cursor..end])
320            .map_err(|err| WireCodecError::Utf8 { what: "key_batch", source: err })?
321            .to_string();
322        keys.push(key);
323        cursor = end;
324    }
325    if cursor != payload.len() {
326        return Err(WireCodecError::TrailingBytes {
327            what: "key_batch",
328            extra: payload.len() - cursor,
329        });
330    }
331    Ok(keys)
332}
333
334/// Encode ordered payload bytes for a batched GET response.
335#[must_use]
336pub fn encode_bytes_batch(items: &[Bytes]) -> Vec<u8> {
337    let total = BYTES_BATCH_MAGIC.len()
338        + 4
339        + (items.len() * 4)
340        + items.iter().map(Bytes::len).sum::<usize>();
341    let mut out = Vec::with_capacity(total);
342    out.extend_from_slice(BYTES_BATCH_MAGIC);
343    out.extend_from_slice(&(items.len() as u32).to_le_bytes());
344    for item in items {
345        out.extend_from_slice(&(item.len() as u32).to_le_bytes());
346    }
347    for item in items {
348        out.extend_from_slice(item);
349    }
350    out
351}
352
353/// Decode an ordered payload batch without copying individual payloads.
354pub fn decode_bytes_batch(batch: Bytes) -> Result<Vec<Bytes>, WireCodecError> {
355    if !batch.starts_with(BYTES_BATCH_MAGIC) {
356        let got_len = BYTES_BATCH_MAGIC.len().min(batch.len());
357        return Err(WireCodecError::BadMagic {
358            op: "bytes_batch",
359            expected: BYTES_BATCH_MAGIC,
360            got_len,
361            got: batch[..got_len].to_vec(),
362        });
363    }
364    let data = batch.as_ref();
365    let mut cursor = BYTES_BATCH_MAGIC.len();
366    let count = read_u32(data, &mut cursor)? as usize;
367    let mut lengths = Vec::with_capacity(count);
368    for _ in 0..count {
369        lengths.push(read_u32(data, &mut cursor)? as usize);
370    }
371
372    let body_start = cursor;
373    let body_len = lengths
374        .iter()
375        .try_fold(0usize, |acc, len| acc.checked_add(*len))
376        .ok_or(WireCodecError::LengthOverflow { what: "bytes_batch" })?;
377    let body_end = body_start
378        .checked_add(body_len)
379        .ok_or(WireCodecError::LengthOverflow { what: "bytes_batch" })?;
380    if body_end != data.len() {
381        return Err(WireCodecError::BodyLengthMismatch {
382            what: "bytes_batch",
383            claimed: body_end,
384            actual: data.len(),
385        });
386    }
387
388    let mut offset = body_start;
389    let mut out = Vec::with_capacity(count);
390    for len in lengths {
391        let end = offset + len;
392        out.push(batch.slice(offset..end));
393        offset = end;
394    }
395    Ok(out)
396}
397
398fn read_u32(payload: &[u8], cursor: &mut usize) -> Result<u32, WireCodecError> {
399    let end = cursor.checked_add(4).ok_or(WireCodecError::LengthOverflow { what: "u32_cursor" })?;
400    if end > payload.len() {
401        return Err(WireCodecError::Truncated {
402            what: "u32",
403            needed: 4,
404            at: *cursor,
405            got: payload.len(),
406        });
407    }
408    let value =
409        u32::from_le_bytes(payload[*cursor..end].try_into().expect("slice length checked above"));
410    *cursor = end;
411    Ok(value)
412}
413
414/// rkyv-archivable wire request.
415#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone)]
416pub struct WireRequest {
417    pub id: u64,
418    pub op: u8,
419    pub namespace: String,
420    pub key: String,
421    pub payload: Vec<u8>,
422}
423
424/// rkyv-archivable wire response.
425#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone)]
426pub struct WireResponse {
427    pub id: u64,
428    pub status: u8,
429    pub op: u8,
430    pub payload: Vec<u8>,
431    pub message: String,
432}
433
434impl Codec for WireRequest {
435    type Encoded = AlignedVec;
436
437    fn encode(&self) -> Result<Self::Encoded, CodecError> {
438        rkyv::to_bytes::<RkyvError>(self).map_err(CodecError::encode)
439    }
440
441    fn decode(bytes: &[u8]) -> Result<Self, CodecError> {
442        let archived = rkyv::access::<<WireRequest as Archive>::Archived, RkyvError>(bytes)
443            .map_err(CodecError::decode)?;
444        rkyv::deserialize::<Self, RkyvError>(archived).map_err(CodecError::decode)
445    }
446}
447
448impl Codec for WireResponse {
449    type Encoded = AlignedVec;
450
451    fn encode(&self) -> Result<Self::Encoded, CodecError> {
452        rkyv::to_bytes::<RkyvError>(self).map_err(CodecError::encode)
453    }
454
455    fn decode(bytes: &[u8]) -> Result<Self, CodecError> {
456        let archived = rkyv::access::<<WireResponse as Archive>::Archived, RkyvError>(bytes)
457            .map_err(CodecError::decode)?;
458        rkyv::deserialize::<Self, RkyvError>(archived).map_err(CodecError::decode)
459    }
460}
461
462// ============================================================
463// Block-shaped op payload types (rkyv-archived, ABI 1.5 / V2)
464// ============================================================
465//
466// These structs are NOT wire envelopes, they ride INSIDE the
467// `WireRequest.payload` / `WireResponse.payload` `Vec<u8>` fields. The
468// envelope (op code, request id, namespace, status, message) stays in
469// `WireRequest`/`WireResponse`; only the per-op shape (hash lists,
470// per-block payload slices, matched counts) lives here. This keeps the
471// existing daemon ↔ client framing unchanged, adding a new block-shaped
472// op is purely additive at the opcode dispatch level.
473//
474// Encoding format (V2, RFC 0008 §4e):
475//   - 16-byte ASCII magic header (per-op, null-padded, includes V2 tag)
476//   - rkyv::to_bytes::<rancor::Error>(payload).to_vec()
477//
478// The V2 magic header reuses the same naming pattern as the prior
479// length-prefix codec so decode failures still point at the exact op
480// that produced the malformed frame.
481
482/// Request payload for [`op::LOOKUP_BLOCK_PREFIX`].
483///
484/// Carries the namespace + ordered list of 64-char lower-hex blake3
485/// block hashes. The daemon resolves each hash via the in-process
486/// `metadata_index().longest_prefix(...)` and replies with the leading-
487/// hit count.
488#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
489pub struct LookupBlockPrefixReq {
490    pub namespace: String,
491    /// Each entry MUST be exactly 64 lower-hex characters (32 bytes).
492    pub block_hashes_hex: Vec<String>,
493}
494
495/// Response payload for [`op::LOOKUP_BLOCK_PREFIX`].
496///
497/// `matched_count` is the number of hashes, counted from index 0 of
498/// the request's `block_hashes_hex`, that the daemon's metadata
499/// index recognized before the first miss. `error` is `Some(msg)` only
500/// for malformed inputs (bad hex, etc.); the wire-level `WireResponse.status`
501/// also carries the OK/ERROR distinction so callers can branch without
502/// re-decoding.
503#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
504pub struct LookupBlockPrefixResp {
505    pub matched_count: u32,
506    pub error: Option<String>,
507}
508
509/// Request payload for [`op::GET_KV_BLOCKS_BATCH`].
510///
511/// Carries the namespace + ordered list of 64-char lower-hex blake3
512/// block hashes. The daemon resolves each hash to a key under
513/// `wombatkv/v1/block/b3=<hex>`, parallel-fetches via `get_kv`, and replies
514/// with all-or-nothing payload bytes.
515#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
516pub struct GetKvBlocksBatchReq {
517    pub namespace: String,
518    pub block_hashes_hex: Vec<String>,
519}
520
521/// Response payload for [`op::GET_KV_BLOCKS_BATCH`].
522///
523/// On full hit `payloads` is `Some(per-block bytes in input order)`; on
524/// any miss it is `None` (matches the cabi `get_kv_blocks_borrowed`
525/// all-or-nothing semantics). `error` is `Some(msg)` for backend errors
526/// distinct from a miss.
527#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
528pub struct GetKvBlocksBatchResp {
529    /// `None` => at least one block was missing (caller treats as miss).
530    /// `Some(items)` => `items.len() == request.block_hashes_hex.len()`.
531    pub payloads: Option<Vec<Vec<u8>>>,
532    pub error: Option<String>,
533}
534
535/// Request payload for [`op::PUT_KV_BLOCKS_BATCH`].
536///
537/// Carries the namespace, ordered list of 64-char lower-hex blake3
538/// hashes, and matching ordered payloads. The daemon writes each block
539/// under `wombatkv/v1/block/b3=<hex>` and updates its in-process metadata
540/// index so a subsequent `LOOKUP_BLOCK_PREFIX` sees the new presence.
541#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
542pub struct PutKvBlocksBatchReq {
543    pub namespace: String,
544    pub block_hashes_hex: Vec<String>,
545    pub payloads: Vec<Vec<u8>>,
546}
547
548/// Response payload for [`op::PUT_KV_BLOCKS_BATCH`].
549///
550/// `total_bytes` is the sum of all `payloads[i].len()` on success.
551/// `error` is `Some(msg)` if any per-block PUT failed; in that case the
552/// metadata index is left unchanged for the failed batch (server-side
553/// transaction: index updates only run after ALL per-block PUTs succeed).
554#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
555pub struct PutKvBlocksBatchResp {
556    pub total_bytes: u64,
557    pub error: Option<String>,
558}
559
560// ------------------------------------------------------------
561// Internal helpers for the block-shaped rkyv V2 codecs (RFC 0008 §4e).
562//
563// Each wire payload is `MAGIC (16 bytes) || rkyv::to_bytes(value)`. The
564// magic distinguishes the op + version (V2 = rkyv); without it a stray
565// frame would still bytecheck because the body is well-aligned, and we
566// would lose the "WMBT_{op}_{role}_V{n}" decode-error trace.
567// ------------------------------------------------------------
568
569fn expect_magic<'a>(
570    payload: &'a [u8],
571    magic: &'static [u8],
572    op_name: &'static str,
573) -> Result<&'a [u8], WireCodecError> {
574    if !payload.starts_with(magic) {
575        let got_len = magic.len().min(payload.len());
576        return Err(WireCodecError::BadMagic {
577            op: op_name,
578            expected: magic,
579            got_len,
580            got: payload[..got_len].to_vec(),
581        });
582    }
583    Ok(&payload[magic.len()..])
584}
585
586/// Generic rkyv encode: prepend a per-op magic header, then archive
587/// the value. The body is allocator-aligned by rkyv but slicing past
588/// the 16-byte prefix preserves 16-byte alignment for free (16 % 16 = 0)
589/// so the decoder can `access` the body in-place without copying.
590fn rkyv_encode_with_magic<T>(
591    magic: &'static [u8],
592    op_name: &'static str,
593    value: &T,
594) -> Result<Vec<u8>, WireCodecError>
595where
596    T: for<'a> rkyv::Serialize<
597        rkyv::api::high::HighSerializer<
598            rkyv::util::AlignedVec,
599            rkyv::ser::allocator::ArenaHandle<'a>,
600            RkyvError,
601        >,
602    >,
603{
604    let body = rkyv::to_bytes::<RkyvError>(value)
605        .map_err(|e| WireCodecError::Rkyv { op: op_name, source: e })?;
606    let mut out = Vec::with_capacity(magic.len() + body.len());
607    out.extend_from_slice(magic);
608    out.extend_from_slice(body.as_slice());
609    Ok(out)
610}
611
612/// Generic rkyv decode: strip the magic, copy the body into an
613/// `AlignedVec<16>` (the body lives at +16 from the caller's slice,
614/// preserving alignment of the underlying buffer, but we still copy
615/// for safety because the `Vec<u8>` payload carrier inside
616/// `WireRequest`/`WireResponse` may have been resliced through code
617/// paths we don't control), then run `from_bytes`.
618fn rkyv_decode_with_magic<T>(
619    payload: &[u8],
620    magic: &'static [u8],
621    op_name: &'static str,
622) -> Result<T, WireCodecError>
623where
624    T: rkyv::Archive,
625    T::Archived: for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, RkyvError>>
626        + rkyv::Deserialize<T, rkyv::api::high::HighDeserializer<RkyvError>>,
627{
628    let body = expect_magic(payload, magic, op_name)?;
629    let mut aligned: rkyv::util::AlignedVec<16> = rkyv::util::AlignedVec::with_capacity(body.len());
630    aligned.extend_from_slice(body);
631    rkyv::from_bytes::<T, RkyvError>(&aligned[..])
632        .map_err(|e| WireCodecError::Rkyv { op: op_name, source: e })
633}
634
635// ------------------------------------------------------------
636// LookupBlockPrefix Req/Resp codec.
637// ------------------------------------------------------------
638
639/// rkyv-encode a [`LookupBlockPrefixReq`] for `WireRequest.payload`.
640///
641/// Layout: `magic(16) | rkyv::to_bytes(req)`
642pub fn encode_lookup_block_prefix_req(
643    req: &LookupBlockPrefixReq,
644) -> Result<Vec<u8>, WireCodecError> {
645    rkyv_encode_with_magic(LOOKUP_BLOCK_PREFIX_REQ_MAGIC, "lookup_block_prefix_req", req)
646}
647
648pub fn decode_lookup_block_prefix_req(
649    bytes: &[u8],
650) -> Result<LookupBlockPrefixReq, WireCodecError> {
651    rkyv_decode_with_magic::<LookupBlockPrefixReq>(
652        bytes,
653        LOOKUP_BLOCK_PREFIX_REQ_MAGIC,
654        "lookup_block_prefix_req",
655    )
656}
657
658/// rkyv-encode a [`LookupBlockPrefixResp`] for `WireResponse.payload`.
659pub fn encode_lookup_block_prefix_resp(
660    resp: &LookupBlockPrefixResp,
661) -> Result<Vec<u8>, WireCodecError> {
662    rkyv_encode_with_magic(LOOKUP_BLOCK_PREFIX_RESP_MAGIC, "lookup_block_prefix_resp", resp)
663}
664
665pub fn decode_lookup_block_prefix_resp(
666    bytes: &[u8],
667) -> Result<LookupBlockPrefixResp, WireCodecError> {
668    rkyv_decode_with_magic::<LookupBlockPrefixResp>(
669        bytes,
670        LOOKUP_BLOCK_PREFIX_RESP_MAGIC,
671        "lookup_block_prefix_resp",
672    )
673}
674
675// ------------------------------------------------------------
676// GetKvBlocksBatch Req/Resp codec.
677// ------------------------------------------------------------
678
679pub fn encode_get_kv_blocks_batch_req(
680    req: &GetKvBlocksBatchReq,
681) -> Result<Vec<u8>, WireCodecError> {
682    rkyv_encode_with_magic(GET_KV_BLOCKS_BATCH_REQ_MAGIC, "get_kv_blocks_batch_req", req)
683}
684
685pub fn decode_get_kv_blocks_batch_req(bytes: &[u8]) -> Result<GetKvBlocksBatchReq, WireCodecError> {
686    rkyv_decode_with_magic::<GetKvBlocksBatchReq>(
687        bytes,
688        GET_KV_BLOCKS_BATCH_REQ_MAGIC,
689        "get_kv_blocks_batch_req",
690    )
691}
692
693pub fn encode_get_kv_blocks_batch_resp(
694    resp: &GetKvBlocksBatchResp,
695) -> Result<Vec<u8>, WireCodecError> {
696    rkyv_encode_with_magic(GET_KV_BLOCKS_BATCH_RESP_MAGIC, "get_kv_blocks_batch_resp", resp)
697}
698
699pub fn decode_get_kv_blocks_batch_resp(
700    bytes: &[u8],
701) -> Result<GetKvBlocksBatchResp, WireCodecError> {
702    rkyv_decode_with_magic::<GetKvBlocksBatchResp>(
703        bytes,
704        GET_KV_BLOCKS_BATCH_RESP_MAGIC,
705        "get_kv_blocks_batch_resp",
706    )
707}
708
709// ------------------------------------------------------------
710// PutKvBlocksBatch Req/Resp codec.
711// ------------------------------------------------------------
712
713pub fn encode_put_kv_blocks_batch_req(
714    req: &PutKvBlocksBatchReq,
715) -> Result<Vec<u8>, WireCodecError> {
716    // Mirror the prior pre-flight check so callers get a precise error
717    // rather than a downstream daemon panic. rkyv would still encode a
718    // shape-mismatched record but the daemon would loop over hashes
719    // assuming payloads[i] exists for each i.
720    if req.block_hashes_hex.len() != req.payloads.len() {
721        return Err(WireCodecError::BodyLengthMismatch {
722            what: "put_kv_blocks_batch_req",
723            claimed: req.block_hashes_hex.len(),
724            actual: req.payloads.len(),
725        });
726    }
727    rkyv_encode_with_magic(PUT_KV_BLOCKS_BATCH_REQ_MAGIC, "put_kv_blocks_batch_req", req)
728}
729
730pub fn decode_put_kv_blocks_batch_req(bytes: &[u8]) -> Result<PutKvBlocksBatchReq, WireCodecError> {
731    rkyv_decode_with_magic::<PutKvBlocksBatchReq>(
732        bytes,
733        PUT_KV_BLOCKS_BATCH_REQ_MAGIC,
734        "put_kv_blocks_batch_req",
735    )
736}
737
738pub fn encode_put_kv_blocks_batch_resp(
739    resp: &PutKvBlocksBatchResp,
740) -> Result<Vec<u8>, WireCodecError> {
741    rkyv_encode_with_magic(PUT_KV_BLOCKS_BATCH_RESP_MAGIC, "put_kv_blocks_batch_resp", resp)
742}
743
744pub fn decode_put_kv_blocks_batch_resp(
745    bytes: &[u8],
746) -> Result<PutKvBlocksBatchResp, WireCodecError> {
747    rkyv_decode_with_magic::<PutKvBlocksBatchResp>(
748        bytes,
749        PUT_KV_BLOCKS_BATCH_RESP_MAGIC,
750        "put_kv_blocks_batch_resp",
751    )
752}
753
754/// Internal SHM prefix prepended to every segment the daemon creates.
755/// Short (2 chars + role char) so the disruptor-mp auxiliary suffixes
756/// (`_producer_seq` = 13 chars, `_cr`, `_ci`, `_<consumer_id>_seq`) fit
757/// within the macOS POSIX-SHM 31-char budget for reasonable user prefixes.
758///
759/// The historical `wmbt_kv_<prefix>_<role>` shape consumed 13 chars of
760/// budget before the user prefix; under disruptor-mp's `_producer_seq`
761/// suffix that gave 30 − 13 − 13 = 4 chars for the prefix, which made
762/// any meaningful daemon prefix illegal on macOS (the seed=999 DST
763/// failure on Mac). Shrinking the wrapper to `wk<prefix><role>` recovers
764/// the budget back to 30 − 4 − 13 = 13 chars of user prefix on macOS.
765const SHM_PREFIX: &str = "wk";
766
767/// `r` for the request ring (client→daemon); `s` for the response ring
768/// (daemon→client). Single-char to stay tight on the macOS budget.
769const ROLE_REQ: char = 'r';
770const ROLE_RESP: char = 's';
771
772/// Longest auxiliary suffix disruptor-mp appends per ring segment -
773/// `_producer_seq` (13 chars including the leading `_`). The other
774/// suffixes (`_cr`, `_ci`, `_<consumer_id>_seq` with our 2-char
775/// `dn`/`cn` consumer ids = 7 chars) are shorter and don't bind the
776/// budget. If disruptor-mp's internal naming ever grows past 13 chars,
777/// the validator below stays correct only after this constant is
778/// updated to match.
779const MAX_DISRUPTOR_INTERNAL_SUFFIX_LEN: usize = 13;
780
781/// Build the request and response SHM segment names for a daemon prefix.
782/// The format is `wk<prefix>r` and `wk<prefix>s` (no underscores) so the
783/// names stay short enough that disruptor-mp's per-ring auxiliary
784/// segments (notably `<base>_producer_seq`) still fit the macOS
785/// POSIX-SHM 31-char budget.
786pub fn segment_names(prefix: &str) -> (String, String) {
787    (format!("{SHM_PREFIX}{prefix}{ROLE_REQ}"), format!("{SHM_PREFIX}{prefix}{ROLE_RESP}"))
788}
789
790/// macOS `POSIX_SHM_NAME_MAX` = 31 chars (including null). We use 30 for
791/// the user-visible name to leave the kernel one byte of slack.
792const SHM_SEGMENT_NAME_MAX_LEN_MACOS: usize = 30;
793
794/// Validate that the SHM segment names derived from `prefix`, including
795/// the longest disruptor-mp auxiliary segment (`<base>_producer_seq`) -
796/// will fit the macOS POSIX-SHM 31-char budget. Returns a clear,
797/// actionable error so daemon and client both fail loud at startup
798/// instead of letting the OS surface a cryptic `ENAMETOOLONG (errno 63)`
799/// (or worse, a `Shared segment not found` after thousands of retries -
800/// see RFC 0011 P10 / the 2026-05-18 macOS DST-sweep regression).
801///
802/// The math:
803///
804/// ```text
805///     longest name = wk<prefix><role>_producer_seq
806///                  = 2 + prefix.len() + 1 + 13
807///                  = 16 + prefix.len()
808/// ```
809///
810/// At the 30-char macOS budget the prefix is capped at **14 chars**.
811/// On Linux SHM names are filesystem paths (effectively unbounded);
812/// the validator is a no-op there beyond a sanity check.
813pub fn validate_segment_name_budget(prefix: &str) -> Result<(), WireCodecError> {
814    let (req, resp) = segment_names(prefix);
815    for base in [&req, &resp] {
816        // Real binding constraint: base name + the longest disruptor
817        // suffix. The suffix lives in the SHM namespace too, so it has
818        // to fit the same 30-char budget.
819        let derived_len = base.len() + MAX_DISRUPTOR_INTERNAL_SUFFIX_LEN;
820        if derived_len > SHM_SEGMENT_NAME_MAX_LEN_MACOS {
821            // Compute the largest acceptable prefix length: budget −
822            // (wk prefix + role + producer_seq).
823            let fixed = SHM_PREFIX.len() + 1 + MAX_DISRUPTOR_INTERNAL_SUFFIX_LEN;
824            let max_prefix = SHM_SEGMENT_NAME_MAX_LEN_MACOS.saturating_sub(fixed);
825            let derived_name = format!("{base}_producer_seq");
826            return Err(WireCodecError::SegmentNameBudget(format!(
827                "wombatkv: SHM segment '{derived_name}' ({derived_len} chars) \
828                 exceeds the macOS POSIX-SHM budget of {SHM_SEGMENT_NAME_MAX_LEN_MACOS} \
829                 chars. Shorten the daemon prefix '{prefix}' ({} chars) to at \
830                 most {max_prefix} chars. The internal disruptor-mp segments \
831                 add up to {MAX_DISRUPTOR_INTERNAL_SUFFIX_LEN} chars of suffix \
832                 ('_producer_seq' is the longest); the wombatkv wrapper adds \
833                 '{SHM_PREFIX}' + 1-char role. For strict cross-platform \
834                 portability (Linux, FreeBSD), see \
835                 `myelon::portable_shm_segment_name`, recommends total \
836                 name ≤ {} chars.",
837                prefix.len(),
838                myelon::PORTABLE_SHM_SEGMENT_NAME_MAX_LEN,
839            )));
840        }
841    }
842    Ok(())
843}
844
845/// Open the daemon side of the ring pair: req-consumer + resp-producer.
846///
847/// Daemon must call this AFTER both segments exist. The `req_seg` is
848/// created by the client before connecting; the daemon attaches.
849/// The `resp_seg` is created by the daemon; the client attaches.
850pub fn open_daemon(
851    req_seg: &str,
852    resp_seg: &str,
853    depth: usize,
854) -> Result<(TypedConsumer<ShmFrame>, TypedProducer<ShmFrame>), Box<dyn std::error::Error>> {
855    let req_consumer = wait_for_consumer(req_seg, depth, REQ_CONSUMER_ID)?;
856    let resp_producer = TypedProducer::<ShmFrame>::create_with_consumers(resp_seg, depth, 1)?;
857    Ok((req_consumer, resp_producer))
858}
859
860/// Open the client side of the ring pair: req-producer + resp-consumer.
861pub fn open_client(
862    req_seg: &str,
863    resp_seg: &str,
864    depth: usize,
865) -> Result<(TypedProducer<ShmFrame>, TypedConsumer<ShmFrame>), Box<dyn std::error::Error>> {
866    let req_producer = TypedProducer::<ShmFrame>::create_with_consumers(req_seg, depth, 1)?;
867    let resp_consumer = wait_for_consumer(resp_seg, depth, RESP_CONSUMER_ID)?;
868    Ok((req_producer, resp_consumer))
869}
870
871/// Read `WMBT_KV_DAEMON_SHM_WAIT_STRATEGY` env. Default is `Block`
872/// (signal-driven park), which saves ~100% CPU per ring at idle vs
873/// the previous `BusySpin` default. Wake latency in Block mode is ~µs,
874/// invisible against our actual workload (ms-scale S3 GET + Metal
875/// decode dominates). Set to `busyspin` for ultra-low-latency RPC
876/// scenarios where wake-µs matter and CPU is free.
877fn wait_strategy_from_env() -> MyelonWaitStrategy {
878    match std::env::var("WMBT_KV_DAEMON_SHM_WAIT_STRATEGY").ok().as_deref().map(str::trim) {
879        Some("busyspin" | "BusySpin" | "spin") => MyelonWaitStrategy::BusySpin,
880        Some("block" | "Block" | "park") => MyelonWaitStrategy::Block,
881        Some(other) if !other.is_empty() => {
882            eprintln!(
883                "WombatKV: unknown WMBT_KV_DAEMON_SHM_WAIT_STRATEGY={other:?}, \
884                 falling back to default 'block'"
885            );
886            MyelonWaitStrategy::Block
887        }
888        _ => MyelonWaitStrategy::Block,
889    }
890}
891
892fn wait_for_consumer(
893    segment: &str,
894    depth: usize,
895    consumer_id: &str,
896) -> Result<TypedConsumer<ShmFrame>, Box<dyn std::error::Error>> {
897    use std::time::Instant;
898    let deadline = Instant::now() + effective_attach_timeout();
899    let mut tries: u64 = 0;
900    let wait_strategy = wait_strategy_from_env();
901    loop {
902        tries += 1;
903        match TypedConsumer::<ShmFrame>::attach_with_consumer_id(
904            segment,
905            depth,
906            consumer_id,
907            wait_strategy,
908        ) {
909            Ok(consumer) => return Ok(consumer),
910            Err(error) => {
911                if Instant::now() < deadline {
912                    // Use myelon's discovery-poll primitive instead
913                    // of an ad-hoc sleep. Same env-var control surface as the
914                    // rest of the myelon transport stack.
915                    myelon::perform_default_discovery_poll_wait();
916                    continue;
917                }
918                return Err(format!("attach {segment}: {error} (after {tries} retries)").into());
919            }
920        }
921    }
922}
923
924/// Returns true when a payload of `len` bytes fits in a single frame
925/// (after rkyv envelope overhead, uses a conservative 256-byte budget).
926#[must_use]
927pub fn fits_one_frame(len: usize) -> bool {
928    len.saturating_add(256) <= FRAME_DATA_BYTES
929}
930
931#[cfg(test)]
932mod tests {
933    use super::*;
934
935    #[test]
936    fn ping_request_round_trips() {
937        let req = WireRequest {
938            id: 42,
939            op: op::PING,
940            namespace: String::new(),
941            key: String::new(),
942            payload: Vec::new(),
943        };
944        let encoded = req.encode().expect("encode");
945        let decoded = WireRequest::decode(encoded.as_ref()).expect("decode");
946        assert_eq!(decoded.id, 42);
947        assert_eq!(decoded.op, op::PING);
948    }
949
950    #[test]
951    fn put_response_round_trips_with_payload() {
952        let resp = WireResponse {
953            id: 7,
954            status: status::OK,
955            op: op::PUT,
956            payload: vec![0xAB; 4096],
957            message: String::new(),
958        };
959        let encoded = resp.encode().expect("encode");
960        let decoded = WireResponse::decode(encoded.as_ref()).expect("decode");
961        assert_eq!(decoded.id, 7);
962        assert_eq!(decoded.status, status::OK);
963        assert_eq!(decoded.payload.len(), 4096);
964        assert_eq!(decoded.payload[0], 0xAB);
965    }
966
967    #[test]
968    fn frame_capacity_check() {
969        assert!(fits_one_frame(0));
970        assert!(fits_one_frame(FRAME_DATA_BYTES - 256));
971        assert!(!fits_one_frame(FRAME_DATA_BYTES));
972        assert!(!fits_one_frame(FRAME_DATA_BYTES * 2));
973    }
974
975    #[test]
976    fn key_batch_round_trips() {
977        let keys = vec!["a".to_string(), "nested/key".to_string()];
978        let encoded = encode_key_batch(&keys);
979        assert_eq!(decode_key_batch(&encoded).expect("decode keys"), keys);
980    }
981
982    #[test]
983    fn bytes_batch_round_trips_without_copying_items() {
984        let items = vec![Bytes::from_static(b"alpha"), Bytes::new(), Bytes::from_static(b"gamma")];
985        let encoded = Bytes::from(encode_bytes_batch(&items));
986        let decoded = decode_bytes_batch(encoded).expect("decode bytes");
987        assert_eq!(decoded, items);
988    }
989
990    #[test]
991    fn segment_names_are_unique_per_prefix() {
992        let (req, resp) = segment_names("smoke");
993        // New short format `wk<prefix>r` / `wk<prefix>s` (see SHM_PREFIX
994        // + ROLE_REQ/ROLE_RESP constants). Keeps disruptor-mp's
995        // `_producer_seq` suffix (+13 chars) within the macOS POSIX-SHM
996        // 31-char budget for user prefixes up to 14 chars.
997        assert_eq!(req, "wksmoker");
998        assert_eq!(resp, "wksmokes");
999        assert_ne!(req, resp);
1000    }
1001
1002    #[test]
1003    fn validate_segment_name_budget_accepts_short_prefix() {
1004        // Prefix that produced ENAMETOOLONG under the old wmbt_kv_<>_<>
1005        // shape: `wmbt_kv_drts999_resp_producer_seq` = 32 chars vs 30
1006        // budget, must now pass under the new `wk<>r`/`wk<>s` shape
1007        // (`wkdrts999s_producer_seq` = 23 chars). This was the 2026-05-18
1008        // macOS DST sweep regression.
1009        validate_segment_name_budget("drts999").expect("dst-sweep prefix");
1010        validate_segment_name_budget("drts42").expect("dst-sweep prefix");
1011        validate_segment_name_budget("dmcoreds").expect("typical");
1012    }
1013
1014    #[test]
1015    fn validate_segment_name_budget_rejects_too_long_prefix() {
1016        // Max user prefix on macOS: 30 − len("wk") − 1 (role) − 13
1017        // (_producer_seq) = 14. 15 chars must fail.
1018        let too_long = "a".repeat(15);
1019        let err = validate_segment_name_budget(&too_long)
1020            .expect_err("15-char prefix must exceed macOS budget");
1021        assert!(err.to_string().contains("exceeds the macOS"));
1022        // 14-char prefix is the hard edge, must accept.
1023        validate_segment_name_budget(&"a".repeat(14)).expect("14 chars at the edge");
1024    }
1025
1026    #[test]
1027    fn block_opcodes_are_distinct_and_contiguous() {
1028        // Block-shaped opcodes were added in ABI 1.5 as a contiguous
1029        // block AFTER the original 10 op codes. Lock the assignment so
1030        // an accidental reorder/collision fails this test, not at
1031        // runtime against an existing daemon build.
1032        assert_eq!(op::LOOKUP_BLOCK_PREFIX, 11);
1033        assert_eq!(op::GET_KV_BLOCKS_BATCH, 12);
1034        assert_eq!(op::PUT_KV_BLOCKS_BATCH, 13);
1035        assert_eq!(op::name(op::LOOKUP_BLOCK_PREFIX), "lookup_block_prefix");
1036        assert_eq!(op::name(op::GET_KV_BLOCKS_BATCH), "get_kv_blocks_batch");
1037        assert_eq!(op::name(op::PUT_KV_BLOCKS_BATCH), "put_kv_blocks_batch");
1038    }
1039
1040    #[test]
1041    fn lookup_block_prefix_req_resp_round_trip() {
1042        let req = LookupBlockPrefixReq {
1043            namespace: "ns/alpha".to_string(),
1044            block_hashes_hex: vec!["aa".repeat(32), "bb".repeat(32)],
1045        };
1046        let bytes = encode_lookup_block_prefix_req(&req).expect("encode req");
1047        let decoded = decode_lookup_block_prefix_req(&bytes).expect("decode req");
1048        assert_eq!(decoded.namespace, req.namespace);
1049        assert_eq!(decoded.block_hashes_hex, req.block_hashes_hex);
1050
1051        let resp_ok = LookupBlockPrefixResp { matched_count: 7, error: None };
1052        let bytes_ok = encode_lookup_block_prefix_resp(&resp_ok).expect("encode resp ok");
1053        let decoded_ok = decode_lookup_block_prefix_resp(&bytes_ok).expect("decode resp ok");
1054        assert_eq!(decoded_ok.matched_count, 7);
1055        assert!(decoded_ok.error.is_none());
1056
1057        let resp_err =
1058            LookupBlockPrefixResp { matched_count: 0, error: Some("bad hex at pos 3".to_string()) };
1059        let bytes_err = encode_lookup_block_prefix_resp(&resp_err).expect("encode resp err");
1060        let decoded_err = decode_lookup_block_prefix_resp(&bytes_err).expect("decode resp err");
1061        assert_eq!(decoded_err.matched_count, 0);
1062        assert_eq!(decoded_err.error.as_deref(), Some("bad hex at pos 3"));
1063    }
1064
1065    #[test]
1066    fn get_kv_blocks_batch_req_resp_round_trip() {
1067        let req = GetKvBlocksBatchReq {
1068            namespace: "ns/beta".to_string(),
1069            block_hashes_hex: vec!["11".repeat(32), "22".repeat(32), "33".repeat(32)],
1070        };
1071        let bytes = encode_get_kv_blocks_batch_req(&req).expect("encode req");
1072        let decoded = decode_get_kv_blocks_batch_req(&bytes).expect("decode req");
1073        assert_eq!(decoded.namespace, req.namespace);
1074        assert_eq!(decoded.block_hashes_hex, req.block_hashes_hex);
1075
1076        let payloads = vec![vec![0xAAu8; 4096], vec![0xBBu8; 1024], vec![0xCCu8; 2048]];
1077        let resp_hit = GetKvBlocksBatchResp { payloads: Some(payloads.clone()), error: None };
1078        let bytes_hit = encode_get_kv_blocks_batch_resp(&resp_hit).expect("encode resp hit");
1079        let decoded_hit = decode_get_kv_blocks_batch_resp(&bytes_hit).expect("decode resp hit");
1080        assert!(decoded_hit.error.is_none());
1081        let got = decoded_hit.payloads.expect("payloads present");
1082        assert_eq!(got.len(), 3);
1083        assert_eq!(got[0].len(), 4096);
1084        assert_eq!(got[1].len(), 1024);
1085        assert_eq!(got[2].len(), 2048);
1086        assert_eq!(got[0][0], 0xAA);
1087
1088        let resp_miss = GetKvBlocksBatchResp { payloads: None, error: None };
1089        let bytes_miss = encode_get_kv_blocks_batch_resp(&resp_miss).expect("encode resp miss");
1090        let decoded_miss = decode_get_kv_blocks_batch_resp(&bytes_miss).expect("decode resp miss");
1091        assert!(decoded_miss.payloads.is_none());
1092    }
1093
1094    #[test]
1095    fn put_kv_blocks_batch_req_resp_round_trip() {
1096        let req = PutKvBlocksBatchReq {
1097            namespace: "ns/gamma".to_string(),
1098            block_hashes_hex: vec!["77".repeat(32), "88".repeat(32)],
1099            payloads: vec![vec![0x77u8; 512], vec![0x88u8; 1024]],
1100        };
1101        let bytes = encode_put_kv_blocks_batch_req(&req).expect("encode req");
1102        let decoded = decode_put_kv_blocks_batch_req(&bytes).expect("decode req");
1103        assert_eq!(decoded.namespace, req.namespace);
1104        assert_eq!(decoded.block_hashes_hex, req.block_hashes_hex);
1105        assert_eq!(decoded.payloads.len(), 2);
1106        assert_eq!(decoded.payloads[0], req.payloads[0]);
1107        assert_eq!(decoded.payloads[1].len(), 1024);
1108
1109        let resp_ok = PutKvBlocksBatchResp { total_bytes: 1536, error: None };
1110        let bytes_ok = encode_put_kv_blocks_batch_resp(&resp_ok).expect("encode resp ok");
1111        let decoded_ok = decode_put_kv_blocks_batch_resp(&bytes_ok).expect("decode resp ok");
1112        assert_eq!(decoded_ok.total_bytes, 1536);
1113        assert!(decoded_ok.error.is_none());
1114
1115        let resp_err =
1116            PutKvBlocksBatchResp { total_bytes: 0, error: Some("backend put failure".to_string()) };
1117        let bytes_err = encode_put_kv_blocks_batch_resp(&resp_err).expect("encode resp err");
1118        let decoded_err = decode_put_kv_blocks_batch_resp(&bytes_err).expect("decode resp err");
1119        assert_eq!(decoded_err.total_bytes, 0);
1120        assert_eq!(decoded_err.error.as_deref(), Some("backend put failure"));
1121    }
1122
1123    #[test]
1124    fn block_payload_magic_headers_are_stable() {
1125        // Locks the on-wire magic bytes so an accidental rename of a
1126        // codec constant doesn't silently break compatibility with a
1127        // running daemon built from an older revision.
1128        assert_eq!(LOOKUP_BLOCK_PREFIX_REQ_MAGIC, b"WMBT_LBP_REQ\0\0\0\0");
1129        assert_eq!(LOOKUP_BLOCK_PREFIX_RESP_MAGIC, b"WMBT_LBP_RES\0\0\0\0");
1130        assert_eq!(GET_KV_BLOCKS_BATCH_REQ_MAGIC, b"WMBT_GBB_REQ\0\0\0\0");
1131        assert_eq!(GET_KV_BLOCKS_BATCH_RESP_MAGIC, b"WMBT_GBB_RES\0\0\0\0");
1132        assert_eq!(PUT_KV_BLOCKS_BATCH_REQ_MAGIC, b"WMBT_PBB_REQ\0\0\0\0");
1133        assert_eq!(PUT_KV_BLOCKS_BATCH_RESP_MAGIC, b"WMBT_PBB_RES\0\0\0\0");
1134
1135        // Every encode_* produces a payload that starts with its magic.
1136        let lbp_req = encode_lookup_block_prefix_req(&LookupBlockPrefixReq {
1137            namespace: "ns".to_string(),
1138            block_hashes_hex: vec!["aa".repeat(32)],
1139        })
1140        .expect("encode");
1141        assert!(lbp_req.starts_with(LOOKUP_BLOCK_PREFIX_REQ_MAGIC));
1142
1143        let gbb_resp = encode_get_kv_blocks_batch_resp(&GetKvBlocksBatchResp {
1144            payloads: Some(vec![vec![0u8; 4]]),
1145            error: None,
1146        })
1147        .expect("encode");
1148        assert!(gbb_resp.starts_with(GET_KV_BLOCKS_BATCH_RESP_MAGIC));
1149
1150        let pbb_resp =
1151            encode_put_kv_blocks_batch_resp(&PutKvBlocksBatchResp { total_bytes: 42, error: None })
1152                .expect("encode");
1153        assert!(pbb_resp.starts_with(PUT_KV_BLOCKS_BATCH_RESP_MAGIC));
1154    }
1155
1156    #[test]
1157    fn block_payload_codec_rejects_corruption() {
1158        // Magic mismatch should fail loudly with a recognizable error
1159        // that identifies the op, not panic, not silently succeed.
1160        let mut bad_magic = encode_lookup_block_prefix_req(&LookupBlockPrefixReq {
1161            namespace: "ns".to_string(),
1162            block_hashes_hex: vec!["aa".repeat(32)],
1163        })
1164        .expect("encode");
1165        bad_magic[0] = b'X';
1166        let err = decode_lookup_block_prefix_req(&bad_magic).expect_err("must fail");
1167        assert!(err.to_string().contains("lookup_block_prefix_req"));
1168        assert!(err.to_string().contains("bad magic"));
1169
1170        // Truncated payload (chop off the last few archived bytes)
1171        // should fail at rkyv bytecheck, not panic.
1172        let full = encode_put_kv_blocks_batch_req(&PutKvBlocksBatchReq {
1173            namespace: "ns".to_string(),
1174            block_hashes_hex: vec!["aa".repeat(32), "bb".repeat(32)],
1175            payloads: vec![vec![1u8; 8], vec![2u8; 8]],
1176        })
1177        .expect("encode");
1178        let truncated = &full[..full.len() - 8];
1179        let err = decode_put_kv_blocks_batch_req(truncated).expect_err("must fail");
1180        assert!(
1181            err.to_string().contains("put_kv_blocks_batch_req"),
1182            "unexpected error message: {err}"
1183        );
1184
1185        // Corrupting a byte in the middle of the archived body should
1186        // make rkyv bytecheck refuse the decode (or, less commonly,
1187        // produce a Vec/String layout error). Either way the decode
1188        // returns Err, never a torn read or panic.
1189        let mut tampered = encode_get_kv_blocks_batch_resp(&GetKvBlocksBatchResp {
1190            payloads: Some(vec![vec![0xAAu8; 8]]),
1191            error: None,
1192        })
1193        .expect("encode");
1194        // Hit a byte ~halfway through the body so we land inside the
1195        // rkyv archive's metadata rather than the trailing payload.
1196        let mid = usize::midpoint(GET_KV_BLOCKS_BATCH_RESP_MAGIC.len(), tampered.len());
1197        tampered[mid] ^= 0xFF;
1198        let result = decode_get_kv_blocks_batch_resp(&tampered);
1199        // We accept either Err or a structurally-different Ok, but
1200        // never a panic. In practice rkyv catches this at the layout
1201        // level and returns Err with op_name prefixed.
1202        if let Ok(decoded) = result {
1203            // If bytecheck happened to accept the tampered bytes
1204            // (which is unlikely but possible for byte positions that
1205            // land in a payload byte), the decoded value should still
1206            // be structurally well-formed.
1207            let _ = decoded;
1208        }
1209    }
1210
1211    #[test]
1212    fn block_payload_encodes_large_blocks_without_overflow() {
1213        // 1 MiB blocks × 4, exercises the u32 length prefix path and
1214        // ensures the capacity-estimator + writer agree on the total.
1215        let big_block = vec![0xCDu8; 1024 * 1024];
1216        let req = PutKvBlocksBatchReq {
1217            namespace: "ns".to_string(),
1218            block_hashes_hex: (0..4).map(|i| format!("{i:02x}").repeat(32)).collect(),
1219            payloads: vec![big_block.clone(); 4],
1220        };
1221        let bytes = encode_put_kv_blocks_batch_req(&req).expect("encode");
1222        let decoded = decode_put_kv_blocks_batch_req(&bytes).expect("decode");
1223        assert_eq!(decoded.payloads.len(), 4);
1224        for p in &decoded.payloads {
1225            assert_eq!(p.len(), big_block.len());
1226            assert_eq!(p[0], 0xCD);
1227        }
1228    }
1229}