1use std::collections::HashMap;
26use std::fs::File;
27use std::io::Write;
28use std::mem::size_of;
29use std::path::{Path, PathBuf};
30
31use anyhow::{Context, Result, ensure};
32use bytemuck::{Pod, Zeroable};
33use memmap2::Mmap;
34
35const MAGIC: [u8; 8] = *b"PONDRMM5";
36const BLOCK_ROWS: usize = 256;
37const ZSTD_LEVEL: i32 = 3;
38
39const ROW_HEADER_LEN: usize = 8 + 7 * 4;
44
45type BlockCache = Option<(usize, Vec<u8>)>;
49
50#[repr(C)]
51#[derive(Clone, Copy, Pod, Zeroable)]
52struct Header {
53 magic: [u8; 8],
54 version: u64,
55 count: u64,
56 session_count: u64,
57 project_count: u64,
58 agent_count: u64,
59 role_count: u64,
60 block_count: u64,
61 blob_offset: u64,
62}
63
64#[repr(C)]
65#[derive(Clone, Copy, Pod, Zeroable)]
66struct Record {
67 row_id: u64,
68 blob_off: u64,
69}
70
71#[repr(C)]
72#[derive(Clone, Copy, Pod, Zeroable)]
73struct SessionEntry {
74 sid_off: u64,
75 max_ts_micros: i64,
76 sid_len: u32,
77 count: u32,
78}
79
80#[repr(C)]
81#[derive(Clone, Copy, Pod, Zeroable)]
82struct DictEntry {
83 off: u64,
84 len: u32,
85 _pad: u32,
86}
87
88#[repr(C)]
89#[derive(Clone, Copy, Pod, Zeroable)]
90struct BlockEntry {
91 comp_off: u64,
92 comp_len: u32,
93 decomp_len: u32,
94}
95
96#[derive(Clone)]
98pub struct RowMetaEntry {
99 pub row_id: u64,
100 pub session_id: String,
101 pub message_id: String,
102 pub role: String,
103 pub project: String,
104 pub source_agent: String,
105 pub timestamp_micros: i64,
106 pub search_text: String,
107}
108
109pub struct RowMeta<'a> {
112 pub session_id: &'a str,
113 pub message_id: &'a str,
114 pub role: &'a str,
115 pub project: &'a str,
116 pub source_agent: &'a str,
117 pub timestamp_micros: i64,
118 pub search_text: String,
119}
120
121pub struct RowMetaMap {
124 mmap: Mmap,
125 version: u64,
126 count: usize,
127 session_count: usize,
128 project_count: usize,
129 agent_count: usize,
130 role_count: usize,
131 block_count: usize,
132 sessions_off: usize,
133 projects_off: usize,
134 agents_off: usize,
135 roles_off: usize,
136 blocks_off: usize,
137 blob_offset: usize,
138}
139
140impl std::fmt::Debug for RowMetaMap {
141 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 formatter
143 .debug_struct("RowMetaMap")
144 .field("version", &self.version)
145 .field("count", &self.count)
146 .field("session_count", &self.session_count)
147 .finish_non_exhaustive()
148 }
149}
150
151impl RowMetaMap {
152 pub fn path_for(cache_dir: &Path, store_key: &str, version: u64) -> PathBuf {
154 cache_dir.join(format!("rowmetamap-{store_key}-v{version}.rmm"))
155 }
156
157 pub fn delta_path(cache_dir: &Path, store_key: &str, version: u64) -> PathBuf {
160 cache_dir.join(format!("rowmetamap-{store_key}-d{version}.rmm"))
161 }
162
163 pub fn build(path: &Path, version: u64, mut entries: Vec<RowMetaEntry>) -> Result<()> {
164 entries.sort_unstable_by_key(|entry| entry.row_id);
165
166 let mut session_agg: HashMap<&str, (u32, i64)> = HashMap::new();
171 for entry in &entries {
172 let agg = session_agg
173 .entry(entry.session_id.as_str())
174 .or_insert((0, i64::MIN));
175 agg.0 += 1;
176 agg.1 = agg.1.max(entry.timestamp_micros);
177 }
178 let mut sessions: Vec<(&str, u32, i64)> = session_agg
179 .into_iter()
180 .map(|(sid, (count, max_ts))| (sid, count, max_ts))
181 .collect();
182 sessions.sort_unstable_by(|left, right| left.0.cmp(right.0));
183 let session_index = index_of(sessions.iter().map(|(value, _, _)| *value));
184
185 let projects = distinct_sorted(entries.iter().map(|entry| entry.project.as_str()));
186 let project_index = index_of(projects.iter().copied());
187 let agents = distinct_sorted(entries.iter().map(|entry| entry.source_agent.as_str()));
188 let agent_index = index_of(agents.iter().copied());
189 let roles = distinct_sorted(entries.iter().map(|entry| entry.role.as_str()));
190 let role_index = index_of(roles.iter().copied());
191
192 let mut blob: Vec<u8> = Vec::new();
193
194 let mut block_entries = Vec::with_capacity(entries.len().div_ceil(BLOCK_ROWS));
197 let mut spans: Vec<(u32, u32)> = Vec::with_capacity(entries.len());
198 for chunk in entries.chunks(BLOCK_ROWS) {
199 let mut plain = Vec::new();
200 for entry in chunk {
201 let off = u32::try_from(plain.len()).context("block too large")?;
202 let len = u32::try_from(entry.search_text.len()).context("search_text too long")?;
203 plain.extend_from_slice(entry.search_text.as_bytes());
204 spans.push((off, len));
205 }
206 let compressed = zstd::bulk::compress(&plain, ZSTD_LEVEL).context("zstd compress")?;
207 block_entries.push(BlockEntry {
208 comp_off: blob.len() as u64,
209 comp_len: u32::try_from(compressed.len()).context("compressed block too large")?,
210 decomp_len: u32::try_from(plain.len()).context("block too large")?,
211 });
212 blob.extend_from_slice(&compressed);
213 }
214
215 let mut records = Vec::with_capacity(entries.len());
216 for (entry, (text_off, text_len)) in entries.iter().zip(&spans) {
217 let blob_off = blob.len() as u64;
218 blob.extend_from_slice(&entry.timestamp_micros.to_le_bytes());
219 blob.extend_from_slice(&session_index[entry.session_id.as_str()].to_le_bytes());
220 blob.extend_from_slice(&project_index[entry.project.as_str()].to_le_bytes());
221 blob.extend_from_slice(&agent_index[entry.source_agent.as_str()].to_le_bytes());
222 blob.extend_from_slice(&role_index[entry.role.as_str()].to_le_bytes());
223 let mid_len = u32::try_from(entry.message_id.len()).context("message_id too long")?;
224 blob.extend_from_slice(&mid_len.to_le_bytes());
225 blob.extend_from_slice(&text_off.to_le_bytes());
226 blob.extend_from_slice(&text_len.to_le_bytes());
227 blob.extend_from_slice(entry.message_id.as_bytes());
228 records.push(Record {
229 row_id: entry.row_id,
230 blob_off,
231 });
232 }
233
234 let session_entries = sessions
235 .iter()
236 .map(|(sid, count, max_ts_micros)| {
237 let off = blob.len() as u64;
238 blob.extend_from_slice(sid.as_bytes());
239 Ok(SessionEntry {
240 sid_off: off,
241 max_ts_micros: *max_ts_micros,
242 sid_len: u32::try_from(sid.len()).context("session_id too long")?,
243 count: *count,
244 })
245 })
246 .collect::<Result<Vec<_>>>()?;
247 let project_entries = dict_entries(&mut blob, &projects)?;
248 let agent_entries = dict_entries(&mut blob, &agents)?;
249 let role_entries = dict_entries(&mut blob, &roles)?;
250
251 let blob_offset = (size_of::<Header>()
252 + records.len() * size_of::<Record>()
253 + session_entries.len() * size_of::<SessionEntry>()
254 + (project_entries.len() + agent_entries.len() + role_entries.len())
255 * size_of::<DictEntry>()
256 + block_entries.len() * size_of::<BlockEntry>()) as u64;
257 let header = Header {
258 magic: MAGIC,
259 version,
260 count: records.len() as u64,
261 session_count: session_entries.len() as u64,
262 project_count: project_entries.len() as u64,
263 agent_count: agent_entries.len() as u64,
264 role_count: role_entries.len() as u64,
265 block_count: block_entries.len() as u64,
266 blob_offset,
267 };
268
269 let tmp = path.with_extension(format!(
273 "tmp-{}-{:016x}",
274 std::process::id(),
275 fastrand::u64(..)
276 ));
277 {
278 let mut file = File::create(&tmp)
279 .with_context(|| format!("create row meta map temp {}", tmp.display()))?;
280 file.write_all(bytemuck::bytes_of(&header))?;
281 file.write_all(bytemuck::cast_slice(&records))?;
282 file.write_all(bytemuck::cast_slice(&session_entries))?;
283 file.write_all(bytemuck::cast_slice(&project_entries))?;
284 file.write_all(bytemuck::cast_slice(&agent_entries))?;
285 file.write_all(bytemuck::cast_slice(&role_entries))?;
286 file.write_all(bytemuck::cast_slice(&block_entries))?;
287 file.write_all(&blob)?;
288 file.sync_all()?;
289 }
290 std::fs::rename(&tmp, path)
291 .with_context(|| format!("rename row meta map into place {}", path.display()))?;
292 Ok(())
293 }
294
295 pub fn open(path: &Path) -> Result<Self> {
296 let file =
297 File::open(path).with_context(|| format!("open row meta map {}", path.display()))?;
298 #[allow(unsafe_code)]
301 let mmap = unsafe { Mmap::map(&file)? };
302 ensure!(
303 mmap.len() >= size_of::<Header>(),
304 "row meta map {} too small for header",
305 path.display()
306 );
307 let header: Header = *bytemuck::from_bytes(&mmap[..size_of::<Header>()]);
308 ensure!(
309 header.magic == MAGIC,
310 "row meta map {} bad magic",
311 path.display()
312 );
313 let count = usize::try_from(header.count).context("count overflow")?;
314 let session_count = usize::try_from(header.session_count).context("session_count")?;
315 let project_count = usize::try_from(header.project_count).context("project_count")?;
316 let agent_count = usize::try_from(header.agent_count).context("agent_count")?;
317 let role_count = usize::try_from(header.role_count).context("role_count")?;
318 let block_count = usize::try_from(header.block_count).context("block_count")?;
319 let blob_offset = usize::try_from(header.blob_offset).context("blob_offset overflow")?;
320
321 let sessions_off = size_of::<Header>() + count * size_of::<Record>();
322 let projects_off = sessions_off + session_count * size_of::<SessionEntry>();
323 let agents_off = projects_off + project_count * size_of::<DictEntry>();
324 let roles_off = agents_off + agent_count * size_of::<DictEntry>();
325 let blocks_off = roles_off + role_count * size_of::<DictEntry>();
326 let blob_offset_expected = blocks_off + block_count * size_of::<BlockEntry>();
327 ensure!(
328 blob_offset == blob_offset_expected && mmap.len() >= blob_offset,
329 "row meta map {} layout mismatch",
330 path.display()
331 );
332 Ok(Self {
333 mmap,
334 version: header.version,
335 count,
336 session_count,
337 project_count,
338 agent_count,
339 role_count,
340 block_count,
341 sessions_off,
342 projects_off,
343 agents_off,
344 roles_off,
345 blocks_off,
346 blob_offset,
347 })
348 }
349
350 pub fn version(&self) -> u64 {
351 self.version
352 }
353
354 pub fn len(&self) -> usize {
355 self.count
356 }
357
358 pub fn is_empty(&self) -> bool {
359 self.count == 0
360 }
361
362 pub fn max_row_id(&self) -> Option<u64> {
365 self.records().last().map(|record| record.row_id)
366 }
367
368 fn records(&self) -> &[Record] {
369 let start = size_of::<Header>();
370 let end = start + self.count * size_of::<Record>();
371 bytemuck::cast_slice(&self.mmap[start..end])
372 }
373
374 fn session_entries(&self) -> &[SessionEntry] {
375 let end = self.sessions_off + self.session_count * size_of::<SessionEntry>();
376 bytemuck::cast_slice(&self.mmap[self.sessions_off..end])
377 }
378
379 fn dict_entries(&self, start: usize, count: usize) -> &[DictEntry] {
380 let end = start + count * size_of::<DictEntry>();
381 bytemuck::cast_slice(&self.mmap[start..end])
382 }
383
384 fn block_entries(&self) -> &[BlockEntry] {
385 let end = self.blocks_off + self.block_count * size_of::<BlockEntry>();
386 bytemuck::cast_slice(&self.mmap[self.blocks_off..end])
387 }
388
389 fn blob_str(&self, off: u64, len: u32) -> &str {
391 let base = self.blob_offset.saturating_add(off as usize);
392 let end = base.saturating_add(len as usize);
393 self.mmap
394 .get(base..end)
395 .and_then(|bytes| std::str::from_utf8(bytes).ok())
396 .unwrap_or_default()
397 }
398
399 fn session_str(&self, index: usize) -> &str {
400 match self.session_entries().get(index) {
401 Some(entry) => self.blob_str(entry.sid_off, entry.sid_len),
402 None => "",
403 }
404 }
405
406 fn dict_str(&self, start: usize, count: usize, index: usize) -> &str {
407 match self.dict_entries(start, count).get(index) {
408 Some(entry) => self.blob_str(entry.off, entry.len),
409 None => "",
410 }
411 }
412
413 fn locate(&self, row_id: u64) -> Option<(usize, usize)> {
415 let records = self.records();
416 let idx = records
417 .binary_search_by_key(&row_id, |record| record.row_id)
418 .ok()?;
419 let base = self
420 .blob_offset
421 .checked_add(usize::try_from(records[idx].blob_off).ok()?)?;
422 Some((idx, base))
423 }
424
425 pub fn lookup(&self, row_id: u64) -> Option<(&str, &str)> {
429 let (_, base) = self.locate(row_id)?;
430 let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
431 let session_idx = read_u32(header, 8)?;
432 let mid_len = read_u32(header, 24)?;
433 let mut at = base + ROW_HEADER_LEN;
434 let mid = self.slice_str(&mut at, mid_len)?;
435 Some((self.session_str(session_idx), mid))
436 }
437
438 pub fn lookup_meta(&self, row_id: u64, cache: &mut BlockCache) -> Option<RowMeta<'_>> {
442 let (idx, base) = self.locate(row_id)?;
443 let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
444 let timestamp_micros = i64::from_le_bytes(header.get(0..8)?.try_into().ok()?);
445 let session_idx = read_u32(header, 8)?;
446 let project_idx = read_u32(header, 12)?;
447 let agent_idx = read_u32(header, 16)?;
448 let role_idx = read_u32(header, 20)?;
449 let mid_len = read_u32(header, 24)?;
450 let text_off = read_u32(header, 28)?;
451 let text_len = read_u32(header, 32)?;
452 let mut at = base + ROW_HEADER_LEN;
453 let message_id = self.slice_str(&mut at, mid_len)?;
454 let search_text = self.decompress_text(idx, text_off, text_len, cache)?;
455 Some(RowMeta {
456 session_id: self.session_str(session_idx),
457 message_id,
458 role: self.dict_str(self.roles_off, self.role_count, role_idx),
459 project: self.dict_str(self.projects_off, self.project_count, project_idx),
460 source_agent: self.dict_str(self.agents_off, self.agent_count, agent_idx),
461 timestamp_micros,
462 search_text,
463 })
464 }
465
466 fn decompress_block(&self, block_idx: usize) -> Option<Vec<u8>> {
467 let block = self.block_entries().get(block_idx)?;
468 if block.decomp_len == 0 {
469 return Some(Vec::new());
470 }
471 let comp_base = self.blob_offset.checked_add(block.comp_off as usize)?;
472 let comp = self
473 .mmap
474 .get(comp_base..comp_base.checked_add(block.comp_len as usize)?)?;
475 zstd::bulk::decompress(comp, block.decomp_len as usize).ok()
476 }
477
478 fn decompress_text(
479 &self,
480 idx: usize,
481 text_off: usize,
482 text_len: usize,
483 cache: &mut BlockCache,
484 ) -> Option<String> {
485 if text_len == 0 {
486 return Some(String::new());
487 }
488 let block_idx = idx / BLOCK_ROWS;
489 if cache.as_ref().map(|(block, _)| *block) != Some(block_idx) {
490 *cache = Some((block_idx, self.decompress_block(block_idx)?));
491 }
492 let plain = &cache.as_ref()?.1;
493 let value = plain.get(text_off..text_off.checked_add(text_len)?)?;
494 String::from_utf8(value.to_vec()).ok()
495 }
496
497 pub fn entries(&self) -> Vec<RowMetaEntry> {
501 let records = self.records();
502 let mut out = Vec::with_capacity(records.len());
503 let mut current_block = usize::MAX;
504 let mut plain: Vec<u8> = Vec::new();
505 for (idx, record) in records.iter().enumerate() {
506 let block_idx = idx / BLOCK_ROWS;
507 if block_idx != current_block {
508 plain = self.decompress_block(block_idx).unwrap_or_default();
509 current_block = block_idx;
510 }
511 let Some(base) = self.blob_offset.checked_add(record.blob_off as usize) else {
512 continue;
513 };
514 if let Some(entry) = self.entry_at(record.row_id, base, &plain) {
515 out.push(entry);
516 }
517 }
518 out
519 }
520
521 fn entry_at(&self, row_id: u64, base: usize, block_plain: &[u8]) -> Option<RowMetaEntry> {
522 let header = self.mmap.get(base..base.checked_add(ROW_HEADER_LEN)?)?;
523 let timestamp_micros = i64::from_le_bytes(header.get(0..8)?.try_into().ok()?);
524 let session_idx = read_u32(header, 8)?;
525 let project_idx = read_u32(header, 12)?;
526 let agent_idx = read_u32(header, 16)?;
527 let role_idx = read_u32(header, 20)?;
528 let mid_len = read_u32(header, 24)?;
529 let text_off = read_u32(header, 28)?;
530 let text_len = read_u32(header, 32)?;
531 let mut at = base + ROW_HEADER_LEN;
532 let message_id = self.slice_str(&mut at, mid_len)?.to_owned();
533 let search_text = if text_len == 0 {
534 String::new()
535 } else {
536 let bytes = block_plain.get(text_off..text_off.checked_add(text_len)?)?;
537 String::from_utf8(bytes.to_vec()).ok()?
538 };
539 Some(RowMetaEntry {
540 row_id,
541 session_id: self.session_str(session_idx).to_owned(),
542 message_id,
543 role: self
544 .dict_str(self.roles_off, self.role_count, role_idx)
545 .to_owned(),
546 project: self
547 .dict_str(self.projects_off, self.project_count, project_idx)
548 .to_owned(),
549 source_agent: self
550 .dict_str(self.agents_off, self.agent_count, agent_idx)
551 .to_owned(),
552 timestamp_micros,
553 search_text,
554 })
555 }
556
557 pub fn lookup_count(&self, session_id: &str) -> Option<usize> {
560 let entries = self.session_entries();
561 let idx = entries
562 .binary_search_by(|entry| self.blob_str(entry.sid_off, entry.sid_len).cmp(session_id))
563 .ok()?;
564 Some(entries[idx].count as usize)
565 }
566
567 pub fn lookup_max_ts(&self, session_id: &str) -> Option<i64> {
571 let entries = self.session_entries();
572 let idx = entries
573 .binary_search_by(|entry| self.blob_str(entry.sid_off, entry.sid_len).cmp(session_id))
574 .ok()?;
575 Some(entries[idx].max_ts_micros)
576 }
577
578 fn slice_str(&self, at: &mut usize, len: usize) -> Option<&str> {
581 let end = at.checked_add(len)?;
582 let bytes = self.mmap.get(*at..end)?;
583 *at = end;
584 std::str::from_utf8(bytes).ok()
585 }
586}
587
588pub struct ChainPaths {
591 pub base: PathBuf,
592 pub base_version: u64,
593 pub deltas: Vec<(u64, PathBuf)>,
594}
595
596impl ChainPaths {
597 pub fn version(&self) -> u64 {
599 self.deltas
600 .last()
601 .map(|(version, _)| *version)
602 .unwrap_or(self.base_version)
603 }
604}
605
606pub fn discover_chain(cache_dir: &Path, store_key: &str) -> Option<ChainPaths> {
610 let prefix = format!("rowmetamap-{store_key}-");
611 let mut bases: Vec<(u64, PathBuf)> = Vec::new();
612 let mut deltas: Vec<(u64, PathBuf)> = Vec::new();
613 for entry in std::fs::read_dir(cache_dir).ok()?.flatten() {
614 let name = entry.file_name();
615 let Some(rest) = name
616 .to_str()
617 .and_then(|name| name.strip_prefix(&prefix))
618 .and_then(|rest| rest.strip_suffix(".rmm"))
619 else {
620 continue;
621 };
622 if let Some(version) = rest.strip_prefix('v').and_then(|d| d.parse::<u64>().ok()) {
623 bases.push((version, entry.path()));
624 } else if let Some(version) = rest.strip_prefix('d').and_then(|d| d.parse::<u64>().ok()) {
625 deltas.push((version, entry.path()));
626 }
627 }
628 let (base_version, base) = bases.into_iter().max_by_key(|(version, _)| *version)?;
629 let mut deltas: Vec<(u64, PathBuf)> = deltas
630 .into_iter()
631 .filter(|(version, _)| *version > base_version)
632 .collect();
633 deltas.sort_by_key(|(version, _)| *version);
634 Some(ChainPaths {
635 base,
636 base_version,
637 deltas,
638 })
639}
640
641pub struct RowMetaSet {
646 segments: Vec<RowMetaMap>,
647}
648
649impl std::fmt::Debug for RowMetaSet {
650 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
651 formatter
652 .debug_struct("RowMetaSet")
653 .field("segments", &self.segments.len())
654 .field("version", &self.version())
655 .finish()
656 }
657}
658
659impl RowMetaSet {
660 pub fn open(paths: &ChainPaths) -> Result<Self> {
662 let mut segments = Vec::with_capacity(1 + paths.deltas.len());
663 segments.push(RowMetaMap::open(&paths.base)?);
664 for (_, delta) in &paths.deltas {
665 segments.push(RowMetaMap::open(delta)?);
666 }
667 Ok(Self { segments })
668 }
669
670 pub fn version(&self) -> u64 {
671 self.segments
672 .iter()
673 .map(RowMetaMap::version)
674 .max()
675 .unwrap_or(0)
676 }
677
678 pub fn delta_count(&self) -> usize {
680 self.segments.len().saturating_sub(1)
681 }
682
683 pub fn is_empty(&self) -> bool {
686 self.segments.iter().all(RowMetaMap::is_empty)
687 }
688
689 pub fn len(&self) -> usize {
692 self.segments.iter().map(RowMetaMap::len).sum()
693 }
694
695 pub fn max_row_id(&self) -> Option<u64> {
698 self.segments
699 .iter()
700 .filter_map(RowMetaMap::max_row_id)
701 .max()
702 }
703
704 pub fn lookup(&self, row_id: u64) -> Option<(&str, &str)> {
706 self.segments
707 .iter()
708 .rev()
709 .find_map(|seg| seg.lookup(row_id))
710 }
711
712 pub fn hydrate(&self, rowids: &[u64]) -> (Vec<RowMetaEntry>, Vec<u64>) {
718 let mut sorted = rowids.to_vec();
719 sorted.sort_unstable();
720 let mut caches: Vec<BlockCache> = vec![None; self.segments.len()];
721 let mut hits = Vec::with_capacity(sorted.len());
722 let mut misses = Vec::new();
723 for row_id in sorted {
724 let hit = self
725 .segments
726 .iter()
727 .enumerate()
728 .rev()
729 .find_map(|(segment, map)| {
730 let meta = map.lookup_meta(row_id, &mut caches[segment])?;
731 Some(RowMetaEntry {
732 row_id,
733 session_id: meta.session_id.to_owned(),
734 message_id: meta.message_id.to_owned(),
735 role: meta.role.to_owned(),
736 project: meta.project.to_owned(),
737 source_agent: meta.source_agent.to_owned(),
738 timestamp_micros: meta.timestamp_micros,
739 search_text: meta.search_text,
740 })
741 });
742 match hit {
743 Some(entry) => hits.push(entry),
744 None => misses.push(row_id),
745 }
746 }
747 (hits, misses)
748 }
749
750 pub fn lookup_count(&self, session_id: &str) -> Option<usize> {
752 let mut total = 0;
753 let mut found = false;
754 for seg in &self.segments {
755 if let Some(count) = seg.lookup_count(session_id) {
756 total += count;
757 found = true;
758 }
759 }
760 found.then_some(total)
761 }
762
763 pub fn lookup_max_ts(&self, session_id: &str) -> Option<i64> {
767 self.segments
768 .iter()
769 .filter_map(|seg| seg.lookup_max_ts(session_id))
770 .max()
771 }
772
773 pub fn merged_entries(&self) -> Vec<RowMetaEntry> {
776 let mut by_row: HashMap<u64, RowMetaEntry> = HashMap::new();
777 for seg in &self.segments {
778 for entry in seg.entries() {
779 by_row.insert(entry.row_id, entry);
780 }
781 }
782 by_row.into_values().collect()
783 }
784}
785
786fn distinct_sorted<'a>(values: impl Iterator<Item = &'a str>) -> Vec<&'a str> {
787 let mut distinct: Vec<&str> = values.collect();
788 distinct.sort_unstable();
789 distinct.dedup();
790 distinct
791}
792
793fn index_of<'a>(values: impl Iterator<Item = &'a str>) -> HashMap<&'a str, u32> {
794 values
795 .enumerate()
796 .map(|(index, value)| (value, index as u32))
797 .collect()
798}
799
800fn dict_entries(blob: &mut Vec<u8>, values: &[&str]) -> Result<Vec<DictEntry>> {
801 values
802 .iter()
803 .map(|value| {
804 let off = blob.len() as u64;
805 blob.extend_from_slice(value.as_bytes());
806 Ok(DictEntry {
807 off,
808 len: u32::try_from(value.len()).context("dictionary value too long")?,
809 _pad: 0,
810 })
811 })
812 .collect()
813}
814
815fn read_u32(bytes: &[u8], at: usize) -> Option<usize> {
816 let slice = bytes.get(at..at.checked_add(4)?)?;
817 Some(u32::from_le_bytes(slice.try_into().ok()?) as usize)
818}
819
820#[cfg(test)]
821mod tests {
822 #![allow(clippy::expect_used, clippy::unwrap_used)]
823 use super::*;
824
825 fn entry(
826 row_id: u64,
827 session_id: &str,
828 message_id: &str,
829 timestamp_micros: i64,
830 search_text: &str,
831 ) -> RowMetaEntry {
832 RowMetaEntry {
833 row_id,
834 session_id: session_id.to_owned(),
835 message_id: message_id.to_owned(),
836 role: "user".to_owned(),
837 project: "/proj".to_owned(),
838 source_agent: "claude-code".to_owned(),
839 timestamp_micros,
840 search_text: search_text.to_owned(),
841 }
842 }
843
844 #[test]
845 fn build_open_lookup_roundtrip() {
846 let dir = tempfile::tempdir().unwrap();
847 let path = RowMetaMap::path_for(dir.path(), "teststore", 7);
848 let mut three = entry(99, "sess-a", "msg-3", 3_000, "third");
849 three.role = "assistant".to_owned();
850 three.project = "/other".to_owned();
851 let entries = vec![
852 entry(10, "sess-a", "msg-1", 1_000, "first message text"),
853 entry(3, "sess-b/agent-x", "msg-2", 2_000, ""),
854 three,
855 ];
856 RowMetaMap::build(&path, 7, entries).unwrap();
857
858 let map = RowMetaMap::open(&path).unwrap();
859 assert_eq!(map.version(), 7);
860 assert_eq!(map.len(), 3);
861 assert_eq!(map.lookup(10), Some(("sess-a", "msg-1")));
862 assert_eq!(map.lookup(3), Some(("sess-b/agent-x", "msg-2")));
863 assert_eq!(map.lookup(99), Some(("sess-a", "msg-3")));
864 assert_eq!(map.lookup(42), None);
865
866 let meta = map.lookup_meta(10, &mut None).expect("row 10 present");
867 assert_eq!(meta.session_id, "sess-a");
868 assert_eq!(meta.message_id, "msg-1");
869 assert_eq!(meta.role, "user");
870 assert_eq!(meta.project, "/proj");
871 assert_eq!(meta.source_agent, "claude-code");
872 assert_eq!(meta.timestamp_micros, 1_000);
873 assert_eq!(meta.search_text, "first message text");
874
875 let assistant = map.lookup_meta(99, &mut None).expect("row 99 present");
876 assert_eq!(assistant.role, "assistant");
877 assert_eq!(assistant.project, "/other");
878 assert_eq!(assistant.search_text, "third");
879
880 let empty_text = map.lookup_meta(3, &mut None).expect("row 3 present");
881 assert_eq!(empty_text.search_text, "");
882 assert!(map.lookup_meta(42, &mut None).is_none());
883
884 assert_eq!(map.lookup_count("sess-a"), Some(2));
885 assert_eq!(map.lookup_count("sess-b/agent-x"), Some(1));
886 assert_eq!(map.lookup_count("missing"), None);
887
888 assert_eq!(map.lookup_max_ts("sess-a"), Some(3_000));
890 assert_eq!(map.lookup_max_ts("sess-b/agent-x"), Some(2_000));
891 assert_eq!(map.lookup_max_ts("missing"), None);
892 }
893
894 #[test]
895 fn max_ts_is_the_session_high_water_mark() {
896 let dir = tempfile::tempdir().unwrap();
897 let path = RowMetaMap::path_for(dir.path(), "ts", 1);
898 let entries = vec![
900 entry(1, "s", "msg-a", 5_000, "a"),
901 entry(2, "s", "msg-b", 9_000, "b"),
902 entry(3, "s", "msg-c", 7_000, "c"),
903 ];
904 RowMetaMap::build(&path, 1, entries).unwrap();
905 let map = RowMetaMap::open(&path).unwrap();
906 assert_eq!(map.lookup_max_ts("s"), Some(9_000));
907 }
908
909 #[test]
910 fn many_blocks_roundtrip() {
911 let dir = tempfile::tempdir().unwrap();
912 let path = RowMetaMap::path_for(dir.path(), "blocks", 1);
913 let entries: Vec<RowMetaEntry> = (0..(BLOCK_ROWS as u64 * 2 + 5))
914 .map(|i| {
915 entry(
916 i,
917 "sess",
918 &format!("msg-{i}"),
919 i as i64,
920 &format!("text body {i}"),
921 )
922 })
923 .collect();
924 RowMetaMap::build(&path, 1, entries).unwrap();
925
926 let map = RowMetaMap::open(&path).unwrap();
927 let mut cache = None;
931 for i in [0u64, 1, 255, 256, 257, 511, 512, 516] {
932 let meta = map.lookup_meta(i, &mut cache).expect("row present");
933 assert_eq!(meta.message_id, format!("msg-{i}"));
934 assert_eq!(meta.search_text, format!("text body {i}"));
935 }
936 }
937
938 #[test]
939 fn lsm_set_layers_delta_over_base() {
940 let dir = tempfile::tempdir().unwrap();
941 let base = vec![
942 entry(10, "sess-a", "m10", 1, "base ten"),
943 entry(11, "sess-a", "m11", 2, "base eleven"),
944 entry(12, "sess-b", "m12", 3, "base twelve"),
945 ];
946 RowMetaMap::build(&RowMetaMap::path_for(dir.path(), "k", 1), 1, base).unwrap();
947 let delta = vec![
948 entry(20, "sess-a", "m20", 4, "delta twenty"),
949 entry(21, "sess-c", "m21", 5, "delta twentyone"),
950 ];
951 RowMetaMap::build(&RowMetaMap::delta_path(dir.path(), "k", 2), 2, delta).unwrap();
952
953 let chain = discover_chain(dir.path(), "k").expect("chain present");
954 assert_eq!(chain.base_version, 1);
955 assert_eq!(chain.deltas.len(), 1);
956 assert_eq!(chain.version(), 2);
957
958 let set = RowMetaSet::open(&chain).unwrap();
959 assert_eq!(set.version(), 2);
960 assert_eq!(set.delta_count(), 1);
961
962 assert_eq!(set.lookup(10), Some(("sess-a", "m10")));
963 assert_eq!(set.lookup(20), Some(("sess-a", "m20")));
964 assert_eq!(set.lookup(99), None);
965
966 let (mut hits, misses) = set.hydrate(&[21, 10, 99]);
968 assert_eq!(misses, vec![99]);
969 hits.sort_by_key(|entry| entry.row_id);
970 assert_eq!(hits.len(), 2);
971 assert_eq!(hits[0].search_text, "base ten");
972 assert_eq!(hits[1].search_text, "delta twentyone");
973
974 assert_eq!(set.lookup_count("sess-a"), Some(3));
976 assert_eq!(set.lookup_count("sess-b"), Some(1));
977 assert_eq!(set.lookup_count("sess-c"), Some(1));
978 assert_eq!(set.lookup_count("missing"), None);
979
980 assert_eq!(set.lookup_max_ts("sess-a"), Some(4));
982 assert_eq!(set.lookup_max_ts("sess-b"), Some(3));
983 assert_eq!(set.lookup_max_ts("sess-c"), Some(5));
984 assert_eq!(set.lookup_max_ts("missing"), None);
985
986 let mut merged = set.merged_entries();
988 merged.sort_by_key(|entry| entry.row_id);
989 assert_eq!(merged.len(), 5);
990 assert_eq!(merged[0].row_id, 10);
991 assert_eq!(merged[4].row_id, 21);
992 assert_eq!(merged[4].search_text, "delta twentyone");
993 }
994
995 #[test]
996 fn empty_map_roundtrips() {
997 let dir = tempfile::tempdir().unwrap();
998 let path = RowMetaMap::path_for(dir.path(), "empty", 1);
999 RowMetaMap::build(&path, 1, Vec::new()).unwrap();
1000 let map = RowMetaMap::open(&path).unwrap();
1001 assert!(map.is_empty());
1002 assert_eq!(map.lookup(0), None);
1003 assert!(map.lookup_meta(0, &mut None).is_none());
1004 assert_eq!(map.lookup_count("anything"), None);
1005 assert_eq!(map.lookup_max_ts("anything"), None);
1006 }
1007}