Skip to main content

pond/
rowmap.rs

1//! Memory-mapped, process-shareable per-message meta map keyed by stable
2//! `row_id`. Resolves FTS/vector `_rowid`s to `(session_id, message_id)` and
3//! hydrates hit meta (`role`, `project`, `source_agent`, `timestamp`,
4//! `search_text`) in memory, with a `take_rows` miss-fallback for rows appended
5//! since the build. Also carries a `session_id -> message count` aggregate.
6//!
7//! `session_id`/`project`/`source_agent`/`role` are dictionary-encoded (each
8//! distinct value stored once, referenced by `u32` index); `search_text` is
9//! block-compressed ([`BLOCK_ROWS`] rows per zstd block). On the real 2M-message
10//! corpus that takes the map from ~655 MB (flat) to ~270 MB.
11//!
12//! Built via temp + atomic rename and `mmap`'d read-only, so N pond processes on
13//! the box share one physical copy in the OS page cache and a restart re-`open`s
14//! instantly. Stable row ids (`enable_stable_row_ids`) keep a built map valid
15//! across compaction; it only rebuilds when the dataset version advances.
16//!
17//! Layout: `Header | [Record; count] | [SessionEntry] | [DictEntry; project] |
18//! [DictEntry; agent] | [DictEntry; role] | [BlockEntry] | blob`. Records are
19//! sorted by `row_id` (binary search); a row's block is `record_index /
20//! BLOCK_ROWS`. The blob holds the compressed `search_text` blocks, then per-row
21//! `{32-byte header + message_id}`, then per-session `session_id` bytes, then the
22//! dict value bytes. Each session entry also carries its max message timestamp,
23//! the watermark the `pond sync` skip oracle compares against the source.
24
25use std::collections::HashMap;
26use std::fs::File;
27use std::io::Write;
28use std::mem::size_of;
29use std::path::{Path, PathBuf};
30
31use anyhow::{Context, Result, ensure};
32use bytemuck::{Pod, Zeroable};
33use memmap2::Mmap;
34
35const MAGIC: [u8; 8] = *b"PONDRMM5";
36const BLOCK_ROWS: usize = 256;
37const ZSTD_LEVEL: i32 = 3;
38
39/// Per-row blob header: `timestamp_micros` (i64 LE) then seven `u32` LE fields -
40/// the four dictionary indices (`session`, `project`, `source_agent`, `role`),
41/// the `message_id` length, and the `search_text` offset+length within its
42/// decompressed block.
43const ROW_HEADER_LEN: usize = 8 + 7 * 4;
44
45/// One-slot decompressed-block cache `(block_index, plaintext)`, threaded
46/// through a batch of `lookup_meta` calls so rows sharing a block (a session's
47/// hits are row-id-adjacent) decompress it once, not once per row.
48type BlockCache = Option<(usize, Vec<u8>)>;
49
50#[repr(C)]
51#[derive(Clone, Copy, Pod, Zeroable)]
52struct Header {
53    magic: [u8; 8],
54    version: u64,
55    count: u64,
56    session_count: u64,
57    project_count: u64,
58    agent_count: u64,
59    role_count: u64,
60    block_count: u64,
61    blob_offset: u64,
62}
63
64#[repr(C)]
65#[derive(Clone, Copy, Pod, Zeroable)]
66struct Record {
67    row_id: u64,
68    blob_off: u64,
69}
70
71#[repr(C)]
72#[derive(Clone, Copy, Pod, Zeroable)]
73struct SessionEntry {
74    sid_off: u64,
75    max_ts_micros: i64,
76    sid_len: u32,
77    count: u32,
78}
79
80#[repr(C)]
81#[derive(Clone, Copy, Pod, Zeroable)]
82struct DictEntry {
83    off: u64,
84    len: u32,
85    _pad: u32,
86}
87
88#[repr(C)]
89#[derive(Clone, Copy, Pod, Zeroable)]
90struct BlockEntry {
91    comp_off: u64,
92    comp_len: u32,
93    decomp_len: u32,
94}
95
96/// Owned input row for [`RowMetaMap::build`].
97#[derive(Clone)]
98pub struct RowMetaEntry {
99    pub row_id: u64,
100    pub session_id: String,
101    pub message_id: String,
102    pub role: String,
103    pub project: String,
104    pub source_agent: String,
105    pub timestamp_micros: i64,
106    pub search_text: String,
107}
108
109/// Borrowed view of one row's meta. The dictionary-encoded fields borrow the
110/// mmap; `search_text` is owned (decompressed from its block).
111pub struct RowMeta<'a> {
112    pub session_id: &'a str,
113    pub message_id: &'a str,
114    pub role: &'a str,
115    pub project: &'a str,
116    pub source_agent: &'a str,
117    pub timestamp_micros: i64,
118    pub search_text: String,
119}
120
121/// An open, memory-mapped row meta map. `lookup`, `lookup_meta`, and
122/// `lookup_count` are lock-free and reentrant.
123pub struct RowMetaMap {
124    mmap: Mmap,
125    version: u64,
126    count: usize,
127    session_count: usize,
128    project_count: usize,
129    agent_count: usize,
130    role_count: usize,
131    block_count: usize,
132    sessions_off: usize,
133    projects_off: usize,
134    agents_off: usize,
135    roles_off: usize,
136    blocks_off: usize,
137    blob_offset: usize,
138}
139
140impl std::fmt::Debug for RowMetaMap {
141    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        formatter
143            .debug_struct("RowMetaMap")
144            .field("version", &self.version)
145            .field("count", &self.count)
146            .field("session_count", &self.session_count)
147            .finish_non_exhaustive()
148    }
149}
150
151impl RowMetaMap {
152    /// Base segment path (`-v{version}`): the foot of the LSM chain.
153    pub fn path_for(cache_dir: &Path, store_key: &str, version: u64) -> PathBuf {
154        cache_dir.join(format!("rowmetamap-{store_key}-v{version}.rmm"))
155    }
156
157    /// Delta segment path (`-d{version}`): rows appended since the previous
158    /// segment, layered over the base.
159    pub fn delta_path(cache_dir: &Path, store_key: &str, version: u64) -> PathBuf {
160        cache_dir.join(format!("rowmetamap-{store_key}-d{version}.rmm"))
161    }
162
163    pub fn build(path: &Path, version: u64, mut entries: Vec<RowMetaEntry>) -> Result<()> {
164        entries.sort_unstable_by_key(|entry| entry.row_id);
165
166        // Per session: message count plus the max message timestamp - the
167        // watermark the sync skip oracle compares against the source's latest
168        // message timestamp (spec.md#adapters; deterministic, rebuilt from the
169        // store, no local cursor).
170        let mut session_agg: HashMap<&str, (u32, i64)> = HashMap::new();
171        for entry in &entries {
172            let agg = session_agg
173                .entry(entry.session_id.as_str())
174                .or_insert((0, i64::MIN));
175            agg.0 += 1;
176            agg.1 = agg.1.max(entry.timestamp_micros);
177        }
178        let mut sessions: Vec<(&str, u32, i64)> = session_agg
179            .into_iter()
180            .map(|(sid, (count, max_ts))| (sid, count, max_ts))
181            .collect();
182        sessions.sort_unstable_by(|left, right| left.0.cmp(right.0));
183        let session_index = index_of(sessions.iter().map(|(value, _, _)| *value));
184
185        let projects = distinct_sorted(entries.iter().map(|entry| entry.project.as_str()));
186        let project_index = index_of(projects.iter().copied());
187        let agents = distinct_sorted(entries.iter().map(|entry| entry.source_agent.as_str()));
188        let agent_index = index_of(agents.iter().copied());
189        let roles = distinct_sorted(entries.iter().map(|entry| entry.role.as_str()));
190        let role_index = index_of(roles.iter().copied());
191
192        let mut blob: Vec<u8> = Vec::new();
193
194        // Compressed search_text blocks first; spans record each row's
195        // offset+length within its decompressed block.
196        let mut block_entries = Vec::with_capacity(entries.len().div_ceil(BLOCK_ROWS));
197        let mut spans: Vec<(u32, u32)> = Vec::with_capacity(entries.len());
198        for chunk in entries.chunks(BLOCK_ROWS) {
199            let mut plain = Vec::new();
200            for entry in chunk {
201                let off = u32::try_from(plain.len()).context("block too large")?;
202                let len = u32::try_from(entry.search_text.len()).context("search_text too long")?;
203                plain.extend_from_slice(entry.search_text.as_bytes());
204                spans.push((off, len));
205            }
206            let compressed = zstd::bulk::compress(&plain, ZSTD_LEVEL).context("zstd compress")?;
207            block_entries.push(BlockEntry {
208                comp_off: blob.len() as u64,
209                comp_len: u32::try_from(compressed.len()).context("compressed block too large")?,
210                decomp_len: u32::try_from(plain.len()).context("block too large")?,
211            });
212            blob.extend_from_slice(&compressed);
213        }
214
215        let mut records = Vec::with_capacity(entries.len());
216        for (entry, (text_off, text_len)) in entries.iter().zip(&spans) {
217            let blob_off = blob.len() as u64;
218            blob.extend_from_slice(&entry.timestamp_micros.to_le_bytes());
219            blob.extend_from_slice(&session_index[entry.session_id.as_str()].to_le_bytes());
220            blob.extend_from_slice(&project_index[entry.project.as_str()].to_le_bytes());
221            blob.extend_from_slice(&agent_index[entry.source_agent.as_str()].to_le_bytes());
222            blob.extend_from_slice(&role_index[entry.role.as_str()].to_le_bytes());
223            let mid_len = u32::try_from(entry.message_id.len()).context("message_id too long")?;
224            blob.extend_from_slice(&mid_len.to_le_bytes());
225            blob.extend_from_slice(&text_off.to_le_bytes());
226            blob.extend_from_slice(&text_len.to_le_bytes());
227            blob.extend_from_slice(entry.message_id.as_bytes());
228            records.push(Record {
229                row_id: entry.row_id,
230                blob_off,
231            });
232        }
233
234        let session_entries = sessions
235            .iter()
236            .map(|(sid, count, max_ts_micros)| {
237                let off = blob.len() as u64;
238                blob.extend_from_slice(sid.as_bytes());
239                Ok(SessionEntry {
240                    sid_off: off,
241                    max_ts_micros: *max_ts_micros,
242                    sid_len: u32::try_from(sid.len()).context("session_id too long")?,
243                    count: *count,
244                })
245            })
246            .collect::<Result<Vec<_>>>()?;
247        let project_entries = dict_entries(&mut blob, &projects)?;
248        let agent_entries = dict_entries(&mut blob, &agents)?;
249        let role_entries = dict_entries(&mut blob, &roles)?;
250
251        let blob_offset = (size_of::<Header>()
252            + records.len() * size_of::<Record>()
253            + session_entries.len() * size_of::<SessionEntry>()
254            + (project_entries.len() + agent_entries.len() + role_entries.len())
255                * size_of::<DictEntry>()
256            + block_entries.len() * size_of::<BlockEntry>()) as u64;
257        let header = Header {
258            magic: MAGIC,
259            version,
260            count: records.len() as u64,
261            session_count: session_entries.len() as u64,
262            project_count: project_entries.len() as u64,
263            agent_count: agent_entries.len() as u64,
264            role_count: role_entries.len() as u64,
265            block_count: block_entries.len() as u64,
266            blob_offset,
267        };
268
269        // Unique temp name per builder (pid + nonce): two processes prewarming
270        // the same store+version must not share one temp inode, or the second's
271        // create would mutate the file the first is mapping.
272        let tmp = path.with_extension(format!(
273            "tmp-{}-{:016x}",
274            std::process::id(),
275            fastrand::u64(..)
276        ));
277        {
278            let mut file = File::create(&tmp)
279                .with_context(|| format!("create row meta map temp {}", tmp.display()))?;
280            file.write_all(bytemuck::bytes_of(&header))?;
281            file.write_all(bytemuck::cast_slice(&records))?;
282            file.write_all(bytemuck::cast_slice(&session_entries))?;
283            file.write_all(bytemuck::cast_slice(&project_entries))?;
284            file.write_all(bytemuck::cast_slice(&agent_entries))?;
285            file.write_all(bytemuck::cast_slice(&role_entries))?;
286            file.write_all(bytemuck::cast_slice(&block_entries))?;
287            file.write_all(&blob)?;
288            file.sync_all()?;
289        }
290        std::fs::rename(&tmp, path)
291            .with_context(|| format!("rename row meta map into place {}", path.display()))?;
292        Ok(())
293    }
294
295    pub fn open(path: &Path) -> Result<Self> {
296        let file =
297            File::open(path).with_context(|| format!("open row meta map {}", path.display()))?;
298        // SAFETY: the file is immutable once renamed into place, so the mapping
299        // never sees concurrent truncation/mutation.
300        #[allow(unsafe_code)]
301        let mmap = unsafe { Mmap::map(&file)? };
302        ensure!(
303            mmap.len() >= size_of::<Header>(),
304            "row meta map {} too small for header",
305            path.display()
306        );
307        let header: Header = *bytemuck::from_bytes(&mmap[..size_of::<Header>()]);
308        ensure!(
309            header.magic == MAGIC,
310            "row meta map {} bad magic",
311            path.display()
312        );
313        let count = usize::try_from(header.count).context("count overflow")?;
314        let session_count = usize::try_from(header.session_count).context("session_count")?;
315        let project_count = usize::try_from(header.project_count).context("project_count")?;
316        let agent_count = usize::try_from(header.agent_count).context("agent_count")?;
317        let role_count = usize::try_from(header.role_count).context("role_count")?;
318        let block_count = usize::try_from(header.block_count).context("block_count")?;
319        let blob_offset = usize::try_from(header.blob_offset).context("blob_offset overflow")?;
320
321        let sessions_off = size_of::<Header>() + count * size_of::<Record>();
322        let projects_off = sessions_off + session_count * size_of::<SessionEntry>();
323        let agents_off = projects_off + project_count * size_of::<DictEntry>();
324        let roles_off = agents_off + agent_count * size_of::<DictEntry>();
325        let blocks_off = roles_off + role_count * size_of::<DictEntry>();
326        let blob_offset_expected = blocks_off + block_count * size_of::<BlockEntry>();
327        ensure!(
328            blob_offset == blob_offset_expected && mmap.len() >= blob_offset,
329            "row meta map {} layout mismatch",
330            path.display()
331        );
332        Ok(Self {
333            mmap,
334            version: header.version,
335            count,
336            session_count,
337            project_count,
338            agent_count,
339            role_count,
340            block_count,
341            sessions_off,
342            projects_off,
343            agents_off,
344            roles_off,
345            blocks_off,
346            blob_offset,
347        })
348    }
349
350    pub fn version(&self) -> u64 {
351        self.version
352    }
353
354    pub fn len(&self) -> usize {
355        self.count
356    }
357
358    pub fn is_empty(&self) -> bool {
359        self.count == 0
360    }
361
362    fn records(&self) -> &[Record] {
363        let start = size_of::<Header>();
364        let end = start + self.count * size_of::<Record>();
365        bytemuck::cast_slice(&self.mmap[start..end])
366    }
367
368    fn session_entries(&self) -> &[SessionEntry] {
369        let end = self.sessions_off + self.session_count * size_of::<SessionEntry>();
370        bytemuck::cast_slice(&self.mmap[self.sessions_off..end])
371    }
372
373    fn dict_entries(&self, start: usize, count: usize) -> &[DictEntry] {
374        let end = start + count * size_of::<DictEntry>();
375        bytemuck::cast_slice(&self.mmap[start..end])
376    }
377
378    fn block_entries(&self) -> &[BlockEntry] {
379        let end = self.blocks_off + self.block_count * size_of::<BlockEntry>();
380        bytemuck::cast_slice(&self.mmap[self.blocks_off..end])
381    }
382
383    /// `""` on a corrupt/truncated extent - treated as a miss, never a panic.
384    fn blob_str(&self, off: u64, len: u32) -> &str {
385        let base = self.blob_offset.saturating_add(off as usize);
386        let end = base.saturating_add(len as usize);
387        self.mmap
388            .get(base..end)
389            .and_then(|bytes| std::str::from_utf8(bytes).ok())
390            .unwrap_or_default()
391    }
392
393    fn session_str(&self, index: usize) -> &str {
394        match self.session_entries().get(index) {
395            Some(entry) => self.blob_str(entry.sid_off, entry.sid_len),
396            None => "",
397        }
398    }
399
400    fn dict_str(&self, start: usize, count: usize, index: usize) -> &str {
401        match self.dict_entries(start, count).get(index) {
402            Some(entry) => self.blob_str(entry.off, entry.len),
403            None => "",
404        }
405    }
406
407    /// `(record index, blob header offset)` for `row_id`, `None` if unmapped.
408    fn locate(&self, row_id: u64) -> Option<(usize, usize)> {
409        let records = self.records();
410        let idx = records
411            .binary_search_by_key(&row_id, |record| record.row_id)
412            .ok()?;
413        let base = self
414            .blob_offset
415            .checked_add(usize::try_from(records[idx].blob_off).ok()?)?;
416        Some((idx, base))
417    }
418
419    /// Resolve a `row_id` to `(session_id, message_id)` - the cheap arm-
420    /// resolution path. Never decompresses a block. `None` for rows appended
421    /// after the build (caller falls back to a data take).
422    pub fn lookup(&self, row_id: u64) -> Option<(&str, &str)> {
423        let (_, base) = self.locate(row_id)?;
424        let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
425        let session_idx = read_u32(header, 8)?;
426        let mid_len = read_u32(header, 24)?;
427        let mut at = base + ROW_HEADER_LEN;
428        let mid = self.slice_str(&mut at, mid_len)?;
429        Some((self.session_str(session_idx), mid))
430    }
431
432    /// Resolve a `row_id` to its full hydration meta, decompressing the row's
433    /// `search_text` block (reusing `cache` if it already holds that block).
434    /// `None` if unmapped (caller falls back to take_rows).
435    pub fn lookup_meta(&self, row_id: u64, cache: &mut BlockCache) -> Option<RowMeta<'_>> {
436        let (idx, base) = self.locate(row_id)?;
437        let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
438        let timestamp_micros = i64::from_le_bytes(header.get(0..8)?.try_into().ok()?);
439        let session_idx = read_u32(header, 8)?;
440        let project_idx = read_u32(header, 12)?;
441        let agent_idx = read_u32(header, 16)?;
442        let role_idx = read_u32(header, 20)?;
443        let mid_len = read_u32(header, 24)?;
444        let text_off = read_u32(header, 28)?;
445        let text_len = read_u32(header, 32)?;
446        let mut at = base + ROW_HEADER_LEN;
447        let message_id = self.slice_str(&mut at, mid_len)?;
448        let search_text = self.decompress_text(idx, text_off, text_len, cache)?;
449        Some(RowMeta {
450            session_id: self.session_str(session_idx),
451            message_id,
452            role: self.dict_str(self.roles_off, self.role_count, role_idx),
453            project: self.dict_str(self.projects_off, self.project_count, project_idx),
454            source_agent: self.dict_str(self.agents_off, self.agent_count, agent_idx),
455            timestamp_micros,
456            search_text,
457        })
458    }
459
460    fn decompress_block(&self, block_idx: usize) -> Option<Vec<u8>> {
461        let block = self.block_entries().get(block_idx)?;
462        if block.decomp_len == 0 {
463            return Some(Vec::new());
464        }
465        let comp_base = self.blob_offset.checked_add(block.comp_off as usize)?;
466        let comp = self
467            .mmap
468            .get(comp_base..comp_base.checked_add(block.comp_len as usize)?)?;
469        zstd::bulk::decompress(comp, block.decomp_len as usize).ok()
470    }
471
472    fn decompress_text(
473        &self,
474        idx: usize,
475        text_off: usize,
476        text_len: usize,
477        cache: &mut BlockCache,
478    ) -> Option<String> {
479        if text_len == 0 {
480            return Some(String::new());
481        }
482        let block_idx = idx / BLOCK_ROWS;
483        if cache.as_ref().map(|(block, _)| *block) != Some(block_idx) {
484            *cache = Some((block_idx, self.decompress_block(block_idx)?));
485        }
486        let plain = &cache.as_ref()?.1;
487        let value = plain.get(text_off..text_off.checked_add(text_len)?)?;
488        String::from_utf8(value.to_vec()).ok()
489    }
490
491    /// Reconstruct every row as an owned [`RowMetaEntry`], decompressing each
492    /// block once. Used to merge segments into a fresh base at compaction
493    /// without re-reading the store.
494    pub fn entries(&self) -> Vec<RowMetaEntry> {
495        let records = self.records();
496        let mut out = Vec::with_capacity(records.len());
497        let mut current_block = usize::MAX;
498        let mut plain: Vec<u8> = Vec::new();
499        for (idx, record) in records.iter().enumerate() {
500            let block_idx = idx / BLOCK_ROWS;
501            if block_idx != current_block {
502                plain = self.decompress_block(block_idx).unwrap_or_default();
503                current_block = block_idx;
504            }
505            let Some(base) = self.blob_offset.checked_add(record.blob_off as usize) else {
506                continue;
507            };
508            if let Some(entry) = self.entry_at(record.row_id, base, &plain) {
509                out.push(entry);
510            }
511        }
512        out
513    }
514
515    fn entry_at(&self, row_id: u64, base: usize, block_plain: &[u8]) -> Option<RowMetaEntry> {
516        let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
517        let timestamp_micros = i64::from_le_bytes(header.get(0..8)?.try_into().ok()?);
518        let session_idx = read_u32(header, 8)?;
519        let project_idx = read_u32(header, 12)?;
520        let agent_idx = read_u32(header, 16)?;
521        let role_idx = read_u32(header, 20)?;
522        let mid_len = read_u32(header, 24)?;
523        let text_off = read_u32(header, 28)?;
524        let text_len = read_u32(header, 32)?;
525        let mut at = base + ROW_HEADER_LEN;
526        let message_id = self.slice_str(&mut at, mid_len)?.to_owned();
527        let search_text = if text_len == 0 {
528            String::new()
529        } else {
530            let bytes = block_plain.get(text_off..text_off.checked_add(text_len)?)?;
531            String::from_utf8(bytes.to_vec()).ok()?
532        };
533        Some(RowMetaEntry {
534            row_id,
535            session_id: self.session_str(session_idx).to_owned(),
536            message_id,
537            role: self
538                .dict_str(self.roles_off, self.role_count, role_idx)
539                .to_owned(),
540            project: self
541                .dict_str(self.projects_off, self.project_count, project_idx)
542                .to_owned(),
543            source_agent: self
544                .dict_str(self.agents_off, self.agent_count, agent_idx)
545                .to_owned(),
546            timestamp_micros,
547            search_text,
548        })
549    }
550
551    /// Whole-session message count for `session_id`. `None` if the session is
552    /// not in this map (caller falls back to the `session_id IN (...)` scan).
553    pub fn lookup_count(&self, session_id: &str) -> Option<usize> {
554        let entries = self.session_entries();
555        let idx = entries
556            .binary_search_by(|entry| self.blob_str(entry.sid_off, entry.sid_len).cmp(session_id))
557            .ok()?;
558        Some(entries[idx].count as usize)
559    }
560
561    /// Max message timestamp (micros) stored for `session_id` - the watermark the
562    /// sync skip oracle compares against the source's latest message timestamp.
563    /// `None` if the session is not in this map.
564    pub fn lookup_max_ts(&self, session_id: &str) -> Option<i64> {
565        let entries = self.session_entries();
566        let idx = entries
567            .binary_search_by(|entry| self.blob_str(entry.sid_off, entry.sid_len).cmp(session_id))
568            .ok()?;
569        Some(entries[idx].max_ts_micros)
570    }
571
572    /// Slice `len` UTF-8 bytes at `*at`, advancing `*at`. Checked so a corrupt
573    /// map yields `None` (-> take fallback), not a panic.
574    fn slice_str(&self, at: &mut usize, len: usize) -> Option<&str> {
575        let end = at.checked_add(len)?;
576        let bytes = self.mmap.get(*at..end)?;
577        *at = end;
578        std::str::from_utf8(bytes).ok()
579    }
580}
581
582/// The on-disk LSM chain for a store: the highest-version base plus every
583/// delta layered above it, ascending.
584pub struct ChainPaths {
585    pub base: PathBuf,
586    pub base_version: u64,
587    pub deltas: Vec<(u64, PathBuf)>,
588}
589
590impl ChainPaths {
591    /// Version the chain covers - the newest segment's version.
592    pub fn version(&self) -> u64 {
593        self.deltas
594            .last()
595            .map(|(version, _)| *version)
596            .unwrap_or(self.base_version)
597    }
598}
599
600/// Discover the chain under `cache_dir` for `store_key`: the highest-version
601/// base (`-v{V}`) plus every delta (`-d{V}`) above it, ascending. `None` if no
602/// base exists yet.
603pub fn discover_chain(cache_dir: &Path, store_key: &str) -> Option<ChainPaths> {
604    let prefix = format!("rowmetamap-{store_key}-");
605    let mut bases: Vec<(u64, PathBuf)> = Vec::new();
606    let mut deltas: Vec<(u64, PathBuf)> = Vec::new();
607    for entry in std::fs::read_dir(cache_dir).ok()?.flatten() {
608        let name = entry.file_name();
609        let Some(rest) = name
610            .to_str()
611            .and_then(|name| name.strip_prefix(&prefix))
612            .and_then(|rest| rest.strip_suffix(".rmm"))
613        else {
614            continue;
615        };
616        if let Some(version) = rest.strip_prefix('v').and_then(|d| d.parse::<u64>().ok()) {
617            bases.push((version, entry.path()));
618        } else if let Some(version) = rest.strip_prefix('d').and_then(|d| d.parse::<u64>().ok()) {
619            deltas.push((version, entry.path()));
620        }
621    }
622    let (base_version, base) = bases.into_iter().max_by_key(|(version, _)| *version)?;
623    let mut deltas: Vec<(u64, PathBuf)> = deltas
624        .into_iter()
625        .filter(|(version, _)| *version > base_version)
626        .collect();
627    deltas.sort_by_key(|(version, _)| *version);
628    Some(ChainPaths {
629        base,
630        base_version,
631        deltas,
632    })
633}
634
635/// An LSM chain of immutable segment maps (base + ascending deltas) viewed as
636/// one logical map. Rows are partitioned across segments by `row_id` (append is
637/// disjoint; compaction rebuilds the base), so key/meta lookups take the newest
638/// hit and counts sum across segments.
639pub struct RowMetaSet {
640    segments: Vec<RowMetaMap>,
641}
642
643impl std::fmt::Debug for RowMetaSet {
644    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
645        formatter
646            .debug_struct("RowMetaSet")
647            .field("segments", &self.segments.len())
648            .field("version", &self.version())
649            .finish()
650    }
651}
652
653impl RowMetaSet {
654    /// Open every segment in `paths` (base first, then deltas ascending).
655    pub fn open(paths: &ChainPaths) -> Result<Self> {
656        let mut segments = Vec::with_capacity(1 + paths.deltas.len());
657        segments.push(RowMetaMap::open(&paths.base)?);
658        for (_, delta) in &paths.deltas {
659            segments.push(RowMetaMap::open(delta)?);
660        }
661        Ok(Self { segments })
662    }
663
664    pub fn version(&self) -> u64 {
665        self.segments
666            .iter()
667            .map(RowMetaMap::version)
668            .max()
669            .unwrap_or(0)
670    }
671
672    /// Number of delta segments layered on the base.
673    pub fn delta_count(&self) -> usize {
674        self.segments.len().saturating_sub(1)
675    }
676
677    /// No rows in any segment - the first-ingest hint that lets the sync oracle
678    /// short-circuit the per-session source last-id read.
679    pub fn is_empty(&self) -> bool {
680        self.segments.iter().all(RowMetaMap::is_empty)
681    }
682
683    /// Newest segment wins (a row lives in exactly one segment).
684    pub fn lookup(&self, row_id: u64) -> Option<(&str, &str)> {
685        self.segments
686            .iter()
687            .rev()
688            .find_map(|seg| seg.lookup(row_id))
689    }
690
691    /// Hydrate `rowids` to owned metas, splitting out the ones no segment holds
692    /// (appended since the build) for the caller's take_rows fallback. Output
693    /// order is unspecified (caller indexes by key). Rowids are visited in
694    /// sorted order with a per-segment block cache, so the common case - many
695    /// hits from a few row-id-adjacent sessions - decompresses each block once.
696    pub fn hydrate(&self, rowids: &[u64]) -> (Vec<RowMetaEntry>, Vec<u64>) {
697        let mut sorted = rowids.to_vec();
698        sorted.sort_unstable();
699        let mut caches: Vec<BlockCache> = vec![None; self.segments.len()];
700        let mut hits = Vec::with_capacity(sorted.len());
701        let mut misses = Vec::new();
702        for row_id in sorted {
703            let hit = self
704                .segments
705                .iter()
706                .enumerate()
707                .rev()
708                .find_map(|(segment, map)| {
709                    let meta = map.lookup_meta(row_id, &mut caches[segment])?;
710                    Some(RowMetaEntry {
711                        row_id,
712                        session_id: meta.session_id.to_owned(),
713                        message_id: meta.message_id.to_owned(),
714                        role: meta.role.to_owned(),
715                        project: meta.project.to_owned(),
716                        source_agent: meta.source_agent.to_owned(),
717                        timestamp_micros: meta.timestamp_micros,
718                        search_text: meta.search_text,
719                    })
720                });
721            match hit {
722                Some(entry) => hits.push(entry),
723                None => misses.push(row_id),
724            }
725        }
726        (hits, misses)
727    }
728
729    /// A session's rows are split across segments, so its count is the sum.
730    pub fn lookup_count(&self, session_id: &str) -> Option<usize> {
731        let mut total = 0;
732        let mut found = false;
733        for seg in &self.segments {
734            if let Some(count) = seg.lookup_count(session_id) {
735                total += count;
736                found = true;
737            }
738        }
739        found.then_some(total)
740    }
741
742    /// Max message timestamp (micros) for `session_id` across the chain - the max
743    /// over every segment that holds it (a session's rows can be split across
744    /// base and deltas). `None` if no segment has it.
745    pub fn lookup_max_ts(&self, session_id: &str) -> Option<i64> {
746        self.segments
747            .iter()
748            .filter_map(|seg| seg.lookup_max_ts(session_id))
749            .max()
750    }
751
752    /// Every row across all segments, newest-segment-wins on `row_id`
753    /// collision - the input to a base rebuild at compaction.
754    pub fn merged_entries(&self) -> Vec<RowMetaEntry> {
755        let mut by_row: HashMap<u64, RowMetaEntry> = HashMap::new();
756        for seg in &self.segments {
757            for entry in seg.entries() {
758                by_row.insert(entry.row_id, entry);
759            }
760        }
761        by_row.into_values().collect()
762    }
763}
764
765fn distinct_sorted<'a>(values: impl Iterator<Item = &'a str>) -> Vec<&'a str> {
766    let mut distinct: Vec<&str> = values.collect();
767    distinct.sort_unstable();
768    distinct.dedup();
769    distinct
770}
771
772fn index_of<'a>(values: impl Iterator<Item = &'a str>) -> HashMap<&'a str, u32> {
773    values
774        .enumerate()
775        .map(|(index, value)| (value, index as u32))
776        .collect()
777}
778
779fn dict_entries(blob: &mut Vec<u8>, values: &[&str]) -> Result<Vec<DictEntry>> {
780    values
781        .iter()
782        .map(|value| {
783            let off = blob.len() as u64;
784            blob.extend_from_slice(value.as_bytes());
785            Ok(DictEntry {
786                off,
787                len: u32::try_from(value.len()).context("dictionary value too long")?,
788                _pad: 0,
789            })
790        })
791        .collect()
792}
793
794fn read_u32(bytes: &[u8], at: usize) -> Option<usize> {
795    let slice = bytes.get(at..at.checked_add(4)?)?;
796    Some(u32::from_le_bytes(slice.try_into().ok()?) as usize)
797}
798
799#[cfg(test)]
800mod tests {
801    #![allow(clippy::expect_used, clippy::unwrap_used)]
802    use super::*;
803
804    fn entry(
805        row_id: u64,
806        session_id: &str,
807        message_id: &str,
808        timestamp_micros: i64,
809        search_text: &str,
810    ) -> RowMetaEntry {
811        RowMetaEntry {
812            row_id,
813            session_id: session_id.to_owned(),
814            message_id: message_id.to_owned(),
815            role: "user".to_owned(),
816            project: "/proj".to_owned(),
817            source_agent: "claude-code".to_owned(),
818            timestamp_micros,
819            search_text: search_text.to_owned(),
820        }
821    }
822
823    #[test]
824    fn build_open_lookup_roundtrip() {
825        let dir = tempfile::tempdir().unwrap();
826        let path = RowMetaMap::path_for(dir.path(), "teststore", 7);
827        let mut three = entry(99, "sess-a", "msg-3", 3_000, "third");
828        three.role = "assistant".to_owned();
829        three.project = "/other".to_owned();
830        let entries = vec![
831            entry(10, "sess-a", "msg-1", 1_000, "first message text"),
832            entry(3, "sess-b/agent-x", "msg-2", 2_000, ""),
833            three,
834        ];
835        RowMetaMap::build(&path, 7, entries).unwrap();
836
837        let map = RowMetaMap::open(&path).unwrap();
838        assert_eq!(map.version(), 7);
839        assert_eq!(map.len(), 3);
840        assert_eq!(map.lookup(10), Some(("sess-a", "msg-1")));
841        assert_eq!(map.lookup(3), Some(("sess-b/agent-x", "msg-2")));
842        assert_eq!(map.lookup(99), Some(("sess-a", "msg-3")));
843        assert_eq!(map.lookup(42), None);
844
845        let meta = map.lookup_meta(10, &mut None).expect("row 10 present");
846        assert_eq!(meta.session_id, "sess-a");
847        assert_eq!(meta.message_id, "msg-1");
848        assert_eq!(meta.role, "user");
849        assert_eq!(meta.project, "/proj");
850        assert_eq!(meta.source_agent, "claude-code");
851        assert_eq!(meta.timestamp_micros, 1_000);
852        assert_eq!(meta.search_text, "first message text");
853
854        let assistant = map.lookup_meta(99, &mut None).expect("row 99 present");
855        assert_eq!(assistant.role, "assistant");
856        assert_eq!(assistant.project, "/other");
857        assert_eq!(assistant.search_text, "third");
858
859        let empty_text = map.lookup_meta(3, &mut None).expect("row 3 present");
860        assert_eq!(empty_text.search_text, "");
861        assert!(map.lookup_meta(42, &mut None).is_none());
862
863        assert_eq!(map.lookup_count("sess-a"), Some(2));
864        assert_eq!(map.lookup_count("sess-b/agent-x"), Some(1));
865        assert_eq!(map.lookup_count("missing"), None);
866
867        // Watermark = max timestamp: sess-a's msg-3 (ts 3000) over msg-1.
868        assert_eq!(map.lookup_max_ts("sess-a"), Some(3_000));
869        assert_eq!(map.lookup_max_ts("sess-b/agent-x"), Some(2_000));
870        assert_eq!(map.lookup_max_ts("missing"), None);
871    }
872
873    #[test]
874    fn max_ts_is_the_session_high_water_mark() {
875        let dir = tempfile::tempdir().unwrap();
876        let path = RowMetaMap::path_for(dir.path(), "ts", 1);
877        // Out-of-row-order timestamps: the max wins regardless of row order.
878        let entries = vec![
879            entry(1, "s", "msg-a", 5_000, "a"),
880            entry(2, "s", "msg-b", 9_000, "b"),
881            entry(3, "s", "msg-c", 7_000, "c"),
882        ];
883        RowMetaMap::build(&path, 1, entries).unwrap();
884        let map = RowMetaMap::open(&path).unwrap();
885        assert_eq!(map.lookup_max_ts("s"), Some(9_000));
886    }
887
888    #[test]
889    fn many_blocks_roundtrip() {
890        let dir = tempfile::tempdir().unwrap();
891        let path = RowMetaMap::path_for(dir.path(), "blocks", 1);
892        let entries: Vec<RowMetaEntry> = (0..(BLOCK_ROWS as u64 * 2 + 5))
893            .map(|i| {
894                entry(
895                    i,
896                    "sess",
897                    &format!("msg-{i}"),
898                    i as i64,
899                    &format!("text body {i}"),
900                )
901            })
902            .collect();
903        RowMetaMap::build(&path, 1, entries).unwrap();
904
905        let map = RowMetaMap::open(&path).unwrap();
906        // One cache reused across rows spanning several blocks - same-block hits
907        // must reuse it, block crossings must refill it, both yielding the right
908        // text.
909        let mut cache = None;
910        for i in [0u64, 1, 255, 256, 257, 511, 512, 516] {
911            let meta = map.lookup_meta(i, &mut cache).expect("row present");
912            assert_eq!(meta.message_id, format!("msg-{i}"));
913            assert_eq!(meta.search_text, format!("text body {i}"));
914        }
915    }
916
917    #[test]
918    fn lsm_set_layers_delta_over_base() {
919        let dir = tempfile::tempdir().unwrap();
920        let base = vec![
921            entry(10, "sess-a", "m10", 1, "base ten"),
922            entry(11, "sess-a", "m11", 2, "base eleven"),
923            entry(12, "sess-b", "m12", 3, "base twelve"),
924        ];
925        RowMetaMap::build(&RowMetaMap::path_for(dir.path(), "k", 1), 1, base).unwrap();
926        let delta = vec![
927            entry(20, "sess-a", "m20", 4, "delta twenty"),
928            entry(21, "sess-c", "m21", 5, "delta twentyone"),
929        ];
930        RowMetaMap::build(&RowMetaMap::delta_path(dir.path(), "k", 2), 2, delta).unwrap();
931
932        let chain = discover_chain(dir.path(), "k").expect("chain present");
933        assert_eq!(chain.base_version, 1);
934        assert_eq!(chain.deltas.len(), 1);
935        assert_eq!(chain.version(), 2);
936
937        let set = RowMetaSet::open(&chain).unwrap();
938        assert_eq!(set.version(), 2);
939        assert_eq!(set.delta_count(), 1);
940
941        assert_eq!(set.lookup(10), Some(("sess-a", "m10")));
942        assert_eq!(set.lookup(20), Some(("sess-a", "m20")));
943        assert_eq!(set.lookup(99), None);
944
945        // hydrate spans both segments and splits out the absent row.
946        let (mut hits, misses) = set.hydrate(&[21, 10, 99]);
947        assert_eq!(misses, vec![99]);
948        hits.sort_by_key(|entry| entry.row_id);
949        assert_eq!(hits.len(), 2);
950        assert_eq!(hits[0].search_text, "base ten");
951        assert_eq!(hits[1].search_text, "delta twentyone");
952
953        // Counts sum across base + delta: sess-a = 2 + 1.
954        assert_eq!(set.lookup_count("sess-a"), Some(3));
955        assert_eq!(set.lookup_count("sess-b"), Some(1));
956        assert_eq!(set.lookup_count("sess-c"), Some(1));
957        assert_eq!(set.lookup_count("missing"), None);
958
959        // Max timestamp across segments: sess-a spans base (ts<=2) + delta (ts 4).
960        assert_eq!(set.lookup_max_ts("sess-a"), Some(4));
961        assert_eq!(set.lookup_max_ts("sess-b"), Some(3));
962        assert_eq!(set.lookup_max_ts("sess-c"), Some(5));
963        assert_eq!(set.lookup_max_ts("missing"), None);
964
965        // Compaction input: all 5 distinct rows reconstructed.
966        let mut merged = set.merged_entries();
967        merged.sort_by_key(|entry| entry.row_id);
968        assert_eq!(merged.len(), 5);
969        assert_eq!(merged[0].row_id, 10);
970        assert_eq!(merged[4].row_id, 21);
971        assert_eq!(merged[4].search_text, "delta twentyone");
972    }
973
974    #[test]
975    fn empty_map_roundtrips() {
976        let dir = tempfile::tempdir().unwrap();
977        let path = RowMetaMap::path_for(dir.path(), "empty", 1);
978        RowMetaMap::build(&path, 1, Vec::new()).unwrap();
979        let map = RowMetaMap::open(&path).unwrap();
980        assert!(map.is_empty());
981        assert_eq!(map.lookup(0), None);
982        assert!(map.lookup_meta(0, &mut None).is_none());
983        assert_eq!(map.lookup_count("anything"), None);
984        assert_eq!(map.lookup_max_ts("anything"), None);
985    }
986}