1#![deny(unsafe_code)]
34
35pub mod arena;
36pub mod client;
37pub mod config;
38pub mod constants;
39pub mod envelope;
40pub mod http_transport;
41pub mod lifecycle;
42pub mod runtime_tpc;
43pub mod tcp_transport;
44pub use arena::{
45 arena_path, ArenaError, ArenaReader, ArenaWriter, ARENA_HEADER_BYTES, DEFAULT_ARENA_BYTES,
46};
47pub use client::{
48 ClientOptions, RemoteError, RemoteGetOutcome, RemoteHitTier, RemoteKvStoreClient,
49 DEFAULT_CALL_TIMEOUT,
50};
51pub use config::DaemonConfig;
52pub use lifecycle::{cleanup_prefix_segments, ClientHeartbeat, HeartbeatMonitor, ReopenReason};
53
54use std::time::Duration;
55
56use bytes::Bytes;
57use myelon::codec::{Codec, CodecError};
58use myelon::transport::AlignedFixedFrame;
59use myelon::typed_transport::{TypedConsumer, TypedProducer};
60use myelon::MyelonWaitStrategy;
61
62use rkyv::rancor::Error as RkyvError;
63use rkyv::util::AlignedVec;
64use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
65
66pub const FRAME_DATA_BYTES: usize = 4 * 1024 * 1024 - 16;
107pub const DEFAULT_RING_DEPTH: usize = 16;
110
111pub type ShmFrame = AlignedFixedFrame<FRAME_DATA_BYTES>;
113
114pub const REQ_CONSUMER_ID: &str = "dn";
118pub const RESP_CONSUMER_ID: &str = "cn";
120
121pub const ATTACH_TIMEOUT: Duration = Duration::from_secs(30);
127
128#[must_use]
133pub fn effective_attach_timeout() -> Duration {
134 std::env::var("WMBT_KV_DAEMON_SHM_ATTACH_TIMEOUT_SECS")
135 .ok()
136 .and_then(|s| s.parse::<u64>().ok())
137 .filter(|n| *n >= 1)
138 .map_or(ATTACH_TIMEOUT, Duration::from_secs)
139}
140
141pub mod op {
155 pub const PING: u8 = 1;
156 pub const PUT: u8 = 2;
157 pub const GET: u8 = 3;
158 pub const EXISTS: u8 = 4;
159 pub const STATS: u8 = 5;
160 pub const CLEAR: u8 = 6;
161 pub const RESTORE: u8 = 7;
162 pub const CLOSE: u8 = 8;
163 pub const GET_MANY: u8 = 9;
164 pub const LIST: u8 = 10;
165 pub const LOOKUP_BLOCK_PREFIX: u8 = 11;
168 pub const GET_KV_BLOCKS_BATCH: u8 = 12;
171 pub const PUT_KV_BLOCKS_BATCH: u8 = 13;
176
177 #[must_use]
178 pub fn name(code: u8) -> &'static str {
179 match code {
180 PING => "ping",
181 PUT => "put",
182 GET => "get",
183 EXISTS => "exists",
184 STATS => "stats",
185 CLEAR => "clear",
186 RESTORE => "restore",
187 CLOSE => "close",
188 GET_MANY => "get_many",
189 LIST => "list",
190 LOOKUP_BLOCK_PREFIX => "lookup_block_prefix",
191 GET_KV_BLOCKS_BATCH => "get_kv_blocks_batch",
192 PUT_KV_BLOCKS_BATCH => "put_kv_blocks_batch",
193 _ => "unknown",
194 }
195 }
196}
197
198pub mod status {
200 pub const OK: u8 = 0;
201 pub const MISS: u8 = 1;
202 pub const TOO_LARGE: u8 = 2;
203 pub const ERROR: u8 = 3;
204}
205
206const KEY_BATCH_MAGIC: &[u8] = b"WMBT_KV_KEYS_V1\0";
207const BYTES_BATCH_MAGIC: &[u8] = b"WMBT_KV_BYTES_V1\0";
208
209const LOOKUP_BLOCK_PREFIX_REQ_MAGIC: &[u8] = b"WMBT_LBP_REQ\0\0\0\0";
218const LOOKUP_BLOCK_PREFIX_RESP_MAGIC: &[u8] = b"WMBT_LBP_RES\0\0\0\0";
219const GET_KV_BLOCKS_BATCH_REQ_MAGIC: &[u8] = b"WMBT_GBB_REQ\0\0\0\0";
220const GET_KV_BLOCKS_BATCH_RESP_MAGIC: &[u8] = b"WMBT_GBB_RES\0\0\0\0";
221const PUT_KV_BLOCKS_BATCH_REQ_MAGIC: &[u8] = b"WMBT_PBB_REQ\0\0\0\0";
222const PUT_KV_BLOCKS_BATCH_RESP_MAGIC: &[u8] = b"WMBT_PBB_RES\0\0\0\0";
223
224#[must_use]
226pub fn encode_key_batch(keys: &[String]) -> Vec<u8> {
227 let total = KEY_BATCH_MAGIC.len() + 4 + keys.iter().map(|key| 4 + key.len()).sum::<usize>();
228 let mut out = Vec::with_capacity(total);
229 out.extend_from_slice(KEY_BATCH_MAGIC);
230 out.extend_from_slice(&(keys.len() as u32).to_le_bytes());
231 for key in keys {
232 let bytes = key.as_bytes();
233 out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
234 out.extend_from_slice(bytes);
235 }
236 out
237}
238
239#[derive(Debug, thiserror::Error)]
248pub enum WireCodecError {
249 #[error("{what} truncated: need {needed} bytes at offset {at}, payload is {got}")]
252 Truncated { what: &'static str, needed: usize, at: usize, got: usize },
253
254 #[error("{op}: bad magic; expected {expected:?} got first {got_len} bytes {got:?}")]
256 BadMagic { op: &'static str, expected: &'static [u8], got_len: usize, got: Vec<u8> },
257
258 #[error("{what}: length overflow")]
260 LengthOverflow { what: &'static str },
261
262 #[error("{what}: trailing bytes ({extra} unread)")]
264 TrailingBytes { what: &'static str, extra: usize },
265
266 #[error("{what}: body length mismatch (claimed={claimed}, actual={actual})")]
269 BodyLengthMismatch { what: &'static str, claimed: usize, actual: usize },
270
271 #[error("{what}: utf8: {source}")]
273 Utf8 {
274 what: &'static str,
275 #[source]
276 source: std::str::Utf8Error,
277 },
278
279 #[error("{op}: rkyv: {source}")]
281 Rkyv {
282 op: &'static str,
283 #[source]
284 source: rkyv::rancor::Error,
285 },
286
287 #[error("{0}")]
290 SegmentNameBudget(String),
291}
292
293pub fn decode_key_batch(payload: &[u8]) -> Result<Vec<String>, WireCodecError> {
295 if !payload.starts_with(KEY_BATCH_MAGIC) {
296 let got_len = KEY_BATCH_MAGIC.len().min(payload.len());
297 return Err(WireCodecError::BadMagic {
298 op: "key_batch",
299 expected: KEY_BATCH_MAGIC,
300 got_len,
301 got: payload[..got_len].to_vec(),
302 });
303 }
304 let mut cursor = KEY_BATCH_MAGIC.len();
305 let count = read_u32(payload, &mut cursor)? as usize;
306 let mut keys = Vec::with_capacity(count);
307 for _ in 0..count {
308 let len = read_u32(payload, &mut cursor)? as usize;
309 let end =
310 cursor.checked_add(len).ok_or(WireCodecError::LengthOverflow { what: "key_batch" })?;
311 if end > payload.len() {
312 return Err(WireCodecError::Truncated {
313 what: "key_batch",
314 needed: len,
315 at: cursor,
316 got: payload.len(),
317 });
318 }
319 let key = std::str::from_utf8(&payload[cursor..end])
320 .map_err(|err| WireCodecError::Utf8 { what: "key_batch", source: err })?
321 .to_string();
322 keys.push(key);
323 cursor = end;
324 }
325 if cursor != payload.len() {
326 return Err(WireCodecError::TrailingBytes {
327 what: "key_batch",
328 extra: payload.len() - cursor,
329 });
330 }
331 Ok(keys)
332}
333
334#[must_use]
336pub fn encode_bytes_batch(items: &[Bytes]) -> Vec<u8> {
337 let total = BYTES_BATCH_MAGIC.len()
338 + 4
339 + (items.len() * 4)
340 + items.iter().map(Bytes::len).sum::<usize>();
341 let mut out = Vec::with_capacity(total);
342 out.extend_from_slice(BYTES_BATCH_MAGIC);
343 out.extend_from_slice(&(items.len() as u32).to_le_bytes());
344 for item in items {
345 out.extend_from_slice(&(item.len() as u32).to_le_bytes());
346 }
347 for item in items {
348 out.extend_from_slice(item);
349 }
350 out
351}
352
353pub fn decode_bytes_batch(batch: Bytes) -> Result<Vec<Bytes>, WireCodecError> {
355 if !batch.starts_with(BYTES_BATCH_MAGIC) {
356 let got_len = BYTES_BATCH_MAGIC.len().min(batch.len());
357 return Err(WireCodecError::BadMagic {
358 op: "bytes_batch",
359 expected: BYTES_BATCH_MAGIC,
360 got_len,
361 got: batch[..got_len].to_vec(),
362 });
363 }
364 let data = batch.as_ref();
365 let mut cursor = BYTES_BATCH_MAGIC.len();
366 let count = read_u32(data, &mut cursor)? as usize;
367 let mut lengths = Vec::with_capacity(count);
368 for _ in 0..count {
369 lengths.push(read_u32(data, &mut cursor)? as usize);
370 }
371
372 let body_start = cursor;
373 let body_len = lengths
374 .iter()
375 .try_fold(0usize, |acc, len| acc.checked_add(*len))
376 .ok_or(WireCodecError::LengthOverflow { what: "bytes_batch" })?;
377 let body_end = body_start
378 .checked_add(body_len)
379 .ok_or(WireCodecError::LengthOverflow { what: "bytes_batch" })?;
380 if body_end != data.len() {
381 return Err(WireCodecError::BodyLengthMismatch {
382 what: "bytes_batch",
383 claimed: body_end,
384 actual: data.len(),
385 });
386 }
387
388 let mut offset = body_start;
389 let mut out = Vec::with_capacity(count);
390 for len in lengths {
391 let end = offset + len;
392 out.push(batch.slice(offset..end));
393 offset = end;
394 }
395 Ok(out)
396}
397
398fn read_u32(payload: &[u8], cursor: &mut usize) -> Result<u32, WireCodecError> {
399 let end = cursor.checked_add(4).ok_or(WireCodecError::LengthOverflow { what: "u32_cursor" })?;
400 if end > payload.len() {
401 return Err(WireCodecError::Truncated {
402 what: "u32",
403 needed: 4,
404 at: *cursor,
405 got: payload.len(),
406 });
407 }
408 let value =
409 u32::from_le_bytes(payload[*cursor..end].try_into().expect("slice length checked above"));
410 *cursor = end;
411 Ok(value)
412}
413
414#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone)]
416pub struct WireRequest {
417 pub id: u64,
418 pub op: u8,
419 pub namespace: String,
420 pub key: String,
421 pub payload: Vec<u8>,
422}
423
424#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone)]
426pub struct WireResponse {
427 pub id: u64,
428 pub status: u8,
429 pub op: u8,
430 pub payload: Vec<u8>,
431 pub message: String,
432}
433
434impl Codec for WireRequest {
435 type Encoded = AlignedVec;
436
437 fn encode(&self) -> Result<Self::Encoded, CodecError> {
438 rkyv::to_bytes::<RkyvError>(self).map_err(CodecError::encode)
439 }
440
441 fn decode(bytes: &[u8]) -> Result<Self, CodecError> {
442 let archived = rkyv::access::<<WireRequest as Archive>::Archived, RkyvError>(bytes)
443 .map_err(CodecError::decode)?;
444 rkyv::deserialize::<Self, RkyvError>(archived).map_err(CodecError::decode)
445 }
446}
447
448impl Codec for WireResponse {
449 type Encoded = AlignedVec;
450
451 fn encode(&self) -> Result<Self::Encoded, CodecError> {
452 rkyv::to_bytes::<RkyvError>(self).map_err(CodecError::encode)
453 }
454
455 fn decode(bytes: &[u8]) -> Result<Self, CodecError> {
456 let archived = rkyv::access::<<WireResponse as Archive>::Archived, RkyvError>(bytes)
457 .map_err(CodecError::decode)?;
458 rkyv::deserialize::<Self, RkyvError>(archived).map_err(CodecError::decode)
459 }
460}
461
462#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
489pub struct LookupBlockPrefixReq {
490 pub namespace: String,
491 pub block_hashes_hex: Vec<String>,
493}
494
495#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
504pub struct LookupBlockPrefixResp {
505 pub matched_count: u32,
506 pub error: Option<String>,
507}
508
509#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
516pub struct GetKvBlocksBatchReq {
517 pub namespace: String,
518 pub block_hashes_hex: Vec<String>,
519}
520
521#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
528pub struct GetKvBlocksBatchResp {
529 pub payloads: Option<Vec<Vec<u8>>>,
532 pub error: Option<String>,
533}
534
535#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
542pub struct PutKvBlocksBatchReq {
543 pub namespace: String,
544 pub block_hashes_hex: Vec<String>,
545 pub payloads: Vec<Vec<u8>>,
546}
547
548#[derive(Archive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq)]
555pub struct PutKvBlocksBatchResp {
556 pub total_bytes: u64,
557 pub error: Option<String>,
558}
559
560fn expect_magic<'a>(
570 payload: &'a [u8],
571 magic: &'static [u8],
572 op_name: &'static str,
573) -> Result<&'a [u8], WireCodecError> {
574 if !payload.starts_with(magic) {
575 let got_len = magic.len().min(payload.len());
576 return Err(WireCodecError::BadMagic {
577 op: op_name,
578 expected: magic,
579 got_len,
580 got: payload[..got_len].to_vec(),
581 });
582 }
583 Ok(&payload[magic.len()..])
584}
585
586fn rkyv_encode_with_magic<T>(
591 magic: &'static [u8],
592 op_name: &'static str,
593 value: &T,
594) -> Result<Vec<u8>, WireCodecError>
595where
596 T: for<'a> rkyv::Serialize<
597 rkyv::api::high::HighSerializer<
598 rkyv::util::AlignedVec,
599 rkyv::ser::allocator::ArenaHandle<'a>,
600 RkyvError,
601 >,
602 >,
603{
604 let body = rkyv::to_bytes::<RkyvError>(value)
605 .map_err(|e| WireCodecError::Rkyv { op: op_name, source: e })?;
606 let mut out = Vec::with_capacity(magic.len() + body.len());
607 out.extend_from_slice(magic);
608 out.extend_from_slice(body.as_slice());
609 Ok(out)
610}
611
612fn rkyv_decode_with_magic<T>(
619 payload: &[u8],
620 magic: &'static [u8],
621 op_name: &'static str,
622) -> Result<T, WireCodecError>
623where
624 T: rkyv::Archive,
625 T::Archived: for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, RkyvError>>
626 + rkyv::Deserialize<T, rkyv::api::high::HighDeserializer<RkyvError>>,
627{
628 let body = expect_magic(payload, magic, op_name)?;
629 let mut aligned: rkyv::util::AlignedVec<16> = rkyv::util::AlignedVec::with_capacity(body.len());
630 aligned.extend_from_slice(body);
631 rkyv::from_bytes::<T, RkyvError>(&aligned[..])
632 .map_err(|e| WireCodecError::Rkyv { op: op_name, source: e })
633}
634
635pub fn encode_lookup_block_prefix_req(
643 req: &LookupBlockPrefixReq,
644) -> Result<Vec<u8>, WireCodecError> {
645 rkyv_encode_with_magic(LOOKUP_BLOCK_PREFIX_REQ_MAGIC, "lookup_block_prefix_req", req)
646}
647
648pub fn decode_lookup_block_prefix_req(
649 bytes: &[u8],
650) -> Result<LookupBlockPrefixReq, WireCodecError> {
651 rkyv_decode_with_magic::<LookupBlockPrefixReq>(
652 bytes,
653 LOOKUP_BLOCK_PREFIX_REQ_MAGIC,
654 "lookup_block_prefix_req",
655 )
656}
657
658pub fn encode_lookup_block_prefix_resp(
660 resp: &LookupBlockPrefixResp,
661) -> Result<Vec<u8>, WireCodecError> {
662 rkyv_encode_with_magic(LOOKUP_BLOCK_PREFIX_RESP_MAGIC, "lookup_block_prefix_resp", resp)
663}
664
665pub fn decode_lookup_block_prefix_resp(
666 bytes: &[u8],
667) -> Result<LookupBlockPrefixResp, WireCodecError> {
668 rkyv_decode_with_magic::<LookupBlockPrefixResp>(
669 bytes,
670 LOOKUP_BLOCK_PREFIX_RESP_MAGIC,
671 "lookup_block_prefix_resp",
672 )
673}
674
675pub fn encode_get_kv_blocks_batch_req(
680 req: &GetKvBlocksBatchReq,
681) -> Result<Vec<u8>, WireCodecError> {
682 rkyv_encode_with_magic(GET_KV_BLOCKS_BATCH_REQ_MAGIC, "get_kv_blocks_batch_req", req)
683}
684
685pub fn decode_get_kv_blocks_batch_req(bytes: &[u8]) -> Result<GetKvBlocksBatchReq, WireCodecError> {
686 rkyv_decode_with_magic::<GetKvBlocksBatchReq>(
687 bytes,
688 GET_KV_BLOCKS_BATCH_REQ_MAGIC,
689 "get_kv_blocks_batch_req",
690 )
691}
692
693pub fn encode_get_kv_blocks_batch_resp(
694 resp: &GetKvBlocksBatchResp,
695) -> Result<Vec<u8>, WireCodecError> {
696 rkyv_encode_with_magic(GET_KV_BLOCKS_BATCH_RESP_MAGIC, "get_kv_blocks_batch_resp", resp)
697}
698
699pub fn decode_get_kv_blocks_batch_resp(
700 bytes: &[u8],
701) -> Result<GetKvBlocksBatchResp, WireCodecError> {
702 rkyv_decode_with_magic::<GetKvBlocksBatchResp>(
703 bytes,
704 GET_KV_BLOCKS_BATCH_RESP_MAGIC,
705 "get_kv_blocks_batch_resp",
706 )
707}
708
709pub fn encode_put_kv_blocks_batch_req(
714 req: &PutKvBlocksBatchReq,
715) -> Result<Vec<u8>, WireCodecError> {
716 if req.block_hashes_hex.len() != req.payloads.len() {
721 return Err(WireCodecError::BodyLengthMismatch {
722 what: "put_kv_blocks_batch_req",
723 claimed: req.block_hashes_hex.len(),
724 actual: req.payloads.len(),
725 });
726 }
727 rkyv_encode_with_magic(PUT_KV_BLOCKS_BATCH_REQ_MAGIC, "put_kv_blocks_batch_req", req)
728}
729
730pub fn decode_put_kv_blocks_batch_req(bytes: &[u8]) -> Result<PutKvBlocksBatchReq, WireCodecError> {
731 rkyv_decode_with_magic::<PutKvBlocksBatchReq>(
732 bytes,
733 PUT_KV_BLOCKS_BATCH_REQ_MAGIC,
734 "put_kv_blocks_batch_req",
735 )
736}
737
738pub fn encode_put_kv_blocks_batch_resp(
739 resp: &PutKvBlocksBatchResp,
740) -> Result<Vec<u8>, WireCodecError> {
741 rkyv_encode_with_magic(PUT_KV_BLOCKS_BATCH_RESP_MAGIC, "put_kv_blocks_batch_resp", resp)
742}
743
744pub fn decode_put_kv_blocks_batch_resp(
745 bytes: &[u8],
746) -> Result<PutKvBlocksBatchResp, WireCodecError> {
747 rkyv_decode_with_magic::<PutKvBlocksBatchResp>(
748 bytes,
749 PUT_KV_BLOCKS_BATCH_RESP_MAGIC,
750 "put_kv_blocks_batch_resp",
751 )
752}
753
754const SHM_PREFIX: &str = "wk";
766
767const ROLE_REQ: char = 'r';
770const ROLE_RESP: char = 's';
771
772const MAX_DISRUPTOR_INTERNAL_SUFFIX_LEN: usize = 13;
780
781pub fn segment_names(prefix: &str) -> (String, String) {
787 (format!("{SHM_PREFIX}{prefix}{ROLE_REQ}"), format!("{SHM_PREFIX}{prefix}{ROLE_RESP}"))
788}
789
790const SHM_SEGMENT_NAME_MAX_LEN_MACOS: usize = 30;
793
794pub fn validate_segment_name_budget(prefix: &str) -> Result<(), WireCodecError> {
814 let (req, resp) = segment_names(prefix);
815 for base in [&req, &resp] {
816 let derived_len = base.len() + MAX_DISRUPTOR_INTERNAL_SUFFIX_LEN;
820 if derived_len > SHM_SEGMENT_NAME_MAX_LEN_MACOS {
821 let fixed = SHM_PREFIX.len() + 1 + MAX_DISRUPTOR_INTERNAL_SUFFIX_LEN;
824 let max_prefix = SHM_SEGMENT_NAME_MAX_LEN_MACOS.saturating_sub(fixed);
825 let derived_name = format!("{base}_producer_seq");
826 return Err(WireCodecError::SegmentNameBudget(format!(
827 "wombatkv: SHM segment '{derived_name}' ({derived_len} chars) \
828 exceeds the macOS POSIX-SHM budget of {SHM_SEGMENT_NAME_MAX_LEN_MACOS} \
829 chars. Shorten the daemon prefix '{prefix}' ({} chars) to at \
830 most {max_prefix} chars. The internal disruptor-mp segments \
831 add up to {MAX_DISRUPTOR_INTERNAL_SUFFIX_LEN} chars of suffix \
832 ('_producer_seq' is the longest); the wombatkv wrapper adds \
833 '{SHM_PREFIX}' + 1-char role. For strict cross-platform \
834 portability (Linux, FreeBSD), see \
835 `myelon::portable_shm_segment_name`, recommends total \
836 name ≤ {} chars.",
837 prefix.len(),
838 myelon::PORTABLE_SHM_SEGMENT_NAME_MAX_LEN,
839 )));
840 }
841 }
842 Ok(())
843}
844
845pub fn open_daemon(
851 req_seg: &str,
852 resp_seg: &str,
853 depth: usize,
854) -> Result<(TypedConsumer<ShmFrame>, TypedProducer<ShmFrame>), Box<dyn std::error::Error>> {
855 let req_consumer = wait_for_consumer(req_seg, depth, REQ_CONSUMER_ID)?;
856 let resp_producer = TypedProducer::<ShmFrame>::create_with_consumers(resp_seg, depth, 1)?;
857 Ok((req_consumer, resp_producer))
858}
859
860pub fn open_client(
862 req_seg: &str,
863 resp_seg: &str,
864 depth: usize,
865) -> Result<(TypedProducer<ShmFrame>, TypedConsumer<ShmFrame>), Box<dyn std::error::Error>> {
866 let req_producer = TypedProducer::<ShmFrame>::create_with_consumers(req_seg, depth, 1)?;
867 let resp_consumer = wait_for_consumer(resp_seg, depth, RESP_CONSUMER_ID)?;
868 Ok((req_producer, resp_consumer))
869}
870
871fn wait_strategy_from_env() -> MyelonWaitStrategy {
878 match std::env::var("WMBT_KV_DAEMON_SHM_WAIT_STRATEGY").ok().as_deref().map(str::trim) {
879 Some("busyspin" | "BusySpin" | "spin") => MyelonWaitStrategy::BusySpin,
880 Some("block" | "Block" | "park") => MyelonWaitStrategy::Block,
881 Some(other) if !other.is_empty() => {
882 eprintln!(
883 "WombatKV: unknown WMBT_KV_DAEMON_SHM_WAIT_STRATEGY={other:?}, \
884 falling back to default 'block'"
885 );
886 MyelonWaitStrategy::Block
887 }
888 _ => MyelonWaitStrategy::Block,
889 }
890}
891
892fn wait_for_consumer(
893 segment: &str,
894 depth: usize,
895 consumer_id: &str,
896) -> Result<TypedConsumer<ShmFrame>, Box<dyn std::error::Error>> {
897 use std::time::Instant;
898 let deadline = Instant::now() + effective_attach_timeout();
899 let mut tries: u64 = 0;
900 let wait_strategy = wait_strategy_from_env();
901 loop {
902 tries += 1;
903 match TypedConsumer::<ShmFrame>::attach_with_consumer_id(
904 segment,
905 depth,
906 consumer_id,
907 wait_strategy,
908 ) {
909 Ok(consumer) => return Ok(consumer),
910 Err(error) => {
911 if Instant::now() < deadline {
912 myelon::perform_default_discovery_poll_wait();
916 continue;
917 }
918 return Err(format!("attach {segment}: {error} (after {tries} retries)").into());
919 }
920 }
921 }
922}
923
924#[must_use]
927pub fn fits_one_frame(len: usize) -> bool {
928 len.saturating_add(256) <= FRAME_DATA_BYTES
929}
930
931#[cfg(test)]
932mod tests {
933 use super::*;
934
935 #[test]
936 fn ping_request_round_trips() {
937 let req = WireRequest {
938 id: 42,
939 op: op::PING,
940 namespace: String::new(),
941 key: String::new(),
942 payload: Vec::new(),
943 };
944 let encoded = req.encode().expect("encode");
945 let decoded = WireRequest::decode(encoded.as_ref()).expect("decode");
946 assert_eq!(decoded.id, 42);
947 assert_eq!(decoded.op, op::PING);
948 }
949
950 #[test]
951 fn put_response_round_trips_with_payload() {
952 let resp = WireResponse {
953 id: 7,
954 status: status::OK,
955 op: op::PUT,
956 payload: vec![0xAB; 4096],
957 message: String::new(),
958 };
959 let encoded = resp.encode().expect("encode");
960 let decoded = WireResponse::decode(encoded.as_ref()).expect("decode");
961 assert_eq!(decoded.id, 7);
962 assert_eq!(decoded.status, status::OK);
963 assert_eq!(decoded.payload.len(), 4096);
964 assert_eq!(decoded.payload[0], 0xAB);
965 }
966
967 #[test]
968 fn frame_capacity_check() {
969 assert!(fits_one_frame(0));
970 assert!(fits_one_frame(FRAME_DATA_BYTES - 256));
971 assert!(!fits_one_frame(FRAME_DATA_BYTES));
972 assert!(!fits_one_frame(FRAME_DATA_BYTES * 2));
973 }
974
975 #[test]
976 fn key_batch_round_trips() {
977 let keys = vec!["a".to_string(), "nested/key".to_string()];
978 let encoded = encode_key_batch(&keys);
979 assert_eq!(decode_key_batch(&encoded).expect("decode keys"), keys);
980 }
981
982 #[test]
983 fn bytes_batch_round_trips_without_copying_items() {
984 let items = vec![Bytes::from_static(b"alpha"), Bytes::new(), Bytes::from_static(b"gamma")];
985 let encoded = Bytes::from(encode_bytes_batch(&items));
986 let decoded = decode_bytes_batch(encoded).expect("decode bytes");
987 assert_eq!(decoded, items);
988 }
989
990 #[test]
991 fn segment_names_are_unique_per_prefix() {
992 let (req, resp) = segment_names("smoke");
993 assert_eq!(req, "wksmoker");
998 assert_eq!(resp, "wksmokes");
999 assert_ne!(req, resp);
1000 }
1001
1002 #[test]
1003 fn validate_segment_name_budget_accepts_short_prefix() {
1004 validate_segment_name_budget("drts999").expect("dst-sweep prefix");
1010 validate_segment_name_budget("drts42").expect("dst-sweep prefix");
1011 validate_segment_name_budget("dmcoreds").expect("typical");
1012 }
1013
1014 #[test]
1015 fn validate_segment_name_budget_rejects_too_long_prefix() {
1016 let too_long = "a".repeat(15);
1019 let err = validate_segment_name_budget(&too_long)
1020 .expect_err("15-char prefix must exceed macOS budget");
1021 assert!(err.to_string().contains("exceeds the macOS"));
1022 validate_segment_name_budget(&"a".repeat(14)).expect("14 chars at the edge");
1024 }
1025
1026 #[test]
1027 fn block_opcodes_are_distinct_and_contiguous() {
1028 assert_eq!(op::LOOKUP_BLOCK_PREFIX, 11);
1033 assert_eq!(op::GET_KV_BLOCKS_BATCH, 12);
1034 assert_eq!(op::PUT_KV_BLOCKS_BATCH, 13);
1035 assert_eq!(op::name(op::LOOKUP_BLOCK_PREFIX), "lookup_block_prefix");
1036 assert_eq!(op::name(op::GET_KV_BLOCKS_BATCH), "get_kv_blocks_batch");
1037 assert_eq!(op::name(op::PUT_KV_BLOCKS_BATCH), "put_kv_blocks_batch");
1038 }
1039
1040 #[test]
1041 fn lookup_block_prefix_req_resp_round_trip() {
1042 let req = LookupBlockPrefixReq {
1043 namespace: "ns/alpha".to_string(),
1044 block_hashes_hex: vec!["aa".repeat(32), "bb".repeat(32)],
1045 };
1046 let bytes = encode_lookup_block_prefix_req(&req).expect("encode req");
1047 let decoded = decode_lookup_block_prefix_req(&bytes).expect("decode req");
1048 assert_eq!(decoded.namespace, req.namespace);
1049 assert_eq!(decoded.block_hashes_hex, req.block_hashes_hex);
1050
1051 let resp_ok = LookupBlockPrefixResp { matched_count: 7, error: None };
1052 let bytes_ok = encode_lookup_block_prefix_resp(&resp_ok).expect("encode resp ok");
1053 let decoded_ok = decode_lookup_block_prefix_resp(&bytes_ok).expect("decode resp ok");
1054 assert_eq!(decoded_ok.matched_count, 7);
1055 assert!(decoded_ok.error.is_none());
1056
1057 let resp_err =
1058 LookupBlockPrefixResp { matched_count: 0, error: Some("bad hex at pos 3".to_string()) };
1059 let bytes_err = encode_lookup_block_prefix_resp(&resp_err).expect("encode resp err");
1060 let decoded_err = decode_lookup_block_prefix_resp(&bytes_err).expect("decode resp err");
1061 assert_eq!(decoded_err.matched_count, 0);
1062 assert_eq!(decoded_err.error.as_deref(), Some("bad hex at pos 3"));
1063 }
1064
1065 #[test]
1066 fn get_kv_blocks_batch_req_resp_round_trip() {
1067 let req = GetKvBlocksBatchReq {
1068 namespace: "ns/beta".to_string(),
1069 block_hashes_hex: vec!["11".repeat(32), "22".repeat(32), "33".repeat(32)],
1070 };
1071 let bytes = encode_get_kv_blocks_batch_req(&req).expect("encode req");
1072 let decoded = decode_get_kv_blocks_batch_req(&bytes).expect("decode req");
1073 assert_eq!(decoded.namespace, req.namespace);
1074 assert_eq!(decoded.block_hashes_hex, req.block_hashes_hex);
1075
1076 let payloads = vec![vec![0xAAu8; 4096], vec![0xBBu8; 1024], vec![0xCCu8; 2048]];
1077 let resp_hit = GetKvBlocksBatchResp { payloads: Some(payloads.clone()), error: None };
1078 let bytes_hit = encode_get_kv_blocks_batch_resp(&resp_hit).expect("encode resp hit");
1079 let decoded_hit = decode_get_kv_blocks_batch_resp(&bytes_hit).expect("decode resp hit");
1080 assert!(decoded_hit.error.is_none());
1081 let got = decoded_hit.payloads.expect("payloads present");
1082 assert_eq!(got.len(), 3);
1083 assert_eq!(got[0].len(), 4096);
1084 assert_eq!(got[1].len(), 1024);
1085 assert_eq!(got[2].len(), 2048);
1086 assert_eq!(got[0][0], 0xAA);
1087
1088 let resp_miss = GetKvBlocksBatchResp { payloads: None, error: None };
1089 let bytes_miss = encode_get_kv_blocks_batch_resp(&resp_miss).expect("encode resp miss");
1090 let decoded_miss = decode_get_kv_blocks_batch_resp(&bytes_miss).expect("decode resp miss");
1091 assert!(decoded_miss.payloads.is_none());
1092 }
1093
1094 #[test]
1095 fn put_kv_blocks_batch_req_resp_round_trip() {
1096 let req = PutKvBlocksBatchReq {
1097 namespace: "ns/gamma".to_string(),
1098 block_hashes_hex: vec!["77".repeat(32), "88".repeat(32)],
1099 payloads: vec![vec![0x77u8; 512], vec![0x88u8; 1024]],
1100 };
1101 let bytes = encode_put_kv_blocks_batch_req(&req).expect("encode req");
1102 let decoded = decode_put_kv_blocks_batch_req(&bytes).expect("decode req");
1103 assert_eq!(decoded.namespace, req.namespace);
1104 assert_eq!(decoded.block_hashes_hex, req.block_hashes_hex);
1105 assert_eq!(decoded.payloads.len(), 2);
1106 assert_eq!(decoded.payloads[0], req.payloads[0]);
1107 assert_eq!(decoded.payloads[1].len(), 1024);
1108
1109 let resp_ok = PutKvBlocksBatchResp { total_bytes: 1536, error: None };
1110 let bytes_ok = encode_put_kv_blocks_batch_resp(&resp_ok).expect("encode resp ok");
1111 let decoded_ok = decode_put_kv_blocks_batch_resp(&bytes_ok).expect("decode resp ok");
1112 assert_eq!(decoded_ok.total_bytes, 1536);
1113 assert!(decoded_ok.error.is_none());
1114
1115 let resp_err =
1116 PutKvBlocksBatchResp { total_bytes: 0, error: Some("backend put failure".to_string()) };
1117 let bytes_err = encode_put_kv_blocks_batch_resp(&resp_err).expect("encode resp err");
1118 let decoded_err = decode_put_kv_blocks_batch_resp(&bytes_err).expect("decode resp err");
1119 assert_eq!(decoded_err.total_bytes, 0);
1120 assert_eq!(decoded_err.error.as_deref(), Some("backend put failure"));
1121 }
1122
1123 #[test]
1124 fn block_payload_magic_headers_are_stable() {
1125 assert_eq!(LOOKUP_BLOCK_PREFIX_REQ_MAGIC, b"WMBT_LBP_REQ\0\0\0\0");
1129 assert_eq!(LOOKUP_BLOCK_PREFIX_RESP_MAGIC, b"WMBT_LBP_RES\0\0\0\0");
1130 assert_eq!(GET_KV_BLOCKS_BATCH_REQ_MAGIC, b"WMBT_GBB_REQ\0\0\0\0");
1131 assert_eq!(GET_KV_BLOCKS_BATCH_RESP_MAGIC, b"WMBT_GBB_RES\0\0\0\0");
1132 assert_eq!(PUT_KV_BLOCKS_BATCH_REQ_MAGIC, b"WMBT_PBB_REQ\0\0\0\0");
1133 assert_eq!(PUT_KV_BLOCKS_BATCH_RESP_MAGIC, b"WMBT_PBB_RES\0\0\0\0");
1134
1135 let lbp_req = encode_lookup_block_prefix_req(&LookupBlockPrefixReq {
1137 namespace: "ns".to_string(),
1138 block_hashes_hex: vec!["aa".repeat(32)],
1139 })
1140 .expect("encode");
1141 assert!(lbp_req.starts_with(LOOKUP_BLOCK_PREFIX_REQ_MAGIC));
1142
1143 let gbb_resp = encode_get_kv_blocks_batch_resp(&GetKvBlocksBatchResp {
1144 payloads: Some(vec![vec![0u8; 4]]),
1145 error: None,
1146 })
1147 .expect("encode");
1148 assert!(gbb_resp.starts_with(GET_KV_BLOCKS_BATCH_RESP_MAGIC));
1149
1150 let pbb_resp =
1151 encode_put_kv_blocks_batch_resp(&PutKvBlocksBatchResp { total_bytes: 42, error: None })
1152 .expect("encode");
1153 assert!(pbb_resp.starts_with(PUT_KV_BLOCKS_BATCH_RESP_MAGIC));
1154 }
1155
1156 #[test]
1157 fn block_payload_codec_rejects_corruption() {
1158 let mut bad_magic = encode_lookup_block_prefix_req(&LookupBlockPrefixReq {
1161 namespace: "ns".to_string(),
1162 block_hashes_hex: vec!["aa".repeat(32)],
1163 })
1164 .expect("encode");
1165 bad_magic[0] = b'X';
1166 let err = decode_lookup_block_prefix_req(&bad_magic).expect_err("must fail");
1167 assert!(err.to_string().contains("lookup_block_prefix_req"));
1168 assert!(err.to_string().contains("bad magic"));
1169
1170 let full = encode_put_kv_blocks_batch_req(&PutKvBlocksBatchReq {
1173 namespace: "ns".to_string(),
1174 block_hashes_hex: vec!["aa".repeat(32), "bb".repeat(32)],
1175 payloads: vec![vec![1u8; 8], vec![2u8; 8]],
1176 })
1177 .expect("encode");
1178 let truncated = &full[..full.len() - 8];
1179 let err = decode_put_kv_blocks_batch_req(truncated).expect_err("must fail");
1180 assert!(
1181 err.to_string().contains("put_kv_blocks_batch_req"),
1182 "unexpected error message: {err}"
1183 );
1184
1185 let mut tampered = encode_get_kv_blocks_batch_resp(&GetKvBlocksBatchResp {
1190 payloads: Some(vec![vec![0xAAu8; 8]]),
1191 error: None,
1192 })
1193 .expect("encode");
1194 let mid = usize::midpoint(GET_KV_BLOCKS_BATCH_RESP_MAGIC.len(), tampered.len());
1197 tampered[mid] ^= 0xFF;
1198 let result = decode_get_kv_blocks_batch_resp(&tampered);
1199 if let Ok(decoded) = result {
1203 let _ = decoded;
1208 }
1209 }
1210
1211 #[test]
1212 fn block_payload_encodes_large_blocks_without_overflow() {
1213 let big_block = vec![0xCDu8; 1024 * 1024];
1216 let req = PutKvBlocksBatchReq {
1217 namespace: "ns".to_string(),
1218 block_hashes_hex: (0..4).map(|i| format!("{i:02x}").repeat(32)).collect(),
1219 payloads: vec![big_block.clone(); 4],
1220 };
1221 let bytes = encode_put_kv_blocks_batch_req(&req).expect("encode");
1222 let decoded = decode_put_kv_blocks_batch_req(&bytes).expect("decode");
1223 assert_eq!(decoded.payloads.len(), 4);
1224 for p in &decoded.payloads {
1225 assert_eq!(p.len(), big_block.len());
1226 assert_eq!(p[0], 0xCD);
1227 }
1228 }
1229}