Skip to main content

pond/
rowmap.rs

1//! Memory-mapped, process-shareable per-message meta map keyed by stable
2//! `row_id`. Resolves FTS/vector `_rowid`s to `(session_id, message_id)` and
3//! hydrates hit meta (`role`, `project`, `source_agent`, `timestamp`,
4//! `search_text`) in memory, with a `take_rows` miss-fallback for rows appended
5//! since the build. Also carries a `session_id -> message count` aggregate.
6//!
7//! `session_id`/`project`/`source_agent`/`role` are dictionary-encoded (each
8//! distinct value stored once, referenced by `u32` index); `search_text` is
9//! block-compressed ([`BLOCK_ROWS`] rows per zstd block). On the real 2M-message
10//! corpus that takes the map from ~655 MB (flat) to ~270 MB.
11//!
12//! Built via temp + atomic rename and `mmap`'d read-only, so N pond processes on
13//! the box share one physical copy in the OS page cache and a restart re-`open`s
14//! instantly. Stable row ids (`enable_stable_row_ids`) keep a built map valid
15//! across compaction; it only rebuilds when the dataset version advances.
16//!
17//! Layout: `Header | [Record; count] | [SessionEntry] | [DictEntry; project] |
18//! [DictEntry; agent] | [DictEntry; role] | [BlockEntry] | blob`. Records are
19//! sorted by `row_id` (binary search); a row's block is `record_index /
20//! BLOCK_ROWS`. The blob holds the compressed `search_text` blocks, then per-row
21//! `{32-byte header + message_id}`, then per-session `session_id` bytes, then the
22//! dict value bytes. Each session entry also carries its max message timestamp,
23//! the watermark the `pond sync` skip oracle compares against the source.
24
25use std::collections::HashMap;
26use std::fs::File;
27use std::io::Write;
28use std::mem::size_of;
29use std::path::{Path, PathBuf};
30
31use anyhow::{Context, Result, ensure};
32use bytemuck::{Pod, Zeroable};
33use memmap2::Mmap;
34
35const MAGIC: [u8; 8] = *b"PONDRMM5";
36const BLOCK_ROWS: usize = 256;
37const ZSTD_LEVEL: i32 = 3;
38
39/// Per-row blob header: `timestamp_micros` (i64 LE) then seven `u32` LE fields -
40/// the four dictionary indices (`session`, `project`, `source_agent`, `role`),
41/// the `message_id` length, and the `search_text` offset+length within its
42/// decompressed block.
43const ROW_HEADER_LEN: usize = 8 + 7 * 4;
44
45/// One-slot decompressed-block cache `(block_index, plaintext)`, threaded
46/// through a batch of `lookup_meta` calls so rows sharing a block (a session's
47/// hits are row-id-adjacent) decompress it once, not once per row.
48type BlockCache = Option<(usize, Vec<u8>)>;
49
50#[repr(C)]
51#[derive(Clone, Copy, Pod, Zeroable)]
52struct Header {
53    magic: [u8; 8],
54    version: u64,
55    count: u64,
56    session_count: u64,
57    project_count: u64,
58    agent_count: u64,
59    role_count: u64,
60    block_count: u64,
61    blob_offset: u64,
62}
63
64#[repr(C)]
65#[derive(Clone, Copy, Pod, Zeroable)]
66struct Record {
67    row_id: u64,
68    blob_off: u64,
69}
70
71#[repr(C)]
72#[derive(Clone, Copy, Pod, Zeroable)]
73struct SessionEntry {
74    sid_off: u64,
75    max_ts_micros: i64,
76    sid_len: u32,
77    count: u32,
78}
79
80#[repr(C)]
81#[derive(Clone, Copy, Pod, Zeroable)]
82struct DictEntry {
83    off: u64,
84    len: u32,
85    _pad: u32,
86}
87
88#[repr(C)]
89#[derive(Clone, Copy, Pod, Zeroable)]
90struct BlockEntry {
91    comp_off: u64,
92    comp_len: u32,
93    decomp_len: u32,
94}
95
96/// Owned input row for [`RowMetaMap::build`].
97#[derive(Clone)]
98pub struct RowMetaEntry {
99    pub row_id: u64,
100    pub session_id: String,
101    pub message_id: String,
102    pub role: String,
103    pub project: String,
104    pub source_agent: String,
105    pub timestamp_micros: i64,
106    pub search_text: String,
107}
108
109/// Borrowed view of one row's meta. The dictionary-encoded fields borrow the
110/// mmap; `search_text` is owned (decompressed from its block).
111pub struct RowMeta<'a> {
112    pub session_id: &'a str,
113    pub message_id: &'a str,
114    pub role: &'a str,
115    pub project: &'a str,
116    pub source_agent: &'a str,
117    pub timestamp_micros: i64,
118    pub search_text: String,
119}
120
121/// An open, memory-mapped row meta map. `lookup`, `lookup_meta`, and
122/// `lookup_count` are lock-free and reentrant.
123pub struct RowMetaMap {
124    mmap: Mmap,
125    version: u64,
126    count: usize,
127    session_count: usize,
128    project_count: usize,
129    agent_count: usize,
130    role_count: usize,
131    block_count: usize,
132    sessions_off: usize,
133    projects_off: usize,
134    agents_off: usize,
135    roles_off: usize,
136    blocks_off: usize,
137    blob_offset: usize,
138}
139
140impl std::fmt::Debug for RowMetaMap {
141    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        formatter
143            .debug_struct("RowMetaMap")
144            .field("version", &self.version)
145            .field("count", &self.count)
146            .field("session_count", &self.session_count)
147            .finish_non_exhaustive()
148    }
149}
150
151impl RowMetaMap {
152    /// Base segment path (`-v{version}`): the foot of the LSM chain.
153    pub fn path_for(cache_dir: &Path, store_key: &str, version: u64) -> PathBuf {
154        cache_dir.join(format!("rowmetamap-{store_key}-v{version}.rmm"))
155    }
156
157    /// Delta segment path (`-d{version}`): rows appended since the previous
158    /// segment, layered over the base.
159    pub fn delta_path(cache_dir: &Path, store_key: &str, version: u64) -> PathBuf {
160        cache_dir.join(format!("rowmetamap-{store_key}-d{version}.rmm"))
161    }
162
163    pub fn build(path: &Path, version: u64, mut entries: Vec<RowMetaEntry>) -> Result<()> {
164        entries.sort_unstable_by_key(|entry| entry.row_id);
165
166        // Per session: message count plus the max message timestamp - the
167        // watermark the sync skip oracle compares against the source's latest
168        // message timestamp (spec.md#adapters; deterministic, rebuilt from the
169        // store, no local cursor).
170        let mut session_agg: HashMap<&str, (u32, i64)> = HashMap::new();
171        for entry in &entries {
172            let agg = session_agg
173                .entry(entry.session_id.as_str())
174                .or_insert((0, i64::MIN));
175            agg.0 += 1;
176            agg.1 = agg.1.max(entry.timestamp_micros);
177        }
178        let mut sessions: Vec<(&str, u32, i64)> = session_agg
179            .into_iter()
180            .map(|(sid, (count, max_ts))| (sid, count, max_ts))
181            .collect();
182        sessions.sort_unstable_by(|left, right| left.0.cmp(right.0));
183        let session_index = index_of(sessions.iter().map(|(value, _, _)| *value));
184
185        let projects = distinct_sorted(entries.iter().map(|entry| entry.project.as_str()));
186        let project_index = index_of(projects.iter().copied());
187        let agents = distinct_sorted(entries.iter().map(|entry| entry.source_agent.as_str()));
188        let agent_index = index_of(agents.iter().copied());
189        let roles = distinct_sorted(entries.iter().map(|entry| entry.role.as_str()));
190        let role_index = index_of(roles.iter().copied());
191
192        let mut blob: Vec<u8> = Vec::new();
193
194        // Compressed search_text blocks first; spans record each row's
195        // offset+length within its decompressed block.
196        let mut block_entries = Vec::with_capacity(entries.len().div_ceil(BLOCK_ROWS));
197        let mut spans: Vec<(u32, u32)> = Vec::with_capacity(entries.len());
198        for chunk in entries.chunks(BLOCK_ROWS) {
199            let mut plain = Vec::new();
200            for entry in chunk {
201                let off = u32::try_from(plain.len()).context("block too large")?;
202                let len = u32::try_from(entry.search_text.len()).context("search_text too long")?;
203                plain.extend_from_slice(entry.search_text.as_bytes());
204                spans.push((off, len));
205            }
206            let compressed = zstd::bulk::compress(&plain, ZSTD_LEVEL).context("zstd compress")?;
207            block_entries.push(BlockEntry {
208                comp_off: blob.len() as u64,
209                comp_len: u32::try_from(compressed.len()).context("compressed block too large")?,
210                decomp_len: u32::try_from(plain.len()).context("block too large")?,
211            });
212            blob.extend_from_slice(&compressed);
213        }
214
215        let mut records = Vec::with_capacity(entries.len());
216        for (entry, (text_off, text_len)) in entries.iter().zip(&spans) {
217            let blob_off = blob.len() as u64;
218            blob.extend_from_slice(&entry.timestamp_micros.to_le_bytes());
219            blob.extend_from_slice(&session_index[entry.session_id.as_str()].to_le_bytes());
220            blob.extend_from_slice(&project_index[entry.project.as_str()].to_le_bytes());
221            blob.extend_from_slice(&agent_index[entry.source_agent.as_str()].to_le_bytes());
222            blob.extend_from_slice(&role_index[entry.role.as_str()].to_le_bytes());
223            let mid_len = u32::try_from(entry.message_id.len()).context("message_id too long")?;
224            blob.extend_from_slice(&mid_len.to_le_bytes());
225            blob.extend_from_slice(&text_off.to_le_bytes());
226            blob.extend_from_slice(&text_len.to_le_bytes());
227            blob.extend_from_slice(entry.message_id.as_bytes());
228            records.push(Record {
229                row_id: entry.row_id,
230                blob_off,
231            });
232        }
233
234        let session_entries = sessions
235            .iter()
236            .map(|(sid, count, max_ts_micros)| {
237                let off = blob.len() as u64;
238                blob.extend_from_slice(sid.as_bytes());
239                Ok(SessionEntry {
240                    sid_off: off,
241                    max_ts_micros: *max_ts_micros,
242                    sid_len: u32::try_from(sid.len()).context("session_id too long")?,
243                    count: *count,
244                })
245            })
246            .collect::<Result<Vec<_>>>()?;
247        let project_entries = dict_entries(&mut blob, &projects)?;
248        let agent_entries = dict_entries(&mut blob, &agents)?;
249        let role_entries = dict_entries(&mut blob, &roles)?;
250
251        let blob_offset = (size_of::<Header>()
252            + records.len() * size_of::<Record>()
253            + session_entries.len() * size_of::<SessionEntry>()
254            + (project_entries.len() + agent_entries.len() + role_entries.len())
255                * size_of::<DictEntry>()
256            + block_entries.len() * size_of::<BlockEntry>()) as u64;
257        let header = Header {
258            magic: MAGIC,
259            version,
260            count: records.len() as u64,
261            session_count: session_entries.len() as u64,
262            project_count: project_entries.len() as u64,
263            agent_count: agent_entries.len() as u64,
264            role_count: role_entries.len() as u64,
265            block_count: block_entries.len() as u64,
266            blob_offset,
267        };
268
269        // Unique temp name per builder (pid + nonce): two processes prewarming
270        // the same store+version must not share one temp inode, or the second's
271        // create would mutate the file the first is mapping.
272        let tmp = path.with_extension(format!(
273            "tmp-{}-{:016x}",
274            std::process::id(),
275            fastrand::u64(..)
276        ));
277        {
278            let mut file = File::create(&tmp)
279                .with_context(|| format!("create row meta map temp {}", tmp.display()))?;
280            file.write_all(bytemuck::bytes_of(&header))?;
281            file.write_all(bytemuck::cast_slice(&records))?;
282            file.write_all(bytemuck::cast_slice(&session_entries))?;
283            file.write_all(bytemuck::cast_slice(&project_entries))?;
284            file.write_all(bytemuck::cast_slice(&agent_entries))?;
285            file.write_all(bytemuck::cast_slice(&role_entries))?;
286            file.write_all(bytemuck::cast_slice(&block_entries))?;
287            file.write_all(&blob)?;
288            file.sync_all()?;
289        }
290        std::fs::rename(&tmp, path)
291            .with_context(|| format!("rename row meta map into place {}", path.display()))?;
292        Ok(())
293    }
294
295    pub fn open(path: &Path) -> Result<Self> {
296        let file =
297            File::open(path).with_context(|| format!("open row meta map {}", path.display()))?;
298        // SAFETY: the file is immutable once renamed into place, so the mapping
299        // never sees concurrent truncation/mutation.
300        #[allow(unsafe_code)]
301        let mmap = unsafe { Mmap::map(&file)? };
302        ensure!(
303            mmap.len() >= size_of::<Header>(),
304            "row meta map {} too small for header",
305            path.display()
306        );
307        let header: Header = *bytemuck::from_bytes(&mmap[..size_of::<Header>()]);
308        ensure!(
309            header.magic == MAGIC,
310            "row meta map {} bad magic",
311            path.display()
312        );
313        let count = usize::try_from(header.count).context("count overflow")?;
314        let session_count = usize::try_from(header.session_count).context("session_count")?;
315        let project_count = usize::try_from(header.project_count).context("project_count")?;
316        let agent_count = usize::try_from(header.agent_count).context("agent_count")?;
317        let role_count = usize::try_from(header.role_count).context("role_count")?;
318        let block_count = usize::try_from(header.block_count).context("block_count")?;
319        let blob_offset = usize::try_from(header.blob_offset).context("blob_offset overflow")?;
320
321        let sessions_off = size_of::<Header>() + count * size_of::<Record>();
322        let projects_off = sessions_off + session_count * size_of::<SessionEntry>();
323        let agents_off = projects_off + project_count * size_of::<DictEntry>();
324        let roles_off = agents_off + agent_count * size_of::<DictEntry>();
325        let blocks_off = roles_off + role_count * size_of::<DictEntry>();
326        let blob_offset_expected = blocks_off + block_count * size_of::<BlockEntry>();
327        ensure!(
328            blob_offset == blob_offset_expected && mmap.len() >= blob_offset,
329            "row meta map {} layout mismatch",
330            path.display()
331        );
332        Ok(Self {
333            mmap,
334            version: header.version,
335            count,
336            session_count,
337            project_count,
338            agent_count,
339            role_count,
340            block_count,
341            sessions_off,
342            projects_off,
343            agents_off,
344            roles_off,
345            blocks_off,
346            blob_offset,
347        })
348    }
349
350    pub fn version(&self) -> u64 {
351        self.version
352    }
353
354    pub fn len(&self) -> usize {
355        self.count
356    }
357
358    pub fn is_empty(&self) -> bool {
359        self.count == 0
360    }
361
362    /// Highest `row_id` in this segment (records are row_id-sorted), or `None`
363    /// when empty. With stable row ids this is the append high-water mark.
364    pub fn max_row_id(&self) -> Option<u64> {
365        self.records().last().map(|record| record.row_id)
366    }
367
368    fn records(&self) -> &[Record] {
369        let start = size_of::<Header>();
370        let end = start + self.count * size_of::<Record>();
371        bytemuck::cast_slice(&self.mmap[start..end])
372    }
373
374    fn session_entries(&self) -> &[SessionEntry] {
375        let end = self.sessions_off + self.session_count * size_of::<SessionEntry>();
376        bytemuck::cast_slice(&self.mmap[self.sessions_off..end])
377    }
378
379    fn dict_entries(&self, start: usize, count: usize) -> &[DictEntry] {
380        let end = start + count * size_of::<DictEntry>();
381        bytemuck::cast_slice(&self.mmap[start..end])
382    }
383
384    fn block_entries(&self) -> &[BlockEntry] {
385        let end = self.blocks_off + self.block_count * size_of::<BlockEntry>();
386        bytemuck::cast_slice(&self.mmap[self.blocks_off..end])
387    }
388
389    /// `""` on a corrupt/truncated extent - treated as a miss, never a panic.
390    fn blob_str(&self, off: u64, len: u32) -> &str {
391        let base = self.blob_offset.saturating_add(off as usize);
392        let end = base.saturating_add(len as usize);
393        self.mmap
394            .get(base..end)
395            .and_then(|bytes| std::str::from_utf8(bytes).ok())
396            .unwrap_or_default()
397    }
398
399    fn session_str(&self, index: usize) -> &str {
400        match self.session_entries().get(index) {
401            Some(entry) => self.blob_str(entry.sid_off, entry.sid_len),
402            None => "",
403        }
404    }
405
406    fn dict_str(&self, start: usize, count: usize, index: usize) -> &str {
407        match self.dict_entries(start, count).get(index) {
408            Some(entry) => self.blob_str(entry.off, entry.len),
409            None => "",
410        }
411    }
412
413    /// `(record index, blob header offset)` for `row_id`, `None` if unmapped.
414    fn locate(&self, row_id: u64) -> Option<(usize, usize)> {
415        let records = self.records();
416        let idx = records
417            .binary_search_by_key(&row_id, |record| record.row_id)
418            .ok()?;
419        let base = self
420            .blob_offset
421            .checked_add(usize::try_from(records[idx].blob_off).ok()?)?;
422        Some((idx, base))
423    }
424
425    /// Resolve a `row_id` to `(session_id, message_id)` - the cheap arm-
426    /// resolution path. Never decompresses a block. `None` for rows appended
427    /// after the build (caller falls back to a data take).
428    pub fn lookup(&self, row_id: u64) -> Option<(&str, &str)> {
429        let (_, base) = self.locate(row_id)?;
430        let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
431        let session_idx = read_u32(header, 8)?;
432        let mid_len = read_u32(header, 24)?;
433        let mut at = base + ROW_HEADER_LEN;
434        let mid = self.slice_str(&mut at, mid_len)?;
435        Some((self.session_str(session_idx), mid))
436    }
437
438    /// Resolve a `row_id` to its full hydration meta, decompressing the row's
439    /// `search_text` block (reusing `cache` if it already holds that block).
440    /// `None` if unmapped (caller falls back to take_rows).
441    pub fn lookup_meta(&self, row_id: u64, cache: &mut BlockCache) -> Option<RowMeta<'_>> {
442        let (idx, base) = self.locate(row_id)?;
443        let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
444        let timestamp_micros = i64::from_le_bytes(header.get(0..8)?.try_into().ok()?);
445        let session_idx = read_u32(header, 8)?;
446        let project_idx = read_u32(header, 12)?;
447        let agent_idx = read_u32(header, 16)?;
448        let role_idx = read_u32(header, 20)?;
449        let mid_len = read_u32(header, 24)?;
450        let text_off = read_u32(header, 28)?;
451        let text_len = read_u32(header, 32)?;
452        let mut at = base + ROW_HEADER_LEN;
453        let message_id = self.slice_str(&mut at, mid_len)?;
454        let search_text = self.decompress_text(idx, text_off, text_len, cache)?;
455        Some(RowMeta {
456            session_id: self.session_str(session_idx),
457            message_id,
458            role: self.dict_str(self.roles_off, self.role_count, role_idx),
459            project: self.dict_str(self.projects_off, self.project_count, project_idx),
460            source_agent: self.dict_str(self.agents_off, self.agent_count, agent_idx),
461            timestamp_micros,
462            search_text,
463        })
464    }
465
466    fn decompress_block(&self, block_idx: usize) -> Option<Vec<u8>> {
467        let block = self.block_entries().get(block_idx)?;
468        if block.decomp_len == 0 {
469            return Some(Vec::new());
470        }
471        let comp_base = self.blob_offset.checked_add(block.comp_off as usize)?;
472        let comp = self
473            .mmap
474            .get(comp_base..comp_base.checked_add(block.comp_len as usize)?)?;
475        zstd::bulk::decompress(comp, block.decomp_len as usize).ok()
476    }
477
478    fn decompress_text(
479        &self,
480        idx: usize,
481        text_off: usize,
482        text_len: usize,
483        cache: &mut BlockCache,
484    ) -> Option<String> {
485        if text_len == 0 {
486            return Some(String::new());
487        }
488        let block_idx = idx / BLOCK_ROWS;
489        if cache.as_ref().map(|(block, _)| *block) != Some(block_idx) {
490            *cache = Some((block_idx, self.decompress_block(block_idx)?));
491        }
492        let plain = &cache.as_ref()?.1;
493        let value = plain.get(text_off..text_off.checked_add(text_len)?)?;
494        String::from_utf8(value.to_vec()).ok()
495    }
496
497    /// Reconstruct every row as an owned [`RowMetaEntry`], decompressing each
498    /// block once. Used to merge segments into a fresh base at compaction
499    /// without re-reading the store.
500    pub fn entries(&self) -> Vec<RowMetaEntry> {
501        let records = self.records();
502        let mut out = Vec::with_capacity(records.len());
503        let mut current_block = usize::MAX;
504        let mut plain: Vec<u8> = Vec::new();
505        for (idx, record) in records.iter().enumerate() {
506            let block_idx = idx / BLOCK_ROWS;
507            if block_idx != current_block {
508                plain = self.decompress_block(block_idx).unwrap_or_default();
509                current_block = block_idx;
510            }
511            let Some(base) = self.blob_offset.checked_add(record.blob_off as usize) else {
512                continue;
513            };
514            if let Some(entry) = self.entry_at(record.row_id, base, &plain) {
515                out.push(entry);
516            }
517        }
518        out
519    }
520
521    fn entry_at(&self, row_id: u64, base: usize, block_plain: &[u8]) -> Option<RowMetaEntry> {
522        let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
523        let timestamp_micros = i64::from_le_bytes(header.get(0..8)?.try_into().ok()?);
524        let session_idx = read_u32(header, 8)?;
525        let project_idx = read_u32(header, 12)?;
526        let agent_idx = read_u32(header, 16)?;
527        let role_idx = read_u32(header, 20)?;
528        let mid_len = read_u32(header, 24)?;
529        let text_off = read_u32(header, 28)?;
530        let text_len = read_u32(header, 32)?;
531        let mut at = base + ROW_HEADER_LEN;
532        let message_id = self.slice_str(&mut at, mid_len)?.to_owned();
533        let search_text = if text_len == 0 {
534            String::new()
535        } else {
536            let bytes = block_plain.get(text_off..text_off.checked_add(text_len)?)?;
537            String::from_utf8(bytes.to_vec()).ok()?
538        };
539        Some(RowMetaEntry {
540            row_id,
541            session_id: self.session_str(session_idx).to_owned(),
542            message_id,
543            role: self
544                .dict_str(self.roles_off, self.role_count, role_idx)
545                .to_owned(),
546            project: self
547                .dict_str(self.projects_off, self.project_count, project_idx)
548                .to_owned(),
549            source_agent: self
550                .dict_str(self.agents_off, self.agent_count, agent_idx)
551                .to_owned(),
552            timestamp_micros,
553            search_text,
554        })
555    }
556
557    /// Whole-session message count for `session_id`. `None` if the session is
558    /// not in this map (caller falls back to the `session_id IN (...)` scan).
559    pub fn lookup_count(&self, session_id: &str) -> Option<usize> {
560        let entries = self.session_entries();
561        let idx = entries
562            .binary_search_by(|entry| self.blob_str(entry.sid_off, entry.sid_len).cmp(session_id))
563            .ok()?;
564        Some(entries[idx].count as usize)
565    }
566
567    /// Max message timestamp (micros) stored for `session_id` - the watermark the
568    /// sync skip oracle compares against the source's latest message timestamp.
569    /// `None` if the session is not in this map.
570    pub fn lookup_max_ts(&self, session_id: &str) -> Option<i64> {
571        let entries = self.session_entries();
572        let idx = entries
573            .binary_search_by(|entry| self.blob_str(entry.sid_off, entry.sid_len).cmp(session_id))
574            .ok()?;
575        Some(entries[idx].max_ts_micros)
576    }
577
578    /// Slice `len` UTF-8 bytes at `*at`, advancing `*at`. Checked so a corrupt
579    /// map yields `None` (-> take fallback), not a panic.
580    fn slice_str(&self, at: &mut usize, len: usize) -> Option<&str> {
581        let end = at.checked_add(len)?;
582        let bytes = self.mmap.get(*at..end)?;
583        *at = end;
584        std::str::from_utf8(bytes).ok()
585    }
586}
587
588/// The on-disk LSM chain for a store: the highest-version base plus every
589/// delta layered above it, ascending.
590pub struct ChainPaths {
591    pub base: PathBuf,
592    pub base_version: u64,
593    pub deltas: Vec<(u64, PathBuf)>,
594}
595
596impl ChainPaths {
597    /// Version the chain covers - the newest segment's version.
598    pub fn version(&self) -> u64 {
599        self.deltas
600            .last()
601            .map(|(version, _)| *version)
602            .unwrap_or(self.base_version)
603    }
604}
605
606/// Discover the chain under `cache_dir` for `store_key`: the highest-version
607/// base (`-v{V}`) plus every delta (`-d{V}`) above it, ascending. `None` if no
608/// base exists yet.
609pub fn discover_chain(cache_dir: &Path, store_key: &str) -> Option<ChainPaths> {
610    let prefix = format!("rowmetamap-{store_key}-");
611    let mut bases: Vec<(u64, PathBuf)> = Vec::new();
612    let mut deltas: Vec<(u64, PathBuf)> = Vec::new();
613    for entry in std::fs::read_dir(cache_dir).ok()?.flatten() {
614        let name = entry.file_name();
615        let Some(rest) = name
616            .to_str()
617            .and_then(|name| name.strip_prefix(&prefix))
618            .and_then(|rest| rest.strip_suffix(".rmm"))
619        else {
620            continue;
621        };
622        if let Some(version) = rest.strip_prefix('v').and_then(|d| d.parse::<u64>().ok()) {
623            bases.push((version, entry.path()));
624        } else if let Some(version) = rest.strip_prefix('d').and_then(|d| d.parse::<u64>().ok()) {
625            deltas.push((version, entry.path()));
626        }
627    }
628    let (base_version, base) = bases.into_iter().max_by_key(|(version, _)| *version)?;
629    let mut deltas: Vec<(u64, PathBuf)> = deltas
630        .into_iter()
631        .filter(|(version, _)| *version > base_version)
632        .collect();
633    deltas.sort_by_key(|(version, _)| *version);
634    Some(ChainPaths {
635        base,
636        base_version,
637        deltas,
638    })
639}
640
641/// An LSM chain of immutable segment maps (base + ascending deltas) viewed as
642/// one logical map. Rows are partitioned across segments by `row_id` (append is
643/// disjoint; compaction rebuilds the base), so key/meta lookups take the newest
644/// hit and counts sum across segments.
645pub struct RowMetaSet {
646    segments: Vec<RowMetaMap>,
647}
648
649impl std::fmt::Debug for RowMetaSet {
650    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
651        formatter
652            .debug_struct("RowMetaSet")
653            .field("segments", &self.segments.len())
654            .field("version", &self.version())
655            .finish()
656    }
657}
658
659impl RowMetaSet {
660    /// Open every segment in `paths` (base first, then deltas ascending).
661    pub fn open(paths: &ChainPaths) -> Result<Self> {
662        let mut segments = Vec::with_capacity(1 + paths.deltas.len());
663        segments.push(RowMetaMap::open(&paths.base)?);
664        for (_, delta) in &paths.deltas {
665            segments.push(RowMetaMap::open(delta)?);
666        }
667        Ok(Self { segments })
668    }
669
670    pub fn version(&self) -> u64 {
671        self.segments
672            .iter()
673            .map(RowMetaMap::version)
674            .max()
675            .unwrap_or(0)
676    }
677
678    /// Number of delta segments layered on the base.
679    pub fn delta_count(&self) -> usize {
680        self.segments.len().saturating_sub(1)
681    }
682
683    /// No rows in any segment - the first-ingest hint that lets the sync oracle
684    /// short-circuit the per-session source last-id read.
685    pub fn is_empty(&self) -> bool {
686        self.segments.iter().all(RowMetaMap::is_empty)
687    }
688
689    /// Total row entries across every segment. Rows are disjoint across segments
690    /// (append-only deltas), so this sums - the live row count the base covers.
691    pub fn len(&self) -> usize {
692        self.segments.iter().map(RowMetaMap::len).sum()
693    }
694
695    /// Highest `row_id` across all segments - the append high-water mark a delta
696    /// extends past. Stable row ids keep it monotonic under fragment churn.
697    pub fn max_row_id(&self) -> Option<u64> {
698        self.segments
699            .iter()
700            .filter_map(RowMetaMap::max_row_id)
701            .max()
702    }
703
704    /// Newest segment wins (a row lives in exactly one segment).
705    pub fn lookup(&self, row_id: u64) -> Option<(&str, &str)> {
706        self.segments
707            .iter()
708            .rev()
709            .find_map(|seg| seg.lookup(row_id))
710    }
711
712    /// Hydrate `rowids` to owned metas, splitting out the ones no segment holds
713    /// (appended since the build) for the caller's take_rows fallback. Output
714    /// order is unspecified (caller indexes by key). Rowids are visited in
715    /// sorted order with a per-segment block cache, so the common case - many
716    /// hits from a few row-id-adjacent sessions - decompresses each block once.
717    pub fn hydrate(&self, rowids: &[u64]) -> (Vec<RowMetaEntry>, Vec<u64>) {
718        let mut sorted = rowids.to_vec();
719        sorted.sort_unstable();
720        let mut caches: Vec<BlockCache> = vec![None; self.segments.len()];
721        let mut hits = Vec::with_capacity(sorted.len());
722        let mut misses = Vec::new();
723        for row_id in sorted {
724            let hit = self
725                .segments
726                .iter()
727                .enumerate()
728                .rev()
729                .find_map(|(segment, map)| {
730                    let meta = map.lookup_meta(row_id, &mut caches[segment])?;
731                    Some(RowMetaEntry {
732                        row_id,
733                        session_id: meta.session_id.to_owned(),
734                        message_id: meta.message_id.to_owned(),
735                        role: meta.role.to_owned(),
736                        project: meta.project.to_owned(),
737                        source_agent: meta.source_agent.to_owned(),
738                        timestamp_micros: meta.timestamp_micros,
739                        search_text: meta.search_text,
740                    })
741                });
742            match hit {
743                Some(entry) => hits.push(entry),
744                None => misses.push(row_id),
745            }
746        }
747        (hits, misses)
748    }
749
750    /// A session's rows are split across segments, so its count is the sum.
751    pub fn lookup_count(&self, session_id: &str) -> Option<usize> {
752        let mut total = 0;
753        let mut found = false;
754        for seg in &self.segments {
755            if let Some(count) = seg.lookup_count(session_id) {
756                total += count;
757                found = true;
758            }
759        }
760        found.then_some(total)
761    }
762
763    /// Max message timestamp (micros) for `session_id` across the chain - the max
764    /// over every segment that holds it (a session's rows can be split across
765    /// base and deltas). `None` if no segment has it.
766    pub fn lookup_max_ts(&self, session_id: &str) -> Option<i64> {
767        self.segments
768            .iter()
769            .filter_map(|seg| seg.lookup_max_ts(session_id))
770            .max()
771    }
772
773    /// Every row across all segments, newest-segment-wins on `row_id`
774    /// collision - the input to a base rebuild at compaction.
775    pub fn merged_entries(&self) -> Vec<RowMetaEntry> {
776        let mut by_row: HashMap<u64, RowMetaEntry> = HashMap::new();
777        for seg in &self.segments {
778            for entry in seg.entries() {
779                by_row.insert(entry.row_id, entry);
780            }
781        }
782        by_row.into_values().collect()
783    }
784}
785
786fn distinct_sorted<'a>(values: impl Iterator<Item = &'a str>) -> Vec<&'a str> {
787    let mut distinct: Vec<&str> = values.collect();
788    distinct.sort_unstable();
789    distinct.dedup();
790    distinct
791}
792
793fn index_of<'a>(values: impl Iterator<Item = &'a str>) -> HashMap<&'a str, u32> {
794    values
795        .enumerate()
796        .map(|(index, value)| (value, index as u32))
797        .collect()
798}
799
800fn dict_entries(blob: &mut Vec<u8>, values: &[&str]) -> Result<Vec<DictEntry>> {
801    values
802        .iter()
803        .map(|value| {
804            let off = blob.len() as u64;
805            blob.extend_from_slice(value.as_bytes());
806            Ok(DictEntry {
807                off,
808                len: u32::try_from(value.len()).context("dictionary value too long")?,
809                _pad: 0,
810            })
811        })
812        .collect()
813}
814
815fn read_u32(bytes: &[u8], at: usize) -> Option<usize> {
816    let slice = bytes.get(at..at.checked_add(4)?)?;
817    Some(u32::from_le_bytes(slice.try_into().ok()?) as usize)
818}
819
820#[cfg(test)]
821mod tests {
822    #![allow(clippy::expect_used, clippy::unwrap_used)]
823    use super::*;
824
825    fn entry(
826        row_id: u64,
827        session_id: &str,
828        message_id: &str,
829        timestamp_micros: i64,
830        search_text: &str,
831    ) -> RowMetaEntry {
832        RowMetaEntry {
833            row_id,
834            session_id: session_id.to_owned(),
835            message_id: message_id.to_owned(),
836            role: "user".to_owned(),
837            project: "/proj".to_owned(),
838            source_agent: "claude-code".to_owned(),
839            timestamp_micros,
840            search_text: search_text.to_owned(),
841        }
842    }
843
844    #[test]
845    fn build_open_lookup_roundtrip() {
846        let dir = tempfile::tempdir().unwrap();
847        let path = RowMetaMap::path_for(dir.path(), "teststore", 7);
848        let mut three = entry(99, "sess-a", "msg-3", 3_000, "third");
849        three.role = "assistant".to_owned();
850        three.project = "/other".to_owned();
851        let entries = vec![
852            entry(10, "sess-a", "msg-1", 1_000, "first message text"),
853            entry(3, "sess-b/agent-x", "msg-2", 2_000, ""),
854            three,
855        ];
856        RowMetaMap::build(&path, 7, entries).unwrap();
857
858        let map = RowMetaMap::open(&path).unwrap();
859        assert_eq!(map.version(), 7);
860        assert_eq!(map.len(), 3);
861        assert_eq!(map.lookup(10), Some(("sess-a", "msg-1")));
862        assert_eq!(map.lookup(3), Some(("sess-b/agent-x", "msg-2")));
863        assert_eq!(map.lookup(99), Some(("sess-a", "msg-3")));
864        assert_eq!(map.lookup(42), None);
865
866        let meta = map.lookup_meta(10, &mut None).expect("row 10 present");
867        assert_eq!(meta.session_id, "sess-a");
868        assert_eq!(meta.message_id, "msg-1");
869        assert_eq!(meta.role, "user");
870        assert_eq!(meta.project, "/proj");
871        assert_eq!(meta.source_agent, "claude-code");
872        assert_eq!(meta.timestamp_micros, 1_000);
873        assert_eq!(meta.search_text, "first message text");
874
875        let assistant = map.lookup_meta(99, &mut None).expect("row 99 present");
876        assert_eq!(assistant.role, "assistant");
877        assert_eq!(assistant.project, "/other");
878        assert_eq!(assistant.search_text, "third");
879
880        let empty_text = map.lookup_meta(3, &mut None).expect("row 3 present");
881        assert_eq!(empty_text.search_text, "");
882        assert!(map.lookup_meta(42, &mut None).is_none());
883
884        assert_eq!(map.lookup_count("sess-a"), Some(2));
885        assert_eq!(map.lookup_count("sess-b/agent-x"), Some(1));
886        assert_eq!(map.lookup_count("missing"), None);
887
888        // Watermark = max timestamp: sess-a's msg-3 (ts 3000) over msg-1.
889        assert_eq!(map.lookup_max_ts("sess-a"), Some(3_000));
890        assert_eq!(map.lookup_max_ts("sess-b/agent-x"), Some(2_000));
891        assert_eq!(map.lookup_max_ts("missing"), None);
892    }
893
894    #[test]
895    fn max_ts_is_the_session_high_water_mark() {
896        let dir = tempfile::tempdir().unwrap();
897        let path = RowMetaMap::path_for(dir.path(), "ts", 1);
898        // Out-of-row-order timestamps: the max wins regardless of row order.
899        let entries = vec![
900            entry(1, "s", "msg-a", 5_000, "a"),
901            entry(2, "s", "msg-b", 9_000, "b"),
902            entry(3, "s", "msg-c", 7_000, "c"),
903        ];
904        RowMetaMap::build(&path, 1, entries).unwrap();
905        let map = RowMetaMap::open(&path).unwrap();
906        assert_eq!(map.lookup_max_ts("s"), Some(9_000));
907    }
908
909    #[test]
910    fn many_blocks_roundtrip() {
911        let dir = tempfile::tempdir().unwrap();
912        let path = RowMetaMap::path_for(dir.path(), "blocks", 1);
913        let entries: Vec<RowMetaEntry> = (0..(BLOCK_ROWS as u64 * 2 + 5))
914            .map(|i| {
915                entry(
916                    i,
917                    "sess",
918                    &format!("msg-{i}"),
919                    i as i64,
920                    &format!("text body {i}"),
921                )
922            })
923            .collect();
924        RowMetaMap::build(&path, 1, entries).unwrap();
925
926        let map = RowMetaMap::open(&path).unwrap();
927        // One cache reused across rows spanning several blocks - same-block hits
928        // must reuse it, block crossings must refill it, both yielding the right
929        // text.
930        let mut cache = None;
931        for i in [0u64, 1, 255, 256, 257, 511, 512, 516] {
932            let meta = map.lookup_meta(i, &mut cache).expect("row present");
933            assert_eq!(meta.message_id, format!("msg-{i}"));
934            assert_eq!(meta.search_text, format!("text body {i}"));
935        }
936    }
937
938    #[test]
939    fn lsm_set_layers_delta_over_base() {
940        let dir = tempfile::tempdir().unwrap();
941        let base = vec![
942            entry(10, "sess-a", "m10", 1, "base ten"),
943            entry(11, "sess-a", "m11", 2, "base eleven"),
944            entry(12, "sess-b", "m12", 3, "base twelve"),
945        ];
946        RowMetaMap::build(&RowMetaMap::path_for(dir.path(), "k", 1), 1, base).unwrap();
947        let delta = vec![
948            entry(20, "sess-a", "m20", 4, "delta twenty"),
949            entry(21, "sess-c", "m21", 5, "delta twentyone"),
950        ];
951        RowMetaMap::build(&RowMetaMap::delta_path(dir.path(), "k", 2), 2, delta).unwrap();
952
953        let chain = discover_chain(dir.path(), "k").expect("chain present");
954        assert_eq!(chain.base_version, 1);
955        assert_eq!(chain.deltas.len(), 1);
956        assert_eq!(chain.version(), 2);
957
958        let set = RowMetaSet::open(&chain).unwrap();
959        assert_eq!(set.version(), 2);
960        assert_eq!(set.delta_count(), 1);
961
962        assert_eq!(set.lookup(10), Some(("sess-a", "m10")));
963        assert_eq!(set.lookup(20), Some(("sess-a", "m20")));
964        assert_eq!(set.lookup(99), None);
965
966        // hydrate spans both segments and splits out the absent row.
967        let (mut hits, misses) = set.hydrate(&[21, 10, 99]);
968        assert_eq!(misses, vec![99]);
969        hits.sort_by_key(|entry| entry.row_id);
970        assert_eq!(hits.len(), 2);
971        assert_eq!(hits[0].search_text, "base ten");
972        assert_eq!(hits[1].search_text, "delta twentyone");
973
974        // Counts sum across base + delta: sess-a = 2 + 1.
975        assert_eq!(set.lookup_count("sess-a"), Some(3));
976        assert_eq!(set.lookup_count("sess-b"), Some(1));
977        assert_eq!(set.lookup_count("sess-c"), Some(1));
978        assert_eq!(set.lookup_count("missing"), None);
979
980        // Max timestamp across segments: sess-a spans base (ts<=2) + delta (ts 4).
981        assert_eq!(set.lookup_max_ts("sess-a"), Some(4));
982        assert_eq!(set.lookup_max_ts("sess-b"), Some(3));
983        assert_eq!(set.lookup_max_ts("sess-c"), Some(5));
984        assert_eq!(set.lookup_max_ts("missing"), None);
985
986        // Compaction input: all 5 distinct rows reconstructed.
987        let mut merged = set.merged_entries();
988        merged.sort_by_key(|entry| entry.row_id);
989        assert_eq!(merged.len(), 5);
990        assert_eq!(merged[0].row_id, 10);
991        assert_eq!(merged[4].row_id, 21);
992        assert_eq!(merged[4].search_text, "delta twentyone");
993    }
994
995    #[test]
996    fn empty_map_roundtrips() {
997        let dir = tempfile::tempdir().unwrap();
998        let path = RowMetaMap::path_for(dir.path(), "empty", 1);
999        RowMetaMap::build(&path, 1, Vec::new()).unwrap();
1000        let map = RowMetaMap::open(&path).unwrap();
1001        assert!(map.is_empty());
1002        assert_eq!(map.lookup(0), None);
1003        assert!(map.lookup_meta(0, &mut None).is_none());
1004        assert_eq!(map.lookup_count("anything"), None);
1005        assert_eq!(map.lookup_max_ts("anything"), None);
1006    }
1007}