1use crate::config::Config;
4use crate::error::{Error, Result};
5use crate::session::{Session, SessionEntry, SessionHeader};
6use fs4::fs_std::FileExt;
7use serde::Deserialize;
8use sqlmodel_core::Value;
9use sqlmodel_sqlite::{OpenFlags, SqliteConfig, SqliteConnection};
10use std::borrow::Borrow;
11use std::fs::{self, File};
12use std::io::{BufRead, BufReader, Read};
13use std::path::{Path, PathBuf};
14use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
15
16const MAX_JSONL_LINE_BYTES: usize = 100 * 1024 * 1024;
17
18#[derive(Debug, Clone)]
19pub struct SessionMeta {
20 pub path: String,
21 pub id: String,
22 pub cwd: String,
23 pub timestamp: String,
24 pub message_count: u64,
25 pub last_modified_ms: i64,
26 pub size_bytes: u64,
27 pub name: Option<String>,
28}
29
30#[derive(Debug, Clone)]
31pub struct SessionIndex {
32 db_path: PathBuf,
33 lock_path: PathBuf,
34}
35
36impl SessionIndex {
37 pub fn new() -> Self {
38 let root = Config::sessions_dir();
39 Self::for_sessions_root(&root)
40 }
41
42 pub fn for_sessions_root(root: &Path) -> Self {
43 Self {
44 db_path: root.join("session-index.sqlite"),
45 lock_path: root.join("session-index.lock"),
46 }
47 }
48
49 pub fn index_session(&self, session: &Session) -> Result<()> {
50 let Some(path) = session.path.as_ref() else {
51 return Ok(());
52 };
53
54 let meta = build_meta(path, &session.header, &session.entries)?;
55 self.upsert_meta(meta)
56 }
57
58 pub fn index_session_snapshot(
63 &self,
64 path: &Path,
65 header: &SessionHeader,
66 message_count: u64,
67 name: Option<String>,
68 ) -> Result<()> {
69 let (last_modified_ms, size_bytes) = session_file_stats(path)?;
70 let meta = SessionMeta {
71 path: path.display().to_string(),
72 id: header.id.clone(),
73 cwd: header.cwd.clone(),
74 timestamp: header.timestamp.clone(),
75 message_count,
76 last_modified_ms,
77 size_bytes,
78 name,
79 };
80 self.upsert_meta(meta)
81 }
82
83 pub(crate) fn upsert_session_meta(&self, meta: SessionMeta) -> Result<()> {
84 self.upsert_meta(meta)
85 }
86
87 fn upsert_meta(&self, meta: SessionMeta) -> Result<()> {
88 self.with_lock(|conn| {
89 init_schema(conn)?;
90
91 conn.execute_raw("BEGIN IMMEDIATE")
92 .map_err(|e| Error::session(format!("BEGIN failed: {e}")))?;
93
94 let result = (|| -> Result<()> {
95 let message_count = sqlite_i64_from_u64("message_count", meta.message_count)?;
96 let size_bytes = sqlite_i64_from_u64("size_bytes", meta.size_bytes)?;
97 conn.execute_sync(
98 "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
99 VALUES (?1,?2,?3,?4,?5,?6,?7,?8)
100 ON CONFLICT(path) DO UPDATE SET
101 id=excluded.id,
102 cwd=excluded.cwd,
103 timestamp=excluded.timestamp,
104 message_count=excluded.message_count,
105 last_modified_ms=excluded.last_modified_ms,
106 size_bytes=excluded.size_bytes,
107 name=excluded.name",
108 &[
109 Value::Text(meta.path),
110 Value::Text(meta.id),
111 Value::Text(meta.cwd),
112 Value::Text(meta.timestamp),
113 Value::BigInt(message_count),
114 Value::BigInt(meta.last_modified_ms),
115 Value::BigInt(size_bytes),
116 meta.name.map_or(Value::Null, Value::Text),
117 ],
118 ).map_err(|e| Error::session(format!("Insert failed: {e}")))?;
119
120 conn.execute_sync(
121 "INSERT INTO meta (key,value) VALUES ('last_sync_epoch_ms', ?1)
122 ON CONFLICT(key) DO UPDATE SET value=excluded.value",
123 &[Value::Text(current_epoch_ms())],
124 ).map_err(|e| Error::session(format!("Meta update failed: {e}")))?;
125 Ok(())
126 })();
127
128 match result {
129 Ok(()) => {
130 conn.execute_raw("COMMIT")
131 .map_err(|e| Error::session(format!("COMMIT failed: {e}")))?;
132 Ok(())
133 }
134 Err(e) => {
135 let _ = conn.execute_raw("ROLLBACK");
136 Err(e)
137 }
138 }
139 })
140 }
141
142 pub fn list_sessions(&self, cwd: Option<&str>) -> Result<Vec<SessionMeta>> {
143 self.with_lock(|conn| {
144 init_schema(conn)?;
145
146 let (sql, params): (&str, Vec<Value>) = cwd.map_or_else(
147 || {
148 (
149 "SELECT path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name
150 FROM sessions ORDER BY last_modified_ms DESC",
151 vec![],
152 )
153 },
154 |cwd| {
155 (
156 "SELECT path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name
157 FROM sessions WHERE cwd=?1 ORDER BY last_modified_ms DESC",
158 vec![Value::Text(cwd.to_string())],
159 )
160 },
161 );
162
163 let rows = conn
164 .query_sync(sql, ¶ms)
165 .map_err(|e| Error::session(format!("Query failed: {e}")))?;
166
167 let mut result = Vec::new();
168 for row in rows {
169 result.push(row_to_meta(&row)?);
170 }
171 Ok(result)
172 })
173 }
174
175 pub fn delete_session_path(&self, path: &Path) -> Result<()> {
176 let path = path.to_string_lossy().to_string();
177 self.with_lock(|conn| {
178 init_schema(conn)?;
179
180 conn.execute_raw("BEGIN IMMEDIATE")
181 .map_err(|e| Error::session(format!("BEGIN failed: {e}")))?;
182
183 let result = (|| -> Result<()> {
184 conn.execute_sync("DELETE FROM sessions WHERE path=?1", &[Value::Text(path)])
185 .map_err(|e| Error::session(format!("Delete failed: {e}")))?;
186
187 conn.execute_sync(
188 "INSERT INTO meta (key,value) VALUES ('last_sync_epoch_ms', ?1)
189 ON CONFLICT(key) DO UPDATE SET value=excluded.value",
190 &[Value::Text(current_epoch_ms())],
191 )
192 .map_err(|e| Error::session(format!("Meta update failed: {e}")))?;
193 Ok(())
194 })();
195
196 match result {
197 Ok(()) => {
198 conn.execute_raw("COMMIT")
199 .map_err(|e| Error::session(format!("COMMIT failed: {e}")))?;
200 Ok(())
201 }
202 Err(e) => {
203 let _ = conn.execute_raw("ROLLBACK");
204 Err(e)
205 }
206 }
207 })
208 }
209
210 pub fn reindex_all(&self) -> Result<()> {
211 let sessions_root = self.sessions_root();
212 if !sessions_root.exists() {
213 return Ok(());
214 }
215
216 let mut metas = Vec::new();
217 for entry in walk_sessions(sessions_root) {
218 let Ok(path) = entry else { continue };
219 if let Ok(meta) = build_meta_from_file(&path) {
220 metas.push(meta);
221 }
222 }
223
224 self.with_lock(|conn| {
225 init_schema(conn)?;
226
227 conn.execute_raw("BEGIN IMMEDIATE")
228 .map_err(|e| Error::session(format!("BEGIN failed: {e}")))?;
229
230 let result = (|| -> Result<()> {
231 conn.execute_sync("DELETE FROM sessions", &[])
232 .map_err(|e| Error::session(format!("Delete failed: {e}")))?;
233
234 for meta in metas {
235 let message_count = sqlite_i64_from_u64("message_count", meta.message_count)?;
236 let size_bytes = sqlite_i64_from_u64("size_bytes", meta.size_bytes)?;
237 conn.execute_sync(
238 "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
239 VALUES (?1,?2,?3,?4,?5,?6,?7,?8)",
240 &[
241 Value::Text(meta.path),
242 Value::Text(meta.id),
243 Value::Text(meta.cwd),
244 Value::Text(meta.timestamp),
245 Value::BigInt(message_count),
246 Value::BigInt(meta.last_modified_ms),
247 Value::BigInt(size_bytes),
248 meta.name.map_or(Value::Null, Value::Text),
249 ],
250 ).map_err(|e| Error::session(format!("Insert failed: {e}")))?;
251 }
252
253 conn.execute_sync(
254 "INSERT INTO meta (key,value) VALUES ('last_sync_epoch_ms', ?1)
255 ON CONFLICT(key) DO UPDATE SET value=excluded.value",
256 &[Value::Text(current_epoch_ms())],
257 ).map_err(|e| Error::session(format!("Meta update failed: {e}")))?;
258
259 Ok(())
260 })();
261
262 match result {
263 Ok(()) => {
264 conn.execute_raw("COMMIT")
265 .map_err(|e| Error::session(format!("COMMIT failed: {e}")))?;
266 Ok(())
267 }
268 Err(e) => {
269 let _ = conn.execute_raw("ROLLBACK");
270 Err(e)
271 }
272 }
273 })
274 }
275
276 pub fn should_reindex(&self, max_age: Duration) -> bool {
278 if !self.db_path.exists() {
279 return true;
280 }
281 if let Ok(Some(last_sync_epoch_ms)) = self.with_lock(load_last_sync_epoch_ms) {
285 return epoch_ms_is_stale(last_sync_epoch_ms, max_age);
286 }
287 let Ok(meta) = fs::metadata(&self.db_path) else {
288 return true;
289 };
290 let Ok(modified) = meta.modified() else {
291 return true;
292 };
293 let age = SystemTime::now()
294 .duration_since(modified)
295 .unwrap_or_default();
296 age > max_age
297 }
298
299 pub fn reindex_if_stale(&self, max_age: Duration) -> Result<bool> {
301 if !self.should_reindex(max_age) {
302 return Ok(false);
303 }
304 self.reindex_all()?;
305 Ok(true)
306 }
307
308 fn with_lock<T>(&self, f: impl FnOnce(&SqliteConnection) -> Result<T>) -> Result<T> {
309 if let Some(parent) = self.db_path.parent() {
310 fs::create_dir_all(parent)?;
311 }
312 let lock_file = File::options()
313 .read(true)
314 .write(true)
315 .create(true)
316 .truncate(false)
317 .open(&self.lock_path)?;
318 let _lock = lock_file_guard(&lock_file, Duration::from_secs(5))?;
319
320 let config = SqliteConfig::file(self.db_path.to_string_lossy())
321 .flags(OpenFlags::create_read_write())
322 .busy_timeout(5000);
323
324 let conn = SqliteConnection::open(&config)
325 .map_err(|e| Error::session(format!("SQLite open: {e}")))?;
326
327 conn.execute_raw("PRAGMA journal_mode = WAL")
329 .map_err(|e| Error::session(format!("PRAGMA journal_mode: {e}")))?;
330 conn.execute_raw("PRAGMA synchronous = NORMAL")
331 .map_err(|e| Error::session(format!("PRAGMA synchronous: {e}")))?;
332 conn.execute_raw("PRAGMA wal_autocheckpoint = 1000")
333 .map_err(|e| Error::session(format!("PRAGMA wal_autocheckpoint: {e}")))?;
334 conn.execute_raw("PRAGMA foreign_keys = ON")
335 .map_err(|e| Error::session(format!("PRAGMA foreign_keys: {e}")))?;
336
337 f(&conn)
338 }
339
340 fn sessions_root(&self) -> &Path {
341 self.db_path.parent().unwrap_or_else(|| Path::new("."))
342 }
343}
344
345impl Default for SessionIndex {
346 fn default() -> Self {
347 Self::new()
348 }
349}
350
351pub(crate) fn enqueue_session_index_snapshot_update(
356 sessions_root: &Path,
357 path: &Path,
358 header: &SessionHeader,
359 message_count: u64,
360 name: Option<String>,
361) {
362 let sessions_root = sessions_root.to_path_buf();
363 let path = path.to_path_buf();
364 let header = header.clone();
365
366 if let Err(err) = SessionIndex::for_sessions_root(&sessions_root).index_session_snapshot(
367 &path,
368 &header,
369 message_count,
370 name,
371 ) {
372 tracing::warn!(
373 sessions_root = %sessions_root.display(),
374 path = %path.display(),
375 error = %err,
376 "Failed to update session index snapshot"
377 );
378 }
379}
380
381fn init_schema(conn: &SqliteConnection) -> Result<()> {
382 conn.execute_raw(
383 "CREATE TABLE IF NOT EXISTS sessions (
384 path TEXT PRIMARY KEY,
385 id TEXT NOT NULL,
386 cwd TEXT NOT NULL,
387 timestamp TEXT NOT NULL,
388 message_count INTEGER NOT NULL,
389 last_modified_ms INTEGER NOT NULL,
390 size_bytes INTEGER NOT NULL,
391 name TEXT
392 )",
393 )
394 .map_err(|e| Error::session(format!("Create sessions table: {e}")))?;
395
396 conn.execute_raw(
397 "CREATE TABLE IF NOT EXISTS meta (
398 key TEXT PRIMARY KEY,
399 value TEXT NOT NULL
400 )",
401 )
402 .map_err(|e| Error::session(format!("Create meta table: {e}")))?;
403
404 Ok(())
405}
406
407fn sqlite_i64_from_u64(field: &str, value: u64) -> Result<i64> {
408 i64::try_from(value)
409 .map_err(|_| Error::session(format!("{field} exceeds SQLite INTEGER range: {value}")))
410}
411
412fn sqlite_u64_from_i64(field: &str, value: i64) -> Result<u64> {
413 u64::try_from(value).map_err(|_| {
414 Error::session(format!(
415 "{field} must be non-negative in session index: {value}"
416 ))
417 })
418}
419
420fn row_to_meta(row: &sqlmodel_core::Row) -> Result<SessionMeta> {
421 let message_count = row
422 .get_named::<i64>("message_count")
423 .map_err(|e| Error::session(format!("get message_count: {e}")))?;
424 let size_bytes = row
425 .get_named::<i64>("size_bytes")
426 .map_err(|e| Error::session(format!("get size_bytes: {e}")))?;
427
428 Ok(SessionMeta {
429 path: row
430 .get_named("path")
431 .map_err(|e| Error::session(format!("get path: {e}")))?,
432 id: row
433 .get_named("id")
434 .map_err(|e| Error::session(format!("get id: {e}")))?,
435 cwd: row
436 .get_named("cwd")
437 .map_err(|e| Error::session(format!("get cwd: {e}")))?,
438 timestamp: row
439 .get_named("timestamp")
440 .map_err(|e| Error::session(format!("get timestamp: {e}")))?,
441 message_count: sqlite_u64_from_i64("message_count", message_count)?,
442 last_modified_ms: row
443 .get_named("last_modified_ms")
444 .map_err(|e| Error::session(format!("get last_modified_ms: {e}")))?,
445 size_bytes: sqlite_u64_from_i64("size_bytes", size_bytes)?,
446 name: row
447 .get_named::<Option<String>>("name")
448 .map_err(|e| Error::session(format!("get name: {e}")))?,
449 })
450}
451
452fn build_meta(
453 path: &Path,
454 header: &SessionHeader,
455 entries: &[SessionEntry],
456) -> Result<SessionMeta> {
457 header
458 .validate()
459 .map_err(|reason| Error::session(format!("Invalid session header: {reason}")))?;
460 let (message_count, name) = session_stats(entries);
461 let (last_modified_ms, size_bytes) = session_file_stats(path)?;
462 Ok(SessionMeta {
463 path: path.display().to_string(),
464 id: header.id.clone(),
465 cwd: header.cwd.clone(),
466 timestamp: header.timestamp.clone(),
467 message_count,
468 last_modified_ms,
469 size_bytes,
470 name,
471 })
472}
473
474fn read_capped_utf8_line_with_limit<R: BufRead>(
475 reader: &mut R,
476 max_bytes: usize,
477) -> std::io::Result<Option<String>> {
478 let limit = u64::try_from(max_bytes)
479 .unwrap_or(u64::MAX.saturating_sub(2))
480 .saturating_add(2);
481 let mut bytes = Vec::new();
482 let bytes_read = reader.take(limit).read_until(b'\n', &mut bytes)?;
483 if bytes_read == 0 {
484 return Ok(None);
485 }
486
487 let content_len = bytes.strip_suffix(b"\n").map_or(bytes.len(), <[u8]>::len);
488 if content_len > max_bytes {
489 if !bytes.ends_with(b"\n") {
490 let mut discard = Vec::new();
491 loop {
492 discard.clear();
493 let discarded = reader.read_until(b'\n', &mut discard)?;
494 if discarded == 0 || discard.ends_with(b"\n") {
495 break;
496 }
497 }
498 }
499 return Err(std::io::Error::new(
500 std::io::ErrorKind::InvalidData,
501 format!("JSONL line exceeds {max_bytes} bytes"),
502 ));
503 }
504
505 String::from_utf8(bytes)
506 .map(Some)
507 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))
508}
509
510fn read_capped_utf8_line<R: BufRead>(reader: &mut R) -> std::io::Result<Option<String>> {
511 read_capped_utf8_line_with_limit(reader, MAX_JSONL_LINE_BYTES)
512}
513
514pub(crate) fn build_meta_from_file(path: &Path) -> Result<SessionMeta> {
515 match path.extension().and_then(|ext| ext.to_str()) {
516 Some("jsonl") => build_meta_from_jsonl(path),
517 #[cfg(feature = "sqlite-sessions")]
518 Some("sqlite") => build_meta_from_sqlite(path),
519 _ => build_meta_from_jsonl(path),
520 }
521}
522
523#[derive(Deserialize)]
524struct PartialEntry {
525 #[serde(default)]
526 r#type: String,
527 #[serde(default)]
528 name: Option<String>,
529}
530
531fn build_meta_from_jsonl(path: &Path) -> Result<SessionMeta> {
532 let file = File::open(path)
533 .map_err(|err| Error::session(format!("Read session file {}: {err}", path.display())))?;
534 let mut reader = BufReader::new(file);
535 let Some(header_line) = read_capped_utf8_line(&mut reader)
536 .map_err(|err| Error::session(format!("Read session header {}: {err}", path.display())))?
537 else {
538 return Err(Error::session(format!(
539 "Empty session file {}",
540 path.display()
541 )));
542 };
543
544 let header: SessionHeader = serde_json::from_str(&header_line)
545 .map_err(|err| Error::session(format!("Parse session header {}: {err}", path.display())))?;
546 header.validate().map_err(|reason| {
547 Error::session(format!(
548 "Invalid session header {}: {reason}",
549 path.display()
550 ))
551 })?;
552
553 let mut message_count = 0u64;
554 let mut name = None;
555 loop {
556 let Some(line_buf) = read_capped_utf8_line(&mut reader).map_err(|err| {
557 Error::session(format!("Read session entry line {}: {err}", path.display()))
558 })?
559 else {
560 break;
561 };
562
563 if let Ok(entry) = serde_json::from_str::<PartialEntry>(&line_buf) {
564 match entry.r#type.as_str() {
565 "message" => message_count += 1,
566 "session_info" if entry.name.is_some() => {
567 name = entry.name;
568 }
569 _ => {}
570 }
571 }
572 }
573
574 let meta = fs::metadata(path)?;
575 let size_bytes = meta.len();
576 let modified = meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
577 let millis = modified
578 .duration_since(UNIX_EPOCH)
579 .unwrap_or_default()
580 .as_millis();
581 let last_modified_ms = i64::try_from(millis).unwrap_or(i64::MAX);
582
583 Ok(SessionMeta {
584 path: path.display().to_string(),
585 id: header.id,
586 cwd: header.cwd,
587 timestamp: header.timestamp,
588 message_count,
589 last_modified_ms,
590 size_bytes,
591 name,
592 })
593}
594
595#[cfg(feature = "sqlite-sessions")]
596fn build_meta_from_sqlite(path: &Path) -> Result<SessionMeta> {
597 let meta = futures::executor::block_on(crate::session_sqlite::load_session_meta(path))?;
598 let header = meta.header;
599 header.validate().map_err(|reason| {
600 Error::session(format!(
601 "Invalid session header {}: {reason}",
602 path.display()
603 ))
604 })?;
605 let (last_modified_ms, size_bytes) = session_file_stats(path)?;
606
607 Ok(SessionMeta {
608 path: path.display().to_string(),
609 id: header.id,
610 cwd: header.cwd,
611 timestamp: header.timestamp,
612 message_count: meta.message_count,
613 last_modified_ms,
614 size_bytes,
615 name: meta.name,
616 })
617}
618
619fn session_stats<T>(entries: &[T]) -> (u64, Option<String>)
620where
621 T: Borrow<SessionEntry>,
622{
623 let mut message_count = 0u64;
624 let mut name = None;
625 for entry in entries {
626 match entry.borrow() {
627 SessionEntry::Message(_) => message_count += 1,
628 SessionEntry::SessionInfo(info) if info.name.is_some() => {
629 name.clone_from(&info.name);
630 }
631 _ => {}
632 }
633 }
634 (message_count, name)
635}
636
637#[cfg(feature = "sqlite-sessions")]
638fn sqlite_auxiliary_paths(path: &Path) -> [PathBuf; 2] {
639 ["-wal", "-shm"].map(|suffix| {
640 let mut candidate = path.as_os_str().to_os_string();
641 candidate.push(suffix);
642 PathBuf::from(candidate)
643 })
644}
645
646pub(crate) fn session_file_stats(path: &Path) -> Result<(i64, u64)> {
647 let meta = fs::metadata(path)?;
648 #[cfg(feature = "sqlite-sessions")]
649 let (size, modified) = {
650 let mut size = meta.len();
651 let mut modified = meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
652
653 if path.extension().and_then(|ext| ext.to_str()) == Some("sqlite") {
654 for auxiliary_path in sqlite_auxiliary_paths(path) {
655 let Ok(aux_meta) = fs::metadata(&auxiliary_path) else {
656 continue;
657 };
658 size = size.saturating_add(aux_meta.len());
659 let aux_modified = aux_meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
660 if aux_modified > modified {
661 modified = aux_modified;
662 }
663 }
664 }
665
666 (size, modified)
667 };
668
669 #[cfg(not(feature = "sqlite-sessions"))]
670 let (size, modified) = (
671 meta.len(),
672 meta.modified().unwrap_or(SystemTime::UNIX_EPOCH),
673 );
674
675 let millis = modified
676 .duration_since(UNIX_EPOCH)
677 .unwrap_or_default()
678 .as_millis();
679 let ms = i64::try_from(millis).unwrap_or(i64::MAX);
680 Ok((ms, size))
681}
682
683pub(crate) fn is_session_file_path(path: &Path) -> bool {
684 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
685 if name.starts_with("session-index.") {
686 return false;
687 }
688 }
689 match path.extension().and_then(|ext| ext.to_str()) {
690 Some("jsonl") => true,
691 #[cfg(feature = "sqlite-sessions")]
692 Some("sqlite") => true,
693 _ => false,
694 }
695}
696
697pub(crate) fn walk_sessions(root: &Path) -> Vec<std::io::Result<PathBuf>> {
698 let mut out = Vec::new();
699 let mut stack = vec![root.to_path_buf()];
700
701 while let Some(dir) = stack.pop() {
702 if let Ok(entries) = fs::read_dir(&dir) {
703 for entry in entries.flatten() {
704 let path = entry.path();
705 let Ok(file_type) = entry.file_type() else {
706 continue;
707 };
708
709 if file_type.is_dir() {
710 stack.push(path);
711 } else if file_type.is_symlink() {
712 if let Ok(meta) = fs::metadata(&path) {
714 if meta.is_file() && is_session_file_path(&path) {
715 out.push(Ok(path));
716 }
717 }
718 } else if is_session_file_path(&path) {
719 out.push(Ok(path));
720 }
721 }
722 }
723 }
724 out
725}
726
727fn current_epoch_ms() -> String {
728 chrono::Utc::now().timestamp_millis().to_string()
729}
730
731fn current_epoch_ms_i64() -> i64 {
732 let millis = SystemTime::now()
733 .duration_since(UNIX_EPOCH)
734 .unwrap_or_default()
735 .as_millis();
736 i64::try_from(millis).unwrap_or(i64::MAX)
737}
738
739fn epoch_ms_is_stale(epoch_ms: i64, max_age: Duration) -> bool {
740 let age_ms = current_epoch_ms_i64().saturating_sub(epoch_ms);
741 u128::try_from(age_ms).unwrap_or(u128::MAX) > max_age.as_millis()
742}
743
744fn load_last_sync_epoch_ms(conn: &SqliteConnection) -> Result<Option<i64>> {
745 let rows = conn
746 .query_sync(
747 "SELECT value FROM meta WHERE key='last_sync_epoch_ms' LIMIT 1",
748 &[],
749 )
750 .map_err(|err| Error::session(format!("Query meta failed: {err}")))?;
751 let Some(row) = rows.into_iter().next() else {
752 return Ok(None);
753 };
754 let value = row
755 .get_named::<String>("value")
756 .map_err(|err| Error::session(format!("get meta value: {err}")))?;
757 Ok(value.parse::<i64>().ok())
758}
759
760fn lock_file_guard(file: &File, timeout: Duration) -> Result<LockGuard<'_>> {
761 let start = Instant::now();
762 loop {
763 match FileExt::try_lock_exclusive(file) {
764 Ok(true) => return Ok(LockGuard { file }),
765 Ok(false) => {}
766 Err(err) => {
767 return Err(Error::session(format!(
768 "Failed to acquire session index lock: {err}"
769 )));
770 }
771 }
772
773 if start.elapsed() >= timeout {
774 return Err(Error::session(
775 "Timed out waiting for session index lock".to_string(),
776 ));
777 }
778
779 std::thread::sleep(Duration::from_millis(50));
780 }
781}
782
783#[derive(Debug)]
784struct LockGuard<'a> {
785 file: &'a File,
786}
787
788impl Drop for LockGuard<'_> {
789 fn drop(&mut self) {
790 let _ = FileExt::unlock(self.file);
791 }
792}
793
794#[cfg(test)]
795#[path = "../tests/common/mod.rs"]
796mod test_common;
797
798#[cfg(test)]
799mod tests {
800 use super::*;
801
802 use super::test_common::TestHarness;
803 use crate::model::UserContent;
804 use crate::session::{EntryBase, MessageEntry, SessionInfoEntry, SessionMessage};
805 use pretty_assertions::assert_eq;
806 use proptest::prelude::*;
807 use proptest::string::string_regex;
808 use std::collections::HashMap;
809 use std::fs;
810 #[cfg(unix)]
811 use std::process::Command;
812 use std::time::Duration;
813
814 fn write_session_jsonl(path: &Path, header: &SessionHeader, entries: &[SessionEntry]) {
815 let mut jsonl = String::new();
816 jsonl.push_str(&serde_json::to_string(header).expect("serialize session header"));
817 jsonl.push('\n');
818 for entry in entries {
819 jsonl.push_str(&serde_json::to_string(entry).expect("serialize session entry"));
820 jsonl.push('\n');
821 }
822 fs::write(path, jsonl).expect("write session jsonl");
823 }
824
825 fn make_header(id: &str, cwd: &str) -> SessionHeader {
826 let mut header = SessionHeader::new();
827 header.id = id.to_string();
828 header.cwd = cwd.to_string();
829 header
830 }
831
832 fn make_user_entry(parent_id: Option<String>, id: &str, text: &str) -> SessionEntry {
833 SessionEntry::Message(MessageEntry {
834 base: EntryBase::new(parent_id, id.to_string()),
835 message: SessionMessage::User {
836 content: UserContent::Text(text.to_string()),
837 timestamp: Some(chrono::Utc::now().timestamp_millis()),
838 },
839 })
840 }
841
842 fn make_session_info_entry(
843 parent_id: Option<String>,
844 id: &str,
845 name: Option<&str>,
846 ) -> SessionEntry {
847 SessionEntry::SessionInfo(SessionInfoEntry {
848 base: EntryBase::new(parent_id, id.to_string()),
849 name: name.map(ToString::to_string),
850 })
851 }
852
853 fn read_meta_last_sync_epoch_ms(index: &SessionIndex) -> String {
854 index
855 .with_lock(|conn| {
856 init_schema(conn)?;
857 let rows = conn
858 .query_sync(
859 "SELECT value FROM meta WHERE key='last_sync_epoch_ms' LIMIT 1",
860 &[],
861 )
862 .map_err(|err| Error::session(format!("Query meta failed: {err}")))?;
863 let row = rows
864 .into_iter()
865 .next()
866 .ok_or_else(|| Error::session("Missing meta row".to_string()))?;
867 row.get_named::<String>("value")
868 .map_err(|err| Error::session(format!("get meta value: {err}")))
869 })
870 .expect("read meta.last_sync_epoch_ms")
871 }
872
873 #[derive(Debug, Clone)]
874 struct ArbitraryMetaRow {
875 id: String,
876 cwd: String,
877 timestamp: String,
878 message_count: i64,
879 last_modified_ms: i64,
880 size_bytes: i64,
881 name: Option<String>,
882 }
883
884 fn ident_strategy() -> impl Strategy<Value = String> {
885 string_regex("[a-z0-9_-]{1,16}").expect("valid identifier regex")
886 }
887
888 fn cwd_strategy() -> impl Strategy<Value = String> {
889 prop_oneof![
890 Just("cwd-a".to_string()),
891 Just("cwd-b".to_string()),
892 string_regex("[a-z0-9_./-]{1,20}").expect("valid cwd regex"),
893 ]
894 }
895
896 fn timestamp_strategy() -> impl Strategy<Value = String> {
897 string_regex("[0-9TZ:.-]{10,32}").expect("valid timestamp regex")
898 }
899
900 fn optional_name_strategy() -> impl Strategy<Value = Option<String>> {
901 prop::option::of(string_regex("[A-Za-z0-9 _.:-]{0,32}").expect("valid name regex"))
902 }
903
904 fn arbitrary_meta_row_strategy() -> impl Strategy<Value = ArbitraryMetaRow> {
905 (
906 ident_strategy(),
907 cwd_strategy(),
908 timestamp_strategy(),
909 any::<i64>(),
910 any::<i64>(),
911 any::<i64>(),
912 optional_name_strategy(),
913 )
914 .prop_map(
915 |(id, cwd, timestamp, message_count, last_modified_ms, size_bytes, name)| {
916 ArbitraryMetaRow {
917 id,
918 cwd,
919 timestamp,
920 message_count,
921 last_modified_ms,
922 size_bytes,
923 name,
924 }
925 },
926 )
927 }
928
929 #[test]
930 fn index_session_on_in_memory_session_is_noop() {
931 let harness = TestHarness::new("index_session_on_in_memory_session_is_noop");
932 let root = harness.temp_path("sessions");
933 fs::create_dir_all(&root).expect("create root dir");
934 let index = SessionIndex::for_sessions_root(&root);
935 let session = Session::in_memory();
936
937 index
938 .index_session(&session)
939 .expect("index in-memory session");
940
941 harness
942 .log()
943 .info_ctx("verify", "No index files created", |ctx| {
944 ctx.push(("db_path".into(), index.db_path.display().to_string()));
945 ctx.push(("lock_path".into(), index.lock_path.display().to_string()));
946 });
947 assert!(!index.db_path.exists());
948 assert!(!index.lock_path.exists());
949 }
950
951 #[test]
952 fn index_session_inserts_row_and_updates_meta() {
953 let harness = TestHarness::new("index_session_inserts_row_and_updates_meta");
954 let root = harness.temp_path("sessions");
955 fs::create_dir_all(&root).expect("create root dir");
956 let index = SessionIndex::for_sessions_root(&root);
957
958 let session_path = harness.temp_path("sessions/project/a.jsonl");
959 fs::create_dir_all(session_path.parent().expect("parent")).expect("create session dir");
960 fs::write(&session_path, "hello").expect("write session file");
961
962 let mut session = Session::in_memory();
963 session.header = make_header("id-a", "cwd-a");
964 session.path = Some(session_path.clone());
965 session.entries.push(make_user_entry(None, "m1", "hi"));
966
967 index.index_session(&session).expect("index session");
968
969 let sessions = index.list_sessions(Some("cwd-a")).expect("list sessions");
970 assert_eq!(sessions.len(), 1);
971 assert_eq!(sessions[0].id, "id-a");
972 assert_eq!(sessions[0].cwd, "cwd-a");
973 assert_eq!(sessions[0].message_count, 1);
974 assert_eq!(sessions[0].path, session_path.display().to_string());
975
976 let meta_value = read_meta_last_sync_epoch_ms(&index);
977 harness
978 .log()
979 .info_ctx("verify", "meta.last_sync_epoch_ms present", |ctx| {
980 ctx.push(("value".into(), meta_value.clone()));
981 });
982 assert!(
983 meta_value.parse::<i64>().is_ok(),
984 "Expected meta value to be an integer epoch ms"
985 );
986 }
987
988 #[test]
989 fn index_session_updates_existing_row() {
990 let harness = TestHarness::new("index_session_updates_existing_row");
991 let root = harness.temp_path("sessions");
992 fs::create_dir_all(&root).expect("create root dir");
993 let index = SessionIndex::for_sessions_root(&root);
994
995 let session_path = harness.temp_path("sessions/project/update.jsonl");
996 fs::create_dir_all(session_path.parent().expect("parent")).expect("create session dir");
997 fs::write(&session_path, "first").expect("write session file");
998
999 let mut session = Session::in_memory();
1000 session.header = make_header("id-update", "cwd-update");
1001 session.path = Some(session_path.clone());
1002 session.entries.push(make_user_entry(None, "m1", "hi"));
1003
1004 index
1005 .index_session(&session)
1006 .expect("index session first time");
1007 let first_meta = index
1008 .list_sessions(Some("cwd-update"))
1009 .expect("list sessions")[0]
1010 .clone();
1011 let first_sync = read_meta_last_sync_epoch_ms(&index);
1012
1013 std::thread::sleep(Duration::from_millis(10));
1014 fs::write(&session_path, "second-longer").expect("rewrite session file");
1015 session
1016 .entries
1017 .push(make_user_entry(Some("m1".to_string()), "m2", "again"));
1018
1019 index
1020 .index_session(&session)
1021 .expect("index session second time");
1022 let second_meta = index
1023 .list_sessions(Some("cwd-update"))
1024 .expect("list sessions")[0]
1025 .clone();
1026 let second_sync = read_meta_last_sync_epoch_ms(&index);
1027
1028 harness.log().info_ctx("verify", "row updated", |ctx| {
1029 ctx.push((
1030 "first_message_count".into(),
1031 first_meta.message_count.to_string(),
1032 ));
1033 ctx.push((
1034 "second_message_count".into(),
1035 second_meta.message_count.to_string(),
1036 ));
1037 ctx.push(("first_size".into(), first_meta.size_bytes.to_string()));
1038 ctx.push(("second_size".into(), second_meta.size_bytes.to_string()));
1039 ctx.push(("first_sync".into(), first_sync.clone()));
1040 ctx.push(("second_sync".into(), second_sync.clone()));
1041 });
1042
1043 assert_eq!(second_meta.message_count, 2);
1044 assert!(second_meta.size_bytes >= first_meta.size_bytes);
1045 assert!(second_meta.last_modified_ms >= first_meta.last_modified_ms);
1046 assert!(second_sync.parse::<i64>().unwrap_or(0) >= first_sync.parse::<i64>().unwrap_or(0));
1047 }
1048
1049 #[test]
1050 fn list_sessions_orders_by_last_modified_desc() {
1051 let harness = TestHarness::new("list_sessions_orders_by_last_modified_desc");
1052 let root = harness.temp_path("sessions");
1053 fs::create_dir_all(&root).expect("create root dir");
1054 let index = SessionIndex::for_sessions_root(&root);
1055
1056 let path_a = harness.temp_path("sessions/project/a.jsonl");
1057 fs::create_dir_all(path_a.parent().expect("parent")).expect("create dirs");
1058 fs::write(&path_a, "a").expect("write file a");
1059
1060 let mut session_a = Session::in_memory();
1061 session_a.header = make_header("id-a", "cwd-a");
1062 session_a.path = Some(path_a);
1063 session_a.entries.push(make_user_entry(None, "m1", "a"));
1064 index.index_session(&session_a).expect("index a");
1065
1066 std::thread::sleep(Duration::from_millis(10));
1067
1068 let path_b = harness.temp_path("sessions/project/b.jsonl");
1069 fs::create_dir_all(path_b.parent().expect("parent")).expect("create dirs");
1070 fs::write(&path_b, "bbbbb").expect("write file b");
1071
1072 let mut session_b = Session::in_memory();
1073 session_b.header = make_header("id-b", "cwd-b");
1074 session_b.path = Some(path_b);
1075 session_b.entries.push(make_user_entry(None, "m1", "b"));
1076 index.index_session(&session_b).expect("index b");
1077
1078 let sessions = index.list_sessions(None).expect("list sessions");
1079 harness
1080 .log()
1081 .info("verify", format!("listed {} sessions", sessions.len()));
1082 assert!(sessions.len() >= 2);
1083 assert_eq!(sessions[0].id, "id-b");
1084 assert_eq!(sessions[1].id, "id-a");
1085 assert!(sessions[0].last_modified_ms >= sessions[1].last_modified_ms);
1086 }
1087
1088 #[test]
1089 fn list_sessions_filters_by_cwd() {
1090 let harness = TestHarness::new("list_sessions_filters_by_cwd");
1091 let root = harness.temp_path("sessions");
1092 fs::create_dir_all(&root).expect("create root dir");
1093 let index = SessionIndex::for_sessions_root(&root);
1094
1095 for (id, cwd) in [("id-a", "cwd-a"), ("id-b", "cwd-b")] {
1096 let path = harness.temp_path(format!("sessions/project/{id}.jsonl"));
1097 fs::create_dir_all(path.parent().expect("parent")).expect("create dirs");
1098 fs::write(&path, id).expect("write session file");
1099
1100 let mut session = Session::in_memory();
1101 session.header = make_header(id, cwd);
1102 session.path = Some(path);
1103 session.entries.push(make_user_entry(None, "m1", id));
1104 index.index_session(&session).expect("index session");
1105 }
1106
1107 let only_a = index
1108 .list_sessions(Some("cwd-a"))
1109 .expect("list sessions cwd-a");
1110 assert_eq!(only_a.len(), 1);
1111 assert_eq!(only_a[0].id, "id-a");
1112 }
1113
1114 #[test]
1115 fn reindex_all_is_noop_when_sessions_root_missing() {
1116 let harness = TestHarness::new("reindex_all_is_noop_when_sessions_root_missing");
1117 let missing_root = harness.temp_path("does-not-exist");
1118 let index = SessionIndex::for_sessions_root(&missing_root);
1119
1120 index.reindex_all().expect("reindex_all");
1121 assert!(!index.db_path.exists());
1122 assert!(!index.lock_path.exists());
1123 }
1124
1125 #[test]
1126 fn reindex_all_rebuilds_index_from_disk() {
1127 let harness = TestHarness::new("reindex_all_rebuilds_index_from_disk");
1128 let root = harness.temp_path("sessions");
1129 fs::create_dir_all(&root).expect("create root dir");
1130 let index = SessionIndex::for_sessions_root(&root);
1131
1132 let path = harness.temp_path("sessions/project/reindex.jsonl");
1133 fs::create_dir_all(path.parent().expect("parent")).expect("create dirs");
1134
1135 let header = make_header("id-reindex", "cwd-reindex");
1136 let entries = vec![
1137 make_user_entry(None, "m1", "hello"),
1138 make_session_info_entry(Some("m1".to_string()), "info1", Some("My Session")),
1139 make_user_entry(Some("info1".to_string()), "m2", "world"),
1140 ];
1141 write_session_jsonl(&path, &header, &entries);
1142
1143 index.reindex_all().expect("reindex_all");
1144
1145 let sessions = index
1146 .list_sessions(Some("cwd-reindex"))
1147 .expect("list sessions");
1148 assert_eq!(sessions.len(), 1);
1149 assert_eq!(sessions[0].id, "id-reindex");
1150 assert_eq!(sessions[0].message_count, 2);
1151 assert_eq!(sessions[0].name.as_deref(), Some("My Session"));
1152
1153 let meta_value = read_meta_last_sync_epoch_ms(&index);
1154 harness.log().info_ctx("verify", "meta updated", |ctx| {
1155 ctx.push(("value".into(), meta_value.clone()));
1156 });
1157 assert!(meta_value.parse::<i64>().unwrap_or(0) > 0);
1158 }
1159
1160 #[test]
1161 fn reindex_all_skips_invalid_jsonl_files() {
1162 let harness = TestHarness::new("reindex_all_skips_invalid_jsonl_files");
1163 let root = harness.temp_path("sessions");
1164 fs::create_dir_all(&root).expect("create root dir");
1165 let index = SessionIndex::for_sessions_root(&root);
1166
1167 let good = harness.temp_path("sessions/project/good.jsonl");
1168 fs::create_dir_all(good.parent().expect("parent")).expect("create dirs");
1169 let header = make_header("id-good", "cwd-good");
1170 let entries = vec![make_user_entry(None, "m1", "ok")];
1171 write_session_jsonl(&good, &header, &entries);
1172
1173 let bad = harness.temp_path("sessions/project/bad.jsonl");
1174 fs::write(&bad, "not-json\n{").expect("write bad jsonl");
1175
1176 index.reindex_all().expect("reindex_all should succeed");
1177 let sessions = index.list_sessions(None).expect("list sessions");
1178 assert_eq!(sessions.len(), 1);
1179 assert_eq!(sessions[0].id, "id-good");
1180 }
1181
1182 #[test]
1183 fn build_meta_from_file_returns_session_error_on_invalid_header() {
1184 let harness =
1185 TestHarness::new("build_meta_from_file_returns_session_error_on_invalid_header");
1186 let path = harness.temp_path("bad_header.jsonl");
1187 fs::write(&path, "not json\n").expect("write bad header");
1188
1189 let err = build_meta_from_file(&path).expect_err("expected error");
1190 harness.log().info("verify", format!("error: {err}"));
1191
1192 assert!(
1193 matches!(err, Error::Session(ref msg) if msg.contains("Parse session header")),
1194 "Expected Error::Session containing Parse session header, got {err:?}",
1195 );
1196 }
1197
1198 #[test]
1199 fn build_meta_from_file_rejects_semantically_invalid_header() {
1200 let harness = TestHarness::new("build_meta_from_file_rejects_semantically_invalid_header");
1201 let path = harness.temp_path("bad_semantic_header.jsonl");
1202 let header = SessionHeader {
1203 r#type: "note".to_string(),
1204 id: "bad-id".to_string(),
1205 cwd: "/tmp".to_string(),
1206 timestamp: "2026-01-01T00:00:00.000Z".to_string(),
1207 ..SessionHeader::default()
1208 };
1209 write_session_jsonl(&path, &header, &[]);
1210
1211 let err = build_meta_from_file(&path).expect_err("expected invalid header error");
1212 harness.log().info("verify", format!("error: {err}"));
1213
1214 assert!(
1215 matches!(err, Error::Session(ref msg) if msg.contains("Invalid session header")),
1216 "Expected Error::Session containing Invalid session header, got {err:?}",
1217 );
1218 }
1219
1220 #[test]
1221 fn build_meta_from_file_returns_session_error_on_empty_file() {
1222 let harness = TestHarness::new("build_meta_from_file_returns_session_error_on_empty_file");
1223 let path = harness.temp_path("empty.jsonl");
1224 fs::write(&path, "").expect("write empty");
1225
1226 let err = build_meta_from_file(&path).expect_err("expected error");
1227 if let Error::Session(msg) = &err {
1228 harness.log().info("verify", msg.clone());
1229 }
1230 assert!(
1231 matches!(err, Error::Session(ref msg) if msg.contains("Empty session file")),
1232 "Expected Error::Session containing Empty session file, got {err:?}",
1233 );
1234 }
1235
1236 #[test]
1237 fn list_sessions_returns_session_error_when_db_path_is_directory() {
1238 let harness =
1239 TestHarness::new("list_sessions_returns_session_error_when_db_path_is_directory");
1240 let root = harness.temp_path("sessions");
1241 fs::create_dir_all(&root).expect("create root dir");
1242
1243 let db_dir = root.join("session-index.sqlite");
1244 fs::create_dir_all(&db_dir).expect("create db dir to force sqlite open failure");
1245
1246 let index = SessionIndex::for_sessions_root(&root);
1247 let err = index.list_sessions(None).expect_err("expected error");
1248 if let Error::Session(msg) = &err {
1249 harness.log().info("verify", msg.clone());
1250 }
1251 assert!(
1252 matches!(err, Error::Session(ref msg) if msg.contains("SQLite open")),
1253 "Expected Error::Session containing SQLite open, got {err:?}",
1254 );
1255 }
1256
1257 #[test]
1258 fn lock_file_guard_prevents_concurrent_access() {
1259 let harness = TestHarness::new("lock_file_guard_prevents_concurrent_access");
1260 let path = harness.temp_path("lockfile.lock");
1261 fs::write(&path, "").expect("create lock file");
1262
1263 let file1 = File::options()
1264 .read(true)
1265 .write(true)
1266 .open(&path)
1267 .expect("open file1");
1268 let file2 = File::options()
1269 .read(true)
1270 .write(true)
1271 .open(&path)
1272 .expect("open file2");
1273
1274 let guard1 = lock_file_guard(&file1, Duration::from_millis(50)).expect("acquire lock");
1275 let err =
1276 lock_file_guard(&file2, Duration::from_millis(50)).expect_err("expected lock timeout");
1277 drop(guard1);
1278
1279 assert!(
1280 matches!(err, Error::Session(ref msg) if msg.contains("Timed out")),
1281 "Expected Error::Session containing Timed out, got {err:?}",
1282 );
1283
1284 let _guard2 =
1285 lock_file_guard(&file2, Duration::from_millis(50)).expect("lock after release");
1286 }
1287
1288 #[test]
1289 fn should_reindex_returns_true_when_db_missing() {
1290 let harness = TestHarness::new("should_reindex_returns_true_when_db_missing");
1291 let root = harness.temp_path("sessions");
1292 fs::create_dir_all(&root).expect("create root dir");
1293 let index = SessionIndex::for_sessions_root(&root);
1294
1295 assert!(index.should_reindex(Duration::from_secs(60)));
1296 }
1297
1298 #[test]
1301 fn session_stats_empty_entries() {
1302 let (count, name) = session_stats::<SessionEntry>(&[]);
1303 assert_eq!(count, 0);
1304 assert!(name.is_none());
1305 }
1306
1307 #[test]
1308 fn session_stats_counts_messages_only() {
1309 let entries = vec![
1310 make_user_entry(None, "m1", "hello"),
1311 make_session_info_entry(Some("m1".to_string()), "info1", None),
1312 make_user_entry(Some("info1".to_string()), "m2", "world"),
1313 ];
1314 let (count, name) = session_stats(&entries);
1315 assert_eq!(count, 2);
1316 assert!(name.is_none());
1317 }
1318
1319 #[test]
1320 fn session_stats_extracts_last_name() {
1321 let entries = vec![
1322 make_session_info_entry(None, "info1", Some("First Name")),
1323 make_user_entry(Some("info1".to_string()), "m1", "msg"),
1324 make_session_info_entry(Some("m1".to_string()), "info2", Some("Final Name")),
1325 ];
1326 let (count, name) = session_stats(&entries);
1327 assert_eq!(count, 1);
1328 assert_eq!(name.as_deref(), Some("Final Name"));
1329 }
1330
1331 #[test]
1332 fn session_stats_name_not_overwritten_by_none() {
1333 let entries = vec![
1334 make_session_info_entry(None, "info1", Some("My Session")),
1335 make_session_info_entry(Some("info1".to_string()), "info2", None),
1336 ];
1337 let (_, name) = session_stats(&entries);
1338 assert_eq!(name.as_deref(), Some("My Session"));
1340 }
1341
1342 #[test]
1345 fn file_stats_returns_size_and_mtime() {
1346 let harness = TestHarness::new("file_stats_returns_size_and_mtime");
1347 let path = harness.temp_path("test_file.txt");
1348 fs::write(&path, "hello world").expect("write");
1349
1350 let (last_modified_ms, size_bytes) = session_file_stats(&path).expect("file_stats");
1351 assert_eq!(size_bytes, 11); assert!(last_modified_ms > 0, "Expected positive modification time");
1353 }
1354
1355 #[cfg(feature = "sqlite-sessions")]
1356 #[test]
1357 fn file_stats_sqlite_includes_wal_and_shm_sizes() {
1358 let harness = TestHarness::new("file_stats_sqlite_includes_wal_and_shm_sizes");
1359 let path = harness.temp_path("test_session.sqlite");
1360 let [wal_path, shm_path] = sqlite_auxiliary_paths(&path);
1361
1362 fs::write(&path, b"db").expect("write sqlite db");
1363 fs::write(&wal_path, b"walpayload").expect("write sqlite wal");
1364 fs::write(&shm_path, b"shm!").expect("write sqlite shm");
1365
1366 let (_, size_bytes) = session_file_stats(&path).expect("file_stats");
1367 assert_eq!(size_bytes, 2 + 10 + 4);
1368 }
1369
1370 #[cfg(feature = "sqlite-sessions")]
1371 #[test]
1372 fn index_session_snapshot_uses_newest_sqlite_sidecar_mtime_and_size() {
1373 let harness =
1374 TestHarness::new("index_session_snapshot_uses_newest_sqlite_sidecar_mtime_and_size");
1375 let root = harness.temp_path("sessions");
1376 let project_dir = root.join("project");
1377 fs::create_dir_all(&project_dir).expect("create project dir");
1378
1379 let path = project_dir.join("test.sqlite");
1380 let [wal_path, _shm_path] = sqlite_auxiliary_paths(&path);
1381 fs::write(&path, b"db").expect("write sqlite db");
1382
1383 let base_millis = fs::metadata(&path)
1384 .expect("base metadata")
1385 .modified()
1386 .expect("base modified")
1387 .duration_since(UNIX_EPOCH)
1388 .expect("base since epoch")
1389 .as_millis();
1390 std::thread::sleep(Duration::from_millis(1_100));
1391 fs::write(&wal_path, b"walpayload").expect("write sqlite wal");
1392 let wal_millis = fs::metadata(&wal_path)
1393 .expect("wal metadata")
1394 .modified()
1395 .expect("wal modified")
1396 .duration_since(UNIX_EPOCH)
1397 .expect("wal since epoch")
1398 .as_millis();
1399
1400 assert!(
1401 wal_millis > base_millis,
1402 "test requires WAL sidecar mtime to be newer than base db mtime"
1403 );
1404
1405 let index = SessionIndex::for_sessions_root(&root);
1406 let header = make_header("sqlite-id", "sqlite-cwd");
1407 index
1408 .index_session_snapshot(&path, &header, 3, Some("sqlite session".to_string()))
1409 .expect("index sqlite snapshot");
1410
1411 let listed = index
1412 .list_sessions(Some("sqlite-cwd"))
1413 .expect("list sqlite session");
1414 assert_eq!(listed.len(), 1);
1415 assert_eq!(listed[0].size_bytes, 2 + 10);
1416 assert_eq!(
1417 listed[0].last_modified_ms,
1418 i64::try_from(wal_millis).expect("wal mtime fits in i64")
1419 );
1420 }
1421
1422 #[test]
1423 fn enqueue_session_index_snapshot_update_persists_row_immediately() {
1424 let harness =
1425 TestHarness::new("enqueue_session_index_snapshot_update_persists_row_immediately");
1426 let root = harness.temp_path("sessions");
1427 let project_dir = root.join("project");
1428 fs::create_dir_all(&project_dir).expect("create project dir");
1429
1430 let path = project_dir.join("session.jsonl");
1431 fs::write(&path, b"{\"type\":\"header\"}\n").expect("write session file");
1432
1433 let header = make_header("queued-id", "queued-cwd");
1434 enqueue_session_index_snapshot_update(
1435 &root,
1436 &path,
1437 &header,
1438 3,
1439 Some("Queued Session".to_string()),
1440 );
1441
1442 let index = SessionIndex::for_sessions_root(&root);
1443 let listed = index
1444 .list_sessions(Some("queued-cwd"))
1445 .expect("list queued snapshot rows");
1446 assert_eq!(listed.len(), 1);
1447 assert_eq!(listed[0].id, "queued-id");
1448 assert_eq!(listed[0].path, path.display().to_string());
1449 assert_eq!(listed[0].message_count, 3);
1450 assert_eq!(listed[0].name.as_deref(), Some("Queued Session"));
1451 }
1452
1453 #[test]
1454 fn file_stats_missing_file_returns_error() {
1455 let err = session_file_stats(Path::new("/nonexistent/file.txt"));
1456 assert!(err.is_err());
1457 }
1458
1459 #[test]
1460 fn list_sessions_errors_on_negative_message_count() {
1461 let harness = TestHarness::new("list_sessions_errors_on_negative_message_count");
1462 let root = harness.temp_path("sessions");
1463 fs::create_dir_all(&root).expect("create root dir");
1464 let index = SessionIndex::for_sessions_root(&root);
1465
1466 index
1467 .with_lock(|conn| {
1468 init_schema(conn)?;
1469 conn.execute_sync(
1470 "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
1471 VALUES (?1,?2,?3,?4,?5,?6,?7,?8)",
1472 &[
1473 Value::Text("/tmp/negative-message-count.jsonl".to_string()),
1474 Value::Text("id-neg".to_string()),
1475 Value::Text("cwd-neg".to_string()),
1476 Value::Text("2026-01-01T00:00:00Z".to_string()),
1477 Value::BigInt(-1),
1478 Value::BigInt(1),
1479 Value::BigInt(1),
1480 Value::Null,
1481 ],
1482 )
1483 .map_err(|err| Error::session(format!("insert negative row: {err}")))?;
1484 Ok(())
1485 })
1486 .expect("seed negative row");
1487
1488 let err = index
1489 .list_sessions(None)
1490 .expect_err("negative count should error");
1491 assert!(
1492 matches!(err, Error::Session(ref msg) if msg.contains("message_count must be non-negative")),
1493 "unexpected error: {err:?}"
1494 );
1495 }
1496
1497 #[test]
1498 fn list_sessions_errors_on_negative_size_bytes() {
1499 let harness = TestHarness::new("list_sessions_errors_on_negative_size_bytes");
1500 let root = harness.temp_path("sessions");
1501 fs::create_dir_all(&root).expect("create root dir");
1502 let index = SessionIndex::for_sessions_root(&root);
1503
1504 index
1505 .with_lock(|conn| {
1506 init_schema(conn)?;
1507 conn.execute_sync(
1508 "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
1509 VALUES (?1,?2,?3,?4,?5,?6,?7,?8)",
1510 &[
1511 Value::Text("/tmp/negative-size-bytes.jsonl".to_string()),
1512 Value::Text("id-neg".to_string()),
1513 Value::Text("cwd-neg".to_string()),
1514 Value::Text("2026-01-01T00:00:00Z".to_string()),
1515 Value::BigInt(1),
1516 Value::BigInt(1),
1517 Value::BigInt(-1),
1518 Value::Null,
1519 ],
1520 )
1521 .map_err(|err| Error::session(format!("insert negative row: {err}")))?;
1522 Ok(())
1523 })
1524 .expect("seed negative row");
1525
1526 let err = index
1527 .list_sessions(None)
1528 .expect_err("negative size should error");
1529 assert!(
1530 matches!(err, Error::Session(ref msg) if msg.contains("size_bytes must be non-negative")),
1531 "unexpected error: {err:?}"
1532 );
1533 }
1534
1535 #[test]
1538 fn is_session_file_path_jsonl() {
1539 assert!(is_session_file_path(Path::new("session.jsonl")));
1540 assert!(is_session_file_path(Path::new("/foo/bar/test.jsonl")));
1541 }
1542
1543 #[test]
1544 fn is_session_file_path_non_session() {
1545 assert!(!is_session_file_path(Path::new("session.txt")));
1546 assert!(!is_session_file_path(Path::new("session.json")));
1547 assert!(!is_session_file_path(Path::new("session")));
1548 }
1549
1550 #[test]
1553 fn walk_sessions_finds_jsonl_files_recursively() {
1554 let harness = TestHarness::new("walk_sessions_finds_jsonl_files_recursively");
1555 let root = harness.temp_path("sessions");
1556 fs::create_dir_all(root.join("project")).expect("create dirs");
1557
1558 fs::write(root.join("a.jsonl"), "").expect("write");
1559 fs::write(root.join("project/b.jsonl"), "").expect("write");
1560 fs::write(root.join("not_session.txt"), "").expect("write");
1561
1562 let paths = walk_sessions(&root);
1563 let ok_paths: Vec<_> = paths
1564 .into_iter()
1565 .filter_map(std::result::Result::ok)
1566 .collect();
1567 assert_eq!(ok_paths.len(), 2);
1568 assert!(ok_paths.iter().any(|p| p.ends_with("a.jsonl")));
1569 assert!(ok_paths.iter().any(|p| p.ends_with("b.jsonl")));
1570 }
1571
1572 #[test]
1573 fn walk_sessions_empty_dir() {
1574 let harness = TestHarness::new("walk_sessions_empty_dir");
1575 let root = harness.temp_path("sessions");
1576 fs::create_dir_all(&root).expect("create dirs");
1577
1578 let paths = walk_sessions(&root);
1579 assert!(paths.is_empty());
1580 }
1581
1582 #[test]
1583 fn walk_sessions_nonexistent_dir() {
1584 let paths = walk_sessions(Path::new("/nonexistent/path"));
1585 assert!(paths.is_empty());
1586 }
1587
1588 #[test]
1591 fn current_epoch_ms_is_valid_number() {
1592 let ms = current_epoch_ms();
1593 let parsed: i64 = ms.parse().expect("should be valid i64");
1594 assert!(parsed > 0, "Epoch ms should be positive");
1595 assert!(parsed > 1_577_836_800_000, "Epoch ms should be after 2020");
1597 }
1598
1599 #[test]
1602 fn delete_session_path_removes_row() {
1603 let harness = TestHarness::new("delete_session_path_removes_row");
1604 let root = harness.temp_path("sessions");
1605 fs::create_dir_all(&root).expect("create root dir");
1606 let index = SessionIndex::for_sessions_root(&root);
1607
1608 let session_path = harness.temp_path("sessions/project/del.jsonl");
1609 fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1610 fs::write(&session_path, "data").expect("write");
1611
1612 let mut session = Session::in_memory();
1613 session.header = make_header("id-del", "cwd-del");
1614 session.path = Some(session_path.clone());
1615 session.entries.push(make_user_entry(None, "m1", "hi"));
1616 index.index_session(&session).expect("index session");
1617
1618 let before = index.list_sessions(None).expect("list before");
1619 assert_eq!(before.len(), 1);
1620
1621 index
1622 .delete_session_path(&session_path)
1623 .expect("delete session path");
1624
1625 let after = index.list_sessions(None).expect("list after");
1626 assert!(after.is_empty());
1627 }
1628
1629 #[test]
1630 fn delete_session_path_noop_when_not_exists() {
1631 let harness = TestHarness::new("delete_session_path_noop_when_not_exists");
1632 let root = harness.temp_path("sessions");
1633 fs::create_dir_all(&root).expect("create root dir");
1634 let index = SessionIndex::for_sessions_root(&root);
1635
1636 index
1638 .delete_session_path(Path::new("/nonexistent/session.jsonl"))
1639 .expect("delete nonexistent should succeed");
1640 }
1641
1642 #[test]
1645 fn should_reindex_returns_false_when_db_is_fresh() {
1646 let harness = TestHarness::new("should_reindex_returns_false_when_db_is_fresh");
1647 let root = harness.temp_path("sessions");
1648 fs::create_dir_all(&root).expect("create root dir");
1649 let index = SessionIndex::for_sessions_root(&root);
1650
1651 let session_path = harness.temp_path("sessions/project/fresh.jsonl");
1653 fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1654 fs::write(&session_path, "data").expect("write");
1655
1656 let mut session = Session::in_memory();
1657 session.header = make_header("id-fresh", "cwd-fresh");
1658 session.path = Some(session_path);
1659 session.entries.push(make_user_entry(None, "m1", "hi"));
1660 index.index_session(&session).expect("index session");
1661
1662 assert!(!index.should_reindex(Duration::from_secs(3600)));
1664 }
1665
1666 #[cfg(unix)]
1667 #[test]
1668 fn should_reindex_prefers_meta_timestamp_over_stale_db_mtime() {
1669 let harness = TestHarness::new("should_reindex_prefers_meta_timestamp_over_stale_db_mtime");
1670 let root = harness.temp_path("sessions");
1671 fs::create_dir_all(&root).expect("create root dir");
1672 let index = SessionIndex::for_sessions_root(&root);
1673
1674 let session_path = harness.temp_path("sessions/project/fresh-meta.jsonl");
1675 fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1676 fs::write(&session_path, "data").expect("write");
1677
1678 let mut session = Session::in_memory();
1679 session.header = make_header("id-fresh-meta", "cwd-fresh-meta");
1680 session.path = Some(session_path);
1681 session.entries.push(make_user_entry(None, "m1", "hi"));
1682 index.index_session(&session).expect("index session");
1683
1684 let status = Command::new("touch")
1685 .args([
1686 "-t",
1687 "200001010000",
1688 index.db_path.to_str().expect("utf-8 db path"),
1689 ])
1690 .status()
1691 .expect("run touch");
1692 assert!(status.success(), "touch should succeed");
1693
1694 assert!(
1695 !index.should_reindex(Duration::from_secs(3600)),
1696 "fresh meta.last_sync_epoch_ms should outrank stale db mtime"
1697 );
1698 }
1699
1700 #[test]
1703 fn reindex_if_stale_returns_false_when_fresh() {
1704 let harness = TestHarness::new("reindex_if_stale_returns_false_when_fresh");
1705 let root = harness.temp_path("sessions");
1706 fs::create_dir_all(&root).expect("create root dir");
1707 let index = SessionIndex::for_sessions_root(&root);
1708
1709 let session_path = harness.temp_path("sessions/project/stale_test.jsonl");
1711 fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1712 let header = make_header("id-stale", "cwd-stale");
1713 let entries = vec![make_user_entry(None, "m1", "msg")];
1714 write_session_jsonl(&session_path, &header, &entries);
1715
1716 let result = index
1718 .reindex_if_stale(Duration::from_secs(3600))
1719 .expect("reindex");
1720 assert!(result, "First reindex should return true (no db)");
1721
1722 let result = index
1724 .reindex_if_stale(Duration::from_secs(3600))
1725 .expect("reindex");
1726 assert!(!result, "Second reindex should return false (fresh)");
1727 }
1728
1729 #[test]
1730 fn reindex_if_stale_returns_true_when_stale() {
1731 let harness = TestHarness::new("reindex_if_stale_returns_true_when_stale");
1732 let root = harness.temp_path("sessions");
1733 fs::create_dir_all(&root).expect("create root dir");
1734 let index = SessionIndex::for_sessions_root(&root);
1735
1736 let session_path = harness.temp_path("sessions/project/stale.jsonl");
1738 fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1739 let header = make_header("id-stale2", "cwd-stale2");
1740 let entries = vec![make_user_entry(None, "m1", "msg")];
1741 write_session_jsonl(&session_path, &header, &entries);
1742
1743 let result = index.reindex_if_stale(Duration::ZERO).expect("reindex");
1745 assert!(result, "Should reindex with zero max_age");
1746 }
1747
1748 #[test]
1751 fn build_meta_from_file_returns_correct_fields() {
1752 let harness = TestHarness::new("build_meta_from_file_returns_correct_fields");
1753 let path = harness.temp_path("test_session.jsonl");
1754 let header = make_header("id-bm", "cwd-bm");
1755 let entries = vec![
1756 make_user_entry(None, "m1", "hello"),
1757 make_user_entry(Some("m1".to_string()), "m2", "world"),
1758 make_session_info_entry(Some("m2".to_string()), "info1", Some("Named Session")),
1759 ];
1760 write_session_jsonl(&path, &header, &entries);
1761
1762 let meta = build_meta_from_file(&path).expect("build_meta_from_file");
1763 assert_eq!(meta.id, "id-bm");
1764 assert_eq!(meta.cwd, "cwd-bm");
1765 assert_eq!(meta.message_count, 2);
1766 assert_eq!(meta.name.as_deref(), Some("Named Session"));
1767 assert!(meta.size_bytes > 0);
1768 assert!(meta.last_modified_ms > 0);
1769 assert!(meta.path.contains("test_session.jsonl"));
1770 }
1771
1772 #[test]
1775 fn for_sessions_root_constructs_correct_paths() {
1776 let root = Path::new("/home/user/.pi/sessions");
1777 let index = SessionIndex::for_sessions_root(root);
1778 assert_eq!(
1779 index.db_path,
1780 PathBuf::from("/home/user/.pi/sessions/session-index.sqlite")
1781 );
1782 assert_eq!(
1783 index.lock_path,
1784 PathBuf::from("/home/user/.pi/sessions/session-index.lock")
1785 );
1786 }
1787
1788 #[test]
1791 fn sessions_root_returns_parent_of_db_path() {
1792 let root = Path::new("/home/user/.pi/sessions");
1793 let index = SessionIndex::for_sessions_root(root);
1794 assert_eq!(index.sessions_root(), root);
1795 }
1796
1797 #[test]
1800 fn reindex_all_replaces_stale_rows() {
1801 let harness = TestHarness::new("reindex_all_replaces_stale_rows");
1802 let root = harness.temp_path("sessions");
1803 fs::create_dir_all(root.join("project")).expect("create dirs");
1804
1805 let index = SessionIndex::for_sessions_root(&root);
1807
1808 let path_a = harness.temp_path("sessions/project/a.jsonl");
1809 let header_a = make_header("id-a", "cwd-a");
1810 write_session_jsonl(&path_a, &header_a, &[make_user_entry(None, "m1", "a")]);
1811
1812 let path_b = harness.temp_path("sessions/project/b.jsonl");
1813 let header_b = make_header("id-b", "cwd-b");
1814 write_session_jsonl(&path_b, &header_b, &[make_user_entry(None, "m1", "b")]);
1815
1816 index.reindex_all().expect("reindex_all");
1818 let all = index.list_sessions(None).expect("list all");
1819 assert_eq!(all.len(), 2);
1820
1821 fs::remove_file(&path_a).expect("remove file");
1823 index.reindex_all().expect("reindex_all after delete");
1824 let all = index.list_sessions(None).expect("list after reindex");
1825 assert_eq!(all.len(), 1);
1826 assert_eq!(all[0].id, "id-b");
1827 }
1828
1829 #[test]
1832 fn index_session_with_session_name() {
1833 let harness = TestHarness::new("index_session_with_session_name");
1834 let root = harness.temp_path("sessions");
1835 fs::create_dir_all(&root).expect("create root dir");
1836 let index = SessionIndex::for_sessions_root(&root);
1837
1838 let session_path = harness.temp_path("sessions/project/named.jsonl");
1839 fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1840 fs::write(&session_path, "data").expect("write");
1841
1842 let mut session = Session::in_memory();
1843 session.header = make_header("id-named", "cwd-named");
1844 session.path = Some(session_path);
1845 session.entries.push(make_user_entry(None, "m1", "hi"));
1846 session.entries.push(make_session_info_entry(
1847 Some("m1".to_string()),
1848 "info1",
1849 Some("My Project"),
1850 ));
1851
1852 index.index_session(&session).expect("index session");
1853
1854 let sessions = index.list_sessions(None).expect("list");
1855 assert_eq!(sessions.len(), 1);
1856 assert_eq!(sessions[0].name.as_deref(), Some("My Project"));
1857 }
1858
1859 #[test]
1860 fn index_session_update_clears_stale_session_name() {
1861 let harness = TestHarness::new("index_session_update_clears_stale_session_name");
1862 let root = harness.temp_path("sessions");
1863 fs::create_dir_all(&root).expect("create root dir");
1864 let index = SessionIndex::for_sessions_root(&root);
1865
1866 let session_path = harness.temp_path("sessions/project/clear-name.jsonl");
1867 fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1868 fs::write(&session_path, "first").expect("write");
1869
1870 let mut named = Session::in_memory();
1871 named.header = make_header("id-clear-name", "cwd-clear-name");
1872 named.path = Some(session_path.clone());
1873 named.entries.push(make_user_entry(None, "m1", "hi"));
1874 named.entries.push(make_session_info_entry(
1875 Some("m1".to_string()),
1876 "info1",
1877 Some("My Project"),
1878 ));
1879
1880 index.index_session(&named).expect("index named session");
1881 let first = index.list_sessions(None).expect("list named");
1882 assert_eq!(first.len(), 1);
1883 assert_eq!(first[0].name.as_deref(), Some("My Project"));
1884
1885 std::thread::sleep(Duration::from_millis(10));
1886 fs::write(&session_path, "second").expect("rewrite");
1887
1888 let mut unnamed = Session::in_memory();
1889 unnamed.header = make_header("id-clear-name", "cwd-clear-name");
1890 unnamed.path = Some(session_path);
1891 unnamed.entries.push(make_user_entry(None, "m1", "hi"));
1892
1893 index
1894 .index_session(&unnamed)
1895 .expect("index unnamed session");
1896 let second = index.list_sessions(None).expect("list unnamed");
1897 assert_eq!(second.len(), 1);
1898 assert_eq!(second[0].name, None);
1899 }
1900
1901 #[test]
1904 fn list_sessions_no_cwd_returns_all() {
1905 let harness = TestHarness::new("list_sessions_no_cwd_returns_all");
1906 let root = harness.temp_path("sessions");
1907 fs::create_dir_all(&root).expect("create root dir");
1908 let index = SessionIndex::for_sessions_root(&root);
1909
1910 for (id, cwd) in [("id-x", "cwd-x"), ("id-y", "cwd-y"), ("id-z", "cwd-z")] {
1911 let path = harness.temp_path(format!("sessions/project/{id}.jsonl"));
1912 fs::create_dir_all(path.parent().expect("parent")).expect("create dirs");
1913 fs::write(&path, id).expect("write");
1914
1915 let mut session = Session::in_memory();
1916 session.header = make_header(id, cwd);
1917 session.path = Some(path);
1918 session.entries.push(make_user_entry(None, "m1", id));
1919 index.index_session(&session).expect("index session");
1920 }
1921
1922 let all = index.list_sessions(None).expect("list all");
1923 assert_eq!(all.len(), 3);
1924 }
1925
1926 #[test]
1929 fn build_meta_from_jsonl_skips_bad_entry_lines() {
1930 let harness = TestHarness::new("build_meta_from_jsonl_skips_bad_entry_lines");
1931 let path = harness.temp_path("mixed.jsonl");
1932
1933 let header = make_header("id-mixed", "cwd-mixed");
1934 let good_entry = make_user_entry(None, "m1", "good");
1935 let mut content = serde_json::to_string(&header).expect("ser header");
1936 content.push('\n');
1937 content.push_str(&serde_json::to_string(&good_entry).expect("ser entry"));
1938 content.push('\n');
1939 content.push_str("not valid json\n");
1940 content.push_str(
1941 &serde_json::to_string(&make_user_entry(Some("m1".to_string()), "m2", "another"))
1942 .expect("ser entry"),
1943 );
1944 content.push('\n');
1945
1946 fs::write(&path, content).expect("write");
1947
1948 let meta = build_meta_from_jsonl(&path).expect("build_meta");
1949 assert_eq!(meta.message_count, 2);
1951 }
1952
1953 #[test]
1954 fn build_meta_from_jsonl_errors_on_invalid_utf8_entry_line() {
1955 let harness = TestHarness::new("build_meta_from_jsonl_errors_on_invalid_utf8_entry_line");
1956 let path = harness.temp_path("invalid_utf8.jsonl");
1957
1958 let header = make_header("id-invalid", "cwd-invalid");
1959 let mut bytes = serde_json::to_vec(&header).expect("serialize header");
1960 bytes.push(b'\n');
1961 bytes.extend_from_slice(br#"{"type":"message","message":{"role":"user","content":"ok"}}"#);
1962 bytes.push(b'\n');
1963 bytes.extend_from_slice(&[0xFF, 0xFE, b'\n']);
1964
1965 fs::write(&path, bytes).expect("write");
1966
1967 let err = build_meta_from_jsonl(&path).expect_err("invalid utf8 should error");
1968 assert!(
1969 matches!(err, Error::Session(ref msg) if msg.contains("Read session entry line")),
1970 "Expected entry line read error, got {err:?}"
1971 );
1972 }
1973
1974 #[test]
1975 fn read_capped_utf8_line_with_limit_rejects_oversized_line_without_newline() {
1976 let oversized = "x".repeat(5);
1977 let mut reader = std::io::Cursor::new(oversized.into_bytes());
1978
1979 let err = read_capped_utf8_line_with_limit(&mut reader, 4).expect_err("oversized line");
1980 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1981 assert!(err.to_string().contains("JSONL line exceeds 4 bytes"));
1982 }
1983
1984 #[test]
1985 fn read_capped_utf8_line_with_limit_allows_exact_limit_before_newline() {
1986 let mut reader = std::io::Cursor::new(b"abcd\n".to_vec());
1987
1988 let line = read_capped_utf8_line_with_limit(&mut reader, 4)
1989 .expect("read line")
1990 .expect("line present");
1991 assert_eq!(line, "abcd\n");
1992 assert!(
1993 read_capped_utf8_line_with_limit(&mut reader, 4)
1994 .expect("read eof")
1995 .is_none()
1996 );
1997 }
1998
1999 #[test]
2000 fn read_capped_utf8_line_with_limit_drains_oversized_line_remainder() {
2001 let mut reader = std::io::Cursor::new(b"xxxxx\ny\n".to_vec());
2002
2003 let err = read_capped_utf8_line_with_limit(&mut reader, 4).expect_err("oversized line");
2004 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
2005
2006 let next_line = read_capped_utf8_line_with_limit(&mut reader, 4)
2007 .expect("read next line")
2008 .expect("next line present");
2009 assert_eq!(next_line, "y\n");
2010 }
2011
2012 #[test]
2013 fn index_session_snapshot_rejects_message_count_over_i64_max() {
2014 let harness = TestHarness::new("index_session_snapshot_rejects_message_count_over_i64_max");
2015 let root = harness.temp_path("sessions");
2016 fs::create_dir_all(root.join("project")).expect("create project dir");
2017 let index = SessionIndex::for_sessions_root(&root);
2018
2019 let path = root.join("project").join("overflow.jsonl");
2020 fs::write(&path, "").expect("write session payload");
2021
2022 let header = make_header("id-overflow", "cwd-overflow");
2023 let err = index
2024 .index_session_snapshot(&path, &header, (i64::MAX as u64) + 1, None)
2025 .expect_err("out-of-range message_count should error");
2026 assert!(
2027 matches!(err, Error::Session(ref msg) if msg.contains("message_count exceeds SQLite INTEGER range")),
2028 "expected out-of-range message_count error, got {err:?}"
2029 );
2030 }
2031
2032 proptest! {
2033 #![proptest_config(ProptestConfig { cases: 128, .. ProptestConfig::default() })]
2034
2035 #[test]
2036 fn proptest_list_sessions_handles_arbitrary_sql_rows(
2037 rows in prop::collection::vec(arbitrary_meta_row_strategy(), 1..16)
2038 ) {
2039 let harness = TestHarness::new("proptest_list_sessions_handles_arbitrary_sql_rows");
2040 let root = harness.temp_path("sessions");
2041 fs::create_dir_all(&root).expect("create root dir");
2042 let index = SessionIndex::for_sessions_root(&root);
2043
2044 let expected_by_path: HashMap<String, ArbitraryMetaRow> = rows
2045 .iter()
2046 .cloned()
2047 .enumerate()
2048 .map(|(idx, row)| (format!("/tmp/pi-session-index-{idx}.jsonl"), row))
2049 .collect();
2050
2051 index
2052 .with_lock(|conn| {
2053 init_schema(conn)?;
2054 conn.execute_sync("DELETE FROM sessions", &[])
2055 .map_err(|err| Error::session(format!("delete sessions: {err}")))?;
2056
2057 for (idx, row) in rows.iter().enumerate() {
2058 let path = format!("/tmp/pi-session-index-{idx}.jsonl");
2059 conn.execute_sync(
2060 "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
2061 VALUES (?1,?2,?3,?4,?5,?6,?7,?8)",
2062 &[
2063 Value::Text(path),
2064 Value::Text(row.id.clone()),
2065 Value::Text(row.cwd.clone()),
2066 Value::Text(row.timestamp.clone()),
2067 Value::BigInt(row.message_count),
2068 Value::BigInt(row.last_modified_ms),
2069 Value::BigInt(row.size_bytes),
2070 row.name.clone().map_or(Value::Null, Value::Text),
2071 ],
2072 )
2073 .map_err(|err| Error::session(format!("insert session row {idx}: {err}")))?;
2074 }
2075
2076 Ok(())
2077 })
2078 .expect("seed session rows");
2079
2080 let has_invalid_unsigned = rows
2081 .iter()
2082 .any(|row| row.message_count < 0 || row.size_bytes < 0);
2083
2084 let listed = index.list_sessions(None);
2085 if has_invalid_unsigned {
2086 prop_assert!(listed.is_err(), "negative message_count/size_bytes should error");
2087 return Ok(());
2088 }
2089 let listed = listed.expect("list all sessions");
2090 prop_assert_eq!(listed.len(), rows.len());
2091 for pair in listed.windows(2) {
2092 prop_assert!(pair[0].last_modified_ms >= pair[1].last_modified_ms);
2093 }
2094
2095 for meta in &listed {
2096 let expected = expected_by_path
2097 .get(&meta.path)
2098 .expect("expected row should exist");
2099 prop_assert_eq!(&meta.id, &expected.id);
2100 prop_assert_eq!(&meta.cwd, &expected.cwd);
2101 prop_assert_eq!(&meta.timestamp, &expected.timestamp);
2102 prop_assert_eq!(
2103 meta.message_count,
2104 u64::try_from(expected.message_count).expect("filtered non-negative count")
2105 );
2106 prop_assert_eq!(
2107 meta.size_bytes,
2108 u64::try_from(expected.size_bytes).expect("filtered non-negative size")
2109 );
2110 prop_assert_eq!(&meta.name, &expected.name);
2111 }
2112
2113 let filtered = index
2114 .list_sessions(Some("cwd-a"))
2115 .expect("list cwd-a sessions");
2116 let expected_filtered = rows.iter().filter(|row| row.cwd == "cwd-a").count();
2117 prop_assert_eq!(filtered.len(), expected_filtered);
2118 prop_assert!(filtered.iter().all(|meta| meta.cwd == "cwd-a"));
2119 for pair in filtered.windows(2) {
2120 prop_assert!(pair[0].last_modified_ms >= pair[1].last_modified_ms);
2121 }
2122 }
2123 }
2124
2125 proptest! {
2126 #![proptest_config(ProptestConfig { cases: 128, .. ProptestConfig::default() })]
2127
2128 #[test]
2129 fn proptest_index_session_snapshot_roundtrip_metadata(
2130 id in ident_strategy(),
2131 cwd in cwd_strategy(),
2132 timestamp in timestamp_strategy(),
2133 message_count in any::<u64>(),
2134 name in optional_name_strategy(),
2135 content in prop::collection::vec(any::<u8>(), 0..256)
2136 ) {
2137 let harness = TestHarness::new("proptest_index_session_snapshot_roundtrip_metadata");
2138 let root = harness.temp_path("sessions");
2139 fs::create_dir_all(root.join("project")).expect("create project dir");
2140 let index = SessionIndex::for_sessions_root(&root);
2141
2142 let path = root.join("project").join(format!("{id}.jsonl"));
2143 fs::write(&path, &content).expect("write session payload");
2144
2145 let mut header = make_header(&id, &cwd);
2146 header.timestamp = timestamp.clone();
2147 let index_result = index.index_session_snapshot(&path, &header, message_count, name.clone());
2148 if message_count > i64::MAX as u64 {
2149 prop_assert!(
2150 index_result.is_err(),
2151 "expected out-of-range message_count to fail indexing"
2152 );
2153 } else {
2154 index_result.expect("index snapshot");
2155
2156 let listed = index
2157 .list_sessions(Some(&cwd))
2158 .expect("list sessions for cwd");
2159 prop_assert_eq!(listed.len(), 1);
2160
2161 let meta = &listed[0];
2162 let expected_count = message_count;
2163 prop_assert_eq!(&meta.id, &id);
2164 prop_assert_eq!(&meta.cwd, &cwd);
2165 prop_assert_eq!(&meta.timestamp, ×tamp);
2166 prop_assert_eq!(&meta.path, &path.display().to_string());
2167 prop_assert_eq!(meta.message_count, expected_count);
2168 prop_assert_eq!(meta.size_bytes, content.len() as u64);
2169 prop_assert_eq!(&meta.name, &name);
2170 prop_assert!(meta.last_modified_ms >= 0);
2171
2172 let other_cwd = index
2173 .list_sessions(Some("definitely-not-this-cwd"))
2174 .expect("list sessions for unmatched cwd");
2175 prop_assert!(other_cwd.is_empty());
2176 }
2177 }
2178 }
2179}