1use std::collections::HashMap;
26use std::fs::File;
27use std::io::Write;
28use std::mem::size_of;
29use std::path::{Path, PathBuf};
30
31use anyhow::{Context, Result, ensure};
32use bytemuck::{Pod, Zeroable};
33use memmap2::Mmap;
34
35const MAGIC: [u8; 8] = *b"PONDRMM5";
36const BLOCK_ROWS: usize = 256;
37const ZSTD_LEVEL: i32 = 3;
38
39const ROW_HEADER_LEN: usize = 8 + 7 * 4;
44
45type BlockCache = Option<(usize, Vec<u8>)>;
49
50#[repr(C)]
51#[derive(Clone, Copy, Pod, Zeroable)]
52struct Header {
53 magic: [u8; 8],
54 version: u64,
55 count: u64,
56 session_count: u64,
57 project_count: u64,
58 agent_count: u64,
59 role_count: u64,
60 block_count: u64,
61 blob_offset: u64,
62}
63
64#[repr(C)]
65#[derive(Clone, Copy, Pod, Zeroable)]
66struct Record {
67 row_id: u64,
68 blob_off: u64,
69}
70
71#[repr(C)]
72#[derive(Clone, Copy, Pod, Zeroable)]
73struct SessionEntry {
74 sid_off: u64,
75 max_ts_micros: i64,
76 sid_len: u32,
77 count: u32,
78}
79
80#[repr(C)]
81#[derive(Clone, Copy, Pod, Zeroable)]
82struct DictEntry {
83 off: u64,
84 len: u32,
85 _pad: u32,
86}
87
88#[repr(C)]
89#[derive(Clone, Copy, Pod, Zeroable)]
90struct BlockEntry {
91 comp_off: u64,
92 comp_len: u32,
93 decomp_len: u32,
94}
95
96#[derive(Clone)]
98pub struct RowMetaEntry {
99 pub row_id: u64,
100 pub session_id: String,
101 pub message_id: String,
102 pub role: String,
103 pub project: String,
104 pub source_agent: String,
105 pub timestamp_micros: i64,
106 pub search_text: String,
107}
108
109pub struct RowMeta<'a> {
112 pub session_id: &'a str,
113 pub message_id: &'a str,
114 pub role: &'a str,
115 pub project: &'a str,
116 pub source_agent: &'a str,
117 pub timestamp_micros: i64,
118 pub search_text: String,
119}
120
121pub struct RowMetaMap {
124 mmap: Mmap,
125 version: u64,
126 count: usize,
127 session_count: usize,
128 project_count: usize,
129 agent_count: usize,
130 role_count: usize,
131 block_count: usize,
132 sessions_off: usize,
133 projects_off: usize,
134 agents_off: usize,
135 roles_off: usize,
136 blocks_off: usize,
137 blob_offset: usize,
138}
139
140impl std::fmt::Debug for RowMetaMap {
141 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 formatter
143 .debug_struct("RowMetaMap")
144 .field("version", &self.version)
145 .field("count", &self.count)
146 .field("session_count", &self.session_count)
147 .finish_non_exhaustive()
148 }
149}
150
151impl RowMetaMap {
152 pub fn path_for(cache_dir: &Path, store_key: &str, version: u64) -> PathBuf {
154 cache_dir.join(format!("rowmetamap-{store_key}-v{version}.rmm"))
155 }
156
157 pub fn delta_path(cache_dir: &Path, store_key: &str, version: u64) -> PathBuf {
160 cache_dir.join(format!("rowmetamap-{store_key}-d{version}.rmm"))
161 }
162
163 pub fn build(path: &Path, version: u64, mut entries: Vec<RowMetaEntry>) -> Result<()> {
164 entries.sort_unstable_by_key(|entry| entry.row_id);
165
166 let mut session_agg: HashMap<&str, (u32, i64)> = HashMap::new();
171 for entry in &entries {
172 let agg = session_agg
173 .entry(entry.session_id.as_str())
174 .or_insert((0, i64::MIN));
175 agg.0 += 1;
176 agg.1 = agg.1.max(entry.timestamp_micros);
177 }
178 let mut sessions: Vec<(&str, u32, i64)> = session_agg
179 .into_iter()
180 .map(|(sid, (count, max_ts))| (sid, count, max_ts))
181 .collect();
182 sessions.sort_unstable_by(|left, right| left.0.cmp(right.0));
183 let session_index = index_of(sessions.iter().map(|(value, _, _)| *value));
184
185 let projects = distinct_sorted(entries.iter().map(|entry| entry.project.as_str()));
186 let project_index = index_of(projects.iter().copied());
187 let agents = distinct_sorted(entries.iter().map(|entry| entry.source_agent.as_str()));
188 let agent_index = index_of(agents.iter().copied());
189 let roles = distinct_sorted(entries.iter().map(|entry| entry.role.as_str()));
190 let role_index = index_of(roles.iter().copied());
191
192 let mut blob: Vec<u8> = Vec::new();
193
194 let mut block_entries = Vec::with_capacity(entries.len().div_ceil(BLOCK_ROWS));
197 let mut spans: Vec<(u32, u32)> = Vec::with_capacity(entries.len());
198 for chunk in entries.chunks(BLOCK_ROWS) {
199 let mut plain = Vec::new();
200 for entry in chunk {
201 let off = u32::try_from(plain.len()).context("block too large")?;
202 let len = u32::try_from(entry.search_text.len()).context("search_text too long")?;
203 plain.extend_from_slice(entry.search_text.as_bytes());
204 spans.push((off, len));
205 }
206 let compressed = zstd::bulk::compress(&plain, ZSTD_LEVEL).context("zstd compress")?;
207 block_entries.push(BlockEntry {
208 comp_off: blob.len() as u64,
209 comp_len: u32::try_from(compressed.len()).context("compressed block too large")?,
210 decomp_len: u32::try_from(plain.len()).context("block too large")?,
211 });
212 blob.extend_from_slice(&compressed);
213 }
214
215 let mut records = Vec::with_capacity(entries.len());
216 for (entry, (text_off, text_len)) in entries.iter().zip(&spans) {
217 let blob_off = blob.len() as u64;
218 blob.extend_from_slice(&entry.timestamp_micros.to_le_bytes());
219 blob.extend_from_slice(&session_index[entry.session_id.as_str()].to_le_bytes());
220 blob.extend_from_slice(&project_index[entry.project.as_str()].to_le_bytes());
221 blob.extend_from_slice(&agent_index[entry.source_agent.as_str()].to_le_bytes());
222 blob.extend_from_slice(&role_index[entry.role.as_str()].to_le_bytes());
223 let mid_len = u32::try_from(entry.message_id.len()).context("message_id too long")?;
224 blob.extend_from_slice(&mid_len.to_le_bytes());
225 blob.extend_from_slice(&text_off.to_le_bytes());
226 blob.extend_from_slice(&text_len.to_le_bytes());
227 blob.extend_from_slice(entry.message_id.as_bytes());
228 records.push(Record {
229 row_id: entry.row_id,
230 blob_off,
231 });
232 }
233
234 let session_entries = sessions
235 .iter()
236 .map(|(sid, count, max_ts_micros)| {
237 let off = blob.len() as u64;
238 blob.extend_from_slice(sid.as_bytes());
239 Ok(SessionEntry {
240 sid_off: off,
241 max_ts_micros: *max_ts_micros,
242 sid_len: u32::try_from(sid.len()).context("session_id too long")?,
243 count: *count,
244 })
245 })
246 .collect::<Result<Vec<_>>>()?;
247 let project_entries = dict_entries(&mut blob, &projects)?;
248 let agent_entries = dict_entries(&mut blob, &agents)?;
249 let role_entries = dict_entries(&mut blob, &roles)?;
250
251 let blob_offset = (size_of::<Header>()
252 + records.len() * size_of::<Record>()
253 + session_entries.len() * size_of::<SessionEntry>()
254 + (project_entries.len() + agent_entries.len() + role_entries.len())
255 * size_of::<DictEntry>()
256 + block_entries.len() * size_of::<BlockEntry>()) as u64;
257 let header = Header {
258 magic: MAGIC,
259 version,
260 count: records.len() as u64,
261 session_count: session_entries.len() as u64,
262 project_count: project_entries.len() as u64,
263 agent_count: agent_entries.len() as u64,
264 role_count: role_entries.len() as u64,
265 block_count: block_entries.len() as u64,
266 blob_offset,
267 };
268
269 let tmp = path.with_extension(format!(
273 "tmp-{}-{:016x}",
274 std::process::id(),
275 fastrand::u64(..)
276 ));
277 {
278 let mut file = File::create(&tmp)
279 .with_context(|| format!("create row meta map temp {}", tmp.display()))?;
280 file.write_all(bytemuck::bytes_of(&header))?;
281 file.write_all(bytemuck::cast_slice(&records))?;
282 file.write_all(bytemuck::cast_slice(&session_entries))?;
283 file.write_all(bytemuck::cast_slice(&project_entries))?;
284 file.write_all(bytemuck::cast_slice(&agent_entries))?;
285 file.write_all(bytemuck::cast_slice(&role_entries))?;
286 file.write_all(bytemuck::cast_slice(&block_entries))?;
287 file.write_all(&blob)?;
288 file.sync_all()?;
289 }
290 std::fs::rename(&tmp, path)
291 .with_context(|| format!("rename row meta map into place {}", path.display()))?;
292 Ok(())
293 }
294
295 pub fn open(path: &Path) -> Result<Self> {
296 let file =
297 File::open(path).with_context(|| format!("open row meta map {}", path.display()))?;
298 #[allow(unsafe_code)]
301 let mmap = unsafe { Mmap::map(&file)? };
302 ensure!(
303 mmap.len() >= size_of::<Header>(),
304 "row meta map {} too small for header",
305 path.display()
306 );
307 let header: Header = *bytemuck::from_bytes(&mmap[..size_of::<Header>()]);
308 ensure!(
309 header.magic == MAGIC,
310 "row meta map {} bad magic",
311 path.display()
312 );
313 let count = usize::try_from(header.count).context("count overflow")?;
314 let session_count = usize::try_from(header.session_count).context("session_count")?;
315 let project_count = usize::try_from(header.project_count).context("project_count")?;
316 let agent_count = usize::try_from(header.agent_count).context("agent_count")?;
317 let role_count = usize::try_from(header.role_count).context("role_count")?;
318 let block_count = usize::try_from(header.block_count).context("block_count")?;
319 let blob_offset = usize::try_from(header.blob_offset).context("blob_offset overflow")?;
320
321 let sessions_off = size_of::<Header>() + count * size_of::<Record>();
322 let projects_off = sessions_off + session_count * size_of::<SessionEntry>();
323 let agents_off = projects_off + project_count * size_of::<DictEntry>();
324 let roles_off = agents_off + agent_count * size_of::<DictEntry>();
325 let blocks_off = roles_off + role_count * size_of::<DictEntry>();
326 let blob_offset_expected = blocks_off + block_count * size_of::<BlockEntry>();
327 ensure!(
328 blob_offset == blob_offset_expected && mmap.len() >= blob_offset,
329 "row meta map {} layout mismatch",
330 path.display()
331 );
332 Ok(Self {
333 mmap,
334 version: header.version,
335 count,
336 session_count,
337 project_count,
338 agent_count,
339 role_count,
340 block_count,
341 sessions_off,
342 projects_off,
343 agents_off,
344 roles_off,
345 blocks_off,
346 blob_offset,
347 })
348 }
349
350 pub fn version(&self) -> u64 {
351 self.version
352 }
353
354 pub fn len(&self) -> usize {
355 self.count
356 }
357
358 pub fn is_empty(&self) -> bool {
359 self.count == 0
360 }
361
362 fn records(&self) -> &[Record] {
363 let start = size_of::<Header>();
364 let end = start + self.count * size_of::<Record>();
365 bytemuck::cast_slice(&self.mmap[start..end])
366 }
367
368 fn session_entries(&self) -> &[SessionEntry] {
369 let end = self.sessions_off + self.session_count * size_of::<SessionEntry>();
370 bytemuck::cast_slice(&self.mmap[self.sessions_off..end])
371 }
372
373 fn dict_entries(&self, start: usize, count: usize) -> &[DictEntry] {
374 let end = start + count * size_of::<DictEntry>();
375 bytemuck::cast_slice(&self.mmap[start..end])
376 }
377
378 fn block_entries(&self) -> &[BlockEntry] {
379 let end = self.blocks_off + self.block_count * size_of::<BlockEntry>();
380 bytemuck::cast_slice(&self.mmap[self.blocks_off..end])
381 }
382
383 fn blob_str(&self, off: u64, len: u32) -> &str {
385 let base = self.blob_offset.saturating_add(off as usize);
386 let end = base.saturating_add(len as usize);
387 self.mmap
388 .get(base..end)
389 .and_then(|bytes| std::str::from_utf8(bytes).ok())
390 .unwrap_or_default()
391 }
392
393 fn session_str(&self, index: usize) -> &str {
394 match self.session_entries().get(index) {
395 Some(entry) => self.blob_str(entry.sid_off, entry.sid_len),
396 None => "",
397 }
398 }
399
400 fn dict_str(&self, start: usize, count: usize, index: usize) -> &str {
401 match self.dict_entries(start, count).get(index) {
402 Some(entry) => self.blob_str(entry.off, entry.len),
403 None => "",
404 }
405 }
406
407 fn locate(&self, row_id: u64) -> Option<(usize, usize)> {
409 let records = self.records();
410 let idx = records
411 .binary_search_by_key(&row_id, |record| record.row_id)
412 .ok()?;
413 let base = self
414 .blob_offset
415 .checked_add(usize::try_from(records[idx].blob_off).ok()?)?;
416 Some((idx, base))
417 }
418
419 pub fn lookup(&self, row_id: u64) -> Option<(&str, &str)> {
423 let (_, base) = self.locate(row_id)?;
424 let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
425 let session_idx = read_u32(header, 8)?;
426 let mid_len = read_u32(header, 24)?;
427 let mut at = base + ROW_HEADER_LEN;
428 let mid = self.slice_str(&mut at, mid_len)?;
429 Some((self.session_str(session_idx), mid))
430 }
431
432 pub fn lookup_meta(&self, row_id: u64, cache: &mut BlockCache) -> Option<RowMeta<'_>> {
436 let (idx, base) = self.locate(row_id)?;
437 let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
438 let timestamp_micros = i64::from_le_bytes(header.get(0..8)?.try_into().ok()?);
439 let session_idx = read_u32(header, 8)?;
440 let project_idx = read_u32(header, 12)?;
441 let agent_idx = read_u32(header, 16)?;
442 let role_idx = read_u32(header, 20)?;
443 let mid_len = read_u32(header, 24)?;
444 let text_off = read_u32(header, 28)?;
445 let text_len = read_u32(header, 32)?;
446 let mut at = base + ROW_HEADER_LEN;
447 let message_id = self.slice_str(&mut at, mid_len)?;
448 let search_text = self.decompress_text(idx, text_off, text_len, cache)?;
449 Some(RowMeta {
450 session_id: self.session_str(session_idx),
451 message_id,
452 role: self.dict_str(self.roles_off, self.role_count, role_idx),
453 project: self.dict_str(self.projects_off, self.project_count, project_idx),
454 source_agent: self.dict_str(self.agents_off, self.agent_count, agent_idx),
455 timestamp_micros,
456 search_text,
457 })
458 }
459
460 fn decompress_block(&self, block_idx: usize) -> Option<Vec<u8>> {
461 let block = self.block_entries().get(block_idx)?;
462 if block.decomp_len == 0 {
463 return Some(Vec::new());
464 }
465 let comp_base = self.blob_offset.checked_add(block.comp_off as usize)?;
466 let comp = self
467 .mmap
468 .get(comp_base..comp_base.checked_add(block.comp_len as usize)?)?;
469 zstd::bulk::decompress(comp, block.decomp_len as usize).ok()
470 }
471
472 fn decompress_text(
473 &self,
474 idx: usize,
475 text_off: usize,
476 text_len: usize,
477 cache: &mut BlockCache,
478 ) -> Option<String> {
479 if text_len == 0 {
480 return Some(String::new());
481 }
482 let block_idx = idx / BLOCK_ROWS;
483 if cache.as_ref().map(|(block, _)| *block) != Some(block_idx) {
484 *cache = Some((block_idx, self.decompress_block(block_idx)?));
485 }
486 let plain = &cache.as_ref()?.1;
487 let value = plain.get(text_off..text_off.checked_add(text_len)?)?;
488 String::from_utf8(value.to_vec()).ok()
489 }
490
491 pub fn entries(&self) -> Vec<RowMetaEntry> {
495 let records = self.records();
496 let mut out = Vec::with_capacity(records.len());
497 let mut current_block = usize::MAX;
498 let mut plain: Vec<u8> = Vec::new();
499 for (idx, record) in records.iter().enumerate() {
500 let block_idx = idx / BLOCK_ROWS;
501 if block_idx != current_block {
502 plain = self.decompress_block(block_idx).unwrap_or_default();
503 current_block = block_idx;
504 }
505 let Some(base) = self.blob_offset.checked_add(record.blob_off as usize) else {
506 continue;
507 };
508 if let Some(entry) = self.entry_at(record.row_id, base, &plain) {
509 out.push(entry);
510 }
511 }
512 out
513 }
514
515 fn entry_at(&self, row_id: u64, base: usize, block_plain: &[u8]) -> Option<RowMetaEntry> {
516 let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
517 let timestamp_micros = i64::from_le_bytes(header.get(0..8)?.try_into().ok()?);
518 let session_idx = read_u32(header, 8)?;
519 let project_idx = read_u32(header, 12)?;
520 let agent_idx = read_u32(header, 16)?;
521 let role_idx = read_u32(header, 20)?;
522 let mid_len = read_u32(header, 24)?;
523 let text_off = read_u32(header, 28)?;
524 let text_len = read_u32(header, 32)?;
525 let mut at = base + ROW_HEADER_LEN;
526 let message_id = self.slice_str(&mut at, mid_len)?.to_owned();
527 let search_text = if text_len == 0 {
528 String::new()
529 } else {
530 let bytes = block_plain.get(text_off..text_off.checked_add(text_len)?)?;
531 String::from_utf8(bytes.to_vec()).ok()?
532 };
533 Some(RowMetaEntry {
534 row_id,
535 session_id: self.session_str(session_idx).to_owned(),
536 message_id,
537 role: self
538 .dict_str(self.roles_off, self.role_count, role_idx)
539 .to_owned(),
540 project: self
541 .dict_str(self.projects_off, self.project_count, project_idx)
542 .to_owned(),
543 source_agent: self
544 .dict_str(self.agents_off, self.agent_count, agent_idx)
545 .to_owned(),
546 timestamp_micros,
547 search_text,
548 })
549 }
550
551 pub fn lookup_count(&self, session_id: &str) -> Option<usize> {
554 let entries = self.session_entries();
555 let idx = entries
556 .binary_search_by(|entry| self.blob_str(entry.sid_off, entry.sid_len).cmp(session_id))
557 .ok()?;
558 Some(entries[idx].count as usize)
559 }
560
561 pub fn lookup_max_ts(&self, session_id: &str) -> Option<i64> {
565 let entries = self.session_entries();
566 let idx = entries
567 .binary_search_by(|entry| self.blob_str(entry.sid_off, entry.sid_len).cmp(session_id))
568 .ok()?;
569 Some(entries[idx].max_ts_micros)
570 }
571
572 fn slice_str(&self, at: &mut usize, len: usize) -> Option<&str> {
575 let end = at.checked_add(len)?;
576 let bytes = self.mmap.get(*at..end)?;
577 *at = end;
578 std::str::from_utf8(bytes).ok()
579 }
580}
581
582pub struct ChainPaths {
585 pub base: PathBuf,
586 pub base_version: u64,
587 pub deltas: Vec<(u64, PathBuf)>,
588}
589
590impl ChainPaths {
591 pub fn version(&self) -> u64 {
593 self.deltas
594 .last()
595 .map(|(version, _)| *version)
596 .unwrap_or(self.base_version)
597 }
598}
599
600pub fn discover_chain(cache_dir: &Path, store_key: &str) -> Option<ChainPaths> {
604 let prefix = format!("rowmetamap-{store_key}-");
605 let mut bases: Vec<(u64, PathBuf)> = Vec::new();
606 let mut deltas: Vec<(u64, PathBuf)> = Vec::new();
607 for entry in std::fs::read_dir(cache_dir).ok()?.flatten() {
608 let name = entry.file_name();
609 let Some(rest) = name
610 .to_str()
611 .and_then(|name| name.strip_prefix(&prefix))
612 .and_then(|rest| rest.strip_suffix(".rmm"))
613 else {
614 continue;
615 };
616 if let Some(version) = rest.strip_prefix('v').and_then(|d| d.parse::<u64>().ok()) {
617 bases.push((version, entry.path()));
618 } else if let Some(version) = rest.strip_prefix('d').and_then(|d| d.parse::<u64>().ok()) {
619 deltas.push((version, entry.path()));
620 }
621 }
622 let (base_version, base) = bases.into_iter().max_by_key(|(version, _)| *version)?;
623 let mut deltas: Vec<(u64, PathBuf)> = deltas
624 .into_iter()
625 .filter(|(version, _)| *version > base_version)
626 .collect();
627 deltas.sort_by_key(|(version, _)| *version);
628 Some(ChainPaths {
629 base,
630 base_version,
631 deltas,
632 })
633}
634
635pub struct RowMetaSet {
640 segments: Vec<RowMetaMap>,
641}
642
643impl std::fmt::Debug for RowMetaSet {
644 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
645 formatter
646 .debug_struct("RowMetaSet")
647 .field("segments", &self.segments.len())
648 .field("version", &self.version())
649 .finish()
650 }
651}
652
653impl RowMetaSet {
654 pub fn open(paths: &ChainPaths) -> Result<Self> {
656 let mut segments = Vec::with_capacity(1 + paths.deltas.len());
657 segments.push(RowMetaMap::open(&paths.base)?);
658 for (_, delta) in &paths.deltas {
659 segments.push(RowMetaMap::open(delta)?);
660 }
661 Ok(Self { segments })
662 }
663
664 pub fn version(&self) -> u64 {
665 self.segments
666 .iter()
667 .map(RowMetaMap::version)
668 .max()
669 .unwrap_or(0)
670 }
671
672 pub fn delta_count(&self) -> usize {
674 self.segments.len().saturating_sub(1)
675 }
676
677 pub fn is_empty(&self) -> bool {
680 self.segments.iter().all(RowMetaMap::is_empty)
681 }
682
683 pub fn lookup(&self, row_id: u64) -> Option<(&str, &str)> {
685 self.segments
686 .iter()
687 .rev()
688 .find_map(|seg| seg.lookup(row_id))
689 }
690
691 pub fn hydrate(&self, rowids: &[u64]) -> (Vec<RowMetaEntry>, Vec<u64>) {
697 let mut sorted = rowids.to_vec();
698 sorted.sort_unstable();
699 let mut caches: Vec<BlockCache> = vec![None; self.segments.len()];
700 let mut hits = Vec::with_capacity(sorted.len());
701 let mut misses = Vec::new();
702 for row_id in sorted {
703 let hit = self
704 .segments
705 .iter()
706 .enumerate()
707 .rev()
708 .find_map(|(segment, map)| {
709 let meta = map.lookup_meta(row_id, &mut caches[segment])?;
710 Some(RowMetaEntry {
711 row_id,
712 session_id: meta.session_id.to_owned(),
713 message_id: meta.message_id.to_owned(),
714 role: meta.role.to_owned(),
715 project: meta.project.to_owned(),
716 source_agent: meta.source_agent.to_owned(),
717 timestamp_micros: meta.timestamp_micros,
718 search_text: meta.search_text,
719 })
720 });
721 match hit {
722 Some(entry) => hits.push(entry),
723 None => misses.push(row_id),
724 }
725 }
726 (hits, misses)
727 }
728
729 pub fn lookup_count(&self, session_id: &str) -> Option<usize> {
731 let mut total = 0;
732 let mut found = false;
733 for seg in &self.segments {
734 if let Some(count) = seg.lookup_count(session_id) {
735 total += count;
736 found = true;
737 }
738 }
739 found.then_some(total)
740 }
741
742 pub fn lookup_max_ts(&self, session_id: &str) -> Option<i64> {
746 self.segments
747 .iter()
748 .filter_map(|seg| seg.lookup_max_ts(session_id))
749 .max()
750 }
751
752 pub fn merged_entries(&self) -> Vec<RowMetaEntry> {
755 let mut by_row: HashMap<u64, RowMetaEntry> = HashMap::new();
756 for seg in &self.segments {
757 for entry in seg.entries() {
758 by_row.insert(entry.row_id, entry);
759 }
760 }
761 by_row.into_values().collect()
762 }
763}
764
765fn distinct_sorted<'a>(values: impl Iterator<Item = &'a str>) -> Vec<&'a str> {
766 let mut distinct: Vec<&str> = values.collect();
767 distinct.sort_unstable();
768 distinct.dedup();
769 distinct
770}
771
772fn index_of<'a>(values: impl Iterator<Item = &'a str>) -> HashMap<&'a str, u32> {
773 values
774 .enumerate()
775 .map(|(index, value)| (value, index as u32))
776 .collect()
777}
778
779fn dict_entries(blob: &mut Vec<u8>, values: &[&str]) -> Result<Vec<DictEntry>> {
780 values
781 .iter()
782 .map(|value| {
783 let off = blob.len() as u64;
784 blob.extend_from_slice(value.as_bytes());
785 Ok(DictEntry {
786 off,
787 len: u32::try_from(value.len()).context("dictionary value too long")?,
788 _pad: 0,
789 })
790 })
791 .collect()
792}
793
794fn read_u32(bytes: &[u8], at: usize) -> Option<usize> {
795 let slice = bytes.get(at..at.checked_add(4)?)?;
796 Some(u32::from_le_bytes(slice.try_into().ok()?) as usize)
797}
798
799#[cfg(test)]
800mod tests {
801 #![allow(clippy::expect_used, clippy::unwrap_used)]
802 use super::*;
803
804 fn entry(
805 row_id: u64,
806 session_id: &str,
807 message_id: &str,
808 timestamp_micros: i64,
809 search_text: &str,
810 ) -> RowMetaEntry {
811 RowMetaEntry {
812 row_id,
813 session_id: session_id.to_owned(),
814 message_id: message_id.to_owned(),
815 role: "user".to_owned(),
816 project: "/proj".to_owned(),
817 source_agent: "claude-code".to_owned(),
818 timestamp_micros,
819 search_text: search_text.to_owned(),
820 }
821 }
822
823 #[test]
824 fn build_open_lookup_roundtrip() {
825 let dir = tempfile::tempdir().unwrap();
826 let path = RowMetaMap::path_for(dir.path(), "teststore", 7);
827 let mut three = entry(99, "sess-a", "msg-3", 3_000, "third");
828 three.role = "assistant".to_owned();
829 three.project = "/other".to_owned();
830 let entries = vec![
831 entry(10, "sess-a", "msg-1", 1_000, "first message text"),
832 entry(3, "sess-b/agent-x", "msg-2", 2_000, ""),
833 three,
834 ];
835 RowMetaMap::build(&path, 7, entries).unwrap();
836
837 let map = RowMetaMap::open(&path).unwrap();
838 assert_eq!(map.version(), 7);
839 assert_eq!(map.len(), 3);
840 assert_eq!(map.lookup(10), Some(("sess-a", "msg-1")));
841 assert_eq!(map.lookup(3), Some(("sess-b/agent-x", "msg-2")));
842 assert_eq!(map.lookup(99), Some(("sess-a", "msg-3")));
843 assert_eq!(map.lookup(42), None);
844
845 let meta = map.lookup_meta(10, &mut None).expect("row 10 present");
846 assert_eq!(meta.session_id, "sess-a");
847 assert_eq!(meta.message_id, "msg-1");
848 assert_eq!(meta.role, "user");
849 assert_eq!(meta.project, "/proj");
850 assert_eq!(meta.source_agent, "claude-code");
851 assert_eq!(meta.timestamp_micros, 1_000);
852 assert_eq!(meta.search_text, "first message text");
853
854 let assistant = map.lookup_meta(99, &mut None).expect("row 99 present");
855 assert_eq!(assistant.role, "assistant");
856 assert_eq!(assistant.project, "/other");
857 assert_eq!(assistant.search_text, "third");
858
859 let empty_text = map.lookup_meta(3, &mut None).expect("row 3 present");
860 assert_eq!(empty_text.search_text, "");
861 assert!(map.lookup_meta(42, &mut None).is_none());
862
863 assert_eq!(map.lookup_count("sess-a"), Some(2));
864 assert_eq!(map.lookup_count("sess-b/agent-x"), Some(1));
865 assert_eq!(map.lookup_count("missing"), None);
866
867 assert_eq!(map.lookup_max_ts("sess-a"), Some(3_000));
869 assert_eq!(map.lookup_max_ts("sess-b/agent-x"), Some(2_000));
870 assert_eq!(map.lookup_max_ts("missing"), None);
871 }
872
873 #[test]
874 fn max_ts_is_the_session_high_water_mark() {
875 let dir = tempfile::tempdir().unwrap();
876 let path = RowMetaMap::path_for(dir.path(), "ts", 1);
877 let entries = vec![
879 entry(1, "s", "msg-a", 5_000, "a"),
880 entry(2, "s", "msg-b", 9_000, "b"),
881 entry(3, "s", "msg-c", 7_000, "c"),
882 ];
883 RowMetaMap::build(&path, 1, entries).unwrap();
884 let map = RowMetaMap::open(&path).unwrap();
885 assert_eq!(map.lookup_max_ts("s"), Some(9_000));
886 }
887
888 #[test]
889 fn many_blocks_roundtrip() {
890 let dir = tempfile::tempdir().unwrap();
891 let path = RowMetaMap::path_for(dir.path(), "blocks", 1);
892 let entries: Vec<RowMetaEntry> = (0..(BLOCK_ROWS as u64 * 2 + 5))
893 .map(|i| {
894 entry(
895 i,
896 "sess",
897 &format!("msg-{i}"),
898 i as i64,
899 &format!("text body {i}"),
900 )
901 })
902 .collect();
903 RowMetaMap::build(&path, 1, entries).unwrap();
904
905 let map = RowMetaMap::open(&path).unwrap();
906 let mut cache = None;
910 for i in [0u64, 1, 255, 256, 257, 511, 512, 516] {
911 let meta = map.lookup_meta(i, &mut cache).expect("row present");
912 assert_eq!(meta.message_id, format!("msg-{i}"));
913 assert_eq!(meta.search_text, format!("text body {i}"));
914 }
915 }
916
917 #[test]
918 fn lsm_set_layers_delta_over_base() {
919 let dir = tempfile::tempdir().unwrap();
920 let base = vec![
921 entry(10, "sess-a", "m10", 1, "base ten"),
922 entry(11, "sess-a", "m11", 2, "base eleven"),
923 entry(12, "sess-b", "m12", 3, "base twelve"),
924 ];
925 RowMetaMap::build(&RowMetaMap::path_for(dir.path(), "k", 1), 1, base).unwrap();
926 let delta = vec![
927 entry(20, "sess-a", "m20", 4, "delta twenty"),
928 entry(21, "sess-c", "m21", 5, "delta twentyone"),
929 ];
930 RowMetaMap::build(&RowMetaMap::delta_path(dir.path(), "k", 2), 2, delta).unwrap();
931
932 let chain = discover_chain(dir.path(), "k").expect("chain present");
933 assert_eq!(chain.base_version, 1);
934 assert_eq!(chain.deltas.len(), 1);
935 assert_eq!(chain.version(), 2);
936
937 let set = RowMetaSet::open(&chain).unwrap();
938 assert_eq!(set.version(), 2);
939 assert_eq!(set.delta_count(), 1);
940
941 assert_eq!(set.lookup(10), Some(("sess-a", "m10")));
942 assert_eq!(set.lookup(20), Some(("sess-a", "m20")));
943 assert_eq!(set.lookup(99), None);
944
945 let (mut hits, misses) = set.hydrate(&[21, 10, 99]);
947 assert_eq!(misses, vec![99]);
948 hits.sort_by_key(|entry| entry.row_id);
949 assert_eq!(hits.len(), 2);
950 assert_eq!(hits[0].search_text, "base ten");
951 assert_eq!(hits[1].search_text, "delta twentyone");
952
953 assert_eq!(set.lookup_count("sess-a"), Some(3));
955 assert_eq!(set.lookup_count("sess-b"), Some(1));
956 assert_eq!(set.lookup_count("sess-c"), Some(1));
957 assert_eq!(set.lookup_count("missing"), None);
958
959 assert_eq!(set.lookup_max_ts("sess-a"), Some(4));
961 assert_eq!(set.lookup_max_ts("sess-b"), Some(3));
962 assert_eq!(set.lookup_max_ts("sess-c"), Some(5));
963 assert_eq!(set.lookup_max_ts("missing"), None);
964
965 let mut merged = set.merged_entries();
967 merged.sort_by_key(|entry| entry.row_id);
968 assert_eq!(merged.len(), 5);
969 assert_eq!(merged[0].row_id, 10);
970 assert_eq!(merged[4].row_id, 21);
971 assert_eq!(merged[4].search_text, "delta twentyone");
972 }
973
974 #[test]
975 fn empty_map_roundtrips() {
976 let dir = tempfile::tempdir().unwrap();
977 let path = RowMetaMap::path_for(dir.path(), "empty", 1);
978 RowMetaMap::build(&path, 1, Vec::new()).unwrap();
979 let map = RowMetaMap::open(&path).unwrap();
980 assert!(map.is_empty());
981 assert_eq!(map.lookup(0), None);
982 assert!(map.lookup_meta(0, &mut None).is_none());
983 assert_eq!(map.lookup_count("anything"), None);
984 assert_eq!(map.lookup_max_ts("anything"), None);
985 }
986}